diff --git a/src/database/distributed/distributed_graph_db.cpp b/src/database/distributed/distributed_graph_db.cpp index 6ae79981f..34c6e0de5 100644 --- a/src/database/distributed/distributed_graph_db.cpp +++ b/src/database/distributed/distributed_graph_db.cpp @@ -40,105 +40,7 @@ namespace { ////////////////////////////////////////////////////////////////////// // GraphDbAccessor implementations ////////////////////////////////////////////////////////////////////// - -class DistributedAccessor : public GraphDbAccessor { - distributed::UpdatesRpcClients *updates_clients_{nullptr}; - distributed::DataManager *data_manager_{nullptr}; - - protected: - DistributedAccessor(GraphDb *db, tx::TransactionId tx_id) - : GraphDbAccessor(*db, tx_id), - updates_clients_(&db->updates_clients()), - data_manager_(&db->data_manager()) {} - - explicit DistributedAccessor(GraphDb *db) - : GraphDbAccessor(*db), - updates_clients_(&db->updates_clients()), - data_manager_(&db->data_manager()) {} - - public: - bool RemoveVertex(VertexAccessor &vertex_accessor, - bool check_empty = true) override { - if (!vertex_accessor.is_local()) { - auto address = vertex_accessor.address(); - updates_clients_->RemoveVertex(address.worker_id(), transaction_id(), - address.gid(), check_empty); - // We can't know if we are going to be able to remove vertex until - // deferred updates on a remote worker are executed - return true; - } - return GraphDbAccessor::RemoveVertex(vertex_accessor, check_empty); - } - - void RemoveEdge(EdgeAccessor &edge, bool remove_out_edge = true, - bool remove_in_edge = true) override { - if (edge.is_local()) { - return GraphDbAccessor::RemoveEdge(edge, remove_out_edge, remove_in_edge); - } - auto edge_addr = edge.GlobalAddress(); - auto from_addr = db().storage().GlobalizedAddress(edge.from_addr()); - CHECK(edge_addr.worker_id() == from_addr.worker_id()) - << "Edge and it's 'from' vertex not on the same worker"; - auto to_addr = db().storage().GlobalizedAddress(edge.to_addr()); - updates_clients_->RemoveEdge(transaction_id(), edge_addr.worker_id(), - edge_addr.gid(), from_addr.gid(), to_addr); - // Another RPC is necessary only if the first did not handle vertices on - // both sides. - if (edge_addr.worker_id() != to_addr.worker_id()) { - updates_clients_->RemoveInEdge(transaction_id(), to_addr.worker_id(), - to_addr.gid(), edge_addr); - } - } - - storage::EdgeAddress InsertEdgeOnFrom( - VertexAccessor *from, VertexAccessor *to, - const storage::EdgeType &edge_type, - const std::experimental::optional &requested_gid, - const std::experimental::optional &cypher_id) override { - if (from->is_local()) { - return GraphDbAccessor::InsertEdgeOnFrom(from, to, edge_type, - requested_gid, cypher_id); - } - auto created_edge_info = updates_clients_->CreateEdge( - transaction_id(), *from, *to, edge_type, cypher_id); - auto edge_address = created_edge_info.edge_address; - auto *from_updated = - data_manager_->FindNew(transaction_id(), from->gid()); - // Create an Edge and insert it into the Cache so we see it locally. - data_manager_->Emplace( - transaction_id(), edge_address.gid(), - distributed::CachedRecordData( - created_edge_info.cypher_id, nullptr, - std::make_unique(from->address(), to->address(), edge_type))); - from_updated->out_.emplace( - db().storage().LocalizedAddressIfPossible(to->address()), edge_address, - edge_type); - return edge_address; - } - - void InsertEdgeOnTo(VertexAccessor *from, VertexAccessor *to, - const storage::EdgeType &edge_type, - const storage::EdgeAddress &edge_address) override { - if (to->is_local()) { - return GraphDbAccessor::InsertEdgeOnTo(from, to, edge_type, edge_address); - } - // The RPC call for the `to` side is already handled if `from` is not - // local. - if (from->is_local() || - from->address().worker_id() != to->address().worker_id()) { - updates_clients_->AddInEdge( - transaction_id(), *from, - db().storage().GlobalizedAddress(edge_address), *to, edge_type); - } - auto *to_updated = - data_manager_->FindNew(transaction_id(), to->gid()); - to_updated->in_.emplace( - db().storage().LocalizedAddressIfPossible(from->address()), - edge_address, edge_type); - } -}; - -class MasterAccessor final : public DistributedAccessor { +class MasterAccessor final : public GraphDbAccessor { distributed::Coordination *coordination_; distributed::PullRpcClients *pull_clients_; int worker_id_{0}; @@ -146,7 +48,7 @@ class MasterAccessor final : public DistributedAccessor { public: MasterAccessor(Master *db, distributed::Coordination *coordination, distributed::PullRpcClients *pull_clients_) - : DistributedAccessor(db), + : GraphDbAccessor(*db), coordination_(coordination), pull_clients_(pull_clients_), worker_id_(db->WorkerId()) {} @@ -154,7 +56,7 @@ class MasterAccessor final : public DistributedAccessor { MasterAccessor(Master *db, tx::TransactionId tx_id, distributed::Coordination *coordination, distributed::PullRpcClients *pull_clients_) - : DistributedAccessor(db, tx_id), + : GraphDbAccessor(*db, tx_id), coordination_(coordination), pull_clients_(pull_clients_), worker_id_(db->WorkerId()) {} @@ -233,20 +135,20 @@ class MasterAccessor final : public DistributedAccessor { // TODO (mferencevic): Move this logic into the transaction engine. void AdvanceCommand() override { - DistributedAccessor::AdvanceCommand(); + GraphDbAccessor::AdvanceCommand(); auto tx_id = transaction_id(); auto futures = pull_clients_->NotifyAllTransactionCommandAdvanced(tx_id); for (auto &future : futures) future.get(); } }; -class WorkerAccessor final : public DistributedAccessor { +class WorkerAccessor final : public GraphDbAccessor { public: explicit WorkerAccessor(Worker *db) - : DistributedAccessor(db) {} + : GraphDbAccessor(*db) {} WorkerAccessor(Worker *db, tx::TransactionId tx_id) - : DistributedAccessor(db, tx_id) {} + : GraphDbAccessor(*db, tx_id) {} void BuildIndex(storage::Label, storage::Property, bool) override { // TODO: Rethink BuildIndex API or inheritance. It's rather strange that a diff --git a/src/database/distributed/graph_db_accessor.cpp b/src/database/distributed/graph_db_accessor.cpp index 59c5c37ef..cb05b7a23 100644 --- a/src/database/distributed/graph_db_accessor.cpp +++ b/src/database/distributed/graph_db_accessor.cpp @@ -5,6 +5,8 @@ #include +#include "distributed/data_manager.hpp" +#include "distributed/updates_rpc_clients.hpp" #include "durability/distributed/state_delta.hpp" #include "storage/distributed/address_types.hpp" #include "storage/distributed/edge.hpp" @@ -312,21 +314,31 @@ int64_t GraphDbAccessor::VerticesCount( bool GraphDbAccessor::RemoveVertex(VertexAccessor &vertex_accessor, bool check_empty) { - DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted"; - vertex_accessor.SwitchNew(); - // it's possible the vertex was removed already in this transaction - // due to it getting matched multiple times by some patterns - // we can only delete it once, so check if it's already deleted - if (vertex_accessor.current().is_expired_by(transaction_)) return true; - if (check_empty && - vertex_accessor.out_degree() + vertex_accessor.in_degree() > 0) - return false; + if (vertex_accessor.is_local()) { + DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted"; + vertex_accessor.SwitchNew(); + // it's possible the vertex was removed already in this transaction + // due to it getting matched multiple times by some patterns + // we can only delete it once, so check if it's already deleted + if (vertex_accessor.current().is_expired_by(transaction_)) return true; + if (check_empty && + vertex_accessor.out_degree() + vertex_accessor.in_degree() > 0) + return false; - auto *vlist_ptr = vertex_accessor.address().local(); - wal().Emplace(database::StateDelta::RemoveVertex( - transaction_.id_, vlist_ptr->gid_, check_empty)); - vlist_ptr->remove(vertex_accessor.current_, transaction_); - return true; + auto *vlist_ptr = vertex_accessor.address().local(); + wal().Emplace(database::StateDelta::RemoveVertex( + transaction_.id_, vlist_ptr->gid_, check_empty)); + vlist_ptr->remove(vertex_accessor.current_, transaction_); + return true; + + } else { + auto address = vertex_accessor.address(); + updates_clients().RemoveVertex(address.worker_id(), transaction_id(), + address.gid(), check_empty); + // We can't know if we are going to be able to remove vertex until + // deferred updates on a remote worker are executed + return true; + } } void GraphDbAccessor::DetachRemoveVertex(VertexAccessor &vertex_accessor) { @@ -365,37 +377,71 @@ storage::EdgeAddress GraphDbAccessor::InsertEdgeOnFrom( const storage::EdgeType &edge_type, const std::experimental::optional &requested_gid, const std::experimental::optional &cypher_id) { - auto edge_accessor = InsertOnlyEdge(from->address(), to->address(), edge_type, - requested_gid, cypher_id); - auto edge_address = edge_accessor.address(); + if (from->is_local()) { + auto edge_accessor = InsertOnlyEdge(from->address(), to->address(), + edge_type, requested_gid, cypher_id); + auto edge_address = edge_accessor.address(); - from->SwitchNew(); - auto from_updated = &from->update(); + from->SwitchNew(); + auto from_updated = &from->update(); - // TODO when preparing WAL for distributed, most likely never use - // `CREATE_EDGE`, but always have it split into 3 parts (edge insertion, - // in/out modification). - wal().Emplace(database::StateDelta::CreateEdge( - transaction_.id_, edge_accessor.gid(), edge_accessor.CypherId(), - from->gid(), to->gid(), edge_type, EdgeTypeName(edge_type))); + // TODO when preparing WAL for distributed, most likely never use + // `CREATE_EDGE`, but always have it split into 3 parts (edge insertion, + // in/out modification). + wal().Emplace(database::StateDelta::CreateEdge( + transaction_.id_, edge_accessor.gid(), edge_accessor.CypherId(), + from->gid(), to->gid(), edge_type, EdgeTypeName(edge_type))); - from_updated->out_.emplace( - db_.storage().LocalizedAddressIfPossible(to->address()), edge_address, - edge_type); - return edge_address; + from_updated->out_.emplace( + db_.storage().LocalizedAddressIfPossible(to->address()), edge_address, + edge_type); + return edge_address; + } else { + auto created_edge_info = updates_clients().CreateEdge( + transaction_id(), *from, *to, edge_type, cypher_id); + auto edge_address = created_edge_info.edge_address; + auto *from_updated = + data_manager().FindNew(transaction_id(), from->gid()); + // Create an Edge and insert it into the Cache so we see it locally. + data_manager().Emplace( + transaction_id(), edge_address.gid(), + distributed::CachedRecordData( + created_edge_info.cypher_id, nullptr, + std::make_unique(from->address(), to->address(), edge_type))); + from_updated->out_.emplace( + db().storage().LocalizedAddressIfPossible(to->address()), edge_address, + edge_type); + return edge_address; + } } void GraphDbAccessor::InsertEdgeOnTo(VertexAccessor *from, VertexAccessor *to, const storage::EdgeType &edge_type, const storage::EdgeAddress &edge_address) { - // Ensure that the "to" accessor has the latest version (switch new). - // WARNING: Must do that after the above "from->update()" for cases when - // we are creating a cycle and "from" and "to" are the same vlist. - to->SwitchNew(); - auto *to_updated = &to->update(); - to_updated->in_.emplace( - db_.storage().LocalizedAddressIfPossible(from->address()), edge_address, - edge_type); + if (to->is_local()) { + // Ensure that the "to" accessor has the latest version (switch new). + // WARNING: Must do that after the above "from->update()" for cases when + // we are creating a cycle and "from" and "to" are the same vlist. + to->SwitchNew(); + auto *to_updated = &to->update(); + to_updated->in_.emplace( + db_.storage().LocalizedAddressIfPossible(from->address()), edge_address, + edge_type); + } else { + // The RPC call for the `to` side is already handled if `from` is not + // local. + if (from->is_local() || + from->address().worker_id() != to->address().worker_id()) { + updates_clients().AddInEdge( + transaction_id(), *from, + db().storage().GlobalizedAddress(edge_address), *to, edge_type); + } + auto *to_updated = + data_manager().FindNew(transaction_id(), to->gid()); + to_updated->in_.emplace( + db().storage().LocalizedAddressIfPossible(from->address()), + edge_address, edge_type); + } } EdgeAccessor GraphDbAccessor::InsertOnlyEdge( @@ -427,17 +473,34 @@ int64_t GraphDbAccessor::EdgesCount() const { void GraphDbAccessor::RemoveEdge(EdgeAccessor &edge, bool remove_out_edge, bool remove_in_edge) { - DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted"; - // it's possible the edge was removed already in this transaction - // due to it getting matched multiple times by some patterns - // we can only delete it once, so check if it's already deleted - edge.SwitchNew(); - if (edge.current().is_expired_by(transaction_)) return; - if (remove_out_edge) edge.from().RemoveOutEdge(edge.address()); - if (remove_in_edge) edge.to().RemoveInEdge(edge.address()); + if (edge.is_local()) { + DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted"; + // it's possible the edge was removed already in this transaction + // due to it getting matched multiple times by some patterns + // we can only delete it once, so check if it's already deleted + edge.SwitchNew(); + if (edge.current().is_expired_by(transaction_)) return; + if (remove_out_edge) edge.from().RemoveOutEdge(edge.address()); + if (remove_in_edge) edge.to().RemoveInEdge(edge.address()); - edge.address().local()->remove(edge.current_, transaction_); - wal().Emplace(database::StateDelta::RemoveEdge(transaction_.id_, edge.gid())); + edge.address().local()->remove(edge.current_, transaction_); + wal().Emplace( + database::StateDelta::RemoveEdge(transaction_.id_, edge.gid())); + } else { + auto edge_addr = edge.GlobalAddress(); + auto from_addr = db().storage().GlobalizedAddress(edge.from_addr()); + CHECK(edge_addr.worker_id() == from_addr.worker_id()) + << "Edge and it's 'from' vertex not on the same worker"; + auto to_addr = db().storage().GlobalizedAddress(edge.to_addr()); + updates_clients().RemoveEdge(transaction_id(), edge_addr.worker_id(), + edge_addr.gid(), from_addr.gid(), to_addr); + // Another RPC is necessary only if the first did not handle vertices on + // both sides. + if (edge_addr.worker_id() != to_addr.worker_id()) { + updates_clients().RemoveInEdge(transaction_id(), to_addr.worker_id(), + to_addr.gid(), edge_addr); + } + } } storage::Label GraphDbAccessor::Label(const std::string &label_name) { diff --git a/src/database/distributed/graph_db_accessor.hpp b/src/database/distributed/graph_db_accessor.hpp index 778bef2fc..f5e0dc5a9 100644 --- a/src/database/distributed/graph_db_accessor.hpp +++ b/src/database/distributed/graph_db_accessor.hpp @@ -119,7 +119,7 @@ class GraphDbAccessor { * before deletion. * @return If or not the vertex was deleted. */ - virtual bool RemoveVertex(VertexAccessor &vertex_accessor, + bool RemoveVertex(VertexAccessor &vertex_accessor, bool check_empty = true); /** @@ -358,7 +358,7 @@ class GraphDbAccessor { * @param remove_in_edge If the edge should be removed from the its * destination side. */ - virtual void RemoveEdge(EdgeAccessor &edge, bool remove_out_edge = true, + void RemoveEdge(EdgeAccessor &edge, bool remove_out_edge = true, bool remove_in_edge = true); /** @@ -657,7 +657,7 @@ class GraphDbAccessor { * Insert a new edge to `from` vertex and return the address. * Called from `InsertEdge` as the first step in edge insertion. * */ - virtual storage::EdgeAddress InsertEdgeOnFrom( + storage::EdgeAddress InsertEdgeOnFrom( VertexAccessor *from, VertexAccessor *to, const storage::EdgeType &edge_type, const std::experimental::optional &requested_gid, @@ -668,7 +668,7 @@ class GraphDbAccessor { * Called after `InsertEdgeOnFrom` in `InsertEdge`. The given `edge_address` * is from the created edge, returned by `InsertEdgeOnFrom`. */ - virtual void InsertEdgeOnTo(VertexAccessor *from, VertexAccessor *to, + void InsertEdgeOnTo(VertexAccessor *from, VertexAccessor *to, const storage::EdgeType &edge_type, const storage::EdgeAddress &edge_address);