From 7d161319f0ac1e689fa3ad870f887481d42b5b18 Mon Sep 17 00:00:00 2001 From: Dominik Gleich <dominik.gleich@memgraph.io> Date: Tue, 22 May 2018 13:14:48 +0200 Subject: [PATCH] Remote vertex with gid creation Reviewers: buda Reviewed By: buda Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1401 --- src/database/graph_db_accessor.cpp | 9 ++++++--- src/database/graph_db_accessor.hpp | 9 ++++++--- src/distributed/updates_rpc_clients.cpp | 11 +++++------ src/distributed/updates_rpc_clients.hpp | 21 +++++++++++---------- src/distributed/updates_rpc_messages.hpp | 3 +++ src/distributed/updates_rpc_server.cpp | 9 +++++---- src/distributed/updates_rpc_server.hpp | 9 +++++---- tests/unit/distributed_updates.cpp | 11 +++++++++++ 8 files changed, 52 insertions(+), 30 deletions(-) diff --git a/src/database/graph_db_accessor.cpp b/src/database/graph_db_accessor.cpp index 6c9655df4..cbbcf178e 100644 --- a/src/database/graph_db_accessor.cpp +++ b/src/database/graph_db_accessor.cpp @@ -79,13 +79,16 @@ VertexAccessor GraphDbAccessor::InsertVertex( VertexAccessor GraphDbAccessor::InsertVertexIntoRemote( int worker_id, const std::vector<storage::Label> &labels, - const std::unordered_map<storage::Property, query::TypedValue> - &properties) { + const std::unordered_map<storage::Property, query::TypedValue> &properties, + std::experimental::optional<gid::Gid> requested_gid) { CHECK(worker_id != db().WorkerId()) << "Not allowed to call InsertVertexIntoRemote for local worker"; gid::Gid gid = db().updates_clients().CreateVertex( - worker_id, transaction_id(), labels, properties); + worker_id, transaction_id(), labels, properties, requested_gid); + + CHECK(!requested_gid || *requested_gid == gid) + << "Unable to assign requested vertex gid"; auto vertex = std::make_unique<Vertex>(); vertex->labels_ = labels; diff --git a/src/database/graph_db_accessor.hpp b/src/database/graph_db_accessor.hpp index bacb710d0..2a3985c4e 100644 --- a/src/database/graph_db_accessor.hpp +++ b/src/database/graph_db_accessor.hpp @@ -78,12 +78,15 @@ class GraphDbAccessor { VertexAccessor InsertVertex(std::experimental::optional<gid::Gid> requested_gid = std::experimental::nullopt); - /** Creates a new Vertex on the given worker. It is NOT allowed to call this - * function with this worker's id. */ + /** Creates a new Vertex on the given worker with the `requested_gid` if + * specified. It is NOT allowed to call this function with this worker's id. + */ VertexAccessor InsertVertexIntoRemote( int worker_id, const std::vector<storage::Label> &labels, const std::unordered_map<storage::Property, query::TypedValue> - &properties); + &properties, + std::experimental::optional<gid::Gid> requested_gid = + std::experimental::nullopt); /** * Removes the vertex of the given accessor. If the vertex has any outgoing or diff --git a/src/distributed/updates_rpc_clients.cpp b/src/distributed/updates_rpc_clients.cpp index bd5ccf52b..864f4a819 100644 --- a/src/distributed/updates_rpc_clients.cpp +++ b/src/distributed/updates_rpc_clients.cpp @@ -23,7 +23,7 @@ void RaiseIfRemoteError(UpdateResult result) { break; } } -} +} // namespace UpdateResult UpdatesRpcClients::Update(int worker_id, const database::StateDelta &delta) { @@ -35,10 +35,10 @@ UpdateResult UpdatesRpcClients::Update(int worker_id, gid::Gid UpdatesRpcClients::CreateVertex( int worker_id, tx::TransactionId tx_id, const std::vector<storage::Label> &labels, - const std::unordered_map<storage::Property, query::TypedValue> - &properties) { + const std::unordered_map<storage::Property, query::TypedValue> &properties, + std::experimental::optional<gid::Gid> requested_gid) { auto res = worker_clients_.GetClientPool(worker_id).Call<CreateVertexRpc>( - CreateVertexReqData{tx_id, labels, properties}); + CreateVertexReqData{tx_id, labels, properties, requested_gid}); CHECK(res) << "CreateVertexRpc failed on worker: " << worker_id; CHECK(res->member.result == UpdateResult::DONE) << "Remote Vertex creation result not UpdateResult::DONE"; @@ -59,8 +59,7 @@ storage::EdgeAddress UpdatesRpcClients::CreateEdge( return {res->member.gid, from_worker}; } -void UpdatesRpcClients::AddInEdge(tx::TransactionId tx_id, - VertexAccessor &from, +void UpdatesRpcClients::AddInEdge(tx::TransactionId tx_id, VertexAccessor &from, storage::EdgeAddress edge_address, VertexAccessor &to, storage::EdgeType edge_type) { diff --git a/src/distributed/updates_rpc_clients.hpp b/src/distributed/updates_rpc_clients.hpp index a5baf55f7..66c2c3708 100644 --- a/src/distributed/updates_rpc_clients.hpp +++ b/src/distributed/updates_rpc_clients.hpp @@ -26,20 +26,21 @@ class UpdatesRpcClients { /// Sends an update delta to the given worker. UpdateResult Update(int worker_id, const database::StateDelta &delta); - /// Creates a vertex on the given worker and returns it's id. - gid::Gid CreateVertex( - int worker_id, tx::TransactionId tx_id, - const std::vector<storage::Label> &labels, - const std::unordered_map<storage::Property, query::TypedValue> - &properties); + /// Creates a vertex on the given worker and returns it's id (tries to create + /// vertex with requested_gid first). + gid::Gid CreateVertex(int worker_id, tx::TransactionId tx_id, + const std::vector<storage::Label> &labels, + const std::unordered_map<storage::Property, + query::TypedValue> &properties, + std::experimental::optional<gid::Gid> requested_gid); /// Creates an edge on the given worker and returns it's address. If the `to` /// vertex is on the same worker as `from`, then all remote CRUD will be /// handled by a call to this function. Otherwise a separate call to /// `AddInEdge` might be necessary. Throws all the exceptions that can /// occur remotely as a result of updating a vertex. - storage::EdgeAddress CreateEdge(tx::TransactionId tx_id, - VertexAccessor &from, VertexAccessor &to, + storage::EdgeAddress CreateEdge(tx::TransactionId tx_id, VertexAccessor &from, + VertexAccessor &to, storage::EdgeType edge_type); /// Adds the edge with the given address to the `to` vertex as an incoming @@ -61,8 +62,8 @@ class UpdatesRpcClients { gid::Gid vertex_from_id, storage::VertexAddress vertex_to_addr); - void RemoveInEdge(tx::TransactionId tx_id, int worker_id, - gid::Gid vertex_id, storage::EdgeAddress edge_address); + void RemoveInEdge(tx::TransactionId tx_id, int worker_id, gid::Gid vertex_id, + storage::EdgeAddress edge_address); /// Calls for all the workers (except the given one) to apply their updates /// and returns the future results. diff --git a/src/distributed/updates_rpc_messages.hpp b/src/distributed/updates_rpc_messages.hpp index 098a13696..e257361e2 100644 --- a/src/distributed/updates_rpc_messages.hpp +++ b/src/distributed/updates_rpc_messages.hpp @@ -50,6 +50,7 @@ struct CreateVertexReqData { tx::TransactionId tx_id; std::vector<storage::Label> labels; std::unordered_map<storage::Property, query::TypedValue> properties; + std::experimental::optional<gid::Gid> requested_gid; private: friend class boost::serialization::access; @@ -63,6 +64,7 @@ struct CreateVertexReqData { ar << kv.first; utils::SaveTypedValue(ar, kv.second); } + ar << requested_gid; } template <class TArchive> @@ -78,6 +80,7 @@ struct CreateVertexReqData { utils::LoadTypedValue(ar, tv); properties.emplace(p, std::move(tv)); } + ar >> requested_gid; } BOOST_SERIALIZATION_SPLIT_MEMBER() }; diff --git a/src/distributed/updates_rpc_server.cpp b/src/distributed/updates_rpc_server.cpp index 09e25a569..a02d4cc73 100644 --- a/src/distributed/updates_rpc_server.cpp +++ b/src/distributed/updates_rpc_server.cpp @@ -61,9 +61,9 @@ UpdateResult UpdatesRpcServer::TransactionUpdates<TRecordAccessor>::Emplace( template <typename TRecordAccessor> gid::Gid UpdatesRpcServer::TransactionUpdates<TRecordAccessor>::CreateVertex( const std::vector<storage::Label> &labels, - const std::unordered_map<storage::Property, query::TypedValue> - &properties) { - auto result = db_accessor_.InsertVertex(); + const std::unordered_map<storage::Property, query::TypedValue> &properties, + std::experimental::optional<gid::Gid> requested_gid) { + auto result = db_accessor_.InsertVertex(requested_gid); for (auto &label : labels) result.add_label(label); for (auto &kv : properties) result.PropsSet(kv.first, kv.second); std::lock_guard<SpinLock> guard{lock_}; @@ -201,7 +201,8 @@ UpdatesRpcServer::UpdatesRpcServer(database::GraphDb &db, server.Register<CreateVertexRpc>([this](const CreateVertexReq &req) { gid::Gid gid = GetUpdates(vertex_updates_, req.member.tx_id) - .CreateVertex(req.member.labels, req.member.properties); + .CreateVertex(req.member.labels, req.member.properties, + req.member.requested_gid); return std::make_unique<CreateVertexRes>( CreateResult{UpdateResult::DONE, gid}); }); diff --git a/src/distributed/updates_rpc_server.hpp b/src/distributed/updates_rpc_server.hpp index de3bef334..62ce8ace0 100644 --- a/src/distributed/updates_rpc_server.hpp +++ b/src/distributed/updates_rpc_server.hpp @@ -40,11 +40,13 @@ class UpdatesRpcServer { /// fail-fast on serialization and update-after-delete errors. UpdateResult Emplace(const database::StateDelta &delta); - /// Creates a new vertex and returns it's gid. + /// Creates a new vertex with requested_gid if possible and returns it's + /// gid. gid::Gid CreateVertex( const std::vector<storage::Label> &labels, const std::unordered_map<storage::Property, query::TypedValue> - &properties); + &properties, + std::experimental::optional<gid::Gid> requested_gid); /// Creates a new edge and returns it's gid. Does not update vertices at the /// end of the edge. @@ -84,8 +86,7 @@ class UpdatesRpcServer { database::GraphDb &db_; template <typename TAccessor> - using MapT = - ConcurrentMap<tx::TransactionId, TransactionUpdates<TAccessor>>; + using MapT = ConcurrentMap<tx::TransactionId, TransactionUpdates<TAccessor>>; MapT<VertexAccessor> vertex_updates_; MapT<EdgeAccessor> edge_updates_; diff --git a/tests/unit/distributed_updates.cpp b/tests/unit/distributed_updates.cpp index 24ec0f9aa..ff37b66cb 100644 --- a/tests/unit/distributed_updates.cpp +++ b/tests/unit/distributed_updates.cpp @@ -133,6 +133,17 @@ TEST_F(DistributedGraphDbTest, CreateVertexWithData) { } } +// Checks if it's possible to request a specific gid for vertex creation +TEST_F(DistributedGraphDbTest, CreateVertexWithGid) { + std::experimental::optional<gid::Gid> gid(1337); + { + database::GraphDbAccessor dba{worker(1)}; + auto v = dba.InsertVertexIntoRemote(2, {}, {}, gid); + EXPECT_EQ(v.gid(), *gid); + dba.Commit(); + } +} + // Checks if expiring a local record for a local update before applying a remote // update delta causes a problem TEST_F(DistributedGraphDbTest, UpdateVertexRemoteAndLocal) {