diff --git a/src/storage/distributed/cached_data_lock.hpp b/src/storage/distributed/cached_data_lock.hpp new file mode 100644 index 000000000..617c705f2 --- /dev/null +++ b/src/storage/distributed/cached_data_lock.hpp @@ -0,0 +1,38 @@ +/// @file + +#pragma once + +namespace storage { +/// CachedDataLock wrapps RecordAccessors HoldCachedData and ReleaseCachedData +/// methods and provides a RAII style mechanism for releasing the lock at the +/// end of the scope. +template <typename TRecordAccessor> +class CachedDataLock { + explicit CachedDataLock(const TRecordAccessor &accessor) + : accessor_(&accessor) { + accessor_->HoldCachedData(); + } + + template <typename URecordAccessor> + friend CachedDataLock<URecordAccessor> GetDataLock(const URecordAccessor &); + + public: + ~CachedDataLock() { Release(); } + + void Release() { + if (accessor_) { + accessor_->ReleaseCachedData(); + accessor_ = nullptr; + } + } + + private: + const TRecordAccessor *accessor_; +}; + +/// Constructs CachedDataLock and then lockes given accessor. +template <typename TRecordAccessor> +CachedDataLock<TRecordAccessor> GetDataLock(const TRecordAccessor &accessor) { + return CachedDataLock<TRecordAccessor>(accessor); +} +} // namespace storage diff --git a/src/storage/distributed/record_accessor.cpp b/src/storage/distributed/record_accessor.cpp index 739f2b5f8..2dcb52782 100644 --- a/src/storage/distributed/record_accessor.cpp +++ b/src/storage/distributed/record_accessor.cpp @@ -6,6 +6,7 @@ #include "distributed/data_manager.hpp" #include "distributed/updates_rpc_clients.hpp" #include "durability/distributed/state_delta.hpp" +#include "storage/distributed/cached_data_lock.hpp" #include "storage/distributed/edge.hpp" #include "storage/distributed/vertex.hpp" @@ -20,6 +21,7 @@ RecordAccessor<TRecord>::RecordAccessor(AddressT address, template <typename TRecord> PropertyValue RecordAccessor<TRecord>::PropsAt(storage::Property key) const { + auto guard = storage::GetDataLock(*this); return current().properties_.at(key); } @@ -29,6 +31,7 @@ void RecordAccessor<Vertex>::PropsSet(storage::Property key, auto &dba = db_accessor(); auto delta = StateDelta::PropsSetVertex(dba.transaction_id(), gid(), key, dba.PropertyName(key), value); + auto guard = storage::GetDataLock(*this); update().properties_.set(key, value); if (is_local()) { dba.UpdatePropertyIndex(key, *this, &update()); @@ -43,6 +46,7 @@ void RecordAccessor<Edge>::PropsSet(storage::Property key, auto delta = StateDelta::PropsSetEdge(dba.transaction_id(), gid(), key, dba.PropertyName(key), value); + auto guard = storage::GetDataLock(*this); update().properties_.set(key, value); ProcessDelta(delta); } @@ -53,6 +57,7 @@ void RecordAccessor<Vertex>::PropsErase(storage::Property key) { auto delta = StateDelta::PropsSetVertex(dba.transaction_id(), gid(), key, dba.PropertyName(key), PropertyValue::Null); + auto guard = storage::GetDataLock(*this); update().properties_.set(key, PropertyValue::Null); ProcessDelta(delta); } @@ -63,6 +68,8 @@ void RecordAccessor<Edge>::PropsErase(storage::Property key) { auto delta = StateDelta::PropsSetEdge(dba.transaction_id(), gid(), key, dba.PropertyName(key), PropertyValue::Null); + + auto guard = storage::GetDataLock(*this); update().properties_.set(key, PropertyValue::Null); ProcessDelta(delta); } @@ -70,6 +77,7 @@ void RecordAccessor<Edge>::PropsErase(storage::Property key) { template <typename TRecord> void RecordAccessor<TRecord>::PropsClear() { std::vector<storage::Property> to_remove; + auto guard = storage::GetDataLock(*this); for (const auto &kv : update().properties_) to_remove.emplace_back(kv.first); for (const auto &prop : to_remove) { PropsErase(prop); @@ -78,6 +86,7 @@ void RecordAccessor<TRecord>::PropsClear() { template <typename TRecord> const PropertyValueStore &RecordAccessor<TRecord>::Properties() const { + auto guard = storage::GetDataLock(*this); return current().properties_; } @@ -114,6 +123,7 @@ RecordAccessor<TRecord>::GlobalAddress() const { template <typename TRecord> RecordAccessor<TRecord> &RecordAccessor<TRecord>::SwitchNew() { + auto guard = storage::GetDataLock(*this); if (is_local()) { if (!new_) { // if new_ is not set yet, look for it @@ -132,15 +142,34 @@ RecordAccessor<TRecord> &RecordAccessor<TRecord>::SwitchNew() { return *this; } +template <typename TRecord> +TRecord *RecordAccessor<TRecord>::GetNew() const { + if (!is_local()) { + DCHECK(remote_.lock_counter > 0) << "Remote data is missing"; + } + + return new_; +} + template <typename TRecord> RecordAccessor<TRecord> &RecordAccessor<TRecord>::SwitchOld() { current_ = old_ ? old_ : new_; return *this; } +template <typename TRecord> +TRecord *RecordAccessor<TRecord>::GetOld() const { + if (!is_local()) { + DCHECK(remote_.lock_counter > 0) << "Remote data is missing"; + } + + return old_; +} + template <typename TRecord> bool RecordAccessor<TRecord>::Reconstruct() const { auto &dba = db_accessor(); + auto guard = storage::GetDataLock(*this); if (is_local()) { address().local()->find_set_old_new(dba.transaction(), &old_, &new_); } else { @@ -159,6 +188,7 @@ bool RecordAccessor<TRecord>::Reconstruct() const { template <typename TRecord> TRecord &RecordAccessor<TRecord>::update() const { auto &dba = db_accessor(); + auto guard = storage::GetDataLock(*this); // Edges have lazily initialize mutable, versioned data (properties). if (std::is_same<TRecord, Edge>::value && current_ == nullptr) { bool reconstructed = Reconstruct(); @@ -203,8 +233,40 @@ int64_t RecordAccessor<TRecord>::CypherId() const { .cypher_id; } +template <typename TRecord> +void RecordAccessor<TRecord>::HoldCachedData() const { + if (!is_local()) { + if (remote_.lock_counter == 0) { + // TODO (vkasljevic) uncomment once Remote has beed implemented + // remote_.data = db_accessor_->data_manager().template Find<TRecord>( + // db_accessor_->transaction().id_, Address().worker_id(), + // Address().gid(), remote_.has_updated); + } + + ++remote_.lock_counter; + DCHECK(remote_.lock_counter <= 10000) + << "Something wrong with lock counter"; + } +} + +template <typename TRecord> +void RecordAccessor<TRecord>::ReleaseCachedData() const { + if (!is_local()) { + DCHECK(remote_.lock_counter > 0) << "Lock should exist at this point"; + --remote_.lock_counter; + if (remote_.lock_counter == 0) { + // TODO (vkasljevic) uncomment once Remote has beed implemented + // remote_.data = nullptr; + } + } +} + template <typename TRecord> const TRecord &RecordAccessor<TRecord>::current() const { + if (!is_local()) { + DCHECK(remote_.lock_counter > 0) << "Remote data is missing"; + } + // Edges have lazily initialize mutable, versioned data (properties). if (std::is_same<TRecord, Edge>::value && current_ == nullptr) { bool reconstructed = Reconstruct(); diff --git a/src/storage/distributed/record_accessor.hpp b/src/storage/distributed/record_accessor.hpp index fb41eee10..e4aaedc8d 100644 --- a/src/storage/distributed/record_accessor.hpp +++ b/src/storage/distributed/record_accessor.hpp @@ -98,7 +98,7 @@ class RecordAccessor { RecordAccessor<TRecord> &SwitchNew(); /** Returns the new record pointer. */ - TRecord *GetNew() const { return new_; } + TRecord *GetNew() const; /** * Attempts to switch this accessor to use the latest version not updated by @@ -111,7 +111,7 @@ class RecordAccessor { RecordAccessor<TRecord> &SwitchOld(); /** Returns the old record pointer. */ - TRecord *GetOld() const { return old_; } + TRecord *GetOld() const; /** * Reconstructs the internal state of the record accessor so it uses the @@ -158,6 +158,19 @@ class RecordAccessor { */ int64_t CypherId() const; + /** + * If accessor holds remote record, this method will hold remote data until + * released. This is needed for methods that return pointers. + * This method can be called multiple times. + */ + void HoldCachedData() const; + + /** + * If accessor holds remote record, this method will release remote data. + * This is needed for methods that return pointers. + */ + void ReleaseCachedData() const; + protected: /** * The database::GraphDbAccessor is friend to this accessor so it can @@ -183,6 +196,7 @@ class RecordAccessor { * This pointer can be null if created by an accessor which lazily reads from * mvcc. */ + // TODO (vkasljevic) remove this mutable TRecord *current_{nullptr}; /** Returns the current version (either new_ or old_) set on this @@ -197,6 +211,13 @@ class RecordAccessor { AddressT address_; + struct Remote { + /* Keeps track of how many times HoldRemoteData was called. */ + mutable unsigned short lock_counter{0}; + }; + + Remote remote_; + /** * Latest version which is visible to the current transaction+command * but has not been created nor modified by the current transaction+command. diff --git a/src/storage/distributed/rpc/serialization.cpp b/src/storage/distributed/rpc/serialization.cpp index cf0b56060..5eab2364d 100644 --- a/src/storage/distributed/rpc/serialization.cpp +++ b/src/storage/distributed/rpc/serialization.cpp @@ -241,6 +241,7 @@ void SaveRecordAccessor(const RecordAccessor<TRecord> &accessor, builder->setAddress(accessor.GlobalAddress().raw()); bool reconstructed = false; + auto guard = storage::GetDataLock(accessor); if (!accessor.GetOld() && !accessor.GetNew()) { reconstructed = true; bool result = accessor.Reconstruct(); diff --git a/src/storage/distributed/vertex_accessor.cpp b/src/storage/distributed/vertex_accessor.cpp index 88e23c770..8029dd744 100644 --- a/src/storage/distributed/vertex_accessor.cpp +++ b/src/storage/distributed/vertex_accessor.cpp @@ -20,6 +20,7 @@ void VertexAccessor::add_label(storage::Label label) { auto &dba = db_accessor(); auto delta = database::StateDelta::AddLabel(dba.transaction_id(), gid(), label, dba.LabelName(label)); + auto guard = storage::GetDataLock(*this); auto &vertex = update(); // not a duplicate label, add it if (!utils::Contains(vertex.labels_, label)) { @@ -37,7 +38,8 @@ void VertexAccessor::remove_label(storage::Label label) { auto &dba = db_accessor(); auto delta = database::StateDelta::RemoveLabel(dba.transaction_id(), gid(), label, dba.LabelName(label)); - Vertex &vertex = update(); + auto guard = storage::GetDataLock(*this); + auto &vertex = update(); if (utils::Contains(vertex.labels_, label)) { auto &labels = vertex.labels_; auto found = std::find(labels.begin(), labels.end(), delta.label); @@ -52,11 +54,13 @@ void VertexAccessor::remove_label(storage::Label label) { } bool VertexAccessor::has_label(storage::Label label) const { + auto guard = storage::GetDataLock(*this); auto &labels = this->current().labels_; return std::find(labels.begin(), labels.end(), label) != labels.end(); } -const std::vector<storage::Label> &VertexAccessor::labels() const { +std::vector<storage::Label> VertexAccessor::labels() const { + auto guard = storage::GetDataLock(*this); return this->current().labels_; } @@ -66,6 +70,7 @@ void VertexAccessor::RemoveOutEdge(storage::EdgeAddress edge) { dba.transaction_id(), gid(), dba.db().storage().GlobalizedAddress(edge)); SwitchNew(); + auto guard = storage::GetDataLock(*this); if (current().is_expired_by(dba.transaction())) return; update().out_.RemoveEdge(dba.db().storage().LocalizedAddressIfPossible(edge)); @@ -78,6 +83,7 @@ void VertexAccessor::RemoveInEdge(storage::EdgeAddress edge) { dba.transaction_id(), gid(), dba.db().storage().GlobalizedAddress(edge)); SwitchNew(); + auto guard = storage::GetDataLock(*this); if (current().is_expired_by(dba.transaction())) return; update().in_.RemoveEdge(dba.db().storage().LocalizedAddressIfPossible(edge)); diff --git a/src/storage/distributed/vertex_accessor.hpp b/src/storage/distributed/vertex_accessor.hpp index a0c18c3ad..2ec032ef2 100644 --- a/src/storage/distributed/vertex_accessor.hpp +++ b/src/storage/distributed/vertex_accessor.hpp @@ -8,6 +8,7 @@ #include <cppitertools/chain.hpp> #include <cppitertools/imap.hpp> +#include "storage/distributed/cached_data_lock.hpp" #include "storage/distributed/edge_accessor.hpp" #include "storage/distributed/record_accessor.hpp" #include "storage/distributed/vertex.hpp" @@ -69,10 +70,12 @@ class VertexAccessor final : public RecordAccessor<Vertex> { bool has_label(storage::Label label) const; /** Returns all the Labels of the Vertex. */ - const std::vector<storage::Label> &labels() const; + // TODO (vkasljevic) add proxy for labels + std::vector<storage::Label> labels() const; /** Returns EdgeAccessors for all incoming edges. */ auto in() const { + auto guard = storage::GetDataLock(*this); return MakeAccessorIterator(current().in_.begin(), current().in_.end(), false, address(), db_accessor()); } @@ -86,6 +89,9 @@ class VertexAccessor final : public RecordAccessor<Vertex> { */ auto in(const VertexAccessor &dest, const std::vector<storage::EdgeType> *edge_types = nullptr) const { + // This is temporary + // TODO (vkasljevic) prepare iterators for lru cache + auto guard = storage::GetDataLock(*this); return MakeAccessorIterator(current().in_.begin(dest.address(), edge_types), current().in_.end(), false, address(), db_accessor()); @@ -98,6 +104,9 @@ class VertexAccessor final : public RecordAccessor<Vertex> { * or empty, the parameter is ignored. */ auto in(const std::vector<storage::EdgeType> *edge_types) const { + // This is temporary + // TODO (vkasljevic) prepare iterators for lru cache + auto guard = storage::GetDataLock(*this); return MakeAccessorIterator( current().in_.begin(std::experimental::nullopt, edge_types), current().in_.end(), false, address(), db_accessor()); @@ -105,6 +114,9 @@ class VertexAccessor final : public RecordAccessor<Vertex> { /** Returns EdgeAccessors for all outgoing edges. */ auto out() const { + // This is temporary + // TODO (vkasljevic) prepare iterators for lru cache + auto guard = storage::GetDataLock(*this); return MakeAccessorIterator(current().out_.begin(), current().out_.end(), true, address(), db_accessor()); } @@ -119,6 +131,9 @@ class VertexAccessor final : public RecordAccessor<Vertex> { */ auto out(const VertexAccessor &dest, const std::vector<storage::EdgeType> *edge_types = nullptr) const { + // This is temporary + // TODO (vkasljevic) prepare iterators for lru cache + auto guard = storage::GetDataLock(*this); return MakeAccessorIterator( current().out_.begin(dest.address(), edge_types), current().out_.end(), true, address(), db_accessor()); @@ -131,6 +146,9 @@ class VertexAccessor final : public RecordAccessor<Vertex> { * or empty, the parameter is ignored. */ auto out(const std::vector<storage::EdgeType> *edge_types) const { + // This is temporary + // TODO (vkasljevic) prepare iterators for lru cache + auto guard = storage::GetDataLock(*this); return MakeAccessorIterator( current().out_.begin(std::experimental::nullopt, edge_types), current().out_.end(), true, address(), db_accessor()); diff --git a/tests/unit/distributed_data_exchange.cpp b/tests/unit/distributed_data_exchange.cpp index 86155b322..631c3f189 100644 --- a/tests/unit/distributed_data_exchange.cpp +++ b/tests/unit/distributed_data_exchange.cpp @@ -45,6 +45,7 @@ TEST_F(DistributedDataExchangeTest, RemoteDataGetting) { { auto w1_dba = worker(1).Access(master_dba->transaction_id()); VertexAccessor v1_in_w1{{v1_id, 0}, *w1_dba}; + auto guard = storage::GetDataLock(v1_in_w1); EXPECT_NE(v1_in_w1.GetOld(), nullptr); EXPECT_EQ(v1_in_w1.GetNew(), nullptr); EXPECT_EQ(v1_in_w1.PropsAt(w1_dba->Property("p1")).Value<int64_t>(), 42); @@ -54,6 +55,7 @@ TEST_F(DistributedDataExchangeTest, RemoteDataGetting) { { auto w2_dba = worker(2).Access(master_dba->transaction_id()); VertexAccessor v2_in_w2{{v2_id, 0}, *w2_dba}; + auto guard = storage::GetDataLock(v2_in_w2); EXPECT_NE(v2_in_w2.GetOld(), nullptr); EXPECT_EQ(v2_in_w2.GetNew(), nullptr); EXPECT_EQ(v2_in_w2.PropsAt(w2_dba->Property("p2")).Value<std::string>(),