From 02e7cbf16c62b4faff54234154e0e92c17615d68 Mon Sep 17 00:00:00 2001 From: Teon Banek Date: Thu, 26 Jul 2018 15:37:07 +0200 Subject: [PATCH] Add pure interface class to RecordAccessor Summary: Since RecordAccessor is often instantiated and copied, using a factory-like function to allocate concrete types on the heap would be too costly. The approach in this diff uses a Strategy pattern (see "Design Patterns" by Gamma et al.), where the Strategy interface is given as RecordAccessor::Impl. Concrete implementations are then created for each GraphDb. This allows us to instantiate the concrete RecordAccessors::Impl *once* and *share* it among all RecordAccessors. Reviewers: msantl, vkasljevic Reviewed By: msantl Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1510 --- src/database/distributed_graph_db.cpp | 276 ++++++++++++++++++++++++-- src/database/graph_db.cpp | 155 ++++++++++++++- src/database/graph_db_accessor.hpp | 29 +-- src/storage/edge_accessor.cpp | 23 +++ src/storage/edge_accessor.hpp | 27 +-- src/storage/record_accessor.cpp | 100 +--------- src/storage/record_accessor.hpp | 75 +++++-- src/storage/vertex_accessor.cpp | 38 +--- src/storage/vertex_accessor.hpp | 19 +- 9 files changed, 538 insertions(+), 204 deletions(-) diff --git a/src/database/distributed_graph_db.cpp b/src/database/distributed_graph_db.cpp index d9ef38d81..8da84fc26 100644 --- a/src/database/distributed_graph_db.cpp +++ b/src/database/distributed_graph_db.cpp @@ -21,6 +21,8 @@ #include "distributed/updates_rpc_clients.hpp" #include "distributed/updates_rpc_server.hpp" #include "durability/snapshooter.hpp" +// TODO: Why do we depend on query here? +#include "query/exceptions.hpp" #include "storage/concurrent_id_mapper.hpp" #include "storage/concurrent_id_mapper_master.hpp" #include "storage/concurrent_id_mapper_worker.hpp" @@ -31,25 +33,230 @@ using namespace std::literals::chrono_literals; namespace database { -// Accessors namespace { +////////////////////////////////////////////////////////////////////// +// RecordAccessors implementations +////////////////////////////////////////////////////////////////////// + +// RecordAccessor implementation is shared among different RecordAccessors to +// avoid heap allocations. Therefore, we are constructing this implementation in +// each DistributedGraphDb and pass it to DistributedAccessor. +template +class DistributedRecordAccessor final { + // These should never be changed, because this implementation may be shared + // among multiple RecordAccessors. + int worker_id_; + distributed::DataManager *data_manager_; + distributed::UpdatesRpcClients *updates_clients_; + + public: + DistributedRecordAccessor(int worker_id, + distributed::DataManager *data_manager, + distributed::UpdatesRpcClients *updates_clients) + : worker_id_(worker_id), + data_manager_(data_manager), + updates_clients_(updates_clients) { + CHECK(data_manager_ && updates_clients_); + } + + typename RecordAccessor::AddressT GlobalAddress( + const RecordAccessor &record_accessor) { + return record_accessor.is_local() + ? storage::Address>( + record_accessor.gid(), worker_id_) + : record_accessor.address(); + } + + void SetOldNew(const RecordAccessor &record_accessor, TRecord **old, + TRecord **newr) { + auto &dba = record_accessor.db_accessor(); + const auto &address = record_accessor.address(); + if (record_accessor.is_local()) { + address.local()->find_set_old_new(dba.transaction(), *old, *newr); + return; + } + // It's not possible that we have a global address for a graph element + // that's local, because that is resolved in the constructor. + // TODO in write queries it's possible the command has been advanced and + // we need to invalidate the Cache and really get the latest stuff. + // But only do that after the command has been advanced. + auto &cache = + data_manager_->template Elements(dba.transaction_id()); + cache.FindSetOldNew(dba.transaction().id_, address.worker_id(), + address.gid(), *old, *newr); + } + + TRecord *FindNew(const RecordAccessor &record_accessor) { + const auto &address = record_accessor.address(); + auto &dba = record_accessor.db_accessor(); + if (address.is_local()) { + return address.local()->update(dba.transaction()); + } + auto &cache = + data_manager_->template Elements(dba.transaction_id()); + return cache.FindNew(address.gid()); + } + + void ProcessDelta(const RecordAccessor &record_accessor, + const database::StateDelta &delta) { + if (record_accessor.is_local()) { + record_accessor.db_accessor().wal().Emplace(delta); + } else { + SendDelta(record_accessor, delta); + } + } + + void SendDelta(const RecordAccessor &record_accessor, + const database::StateDelta &delta) { + auto result = + updates_clients_->Update(record_accessor.address().worker_id(), delta); + switch (result) { + case distributed::UpdateResult::DONE: + break; + case distributed::UpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR: + throw query::RemoveAttachedVertexException(); + case distributed::UpdateResult::SERIALIZATION_ERROR: + throw mvcc::SerializationError(); + case distributed::UpdateResult::UPDATE_DELETED_ERROR: + throw RecordDeletedError(); + case distributed::UpdateResult::LOCK_TIMEOUT_ERROR: + throw utils::LockTimeoutException("Lock timeout on remote worker"); + } + } +}; + +class DistributedEdgeAccessor final : public ::RecordAccessor::Impl { + DistributedRecordAccessor distributed_accessor_; + + public: + DistributedEdgeAccessor(int worker_id, distributed::DataManager *data_manager, + distributed::UpdatesRpcClients *updates_clients) + : distributed_accessor_(worker_id, data_manager, updates_clients) {} + + typename RecordAccessor::AddressT GlobalAddress( + const RecordAccessor &ra) override { + return distributed_accessor_.GlobalAddress(ra); + } + + void SetOldNew(const RecordAccessor &ra, Edge **old_record, + Edge **new_record) override { + return distributed_accessor_.SetOldNew(ra, old_record, new_record); + } + + Edge *FindNew(const RecordAccessor &ra) override { + return distributed_accessor_.FindNew(ra); + } + + void ProcessDelta(const RecordAccessor &ra, + const database::StateDelta &delta) override { + return distributed_accessor_.ProcessDelta(ra, delta); + } +}; + +class DistributedVertexAccessor final : public ::VertexAccessor::Impl { + DistributedRecordAccessor distributed_accessor_; + + public: + DistributedVertexAccessor(int worker_id, + distributed::DataManager *data_manager, + distributed::UpdatesRpcClients *updates_clients) + : distributed_accessor_(worker_id, data_manager, updates_clients) {} + + typename RecordAccessor::AddressT GlobalAddress( + const RecordAccessor &ra) override { + return distributed_accessor_.GlobalAddress(ra); + } + + void SetOldNew(const RecordAccessor &ra, Vertex **old_record, + Vertex **new_record) override { + return distributed_accessor_.SetOldNew(ra, old_record, new_record); + } + + Vertex *FindNew(const RecordAccessor &ra) override { + return distributed_accessor_.FindNew(ra); + } + + void ProcessDelta(const RecordAccessor &ra, + const database::StateDelta &delta) override { + return distributed_accessor_.ProcessDelta(ra, delta); + } + + void AddLabel(const VertexAccessor &va, + const storage::Label &label) override { + auto &dba = va.db_accessor(); + auto delta = StateDelta::AddLabel(dba.transaction_id(), va.gid(), label, + dba.LabelName(label)); + Vertex &vertex = va.update(); + // not a duplicate label, add it + if (!utils::Contains(vertex.labels_, label)) { + vertex.labels_.emplace_back(label); + if (va.is_local()) { + dba.wal().Emplace(delta); + dba.UpdateLabelIndices(label, va, &vertex); + } + } + + if (!va.is_local()) distributed_accessor_.SendDelta(va, delta); + } + + void RemoveLabel(const VertexAccessor &va, + const storage::Label &label) override { + auto &dba = va.db_accessor(); + auto delta = StateDelta::RemoveLabel(dba.transaction_id(), va.gid(), label, + dba.LabelName(label)); + Vertex &vertex = va.update(); + if (utils::Contains(vertex.labels_, label)) { + auto &labels = vertex.labels_; + auto found = std::find(labels.begin(), labels.end(), delta.label); + std::swap(*found, labels.back()); + labels.pop_back(); + if (va.is_local()) { + dba.wal().Emplace(delta); + } + } + + if (!va.is_local()) distributed_accessor_.SendDelta(va, delta); + } +}; + +////////////////////////////////////////////////////////////////////// +// GraphDbAccessor implementations +////////////////////////////////////////////////////////////////////// + class DistributedAccessor : public GraphDbAccessor { distributed::UpdatesRpcClients *updates_clients_{nullptr}; distributed::DataManager *data_manager_{nullptr}; + // Shared implementations of record accessors. + DistributedVertexAccessor *vertex_accessor_; + DistributedEdgeAccessor *edge_accessor_; protected: - DistributedAccessor(DistributedGraphDb *db, tx::TransactionId tx_id) + DistributedAccessor(DistributedGraphDb *db, tx::TransactionId tx_id, + DistributedVertexAccessor *vertex_accessor, + DistributedEdgeAccessor *edge_accessor) : GraphDbAccessor(*db, tx_id), updates_clients_(&db->updates_clients()), - data_manager_(&db->data_manager()) {} + data_manager_(&db->data_manager()), + vertex_accessor_(vertex_accessor), + edge_accessor_(edge_accessor) {} - explicit DistributedAccessor(DistributedGraphDb *db) + DistributedAccessor(DistributedGraphDb *db, + DistributedVertexAccessor *vertex_accessor, + DistributedEdgeAccessor *edge_accessor) : GraphDbAccessor(*db), updates_clients_(&db->updates_clients()), - data_manager_(&db->data_manager()) {} + data_manager_(&db->data_manager()), + vertex_accessor_(vertex_accessor), + edge_accessor_(edge_accessor) {} public: + ::VertexAccessor::Impl *GetVertexImpl() override { return vertex_accessor_; } + + ::RecordAccessor::Impl *GetEdgeImpl() override { + return edge_accessor_; + } + bool RemoveVertex(VertexAccessor &vertex_accessor, bool check_empty = true) override { if (!vertex_accessor.is_local()) { @@ -134,14 +341,18 @@ class MasterAccessor final : public DistributedAccessor { int worker_id_{0}; public: - explicit MasterAccessor(Master *db, - distributed::IndexRpcClients *index_rpc_clients) - : DistributedAccessor(db), + MasterAccessor(Master *db, distributed::IndexRpcClients *index_rpc_clients, + DistributedVertexAccessor *vertex_accessor, + DistributedEdgeAccessor *edge_accessor) + : DistributedAccessor(db, vertex_accessor, edge_accessor), index_rpc_clients_(index_rpc_clients), worker_id_(db->WorkerId()) {} + MasterAccessor(Master *db, tx::TransactionId tx_id, - distributed::IndexRpcClients *index_rpc_clients) - : DistributedAccessor(db, tx_id), + distributed::IndexRpcClients *index_rpc_clients, + DistributedVertexAccessor *vertex_accessor, + DistributedEdgeAccessor *edge_accessor) + : DistributedAccessor(db, tx_id, vertex_accessor, edge_accessor), index_rpc_clients_(index_rpc_clients), worker_id_(db->WorkerId()) {} @@ -198,9 +409,14 @@ class MasterAccessor final : public DistributedAccessor { class WorkerAccessor final : public DistributedAccessor { public: - explicit WorkerAccessor(Worker *db) : DistributedAccessor(db) {} - WorkerAccessor(Worker *db, tx::TransactionId tx_id) - : DistributedAccessor(db, tx_id) {} + WorkerAccessor(Worker *db, DistributedVertexAccessor *vertex_accessor, + DistributedEdgeAccessor *edge_accessor) + : DistributedAccessor(db, vertex_accessor, edge_accessor) {} + + WorkerAccessor(Worker *db, tx::TransactionId tx_id, + DistributedVertexAccessor *vertex_accessor, + DistributedEdgeAccessor *edge_accessor) + : DistributedAccessor(db, tx_id, vertex_accessor, edge_accessor) {} void BuildIndex(storage::Label, storage::Property) override { // TODO: Rethink BuildIndex API or inheritance. It's rather strange that a @@ -211,7 +427,9 @@ class WorkerAccessor final : public DistributedAccessor { } // namespace +////////////////////////////////////////////////////////////////////// // GraphDb implementations +////////////////////////////////////////////////////////////////////// namespace impl { @@ -226,7 +444,9 @@ struct TypemapPack { TMapper property; }; +////////////////////////////////////////////////////////////////////// // Master +////////////////////////////////////////////////////////////////////// class Master { public: @@ -239,6 +459,11 @@ class Master { durability::WriteAheadLog wal_{config_.worker_id, config_.durability_directory, config_.durability_enabled}; + // Shared implementations for all RecordAccessor in this Db. + DistributedEdgeAccessor edge_accessor_{config_.worker_id, &data_manager_, + &updates_clients_}; + DistributedVertexAccessor vertex_accessor_{config_.worker_id, &data_manager_, + &updates_clients_}; // TODO: Some things may depend on order of construction/destruction. We also // have a lot of circular pointers among members. It would be a good idea to @@ -265,7 +490,8 @@ class Master { distributed::DataRpcServer data_server_{self_, &server_}; distributed::DataRpcClients data_clients_{rpc_worker_clients_}; distributed::PlanDispatcher plan_dispatcher_{rpc_worker_clients_}; - distributed::PullRpcClients pull_clients_{&rpc_worker_clients_, &data_manager_}; + distributed::PullRpcClients pull_clients_{&rpc_worker_clients_, + &data_manager_}; distributed::IndexRpcClients index_rpc_clients_{rpc_worker_clients_}; distributed::UpdatesRpcServer updates_server_{self_, &server_}; distributed::UpdatesRpcClients updates_clients_{rpc_worker_clients_}; @@ -382,12 +608,15 @@ Master::~Master() { } std::unique_ptr Master::Access() { - return std::make_unique(this, &impl_->index_rpc_clients_); + return std::make_unique(this, &impl_->index_rpc_clients_, + &impl_->vertex_accessor_, + &impl_->edge_accessor_); } std::unique_ptr Master::Access(tx::TransactionId tx_id) { - return std::make_unique(this, tx_id, - &impl_->index_rpc_clients_); + return std::make_unique( + this, tx_id, &impl_->index_rpc_clients_, &impl_->vertex_accessor_, + &impl_->edge_accessor_); } Storage &Master::storage() { return *impl_->storage_; } @@ -514,7 +743,9 @@ VertexAccessor InsertVertexIntoRemote( return VertexAccessor({gid, worker_id}, *dba); } +////////////////////////////////////////////////////////////////////// // Worker +////////////////////////////////////////////////////////////////////// namespace impl { @@ -526,6 +757,11 @@ class Worker { durability::WriteAheadLog wal_{config_.worker_id, config_.durability_directory, config_.durability_enabled}; + // Shared implementations for all RecordAccessor in this Db. + DistributedEdgeAccessor edge_accessor_{config_.worker_id, &data_manager_, + &updates_clients_}; + DistributedVertexAccessor vertex_accessor_{config_.worker_id, &data_manager_, + &updates_clients_}; explicit Worker(const Config &config, database::Worker *self) : config_(config), self_(self) { @@ -641,11 +877,13 @@ Worker::~Worker() { } std::unique_ptr Worker::Access() { - return std::make_unique(this); + return std::make_unique(this, &impl_->vertex_accessor_, + &impl_->edge_accessor_); } std::unique_ptr Worker::Access(tx::TransactionId tx_id) { - return std::make_unique(this, tx_id); + return std::make_unique(this, tx_id, &impl_->vertex_accessor_, + &impl_->edge_accessor_); } Storage &Worker::storage() { return *impl_->storage_; } diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index 552bc18fb..601e05e18 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -14,7 +14,151 @@ #include "utils/file.hpp" namespace database { -namespace impl { + +namespace { + +////////////////////////////////////////////////////////////////////// +// RecordAccessor and GraphDbAccessor implementations +////////////////////////////////////////////////////////////////////// + +template +class SingleNodeRecordAccessor final { + public: + typename RecordAccessor::AddressT GlobalAddress( + const RecordAccessor &record_accessor) { + // TODO: This is still coupled to distributed storage, albeit loosely. + int worker_id = 0; + CHECK(record_accessor.is_local()); + return storage::Address>(record_accessor.gid(), + worker_id); + } + + void SetOldNew(const RecordAccessor &record_accessor, TRecord **old, + TRecord **newr) { + auto &dba = record_accessor.db_accessor(); + const auto &address = record_accessor.address(); + CHECK(record_accessor.is_local()); + address.local()->find_set_old_new(dba.transaction(), *old, *newr); + } + + TRecord *FindNew(const RecordAccessor &record_accessor) { + const auto &address = record_accessor.address(); + auto &dba = record_accessor.db_accessor(); + CHECK(address.is_local()); + return address.local()->update(dba.transaction()); + } + + void ProcessDelta(const RecordAccessor &record_accessor, + const database::StateDelta &delta) { + CHECK(record_accessor.is_local()); + record_accessor.db_accessor().wal().Emplace(delta); + } +}; + +class VertexAccessorImpl final : public ::VertexAccessor::Impl { + SingleNodeRecordAccessor accessor_; + + public: + typename RecordAccessor::AddressT GlobalAddress( + const RecordAccessor &ra) override { + return accessor_.GlobalAddress(ra); + } + + void SetOldNew(const RecordAccessor &ra, Vertex **old_record, + Vertex **new_record) override { + return accessor_.SetOldNew(ra, old_record, new_record); + } + + Vertex *FindNew(const RecordAccessor &ra) override { + return accessor_.FindNew(ra); + } + + void ProcessDelta(const RecordAccessor &ra, + const database::StateDelta &delta) override { + return accessor_.ProcessDelta(ra, delta); + } + + void AddLabel(const VertexAccessor &va, + const storage::Label &label) override { + CHECK(va.is_local()); + auto &dba = va.db_accessor(); + auto delta = StateDelta::AddLabel(dba.transaction_id(), va.gid(), label, + dba.LabelName(label)); + Vertex &vertex = va.update(); + // not a duplicate label, add it + if (!utils::Contains(vertex.labels_, label)) { + vertex.labels_.emplace_back(label); + dba.wal().Emplace(delta); + dba.UpdateLabelIndices(label, va, &vertex); + } + } + + void RemoveLabel(const VertexAccessor &va, + const storage::Label &label) override { + CHECK(va.is_local()); + auto &dba = va.db_accessor(); + auto delta = StateDelta::RemoveLabel(dba.transaction_id(), va.gid(), label, + dba.LabelName(label)); + Vertex &vertex = va.update(); + if (utils::Contains(vertex.labels_, label)) { + auto &labels = vertex.labels_; + auto found = std::find(labels.begin(), labels.end(), delta.label); + std::swap(*found, labels.back()); + labels.pop_back(); + dba.wal().Emplace(delta); + } + } +}; + +class EdgeAccessorImpl final : public ::RecordAccessor::Impl { + SingleNodeRecordAccessor accessor_; + + public: + typename RecordAccessor::AddressT GlobalAddress( + const RecordAccessor &ra) override { + return accessor_.GlobalAddress(ra); + } + + void SetOldNew(const RecordAccessor &ra, Edge **old_record, + Edge **new_record) override { + return accessor_.SetOldNew(ra, old_record, new_record); + } + + Edge *FindNew(const RecordAccessor &ra) override { + return accessor_.FindNew(ra); + } + + void ProcessDelta(const RecordAccessor &ra, + const database::StateDelta &delta) override { + return accessor_.ProcessDelta(ra, delta); + } +}; + +class SingleNodeAccessor : public GraphDbAccessor { + // Shared implementations of record accessors. + static VertexAccessorImpl vertex_accessor_; + static EdgeAccessorImpl edge_accessor_; + + public: + explicit SingleNodeAccessor(GraphDb &db) : GraphDbAccessor(db) {} + SingleNodeAccessor(GraphDb &db, tx::TransactionId tx_id) + : GraphDbAccessor(db, tx_id) {} + + ::VertexAccessor::Impl *GetVertexImpl() override { return &vertex_accessor_; } + + ::RecordAccessor::Impl *GetEdgeImpl() override { + return &edge_accessor_; + } +}; + +VertexAccessorImpl SingleNodeAccessor::vertex_accessor_; +EdgeAccessorImpl SingleNodeAccessor::edge_accessor_; + +} // namespace + +////////////////////////////////////////////////////////////////////// +// SingleNode GraphDb implementation +////////////////////////////////////////////////////////////////////// template