Make previous pointer atomic in storage v2

Reviewers: mtomic, teon.banek

Reviewed By: mtomic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2191
This commit is contained in:
Matej Ferencevic 2019-07-09 15:40:59 +02:00
parent 4d94aa740a
commit 18e2f9eeda
3 changed files with 39 additions and 34 deletions

View File

@ -36,54 +36,56 @@ class PreviousPtr {
EDGE, EDGE,
}; };
void Set(Delta *delta) { struct Pointer {
uintptr_t value = reinterpret_cast<uintptr_t>(delta); explicit Pointer(Delta *delta) : type(Type::DELTA), delta(delta) {}
CHECK((value & kMask) == 0) << "Invalid pointer!"; explicit Pointer(Vertex *vertex) : type(Type::VERTEX), vertex(vertex) {}
storage_ = value | kDelta; explicit Pointer(Edge *edge) : type(Type::EDGE), edge(edge) {}
}
void Set(Vertex *vertex) { Type type;
uintptr_t value = reinterpret_cast<uintptr_t>(vertex); Delta *delta{nullptr};
CHECK((value & kMask) == 0) << "Invalid pointer!"; Vertex *vertex{nullptr};
storage_ = value | kVertex; Edge *edge{nullptr};
} };
void Set(Edge *edge) { PreviousPtr() : storage_(0) {}
uintptr_t value = reinterpret_cast<uintptr_t>(edge);
CHECK((value & kMask) == 0) << "Invalid pointer!";
storage_ = value | kEdge;
}
Type GetType() const { PreviousPtr(const PreviousPtr &other) noexcept
uintptr_t type = storage_ & kMask; : storage_(other.storage_.load(std::memory_order_acquire)) {}
Pointer Get() const {
uintptr_t value = storage_.load(std::memory_order_acquire);
uintptr_t type = value & kMask;
if (type == kDelta) { if (type == kDelta) {
return Type::DELTA; return Pointer{reinterpret_cast<Delta *>(value & ~kMask)};
} else if (type == kVertex) { } else if (type == kVertex) {
return Type::VERTEX; return Pointer{reinterpret_cast<Vertex *>(value & ~kMask)};
} else if (type == kEdge) { } else if (type == kEdge) {
return Type::EDGE; return Pointer{reinterpret_cast<Edge *>(value & ~kMask)};
} else { } else {
LOG(FATAL) << "Invalid pointer type!"; LOG(FATAL) << "Invalid pointer type!";
} }
} }
Delta *GetDelta() const { void Set(Delta *delta) {
CHECK((storage_ & kMask) == kDelta) << "Can't convert pointer to delta!"; uintptr_t value = reinterpret_cast<uintptr_t>(delta);
return reinterpret_cast<Delta *>(storage_ & ~kMask); CHECK((value & kMask) == 0) << "Invalid pointer!";
storage_.store(value | kDelta, std::memory_order_release);
} }
Vertex *GetVertex() const { void Set(Vertex *vertex) {
CHECK((storage_ & kMask) == kVertex) << "Can't convert pointer to vertex!"; uintptr_t value = reinterpret_cast<uintptr_t>(vertex);
return reinterpret_cast<Vertex *>(storage_ & ~kMask); CHECK((value & kMask) == 0) << "Invalid pointer!";
storage_.store(value | kVertex, std::memory_order_release);
} }
Edge *GetEdge() const { void Set(Edge *edge) {
CHECK((storage_ & kMask) == kEdge) << "Can't convert pointer to edge!"; uintptr_t value = reinterpret_cast<uintptr_t>(edge);
return reinterpret_cast<Edge *>(storage_ & ~kMask); CHECK((value & kMask) == 0) << "Invalid pointer!";
storage_.store(value | kEdge, std::memory_order_release);
} }
private: private:
uintptr_t storage_{0}; std::atomic<uintptr_t> storage_;
}; };
struct Delta { struct Delta {

View File

@ -85,7 +85,9 @@ inline void CreateAndLinkDelta(Transaction *transaction, TObj *object,
transaction->command_id); transaction->command_id);
// The operations are written in such order so that both `next` and `prev` // The operations are written in such order so that both `next` and `prev`
// chains are valid at all times. // chains are valid at all times. The chains must be valid at all times
// because garbage collection (which traverses the chains) is done
// concurrently (as well as other execution threads).
delta->prev.Set(object); delta->prev.Set(object);
if (object->delta) { if (object->delta) {
object->delta->prev.Set(delta); object->delta->prev.Set(delta);

View File

@ -307,9 +307,10 @@ void Storage::Accessor::Commit() {
void Storage::Accessor::Abort() { void Storage::Accessor::Abort() {
CHECK(transaction_->is_active) << "The transaction is already terminated!"; CHECK(transaction_->is_active) << "The transaction is already terminated!";
for (const auto &delta : transaction_->deltas) { for (const auto &delta : transaction_->deltas) {
switch (delta.prev.GetType()) { auto prev = delta.prev.Get();
switch (prev.type) {
case PreviousPtr::Type::VERTEX: { case PreviousPtr::Type::VERTEX: {
auto vertex = delta.prev.GetVertex(); auto vertex = prev.vertex;
std::lock_guard<utils::SpinLock> guard(vertex->lock); std::lock_guard<utils::SpinLock> guard(vertex->lock);
Delta *current = vertex->delta; Delta *current = vertex->delta;
while (current != nullptr && while (current != nullptr &&
@ -409,7 +410,7 @@ void Storage::Accessor::Abort() {
break; break;
} }
case PreviousPtr::Type::EDGE: { case PreviousPtr::Type::EDGE: {
auto edge = delta.prev.GetEdge(); auto edge = prev.edge;
std::lock_guard<utils::SpinLock> guard(edge->lock); std::lock_guard<utils::SpinLock> guard(edge->lock);
Delta *current = edge->delta; Delta *current = edge->delta;
while (current != nullptr && while (current != nullptr &&