From 3ad5c8069cf45963db8b839e942ab00eac8daffc Mon Sep 17 00:00:00 2001 From: Vinko Kasljevic <vinko.kasljevic@memgraph.io> Date: Fri, 15 Feb 2019 11:18:44 +0100 Subject: [PATCH] Add CachedDataLock throughout the code Summary: CachedDataLock is necessary for lru cache as remote data is no longer persistent. Most methods internally handle this, but for methods that return pointers or references to remote data, we need to manually lock data. Reviewers: msantl, ipaljak Reviewed By: msantl Subscribers: teon.banek, pullbot Differential Revision: https://phabricator.memgraph.io/D1869 --- src/storage/distributed/cached_data_lock.hpp | 38 ++++++++++++ src/storage/distributed/record_accessor.cpp | 62 +++++++++++++++++++ src/storage/distributed/record_accessor.hpp | 25 +++++++- src/storage/distributed/rpc/serialization.cpp | 1 + src/storage/distributed/vertex_accessor.cpp | 10 ++- src/storage/distributed/vertex_accessor.hpp | 20 +++++- tests/unit/distributed_data_exchange.cpp | 2 + 7 files changed, 153 insertions(+), 5 deletions(-) create mode 100644 src/storage/distributed/cached_data_lock.hpp diff --git a/src/storage/distributed/cached_data_lock.hpp b/src/storage/distributed/cached_data_lock.hpp new file mode 100644 index 000000000..617c705f2 --- /dev/null +++ b/src/storage/distributed/cached_data_lock.hpp @@ -0,0 +1,38 @@ +/// @file + +#pragma once + +namespace storage { +/// CachedDataLock wrapps RecordAccessors HoldCachedData and ReleaseCachedData +/// methods and provides a RAII style mechanism for releasing the lock at the +/// end of the scope. +template <typename TRecordAccessor> +class CachedDataLock { + explicit CachedDataLock(const TRecordAccessor &accessor) + : accessor_(&accessor) { + accessor_->HoldCachedData(); + } + + template <typename URecordAccessor> + friend CachedDataLock<URecordAccessor> GetDataLock(const URecordAccessor &); + + public: + ~CachedDataLock() { Release(); } + + void Release() { + if (accessor_) { + accessor_->ReleaseCachedData(); + accessor_ = nullptr; + } + } + + private: + const TRecordAccessor *accessor_; +}; + +/// Constructs CachedDataLock and then lockes given accessor. +template <typename TRecordAccessor> +CachedDataLock<TRecordAccessor> GetDataLock(const TRecordAccessor &accessor) { + return CachedDataLock<TRecordAccessor>(accessor); +} +} // namespace storage diff --git a/src/storage/distributed/record_accessor.cpp b/src/storage/distributed/record_accessor.cpp index 739f2b5f8..2dcb52782 100644 --- a/src/storage/distributed/record_accessor.cpp +++ b/src/storage/distributed/record_accessor.cpp @@ -6,6 +6,7 @@ #include "distributed/data_manager.hpp" #include "distributed/updates_rpc_clients.hpp" #include "durability/distributed/state_delta.hpp" +#include "storage/distributed/cached_data_lock.hpp" #include "storage/distributed/edge.hpp" #include "storage/distributed/vertex.hpp" @@ -20,6 +21,7 @@ RecordAccessor<TRecord>::RecordAccessor(AddressT address, template <typename TRecord> PropertyValue RecordAccessor<TRecord>::PropsAt(storage::Property key) const { + auto guard = storage::GetDataLock(*this); return current().properties_.at(key); } @@ -29,6 +31,7 @@ void RecordAccessor<Vertex>::PropsSet(storage::Property key, auto &dba = db_accessor(); auto delta = StateDelta::PropsSetVertex(dba.transaction_id(), gid(), key, dba.PropertyName(key), value); + auto guard = storage::GetDataLock(*this); update().properties_.set(key, value); if (is_local()) { dba.UpdatePropertyIndex(key, *this, &update()); @@ -43,6 +46,7 @@ void RecordAccessor<Edge>::PropsSet(storage::Property key, auto delta = StateDelta::PropsSetEdge(dba.transaction_id(), gid(), key, dba.PropertyName(key), value); + auto guard = storage::GetDataLock(*this); update().properties_.set(key, value); ProcessDelta(delta); } @@ -53,6 +57,7 @@ void RecordAccessor<Vertex>::PropsErase(storage::Property key) { auto delta = StateDelta::PropsSetVertex(dba.transaction_id(), gid(), key, dba.PropertyName(key), PropertyValue::Null); + auto guard = storage::GetDataLock(*this); update().properties_.set(key, PropertyValue::Null); ProcessDelta(delta); } @@ -63,6 +68,8 @@ void RecordAccessor<Edge>::PropsErase(storage::Property key) { auto delta = StateDelta::PropsSetEdge(dba.transaction_id(), gid(), key, dba.PropertyName(key), PropertyValue::Null); + + auto guard = storage::GetDataLock(*this); update().properties_.set(key, PropertyValue::Null); ProcessDelta(delta); } @@ -70,6 +77,7 @@ void RecordAccessor<Edge>::PropsErase(storage::Property key) { template <typename TRecord> void RecordAccessor<TRecord>::PropsClear() { std::vector<storage::Property> to_remove; + auto guard = storage::GetDataLock(*this); for (const auto &kv : update().properties_) to_remove.emplace_back(kv.first); for (const auto &prop : to_remove) { PropsErase(prop); @@ -78,6 +86,7 @@ void RecordAccessor<TRecord>::PropsClear() { template <typename TRecord> const PropertyValueStore &RecordAccessor<TRecord>::Properties() const { + auto guard = storage::GetDataLock(*this); return current().properties_; } @@ -114,6 +123,7 @@ RecordAccessor<TRecord>::GlobalAddress() const { template <typename TRecord> RecordAccessor<TRecord> &RecordAccessor<TRecord>::SwitchNew() { + auto guard = storage::GetDataLock(*this); if (is_local()) { if (!new_) { // if new_ is not set yet, look for it @@ -132,15 +142,34 @@ RecordAccessor<TRecord> &RecordAccessor<TRecord>::SwitchNew() { return *this; } +template <typename TRecord> +TRecord *RecordAccessor<TRecord>::GetNew() const { + if (!is_local()) { + DCHECK(remote_.lock_counter > 0) << "Remote data is missing"; + } + + return new_; +} + template <typename TRecord> RecordAccessor<TRecord> &RecordAccessor<TRecord>::SwitchOld() { current_ = old_ ? old_ : new_; return *this; } +template <typename TRecord> +TRecord *RecordAccessor<TRecord>::GetOld() const { + if (!is_local()) { + DCHECK(remote_.lock_counter > 0) << "Remote data is missing"; + } + + return old_; +} + template <typename TRecord> bool RecordAccessor<TRecord>::Reconstruct() const { auto &dba = db_accessor(); + auto guard = storage::GetDataLock(*this); if (is_local()) { address().local()->find_set_old_new(dba.transaction(), &old_, &new_); } else { @@ -159,6 +188,7 @@ bool RecordAccessor<TRecord>::Reconstruct() const { template <typename TRecord> TRecord &RecordAccessor<TRecord>::update() const { auto &dba = db_accessor(); + auto guard = storage::GetDataLock(*this); // Edges have lazily initialize mutable, versioned data (properties). if (std::is_same<TRecord, Edge>::value && current_ == nullptr) { bool reconstructed = Reconstruct(); @@ -203,8 +233,40 @@ int64_t RecordAccessor<TRecord>::CypherId() const { .cypher_id; } +template <typename TRecord> +void RecordAccessor<TRecord>::HoldCachedData() const { + if (!is_local()) { + if (remote_.lock_counter == 0) { + // TODO (vkasljevic) uncomment once Remote has beed implemented + // remote_.data = db_accessor_->data_manager().template Find<TRecord>( + // db_accessor_->transaction().id_, Address().worker_id(), + // Address().gid(), remote_.has_updated); + } + + ++remote_.lock_counter; + DCHECK(remote_.lock_counter <= 10000) + << "Something wrong with lock counter"; + } +} + +template <typename TRecord> +void RecordAccessor<TRecord>::ReleaseCachedData() const { + if (!is_local()) { + DCHECK(remote_.lock_counter > 0) << "Lock should exist at this point"; + --remote_.lock_counter; + if (remote_.lock_counter == 0) { + // TODO (vkasljevic) uncomment once Remote has beed implemented + // remote_.data = nullptr; + } + } +} + template <typename TRecord> const TRecord &RecordAccessor<TRecord>::current() const { + if (!is_local()) { + DCHECK(remote_.lock_counter > 0) << "Remote data is missing"; + } + // Edges have lazily initialize mutable, versioned data (properties). if (std::is_same<TRecord, Edge>::value && current_ == nullptr) { bool reconstructed = Reconstruct(); diff --git a/src/storage/distributed/record_accessor.hpp b/src/storage/distributed/record_accessor.hpp index fb41eee10..e4aaedc8d 100644 --- a/src/storage/distributed/record_accessor.hpp +++ b/src/storage/distributed/record_accessor.hpp @@ -98,7 +98,7 @@ class RecordAccessor { RecordAccessor<TRecord> &SwitchNew(); /** Returns the new record pointer. */ - TRecord *GetNew() const { return new_; } + TRecord *GetNew() const; /** * Attempts to switch this accessor to use the latest version not updated by @@ -111,7 +111,7 @@ class RecordAccessor { RecordAccessor<TRecord> &SwitchOld(); /** Returns the old record pointer. */ - TRecord *GetOld() const { return old_; } + TRecord *GetOld() const; /** * Reconstructs the internal state of the record accessor so it uses the @@ -158,6 +158,19 @@ class RecordAccessor { */ int64_t CypherId() const; + /** + * If accessor holds remote record, this method will hold remote data until + * released. This is needed for methods that return pointers. + * This method can be called multiple times. + */ + void HoldCachedData() const; + + /** + * If accessor holds remote record, this method will release remote data. + * This is needed for methods that return pointers. + */ + void ReleaseCachedData() const; + protected: /** * The database::GraphDbAccessor is friend to this accessor so it can @@ -183,6 +196,7 @@ class RecordAccessor { * This pointer can be null if created by an accessor which lazily reads from * mvcc. */ + // TODO (vkasljevic) remove this mutable TRecord *current_{nullptr}; /** Returns the current version (either new_ or old_) set on this @@ -197,6 +211,13 @@ class RecordAccessor { AddressT address_; + struct Remote { + /* Keeps track of how many times HoldRemoteData was called. */ + mutable unsigned short lock_counter{0}; + }; + + Remote remote_; + /** * Latest version which is visible to the current transaction+command * but has not been created nor modified by the current transaction+command. diff --git a/src/storage/distributed/rpc/serialization.cpp b/src/storage/distributed/rpc/serialization.cpp index cf0b56060..5eab2364d 100644 --- a/src/storage/distributed/rpc/serialization.cpp +++ b/src/storage/distributed/rpc/serialization.cpp @@ -241,6 +241,7 @@ void SaveRecordAccessor(const RecordAccessor<TRecord> &accessor, builder->setAddress(accessor.GlobalAddress().raw()); bool reconstructed = false; + auto guard = storage::GetDataLock(accessor); if (!accessor.GetOld() && !accessor.GetNew()) { reconstructed = true; bool result = accessor.Reconstruct(); diff --git a/src/storage/distributed/vertex_accessor.cpp b/src/storage/distributed/vertex_accessor.cpp index 88e23c770..8029dd744 100644 --- a/src/storage/distributed/vertex_accessor.cpp +++ b/src/storage/distributed/vertex_accessor.cpp @@ -20,6 +20,7 @@ void VertexAccessor::add_label(storage::Label label) { auto &dba = db_accessor(); auto delta = database::StateDelta::AddLabel(dba.transaction_id(), gid(), label, dba.LabelName(label)); + auto guard = storage::GetDataLock(*this); auto &vertex = update(); // not a duplicate label, add it if (!utils::Contains(vertex.labels_, label)) { @@ -37,7 +38,8 @@ void VertexAccessor::remove_label(storage::Label label) { auto &dba = db_accessor(); auto delta = database::StateDelta::RemoveLabel(dba.transaction_id(), gid(), label, dba.LabelName(label)); - Vertex &vertex = update(); + auto guard = storage::GetDataLock(*this); + auto &vertex = update(); if (utils::Contains(vertex.labels_, label)) { auto &labels = vertex.labels_; auto found = std::find(labels.begin(), labels.end(), delta.label); @@ -52,11 +54,13 @@ void VertexAccessor::remove_label(storage::Label label) { } bool VertexAccessor::has_label(storage::Label label) const { + auto guard = storage::GetDataLock(*this); auto &labels = this->current().labels_; return std::find(labels.begin(), labels.end(), label) != labels.end(); } -const std::vector<storage::Label> &VertexAccessor::labels() const { +std::vector<storage::Label> VertexAccessor::labels() const { + auto guard = storage::GetDataLock(*this); return this->current().labels_; } @@ -66,6 +70,7 @@ void VertexAccessor::RemoveOutEdge(storage::EdgeAddress edge) { dba.transaction_id(), gid(), dba.db().storage().GlobalizedAddress(edge)); SwitchNew(); + auto guard = storage::GetDataLock(*this); if (current().is_expired_by(dba.transaction())) return; update().out_.RemoveEdge(dba.db().storage().LocalizedAddressIfPossible(edge)); @@ -78,6 +83,7 @@ void VertexAccessor::RemoveInEdge(storage::EdgeAddress edge) { dba.transaction_id(), gid(), dba.db().storage().GlobalizedAddress(edge)); SwitchNew(); + auto guard = storage::GetDataLock(*this); if (current().is_expired_by(dba.transaction())) return; update().in_.RemoveEdge(dba.db().storage().LocalizedAddressIfPossible(edge)); diff --git a/src/storage/distributed/vertex_accessor.hpp b/src/storage/distributed/vertex_accessor.hpp index a0c18c3ad..2ec032ef2 100644 --- a/src/storage/distributed/vertex_accessor.hpp +++ b/src/storage/distributed/vertex_accessor.hpp @@ -8,6 +8,7 @@ #include <cppitertools/chain.hpp> #include <cppitertools/imap.hpp> +#include "storage/distributed/cached_data_lock.hpp" #include "storage/distributed/edge_accessor.hpp" #include "storage/distributed/record_accessor.hpp" #include "storage/distributed/vertex.hpp" @@ -69,10 +70,12 @@ class VertexAccessor final : public RecordAccessor<Vertex> { bool has_label(storage::Label label) const; /** Returns all the Labels of the Vertex. */ - const std::vector<storage::Label> &labels() const; + // TODO (vkasljevic) add proxy for labels + std::vector<storage::Label> labels() const; /** Returns EdgeAccessors for all incoming edges. */ auto in() const { + auto guard = storage::GetDataLock(*this); return MakeAccessorIterator(current().in_.begin(), current().in_.end(), false, address(), db_accessor()); } @@ -86,6 +89,9 @@ class VertexAccessor final : public RecordAccessor<Vertex> { */ auto in(const VertexAccessor &dest, const std::vector<storage::EdgeType> *edge_types = nullptr) const { + // This is temporary + // TODO (vkasljevic) prepare iterators for lru cache + auto guard = storage::GetDataLock(*this); return MakeAccessorIterator(current().in_.begin(dest.address(), edge_types), current().in_.end(), false, address(), db_accessor()); @@ -98,6 +104,9 @@ class VertexAccessor final : public RecordAccessor<Vertex> { * or empty, the parameter is ignored. */ auto in(const std::vector<storage::EdgeType> *edge_types) const { + // This is temporary + // TODO (vkasljevic) prepare iterators for lru cache + auto guard = storage::GetDataLock(*this); return MakeAccessorIterator( current().in_.begin(std::experimental::nullopt, edge_types), current().in_.end(), false, address(), db_accessor()); @@ -105,6 +114,9 @@ class VertexAccessor final : public RecordAccessor<Vertex> { /** Returns EdgeAccessors for all outgoing edges. */ auto out() const { + // This is temporary + // TODO (vkasljevic) prepare iterators for lru cache + auto guard = storage::GetDataLock(*this); return MakeAccessorIterator(current().out_.begin(), current().out_.end(), true, address(), db_accessor()); } @@ -119,6 +131,9 @@ class VertexAccessor final : public RecordAccessor<Vertex> { */ auto out(const VertexAccessor &dest, const std::vector<storage::EdgeType> *edge_types = nullptr) const { + // This is temporary + // TODO (vkasljevic) prepare iterators for lru cache + auto guard = storage::GetDataLock(*this); return MakeAccessorIterator( current().out_.begin(dest.address(), edge_types), current().out_.end(), true, address(), db_accessor()); @@ -131,6 +146,9 @@ class VertexAccessor final : public RecordAccessor<Vertex> { * or empty, the parameter is ignored. */ auto out(const std::vector<storage::EdgeType> *edge_types) const { + // This is temporary + // TODO (vkasljevic) prepare iterators for lru cache + auto guard = storage::GetDataLock(*this); return MakeAccessorIterator( current().out_.begin(std::experimental::nullopt, edge_types), current().out_.end(), true, address(), db_accessor()); diff --git a/tests/unit/distributed_data_exchange.cpp b/tests/unit/distributed_data_exchange.cpp index 86155b322..631c3f189 100644 --- a/tests/unit/distributed_data_exchange.cpp +++ b/tests/unit/distributed_data_exchange.cpp @@ -45,6 +45,7 @@ TEST_F(DistributedDataExchangeTest, RemoteDataGetting) { { auto w1_dba = worker(1).Access(master_dba->transaction_id()); VertexAccessor v1_in_w1{{v1_id, 0}, *w1_dba}; + auto guard = storage::GetDataLock(v1_in_w1); EXPECT_NE(v1_in_w1.GetOld(), nullptr); EXPECT_EQ(v1_in_w1.GetNew(), nullptr); EXPECT_EQ(v1_in_w1.PropsAt(w1_dba->Property("p1")).Value<int64_t>(), 42); @@ -54,6 +55,7 @@ TEST_F(DistributedDataExchangeTest, RemoteDataGetting) { { auto w2_dba = worker(2).Access(master_dba->transaction_id()); VertexAccessor v2_in_w2{{v2_id, 0}, *w2_dba}; + auto guard = storage::GetDataLock(v2_in_w2); EXPECT_NE(v2_in_w2.GetOld(), nullptr); EXPECT_EQ(v2_in_w2.GetNew(), nullptr); EXPECT_EQ(v2_in_w2.PropsAt(w2_dba->Property("p2")).Value<std::string>(),