From 99375a4b47af76e2e0ddf4461e66948458a391a2 Mon Sep 17 00:00:00 2001 From: Dominik Gleich Date: Wed, 28 Feb 2018 10:36:48 +0100 Subject: [PATCH] Vertex removal using rpcs Summary: Remove vertex remote Add tests Reviewers: florijan, teon.banek Reviewed By: florijan Subscribers: teon.banek, pullbot Differential Revision: https://phabricator.memgraph.io/D1230 --- src/communication/rpc/messages-inl.hpp | 4 ++ src/database/graph_db_accessor.cpp | 13 ++--- .../remote_updates_rpc_clients.hpp | 18 +++++- .../remote_updates_rpc_messages.hpp | 23 +++++++- src/distributed/remote_updates_rpc_server.hpp | 32 ++++++++--- src/query/exceptions.hpp | 8 +++ src/query/plan/operator.cpp | 11 ++-- src/storage/record_accessor.cpp | 2 + tests/unit/distributed_updates.cpp | 56 +++++++++++++++++++ 9 files changed, 143 insertions(+), 24 deletions(-) diff --git a/src/communication/rpc/messages-inl.hpp b/src/communication/rpc/messages-inl.hpp index e4e488ea1..a26b9defc 100644 --- a/src/communication/rpc/messages-inl.hpp +++ b/src/communication/rpc/messages-inl.hpp @@ -100,3 +100,7 @@ BOOST_CLASS_EXPORT(distributed::RemoteCreateEdgeRes); BOOST_CLASS_EXPORT(distributed::RemoteAddInEdgeReqData); BOOST_CLASS_EXPORT(distributed::RemoteAddInEdgeReq); BOOST_CLASS_EXPORT(distributed::RemoteAddInEdgeRes); + +// Remote removes +BOOST_CLASS_EXPORT(distributed::RemoteRemoveVertexReq); +BOOST_CLASS_EXPORT(distributed::RemoteRemoveVertexRes); diff --git a/src/database/graph_db_accessor.cpp b/src/database/graph_db_accessor.cpp index d3620244a..ae9d2e7d9 100644 --- a/src/database/graph_db_accessor.cpp +++ b/src/database/graph_db_accessor.cpp @@ -375,13 +375,12 @@ bool GraphDbAccessor::RemoveVertex(VertexAccessor &vertex_accessor) { DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted"; if (!vertex_accessor.is_local()) { - LOG(ERROR) << "Remote vertex deletion not implemented"; - // TODO support distributed - // call remote RemoveVertex(gid), return it's result. The result can be - // (true, false), or an error can occur (serialization, timeout). In case - // of error the remote worker will be asking for a transaction abort, - // not sure what to do here. - return false; + auto address = vertex_accessor.address(); + db().remote_updates_clients().RemoteRemoveVertex( + address.worker_id(), transaction_id(), address.gid()); + // We can't know if we are going to be able to remove vertex until deferred + // updates on a remote worker are executed + return true; } vertex_accessor.SwitchNew(); // it's possible the vertex was removed already in this transaction diff --git a/src/distributed/remote_updates_rpc_clients.hpp b/src/distributed/remote_updates_rpc_clients.hpp index d76bc176f..4f3bcc458 100644 --- a/src/distributed/remote_updates_rpc_clients.hpp +++ b/src/distributed/remote_updates_rpc_clients.hpp @@ -7,6 +7,7 @@ #include "distributed/coordination.hpp" #include "distributed/remote_updates_rpc_messages.hpp" #include "distributed/rpc_worker_clients.hpp" +#include "query/exceptions.hpp" #include "query/typed_value.hpp" #include "storage/address_types.hpp" #include "storage/gid.hpp" @@ -85,6 +86,15 @@ class RemoteUpdatesRpcClients { RaiseIfRemoteError(res->member); } + void RemoteRemoveVertex(int worker_id, tx::transaction_id_t tx_id, + gid::Gid gid) { + auto res = + worker_clients_.GetClientPool(worker_id).Call( + RemoteRemoveVertexReqData{gid, tx_id}); + CHECK(res) << "RemoteRemoveVertex RPC failed"; + RaiseIfRemoteError(res->member); + } + /// Calls for the worker with the given ID to apply remote updates. Returns /// the results of that operation. RemoteUpdateResult RemoteUpdateApply(int worker_id, @@ -109,13 +119,15 @@ class RemoteUpdatesRpcClients { void RaiseIfRemoteError(RemoteUpdateResult result) { switch (result) { + case RemoteUpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR: + throw query::RemoveAttachedVertexException(); case RemoteUpdateResult::SERIALIZATION_ERROR: - throw new mvcc::SerializationError(); + throw mvcc::SerializationError(); case RemoteUpdateResult::LOCK_TIMEOUT_ERROR: - throw new LockTimeoutException( + throw LockTimeoutException( "Remote LockTimeoutError during edge creation"); case RemoteUpdateResult::UPDATE_DELETED_ERROR: - throw new RecordDeletedError(); + throw RecordDeletedError(); case RemoteUpdateResult::DONE: break; } diff --git a/src/distributed/remote_updates_rpc_messages.hpp b/src/distributed/remote_updates_rpc_messages.hpp index 64b9d95bb..e2aa4c7b9 100644 --- a/src/distributed/remote_updates_rpc_messages.hpp +++ b/src/distributed/remote_updates_rpc_messages.hpp @@ -18,7 +18,8 @@ enum class RemoteUpdateResult { DONE, SERIALIZATION_ERROR, LOCK_TIMEOUT_ERROR, - UPDATE_DELETED_ERROR + UPDATE_DELETED_ERROR, + UNABLE_TO_DELETE_VERTEX_ERROR }; RPC_SINGLE_MEMBER_MESSAGE(RemoteUpdateReq, database::StateDelta); @@ -137,4 +138,24 @@ RPC_SINGLE_MEMBER_MESSAGE(RemoteAddInEdgeReq, RemoteAddInEdgeReqData); RPC_SINGLE_MEMBER_MESSAGE(RemoteAddInEdgeRes, RemoteUpdateResult); using RemoteAddInEdgeRpc = communication::rpc::RequestResponse; + +struct RemoteRemoveVertexReqData { + gid::Gid gid; + tx::transaction_id_t tx_id; + + private: + friend class boost::serialization::access; + + template + void serialize(TArchive &ar, unsigned int) { + ar &gid; + ar &tx_id; + } +}; + +RPC_SINGLE_MEMBER_MESSAGE(RemoteRemoveVertexReq, RemoteRemoveVertexReqData); +RPC_SINGLE_MEMBER_MESSAGE(RemoteRemoveVertexRes, RemoteUpdateResult); +using RemoteRemoveVertexRpc = + communication::rpc::RequestResponse; } // namespace distributed diff --git a/src/distributed/remote_updates_rpc_server.hpp b/src/distributed/remote_updates_rpc_server.hpp index 05aea8150..e1e884831 100644 --- a/src/distributed/remote_updates_rpc_server.hpp +++ b/src/distributed/remote_updates_rpc_server.hpp @@ -116,7 +116,6 @@ class RemoteUpdatesRpcServer { record_accessor.Reconstruct(); for (database::StateDelta &delta : kv.second.second) { try { - auto &updated = record_accessor.update(); auto &dba = db_accessor_; switch (delta.type) { case database::StateDelta::Type::TRANSACTION_BEGIN: @@ -124,11 +123,16 @@ class RemoteUpdatesRpcServer { case database::StateDelta::Type::TRANSACTION_ABORT: case database::StateDelta::Type::CREATE_VERTEX: case database::StateDelta::Type::CREATE_EDGE: - case database::StateDelta::Type::REMOVE_VERTEX: case database::StateDelta::Type::REMOVE_EDGE: case database::StateDelta::Type::BUILD_INDEX: LOG(FATAL) << "Can only apply record update deltas for remote " "graph element"; + case database::StateDelta::Type::REMOVE_VERTEX: + if (!db_accessor().RemoveVertex( + reinterpret_cast(record_accessor))) { + return RemoteUpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR; + } + break; case database::StateDelta::Type::SET_PROPERTY_VERTEX: case database::StateDelta::Type::SET_PROPERTY_EDGE: record_accessor.PropsSet(delta.property, delta.value); @@ -146,15 +150,18 @@ class RemoteUpdatesRpcServer { .remove_label(delta.label); } break; case database::StateDelta::Type::ADD_OUT_EDGE: - reinterpret_cast(updated).out_.emplace( - dba.LocalizedAddress(delta.vertex_to_address), - dba.LocalizedAddress(delta.edge_address), delta.edge_type); + reinterpret_cast(record_accessor.update()) + .out_.emplace(dba.LocalizedAddress(delta.vertex_to_address), + dba.LocalizedAddress(delta.edge_address), + delta.edge_type); dba.wal().Emplace(delta); break; case database::StateDelta::Type::ADD_IN_EDGE: - reinterpret_cast(updated).in_.emplace( - dba.LocalizedAddress(delta.vertex_from_address), - dba.LocalizedAddress(delta.edge_address), delta.edge_type); + reinterpret_cast(record_accessor.update()) + .in_.emplace( + dba.LocalizedAddress(delta.vertex_from_address), + dba.LocalizedAddress(delta.edge_address), + delta.edge_type); dba.wal().Emplace(delta); break; } @@ -245,6 +252,15 @@ class RemoteUpdatesRpcServer { GetUpdates(vertex_updates_, req.member.tx_id).Emplace(to_delta); return std::make_unique(result); }); + + server_.Register( + [this](const RemoteRemoveVertexReq &req) { + auto to_delta = database::StateDelta::RemoveVertex(req.member.tx_id, + req.member.gid); + auto result = + GetUpdates(vertex_updates_, req.member.tx_id).Emplace(to_delta); + return std::make_unique(result); + }); } /// Applies all existsing updates for the given transaction ID. If there are diff --git a/src/query/exceptions.hpp b/src/query/exceptions.hpp index 51cb20ac1..2b04e78f5 100644 --- a/src/query/exceptions.hpp +++ b/src/query/exceptions.hpp @@ -107,4 +107,12 @@ class ReconstructionException : public QueryException { "preceeding DELETE.") {} }; +class RemoveAttachedVertexException : public QueryRuntimeException { + public: + RemoveAttachedVertexException() + : QueryRuntimeException( + "Failed to remove vertex because of it's existing " + "connections. Consider using DETACH DELETE.") {} +}; + } // namespace query diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index ff0209539..9d67680a9 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -1227,8 +1227,9 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor { self_.graph_view_); // For the given (vertex, edge, vertex) tuple checks if they satisfy the // "where" condition. if so, places them in the priority queue. - auto expand_pair = [this, &evaluator, &frame]( - VertexAccessor from, EdgeAccessor edge, VertexAccessor vertex) { + auto expand_pair = [this, &evaluator, &frame](VertexAccessor from, + EdgeAccessor edge, + VertexAccessor vertex) { SwitchAccessor(edge, self_.graph_view_); SwitchAccessor(vertex, self_.graph_view_); @@ -1648,9 +1649,7 @@ bool Delete::DeleteCursor::Pull(Frame &frame, Context &context) { if (self_.detach_) db_.DetachRemoveVertex(va); else if (!db_.RemoveVertex(va)) - throw QueryRuntimeException( - "Failed to remove vertex because of it's existing " - "connections. Consider using DETACH DELETE."); + throw RemoveAttachedVertexException(); break; } @@ -3325,6 +3324,8 @@ class SynchronizeCursor : public Cursor { case distributed::RemoteUpdateResult::SERIALIZATION_ERROR: throw mvcc::SerializationError( "Failed to apply deferred updates due to SerializationError"); + case distributed::RemoteUpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR: + throw RemoveAttachedVertexException(); case distributed::RemoteUpdateResult::UPDATE_DELETED_ERROR: throw QueryRuntimeException( "Failed to apply deferred updates due to RecordDeletedError"); diff --git a/src/storage/record_accessor.cpp b/src/storage/record_accessor.cpp index 4eb3f5299..6da408429 100644 --- a/src/storage/record_accessor.cpp +++ b/src/storage/record_accessor.cpp @@ -207,6 +207,8 @@ void RecordAccessor::SendDelta( switch (result) { case distributed::RemoteUpdateResult::DONE: break; + case distributed::RemoteUpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR: + throw query::RemoveAttachedVertexException(); case distributed::RemoteUpdateResult::SERIALIZATION_ERROR: throw mvcc::SerializationError(); case distributed::RemoteUpdateResult::UPDATE_DELETED_ERROR: diff --git a/tests/unit/distributed_updates.cpp b/tests/unit/distributed_updates.cpp index 0c748ff88..9d984b2bf 100644 --- a/tests/unit/distributed_updates.cpp +++ b/tests/unit/distributed_updates.cpp @@ -196,6 +196,62 @@ TEST_F(DistributedGraphDbTest, IndexGetsUpdatedRemotely) { } } +TEST_F(DistributedGraphDbTest, DeleteVertexRemoteCommit) { + auto v_address = InsertVertex(worker(1)); + database::GraphDbAccessor dba0{master()}; + database::GraphDbAccessor dba1{worker(1), dba0.transaction_id()}; + auto v_remote = VertexAccessor(v_address, dba0); + dba0.RemoveVertex(v_remote); + EXPECT_TRUE(dba1.FindVertex(v_address.gid(), true)); + EXPECT_EQ(worker(1).remote_updates_server().Apply(dba0.transaction_id()), + distributed::RemoteUpdateResult::DONE); + EXPECT_FALSE(dba1.FindVertex(v_address.gid(), true)); +} + +TEST_F(DistributedGraphDbTest, DeleteVertexRemoteBothDelete) { + auto v_address = InsertVertex(worker(1)); + { + database::GraphDbAccessor dba0{master()}; + database::GraphDbAccessor dba1{worker(1), dba0.transaction_id()}; + auto v_local = dba1.FindVertexChecked(v_address.gid(), false); + auto v_remote = VertexAccessor(v_address, dba0); + EXPECT_TRUE(dba1.RemoveVertex(v_local)); + EXPECT_TRUE(dba0.RemoveVertex(v_remote)); + EXPECT_EQ(worker(1).remote_updates_server().Apply(dba0.transaction_id()), + distributed::RemoteUpdateResult::DONE); + EXPECT_FALSE(dba1.FindVertex(v_address.gid(), true)); + } +} + +TEST_F(DistributedGraphDbTest, DeleteVertexRemoteStillConnected) { + auto v_address = InsertVertex(worker(1)); + auto e_address = InsertEdge(v_address, v_address, "edge"); + + { + database::GraphDbAccessor dba0{master()}; + database::GraphDbAccessor dba1{worker(1), dba0.transaction_id()}; + auto v_remote = VertexAccessor(v_address, dba0); + dba0.RemoveVertex(v_remote); + EXPECT_EQ(worker(1).remote_updates_server().Apply(dba0.transaction_id()), + distributed::RemoteUpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR); + EXPECT_TRUE(dba1.FindVertex(v_address.gid(), true)); + } + { + database::GraphDbAccessor dba0{master()}; + database::GraphDbAccessor dba1{worker(1), dba0.transaction_id()}; + auto e_local = dba1.FindEdgeChecked(e_address.gid(), false); + auto v_local = dba1.FindVertexChecked(v_address.gid(), false); + auto v_remote = VertexAccessor(v_address, dba0); + + dba1.RemoveEdge(e_local); + dba0.RemoveVertex(v_remote); + + EXPECT_EQ(worker(1).remote_updates_server().Apply(dba0.transaction_id()), + distributed::RemoteUpdateResult::DONE); + EXPECT_FALSE(dba1.FindVertex(v_address.gid(), true)); + } +} + class DistributedEdgeCreateTest : public DistributedGraphDbTest { protected: storage::VertexAddress w1_a;