From 0c40c67ac29859d5ec04b56a744ad383a7a55755 Mon Sep 17 00:00:00 2001 From: florijan Date: Tue, 20 Feb 2018 14:48:36 +0100 Subject: [PATCH] Implement remote create (storage, RPC, not operator) Summary: Implementation of remote vertex and edge creation. This diff addresses the creation API (`GraphDbAccessor::InsertEdge`, `GraphDbAccessor::InsertRemoteVertex`) and the necessary RPC and `RemoteCache` stuff. What is missing for full remote creation support are `query::plan::operator` changes that are expected to minor. Pushing this diff as it's large enough, operator and end to end tests in the next. Also, the naming of existing structures and files is confusing (update refering to both updates and created, `results` used too often etc.). I will address this too, but feel free to comment on bad naming. Reviewers: dgleich, teon.banek, msantl Reviewed By: dgleich Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1210 --- src/communication/rpc/messages-inl.hpp | 12 ++ src/database/graph_db_accessor.cpp | 150 ++++++++++---- src/database/graph_db_accessor.hpp | 28 ++- src/database/state_delta.cpp | 57 ++++++ src/database/state_delta.hpp | 28 ++- .../remote_updates_rpc_clients.hpp | 75 +++++++ .../remote_updates_rpc_messages.hpp | 113 +++++++++++ src/distributed/remote_updates_rpc_server.hpp | 104 ++++++++-- src/storage/address.hpp | 8 + src/storage/record_accessor.cpp | 40 ++-- src/storage/record_accessor.hpp | 6 - tests/unit/distributed_common.hpp | 7 +- tests/unit/distributed_updates.cpp | 185 ++++++++++++++++++ 13 files changed, 717 insertions(+), 96 deletions(-) diff --git a/src/communication/rpc/messages-inl.hpp b/src/communication/rpc/messages-inl.hpp index 5b8367135..3f8cd7f8d 100644 --- a/src/communication/rpc/messages-inl.hpp +++ b/src/communication/rpc/messages-inl.hpp @@ -90,3 +90,15 @@ BOOST_CLASS_EXPORT(distributed::RemoteUpdateReq); BOOST_CLASS_EXPORT(distributed::RemoteUpdateRes); BOOST_CLASS_EXPORT(distributed::RemoteUpdateApplyReq); BOOST_CLASS_EXPORT(distributed::RemoteUpdateApplyRes); + +// Remote creates +BOOST_CLASS_EXPORT(distributed::RemoteCreateResult); +BOOST_CLASS_EXPORT(distributed::RemoteCreateVertexReq); +BOOST_CLASS_EXPORT(distributed::RemoteCreateVertexReqData); +BOOST_CLASS_EXPORT(distributed::RemoteCreateVertexRes); +BOOST_CLASS_EXPORT(distributed::RemoteCreateEdgeReqData); +BOOST_CLASS_EXPORT(distributed::RemoteCreateEdgeReq); +BOOST_CLASS_EXPORT(distributed::RemoteCreateEdgeRes); +BOOST_CLASS_EXPORT(distributed::RemoteAddInEdgeReqData); +BOOST_CLASS_EXPORT(distributed::RemoteAddInEdgeReq); +BOOST_CLASS_EXPORT(distributed::RemoteAddInEdgeRes); diff --git a/src/database/graph_db_accessor.cpp b/src/database/graph_db_accessor.cpp index 7e354a675..e70bfdee4 100644 --- a/src/database/graph_db_accessor.cpp +++ b/src/database/graph_db_accessor.cpp @@ -6,6 +6,8 @@ #include "database/graph_db_accessor.hpp" #include "database/state_delta.hpp" #include "distributed/index_rpc_messages.hpp" +#include "distributed/remote_data_manager.hpp" +#include "distributed/remote_updates_rpc_clients.hpp" #include "storage/address_types.hpp" #include "storage/edge.hpp" #include "storage/edge_accessor.hpp" @@ -16,6 +18,35 @@ namespace database { +#define LOCALIZED_ADDRESS_SPECIALIZATION(type) \ + template <> \ + storage::type##Address GraphDbAccessor::LocalizedAddress( \ + storage::type##Address address) const { \ + if (address.is_local()) return address; \ + if (address.worker_id() == db().WorkerId()) { \ + return Local##type##Address(address.gid()); \ + } \ + return address; \ + } + +LOCALIZED_ADDRESS_SPECIALIZATION(Vertex) +LOCALIZED_ADDRESS_SPECIALIZATION(Edge) + +#undef LOCALIZED_ADDRESS_SPECIALIZATION + +#define GLOBALIZED_ADDRESS_SPECIALIZATION(type) \ + template <> \ + storage::type##Address GraphDbAccessor::GlobalizedAddress( \ + storage::type##Address address) const { \ + if (address.is_remote()) return address; \ + return {address.local()->gid_, db().WorkerId()}; \ + } + +GLOBALIZED_ADDRESS_SPECIALIZATION(Vertex) +GLOBALIZED_ADDRESS_SPECIALIZATION(Edge) + +#undef GLOBALIZED_ADDRESS_SPECIALIZATION + GraphDbAccessor::GraphDbAccessor(GraphDb &db) : db_(db), transaction_(*db.tx_engine().Begin()), @@ -76,6 +107,26 @@ VertexAccessor GraphDbAccessor::InsertVertex( return VertexAccessor(vertex_vlist, *this); } +VertexAccessor GraphDbAccessor::InsertVertexIntoRemote( + int worker_id, const std::vector &labels, + const std::unordered_map + &properties) { + CHECK(worker_id != db().WorkerId()) + << "Not allowed to call InsertVertexIntoRemote for local worker"; + + gid::Gid gid = db().remote_updates_clients().RemoteCreateVertex( + worker_id, transaction_id(), labels, properties); + + auto vertex = std::make_unique(); + vertex->labels_ = labels; + for (auto &kv : properties) vertex->properties_.set(kv.first, kv.second); + + db().remote_data_manager() + .Vertices(transaction_id()) + .emplace(gid, nullptr, std::move(vertex)); + return VertexAccessor({gid, worker_id}, *this); +} + std::experimental::optional GraphDbAccessor::FindVertex( gid::Gid gid, bool current_state) { VertexAccessor record_accessor(LocalVertexAddress(gid), *this); @@ -375,55 +426,79 @@ EdgeAccessor GraphDbAccessor::InsertEdge( VertexAccessor &from, VertexAccessor &to, storage::EdgeType edge_type, std::experimental::optional requested_gid) { DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted"; - // An edge is created on the worker of it's "from" vertex. - if (!from.is_local()) { - LOG(ERROR) << "Remote edge insertion not implemented."; - // TODO call remote InsertEdge(...)->gid. Possible outcomes are successful - // creation or an error (serialization, timeout). If successful, create an - // EdgeAccessor and return it. The remote InsertEdge(...) will be calling - // remote Connect(...) if "to" is not local to it. + + // The address of an edge we'll create. + storage::EdgeAddress edge_address; + + Vertex *from_updated; + if (from.is_local()) { + auto gid = db_.storage().edge_generator_.Next(requested_gid); + edge_address = new mvcc::VersionList( + transaction_, gid, from.address(), to.address(), edge_type); + // We need to insert edge_address to edges_ before calling update since + // update can throw and edge_vlist will not be garbage collected if it is + // not in edges_ skiplist. + bool success = + db_.storage().edges_.access().insert(gid, edge_address.local()).second; + CHECK(success) << "Attempting to insert an edge with an existing GID: " + << gid; + + from.SwitchNew(); + from_updated = &from.update(); + + // TODO when preparing WAL for distributed, most likely never use + // `CREATE_EDGE`, but always have it split into 3 parts (edge insertion, + // in/out modification). + wal().Emplace(database::StateDelta::CreateEdge( + transaction_.id_, gid, from.gid(), to.gid(), edge_type, + EdgeTypeName(edge_type))); + + } else { + edge_address = db().remote_updates_clients().RemoteCreateEdge( + transaction_id(), from, to, edge_type); + + from_updated = db().remote_data_manager() + .Vertices(transaction_id()) + .FindNew(from.gid()); + + // Create an Edge and insert it into the RemoteCache so we see it locally. + db().remote_data_manager() + .Edges(transaction_id()) + .emplace( + edge_address.gid(), nullptr, + std::make_unique(from.address(), to.address(), edge_type)); } - auto gid = db_.storage().edge_generator_.Next(requested_gid); - auto edge_vlist = new mvcc::VersionList( - transaction_, gid, from.address(), to.address(), edge_type); - // We need to insert edge_vlist to edges_ before calling update since update - // can throw and edge_vlist will not be garbage collected if it is not in - // edges_ skiplist. - bool success = db_.storage().edges_.access().insert(gid, edge_vlist).second; - CHECK(success) << "Attempting to insert an edge with an existing GID: " - << gid; + from_updated->out_.emplace(to.address(), edge_address, edge_type); - // ensure that the "from" accessor has the latest version - from.SwitchNew(); - from.update().out_.emplace(to.address(), edge_vlist, edge_type); - - // It is possible that the "to" accessor is remote. + Vertex *to_updated; if (to.is_local()) { // ensure that the "to" accessor has the latest version (Switch new) // WARNING: must do that after the above "from.update()" for cases when // we are creating a cycle and "from" and "to" are the same vlist to.SwitchNew(); - to.update().in_.emplace(from.address(), edge_vlist, edge_type); + to_updated = &to.update(); } else { - LOG(ERROR) << "Connecting to a remote vertex not implemented."; - // TODO call remote Connect(from_gid, edge_gid, to_gid, edge_type). Possible - // outcomes are success or error (serialization, timeout). + // The RPC call for the `to` side is already handled if `from` is not local. + if (from.is_local() || + from.address().worker_id() != to.address().worker_id()) { + db().remote_updates_clients().RemoteAddInEdge( + transaction_id(), from, GlobalizedAddress(edge_address), to, + edge_type); + } + to_updated = + db().remote_data_manager().Vertices(transaction_id()).FindNew(to.gid()); } - wal().Emplace(database::StateDelta::CreateEdge( - transaction_.id_, edge_vlist->gid_, from.gid(), to.gid(), edge_type, - EdgeTypeName(edge_type))); - return EdgeAccessor(edge_vlist, *this, from.address(), to.address(), + to_updated->in_.emplace(from.address(), edge_address, edge_type); + + return EdgeAccessor(edge_address, *this, from.address(), to.address(), edge_type); } -EdgeAccessor GraphDbAccessor::InsertOnlyEdge(storage::VertexAddress &from, - storage::VertexAddress &to, - storage::EdgeType edge_type, - gid::Gid edge_gid) { - auto gid = db_.storage().edge_generator_.Next(edge_gid); - DCHECK(gid == edge_gid) << "Gid should be equal as edge gid since " - "this edges are only added after vertices " - "reference them by their gid"; +EdgeAccessor GraphDbAccessor::InsertOnlyEdge( + storage::VertexAddress from, storage::VertexAddress to, + storage::EdgeType edge_type, + std::experimental::optional requested_gid) { + auto gid = db_.storage().edge_generator_.Next(requested_gid); auto edge_vlist = new mvcc::VersionList(transaction_, gid, from, to, edge_type); // We need to insert edge_vlist to edges_ before calling update since update @@ -530,4 +605,5 @@ mvcc::VersionList *GraphDbAccessor::LocalEdgeAddress(gid::Gid gid) const { CHECK(found != access.end()) << "Failed to find edge for gid: " << gid; return found->second; } + } // namespace database diff --git a/src/database/graph_db_accessor.hpp b/src/database/graph_db_accessor.hpp index 2fb2ea2aa..3b6083d36 100644 --- a/src/database/graph_db_accessor.hpp +++ b/src/database/graph_db_accessor.hpp @@ -2,6 +2,7 @@ #include #include +#include #include "cppitertools/filter.hpp" #include "cppitertools/imap.hpp" @@ -9,6 +10,7 @@ #include "database/graph_db.hpp" #include "distributed/remote_cache.hpp" +#include "query/typed_value.hpp" #include "storage/address_types.hpp" #include "storage/edge_accessor.hpp" #include "storage/types.hpp" @@ -76,6 +78,13 @@ class GraphDbAccessor { VertexAccessor InsertVertex(std::experimental::optional requested_gid = std::experimental::nullopt); + /** Creates a new Vertex on the given worker. It is NOT allowed to call this + * function with this worker's id. */ + VertexAccessor InsertVertexIntoRemote( + int worker_id, const std::vector &labels, + const std::unordered_map + &properties); + /** * Removes the vertex of the given accessor. If the vertex has any outgoing or * incoming edges, it is not deleted. See `DetachRemoveVertex` if you want to @@ -281,9 +290,11 @@ class GraphDbAccessor { * Insert edge into main storage, but don't insert it into from and to * vertices edge lists. */ - EdgeAccessor InsertOnlyEdge(storage::VertexAddress &from, - storage::VertexAddress &to, - storage::EdgeType edge_type, gid::Gid edge_gid); + EdgeAccessor InsertOnlyEdge(storage::VertexAddress from, + storage::VertexAddress to, + storage::EdgeType edge_type, + std::experimental::optional + requested_gid = std::experimental::nullopt); /** * Removes an edge from the graph. Parameters can indicate if the edge should @@ -533,6 +544,7 @@ class GraphDbAccessor { const tx::Transaction &transaction() const { return transaction_; } durability::WriteAheadLog &wal(); auto &db() { return db_; } + const auto &db() const { return db_; } /** * Returns the current value of the counter with the given name, and @@ -557,6 +569,16 @@ class GraphDbAccessor { /// Gets the local edge address for the given gid. Fails if not present. mvcc::VersionList *LocalEdgeAddress(gid::Gid gid) const; + /// Converts an address to local, if possible. Returns the same address if + /// not. + template + TAddress LocalizedAddress(TAddress address) const; + + /// Converts local address to remote, returns remote as they are. + /// not. + template + TAddress GlobalizedAddress(TAddress address) const; + private: GraphDb &db_; tx::Transaction &transaction_; diff --git a/src/database/state_delta.cpp b/src/database/state_delta.cpp index ac1c25da2..d7a27b1d8 100644 --- a/src/database/state_delta.cpp +++ b/src/database/state_delta.cpp @@ -39,6 +39,35 @@ StateDelta StateDelta::CreateEdge(tx::transaction_id_t tx_id, gid::Gid edge_id, return op; } +StateDelta StateDelta::AddOutEdge(tx::transaction_id_t tx_id, + gid::Gid vertex_id, + storage::VertexAddress vertex_to_address, + storage::EdgeAddress edge_address, + storage::EdgeType edge_type) { + CHECK(vertex_to_address.is_remote() && edge_address.is_remote()) + << "WAL can only contain global addresses."; + StateDelta op(StateDelta::Type::ADD_OUT_EDGE, tx_id); + op.vertex_id = vertex_id; + op.vertex_to_address = vertex_to_address; + op.edge_address = edge_address; + op.edge_type = edge_type; + return op; +} + +StateDelta StateDelta::AddInEdge(tx::transaction_id_t tx_id, gid::Gid vertex_id, + storage::VertexAddress vertex_from_address, + storage::EdgeAddress edge_address, + storage::EdgeType edge_type) { + CHECK(vertex_from_address.is_remote() && edge_address.is_remote()) + << "WAL can only contain global addresses."; + StateDelta op(StateDelta::Type::ADD_IN_EDGE, tx_id); + op.vertex_id = vertex_id; + op.vertex_from_address = vertex_from_address; + op.edge_address = edge_address; + op.edge_type = edge_type; + return op; +} + StateDelta StateDelta::PropsSetVertex(tx::transaction_id_t tx_id, gid::Gid vertex_id, storage::Property property, @@ -133,6 +162,18 @@ void StateDelta::Encode( encoder.WriteInt(edge_type.storage()); encoder.WriteString(edge_type_name); break; + case Type::ADD_OUT_EDGE: + encoder.WriteInt(vertex_id); + encoder.WriteInt(vertex_to_address.raw()); + encoder.WriteInt(edge_address.raw()); + encoder.WriteInt(edge_type.storage()); + break; + case Type::ADD_IN_EDGE: + encoder.WriteInt(vertex_id); + encoder.WriteInt(vertex_from_address.raw()); + encoder.WriteInt(edge_address.raw()); + encoder.WriteInt(edge_type.storage()); + break; case Type::SET_PROPERTY_VERTEX: encoder.WriteInt(vertex_id); encoder.WriteInt(property.storage()); @@ -205,6 +246,19 @@ std::experimental::optional StateDelta::Decode( DECODE_MEMBER_CAST(edge_type, ValueInt, storage::EdgeType) DECODE_MEMBER(edge_type_name, ValueString) break; + case Type::ADD_OUT_EDGE: + DECODE_MEMBER(vertex_id, ValueInt) + DECODE_MEMBER_CAST(vertex_to_address, ValueInt, storage::VertexAddress) + DECODE_MEMBER_CAST(edge_address, ValueInt, storage::EdgeAddress) + DECODE_MEMBER_CAST(edge_type, ValueInt, storage::EdgeType) + break; + case Type::ADD_IN_EDGE: + DECODE_MEMBER(vertex_id, ValueInt) + DECODE_MEMBER_CAST(vertex_from_address, ValueInt, + storage::VertexAddress) + DECODE_MEMBER_CAST(edge_address, ValueInt, storage::EdgeAddress) + DECODE_MEMBER_CAST(edge_type, ValueInt, storage::EdgeType) + break; case Type::SET_PROPERTY_VERTEX: DECODE_MEMBER(vertex_id, ValueInt) DECODE_MEMBER_CAST(property, ValueInt, storage::Property) @@ -273,6 +327,9 @@ void StateDelta::Apply(GraphDbAccessor &dba) const { dba.InsertEdge(*from, *to, dba.EdgeType(edge_type_name), edge_id); break; } + case Type::ADD_OUT_EDGE: + case Type::ADD_IN_EDGE: + LOG(FATAL) << "Partial edge-creation not yet supported in Apply"; case Type::SET_PROPERTY_VERTEX: { auto vertex = dba.FindVertex(vertex_id, true); DCHECK(vertex) << "Failed to find vertex."; diff --git a/src/database/state_delta.hpp b/src/database/state_delta.hpp index 6f346b385..0cbe6d7c6 100644 --- a/src/database/state_delta.hpp +++ b/src/database/state_delta.hpp @@ -4,6 +4,7 @@ #include "communication/bolt/v1/encoder/primitive_encoder.hpp" #include "durability/hashed_file_reader.hpp" #include "durability/hashed_file_writer.hpp" +#include "storage/address_types.hpp" #include "storage/gid.hpp" #include "storage/property_value.hpp" #include "utils/serialization.hpp" @@ -28,9 +29,11 @@ struct StateDelta { TRANSACTION_BEGIN, TRANSACTION_COMMIT, TRANSACTION_ABORT, - CREATE_VERTEX, // vertex_id - CREATE_EDGE, // edge_id, from_vertex_id, to_vertex_id, edge_type, - // edge_type_name + CREATE_VERTEX, // vertex_id + CREATE_EDGE, // edge_id, from_vertex_id, to_vertex_id, edge_type, + // edge_type_name + ADD_OUT_EDGE, // vertex_id, edge_address, vertex_to_address, edge_type + ADD_IN_EDGE, // vertex_id, edge_address, vertex_from_address, edge_type SET_PROPERTY_VERTEX, // vertex_id, property, property_name, property_value SET_PROPERTY_EDGE, // edge_id, property, property_name, property_value // remove property is done by setting a PropertyValue::Null @@ -66,6 +69,14 @@ struct StateDelta { gid::Gid vertex_from_id, gid::Gid vertex_to_id, storage::EdgeType edge_type, const std::string &edge_type_name); + static StateDelta AddOutEdge(tx::transaction_id_t tx_id, gid::Gid vertex_id, + storage::VertexAddress vertex_to_address, + storage::EdgeAddress edge_address, + storage::EdgeType edge_type); + static StateDelta AddInEdge(tx::transaction_id_t tx_id, gid::Gid vertex_id, + storage::VertexAddress vertex_from_address, + storage::EdgeAddress edge_address, + storage::EdgeType edge_type); static StateDelta PropsSetVertex(tx::transaction_id_t tx_id, gid::Gid vertex_id, storage::Property property, @@ -97,10 +108,15 @@ struct StateDelta { tx::transaction_id_t transaction_id; // Members valid only for some deltas, see StateDelta::Type comments above. + // TODO: when preparing the WAL for distributed, most likely remove Gids and + // only keep addresses. gid::Gid vertex_id; gid::Gid edge_id; + storage::EdgeAddress edge_address; gid::Gid vertex_from_id; + storage::VertexAddress vertex_from_address; gid::Gid vertex_to_id; + storage::VertexAddress vertex_to_address; storage::EdgeType edge_type; std::string edge_type_name; storage::Property property; @@ -118,8 +134,11 @@ struct StateDelta { ar &transaction_id; ar &vertex_id; ar &edge_id; + ar &edge_address; ar &vertex_from_id; + ar &vertex_from_address; ar &vertex_to_id; + ar &vertex_to_address; ar &edge_type; ar &edge_type_name; ar &property; @@ -135,8 +154,11 @@ struct StateDelta { ar &transaction_id; ar &vertex_id; ar &edge_id; + ar &edge_address; ar &vertex_from_id; + ar &vertex_from_address; ar &vertex_to_id; + ar &vertex_to_address; ar &edge_type; ar &edge_type_name; ar &property; diff --git a/src/distributed/remote_updates_rpc_clients.hpp b/src/distributed/remote_updates_rpc_clients.hpp index f54236d8e..ace9579a7 100644 --- a/src/distributed/remote_updates_rpc_clients.hpp +++ b/src/distributed/remote_updates_rpc_clients.hpp @@ -1,9 +1,16 @@ #pragma once +#include +#include + #include "database/state_delta.hpp" #include "distributed/coordination.hpp" #include "distributed/remote_updates_rpc_messages.hpp" #include "distributed/rpc_worker_clients.hpp" +#include "query/typed_value.hpp" +#include "storage/address_types.hpp" +#include "storage/gid.hpp" +#include "storage/types.hpp" #include "transactions/type.hpp" namespace distributed { @@ -24,6 +31,60 @@ class RemoteUpdatesRpcClients { ->member; } + /// Creates a vertex on the given worker and returns it's id. + gid::Gid RemoteCreateVertex( + int worker_id, tx::transaction_id_t tx_id, + const std::vector &labels, + const std::unordered_map + &properties) { + auto result = + worker_clients_.GetClientPool(worker_id).Call( + RemoteCreateVertexReqData{tx_id, labels, properties}); + CHECK(result) << "Failed to remote-create a vertex on worker: " + << worker_id; + CHECK(result->member.result == RemoteUpdateResult::DONE) + << "Vertex creation can not result in an error"; + return result->member.gid; + } + + /// Creates an edge on the given worker and returns it's address. If the `to` + /// vertex is on the same worker as `from`, then all remote CRUD will be + /// handled by a call to this function. Otherwise a separate call to + /// `RemoteAddInEdge` might be necessary. Throws all the exceptions that can + /// occur remotely as a result of updating a vertex. + storage::EdgeAddress RemoteCreateEdge(tx::transaction_id_t tx_id, + VertexAccessor &from, + VertexAccessor &to, + storage::EdgeType edge_type) { + CHECK(from.address().is_remote()) + << "In RemoteCreateEdge `from` must be remote"; + + int from_worker = from.address().worker_id(); + auto res = worker_clients_.GetClientPool(from_worker) + .Call(RemoteCreateEdgeReqData{ + from.gid(), to.GlobalAddress(), edge_type, tx_id}); + CHECK(res) << "RemoteCreateEdge RPC failed"; + RaiseIfRemoteError(res->member.result); + return {res->member.gid, from_worker}; + } + + /// Adds the edge with the given address to the `to` vertex as an incoming + /// edge. Only used when `to` is remote and not on the same worker as `from`. + void RemoteAddInEdge(tx::transaction_id_t tx_id, VertexAccessor &from, + storage::EdgeAddress edge_address, VertexAccessor &to, + storage::EdgeType edge_type) { + CHECK(to.address().is_remote() && edge_address.is_remote() && + (from.GlobalAddress().worker_id() != to.address().worker_id())) + << "RemoteAddInEdge should only be called when `to` is remote and " + "`from` is not on the same worker as `to`."; + auto res = worker_clients_.GetClientPool(to.GlobalAddress().worker_id()) + .Call(RemoteAddInEdgeReqData{ + from.GlobalAddress(), edge_address, to.gid(), edge_type, + tx_id}); + CHECK(res) << "RemoteAddInEdge RPC failed"; + RaiseIfRemoteError(res->member); + } + /// Calls for the worker with the given ID to apply remote updates. Returns /// the results of that operation. RemoteUpdateResult RemoteUpdateApply(int worker_id, @@ -45,5 +106,19 @@ class RemoteUpdatesRpcClients { private: RpcWorkerClients worker_clients_; + + void RaiseIfRemoteError(RemoteUpdateResult result) { + switch (result) { + case RemoteUpdateResult::SERIALIZATION_ERROR: + throw new mvcc::SerializationError(); + case RemoteUpdateResult::LOCK_TIMEOUT_ERROR: + throw new LockTimeoutException( + "Remote LockTimeoutError during edge creation"); + case RemoteUpdateResult::UPDATE_DELETED_ERROR: + throw new RecordDeletedError(); + case RemoteUpdateResult::DONE: + break; + } + } }; } // namespace distributed diff --git a/src/distributed/remote_updates_rpc_messages.hpp b/src/distributed/remote_updates_rpc_messages.hpp index 451bebd24..76e4c885f 100644 --- a/src/distributed/remote_updates_rpc_messages.hpp +++ b/src/distributed/remote_updates_rpc_messages.hpp @@ -1,8 +1,15 @@ #pragma once +#include + +#include "boost/serialization/vector.hpp" + #include "communication/rpc/messages.hpp" #include "database/state_delta.hpp" +#include "storage/address_types.hpp" +#include "storage/gid.hpp" #include "transactions/type.hpp" +#include "utils/serialization.hpp" namespace distributed { @@ -26,4 +33,110 @@ RPC_SINGLE_MEMBER_MESSAGE(RemoteUpdateApplyRes, RemoteUpdateResult); using RemoteUpdateApplyRpc = communication::rpc::RequestResponse; + +struct RemoteCreateResult { + RemoteUpdateResult result; + // Only valid if creation was successful. + gid::Gid gid; + + private: + friend class boost::serialization::access; + + template + void serialize(TArchive &ar, unsigned int) { + ar &result; + ar &gid; + } +}; + +struct RemoteCreateVertexReqData { + tx::transaction_id_t tx_id; + std::vector labels; + std::unordered_map properties; + + private: + friend class boost::serialization::access; + + template + void save(TArchive &ar, unsigned int) const { + ar << tx_id; + ar << labels; + ar << properties.size(); + for (auto &kv : properties) { + ar << kv.first; + utils::SaveTypedValue(ar, kv.second); + } + } + + template + void load(TArchive &ar, unsigned int) { + ar >> tx_id; + ar >> labels; + size_t props_size; + ar >> props_size; + for (size_t i = 0; i < props_size; ++i) { + storage::Property p; + ar >> p; + query::TypedValue tv; + utils::LoadTypedValue(ar, tv); + properties.emplace(p, std::move(tv)); + } + } + BOOST_SERIALIZATION_SPLIT_MEMBER() +}; + +RPC_SINGLE_MEMBER_MESSAGE(RemoteCreateVertexReq, RemoteCreateVertexReqData); +RPC_SINGLE_MEMBER_MESSAGE(RemoteCreateVertexRes, RemoteCreateResult); +using RemoteCreateVertexRpc = + communication::rpc::RequestResponse; + +struct RemoteCreateEdgeReqData { + gid::Gid from; + storage::VertexAddress to; + storage::EdgeType edge_type; + tx::transaction_id_t tx_id; + + private: + friend class boost::serialization::access; + + template + void serialize(TArchive &ar, unsigned int) { + ar &from; + ar &to; + ar &edge_type; + ar &tx_id; + } +}; + +RPC_SINGLE_MEMBER_MESSAGE(RemoteCreateEdgeReq, RemoteCreateEdgeReqData); +RPC_SINGLE_MEMBER_MESSAGE(RemoteCreateEdgeRes, RemoteCreateResult); +using RemoteCreateEdgeRpc = + communication::rpc::RequestResponse; + +struct RemoteAddInEdgeReqData { + storage::VertexAddress from; + storage::EdgeAddress edge_address; + gid::Gid to; + storage::EdgeType edge_type; + tx::transaction_id_t tx_id; + + private: + friend class boost::serialization::access; + + template + void serialize(TArchive &ar, unsigned int) { + ar &from; + ar &edge_address; + ar &to; + ar &edge_type; + ar &tx_id; + } +}; + +RPC_SINGLE_MEMBER_MESSAGE(RemoteAddInEdgeReq, RemoteAddInEdgeReqData); +RPC_SINGLE_MEMBER_MESSAGE(RemoteAddInEdgeRes, RemoteUpdateResult); +using RemoteAddInEdgeRpc = + communication::rpc::RequestResponse; } // namespace distributed diff --git a/src/distributed/remote_updates_rpc_server.hpp b/src/distributed/remote_updates_rpc_server.hpp index 309413168..49b555633 100644 --- a/src/distributed/remote_updates_rpc_server.hpp +++ b/src/distributed/remote_updates_rpc_server.hpp @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -14,8 +15,10 @@ #include "database/state_delta.hpp" #include "distributed/remote_updates_rpc_messages.hpp" #include "mvcc/version_list.hpp" +#include "query/typed_value.hpp" #include "storage/gid.hpp" #include "storage/record_accessor.hpp" +#include "storage/types.hpp" #include "storage/vertex_accessor.hpp" #include "threading/sync/lock_timeout_exception.hpp" #include "threading/sync/spinlock.hpp" @@ -90,6 +93,21 @@ class RemoteUpdatesRpcServer { return RemoteUpdateResult::DONE; } + /// Creates a new vertex and returns it's gid. + RemoteCreateResult CreateVertex( + const std::vector &labels, + const std::unordered_map + &properties) { + auto result = db_accessor_.InsertVertex(); + for (auto &label : labels) result.add_label(label); + for (auto &kv : properties) result.PropsSet(kv.first, kv.second); + std::lock_guard guard{lock_}; + deltas_.emplace( + result.gid(), + std::make_pair(result, std::vector{})); + return {RemoteUpdateResult::DONE, result.gid()}; + } + /// Applies all the deltas on the record. RemoteUpdateResult Apply() { std::lock_guard guard{lock_}; @@ -109,6 +127,8 @@ class RemoteUpdatesRpcServer { return RemoteUpdateResult::DONE; } + auto &db_accessor() { return db_accessor_; } + private: database::GraphDbAccessor db_accessor_; std::unordered_map< @@ -127,15 +147,16 @@ class RemoteUpdatesRpcServer { : db_(db), engine_(engine), server_(system, kRemoteUpdatesRpc) { server_.Register([this](const RemoteUpdateReq &req) { using DeltaType = database::StateDelta::Type; - switch (req.member.type) { + auto &delta = req.member; + switch (delta.type) { case DeltaType::SET_PROPERTY_VERTEX: case DeltaType::ADD_LABEL: case DeltaType::REMOVE_LABEL: return std::make_unique( - Process(vertex_updates_, req.member)); + GetUpdates(vertex_updates_, delta.transaction_id).Emplace(delta)); case DeltaType::SET_PROPERTY_EDGE: return std::make_unique( - Process(edge_updates_, req.member)); + GetUpdates(edge_updates_, delta.transaction_id).Emplace(delta)); default: LOG(FATAL) << "Can't perform a remote update with delta type: " << static_cast(req.member.type); @@ -146,6 +167,41 @@ class RemoteUpdatesRpcServer { [this](const RemoteUpdateApplyReq &req) { return std::make_unique(Apply(req.member)); }); + + server_.Register( + [this](const RemoteCreateVertexReq &req) { + return std::make_unique( + GetUpdates(vertex_updates_, req.member.tx_id) + .CreateVertex(req.member.labels, req.member.properties)); + }); + + server_.Register( + [this](const RemoteCreateEdgeReq &req) { + auto data = req.member; + auto creation_result = CreateEdge(data); + + // If `from` and `to` are both on this worker, we handle it in this + // RPC call. Do it only if CreateEdge succeeded. + if (creation_result.result == RemoteUpdateResult::DONE && + data.to.worker_id() == db_.WorkerId()) { + auto to_delta = database::StateDelta::AddInEdge( + data.tx_id, data.to.gid(), data.from, + {creation_result.gid, db_.WorkerId()}, data.edge_type); + creation_result.result = + GetUpdates(vertex_updates_, data.tx_id).Emplace(to_delta); + } + + return std::make_unique(creation_result); + }); + + server_.Register([this](const RemoteAddInEdgeReq &req) { + auto to_delta = database::StateDelta::AddInEdge( + req.member.tx_id, req.member.to, req.member.from, + req.member.edge_address, req.member.edge_type); + auto result = + GetUpdates(vertex_updates_, req.member.tx_id).Emplace(to_delta); + return std::make_unique(result); + }); } /// Applies all existsing updates for the given transaction ID. If there are @@ -174,29 +230,39 @@ class RemoteUpdatesRpcServer { database::GraphDb &db_; tx::Engine &engine_; communication::rpc::Server server_; - ConcurrentMap> - vertex_updates_; - ConcurrentMap> - edge_updates_; tx::TxEndListener tx_end_listener_{engine_, [this](tx::transaction_id_t tx_id) { vertex_updates_.access().remove(tx_id); edge_updates_.access().remove(tx_id); }}; + template + using MapT = + ConcurrentMap>; + MapT vertex_updates_; + MapT edge_updates_; - // Processes a single delta recieved in the RPC request. - template - RemoteUpdateResult Process(TCollection &updates, - const database::StateDelta &delta) { - auto tx_id = delta.transaction_id; - auto access = updates.access(); - auto &transaction_updates = - access - .emplace(tx_id, std::make_tuple(tx_id), - std::make_tuple(std::ref(db_), tx_id)) - .first->second; + // Gets/creates the TransactionUpdates for the given transaction. + template + TransactionUpdates &GetUpdates(MapT &updates, + tx::transaction_id_t tx_id) { + return updates.access() + .emplace(tx_id, std::make_tuple(tx_id), + std::make_tuple(std::ref(db_), tx_id)) + .first->second; + } - return transaction_updates.Emplace(delta); + RemoteCreateResult CreateEdge(const RemoteCreateEdgeReqData &req) { + auto &dba = GetUpdates(edge_updates_, req.tx_id).db_accessor(); + + auto edge = dba.InsertOnlyEdge({req.from, db_.WorkerId()}, + dba.LocalizedAddress(req.to), req.edge_type); + + auto from_delta = database::StateDelta::AddOutEdge( + req.tx_id, req.from, req.to, dba.GlobalizedAddress(edge.address()), + req.edge_type); + + auto result = GetUpdates(vertex_updates_, req.tx_id).Emplace(from_delta); + return {result, edge.gid()}; } }; diff --git a/src/storage/address.hpp b/src/storage/address.hpp index d7455eec8..93e83dcba 100644 --- a/src/storage/address.hpp +++ b/src/storage/address.hpp @@ -2,7 +2,9 @@ #include +#include "boost/serialization/access.hpp" #include "glog/logging.h" + #include "storage/gid.hpp" namespace storage { @@ -89,5 +91,11 @@ class Address { private: StorageT storage_{0}; + + friend class boost::serialization::access; + template + void serialize(TArchive &ar, unsigned int) { + ar &storage_; + } }; } // namespace storage diff --git a/src/storage/record_accessor.cpp b/src/storage/record_accessor.cpp index 927fca8bd..1400ad1f9 100644 --- a/src/storage/record_accessor.cpp +++ b/src/storage/record_accessor.cpp @@ -14,7 +14,8 @@ using database::StateDelta; template RecordAccessor::RecordAccessor(AddressT address, database::GraphDbAccessor &db_accessor) - : db_accessor_(&db_accessor), address_(NormalizedAddress(address)) {} + : db_accessor_(&db_accessor), + address_(db_accessor.LocalizedAddress(address)) {} template const PropertyValue &RecordAccessor::PropsAt( @@ -198,6 +199,7 @@ const TRecord &RecordAccessor::current() const { template void RecordAccessor::ProcessDelta( const database::StateDelta &delta) const { + auto &dba = db_accessor(); // Apply the delta both on local and remote data. We need to see the changes // we make to remote data, even if it's not applied immediately. auto &updated = update(); @@ -227,12 +229,22 @@ void RecordAccessor::ProcessDelta( std::swap(*found, labels.back()); labels.pop_back(); } break; + case StateDelta::Type::ADD_OUT_EDGE: + reinterpret_cast(updated).out_.emplace( + dba.LocalizedAddress(delta.vertex_to_address), + dba.LocalizedAddress(delta.edge_address), delta.edge_type); + break; + case StateDelta::Type::ADD_IN_EDGE: + reinterpret_cast(updated).in_.emplace( + dba.LocalizedAddress(delta.vertex_from_address), + dba.LocalizedAddress(delta.edge_address), delta.edge_type); + break; } if (is_local()) { - db_accessor().wal().Emplace(delta); + dba.wal().Emplace(delta); } else { - auto result = db_accessor().db().remote_updates_clients().RemoteUpdate( + auto result = dba.db().remote_updates_clients().RemoteUpdate( address().worker_id(), delta); switch (result) { case distributed::RemoteUpdateResult::DONE: @@ -247,27 +259,5 @@ void RecordAccessor::ProcessDelta( } } -template <> -RecordAccessor::AddressT RecordAccessor::NormalizedAddress( - AddressT address) const { - if (address.is_local()) return address; - if (address.worker_id() == db_accessor().db_.WorkerId()) { - return AddressT(db_accessor().LocalVertexAddress(address.gid())); - } - - return address; -} - -template <> -RecordAccessor::AddressT RecordAccessor::NormalizedAddress( - AddressT address) const { - if (address.is_local()) return address; - if (address.worker_id() == db_accessor().db_.WorkerId()) { - return AddressT(db_accessor().LocalEdgeAddress(address.gid())); - } - - return address; -} - template class RecordAccessor; template class RecordAccessor; diff --git a/src/storage/record_accessor.hpp b/src/storage/record_accessor.hpp index 259eec7cc..a6c21df3a 100644 --- a/src/storage/record_accessor.hpp +++ b/src/storage/record_accessor.hpp @@ -202,12 +202,6 @@ class RecordAccessor : public TotalOrdering> { * an update. */ mutable TRecord *new_{nullptr}; - - /** Returns an address that is local, if the given address is local, or if it - * is remote, but points to this worker. This method is used in the - * constructor, but the graph_db_accessor field must be initizalized when it's - * called. */ - AddressT NormalizedAddress(AddressT address) const; }; /** Error when trying to update a deleted record */ diff --git a/tests/unit/distributed_common.hpp b/tests/unit/distributed_common.hpp index 4b0de3b97..aa11a0fad 100644 --- a/tests/unit/distributed_common.hpp +++ b/tests/unit/distributed_common.hpp @@ -67,7 +67,7 @@ class DistributedGraphDbTest : public ::testing::Test { /// Inserts a vertex and returns it's global address. Does it in a new /// transaction. - auto InsertVertex(database::GraphDb &db) { + storage::VertexAddress InsertVertex(database::GraphDb &db) { database::GraphDbAccessor dba{db}; auto r_val = dba.InsertVertex().GlobalAddress(); dba.Commit(); @@ -75,8 +75,9 @@ class DistributedGraphDbTest : public ::testing::Test { } /// Inserts an edge (on the 'from' side) and returns it's global address. - auto InsertEdge(storage::VertexAddress from, storage::VertexAddress to, - const std::string &edge_type_name) { + storage::EdgeAddress InsertEdge(storage::VertexAddress from, + storage::VertexAddress to, + const std::string &edge_type_name) { database::GraphDbAccessor dba{worker(from.worker_id())}; auto from_v = dba.FindVertexChecked(from.gid(), false); auto edge_type = dba.EdgeType(edge_type_name); diff --git a/tests/unit/distributed_updates.cpp b/tests/unit/distributed_updates.cpp index 661e5dd4c..c4f997b73 100644 --- a/tests/unit/distributed_updates.cpp +++ b/tests/unit/distributed_updates.cpp @@ -1,6 +1,7 @@ #include #include "database/graph_db_accessor.hpp" +#include "distributed/remote_updates_rpc_clients.hpp" #include "distributed/remote_updates_rpc_server.hpp" #include "distributed_common.hpp" @@ -59,3 +60,187 @@ TEST_F(DistributedUpdateTest, RemoteUpdateApply) { } #undef EXPECT_LABEL + +TEST_F(DistributedGraphDbTest, CreateVertex) { + gid::Gid gid; + { + database::GraphDbAccessor dba{worker(1)}; + auto v = dba.InsertVertexIntoRemote(2, {}, {}); + gid = v.gid(); + dba.Commit(); + } + { + database::GraphDbAccessor dba{worker(2)}; + auto v = dba.FindVertex(gid, false); + ASSERT_TRUE(v); + } +} + +TEST_F(DistributedGraphDbTest, CreateVertexWithUpdate) { + gid::Gid gid; + storage::Property prop; + { + database::GraphDbAccessor dba{worker(1)}; + auto v = dba.InsertVertexIntoRemote(2, {}, {}); + gid = v.gid(); + prop = dba.Property("prop"); + v.PropsSet(prop, 42); + worker(2).remote_updates_server().Apply(dba.transaction_id()); + dba.Commit(); + } + { + database::GraphDbAccessor dba{worker(2)}; + auto v = dba.FindVertex(gid, false); + ASSERT_TRUE(v); + EXPECT_EQ(v->PropsAt(prop).Value(), 42); + } +} + +TEST_F(DistributedGraphDbTest, CreateVertexWithData) { + gid::Gid gid; + storage::Label l1; + storage::Label l2; + storage::Property prop; + { + database::GraphDbAccessor dba{worker(1)}; + l1 = dba.Label("l1"); + l2 = dba.Label("l2"); + prop = dba.Property("prop"); + auto v = dba.InsertVertexIntoRemote(2, {l1, l2}, {{prop, 42}}); + gid = v.gid(); + + // Check local visibility before commit. + EXPECT_TRUE(v.has_label(l1)); + EXPECT_TRUE(v.has_label(l2)); + EXPECT_EQ(v.PropsAt(prop).Value(), 42); + + worker(2).remote_updates_server().Apply(dba.transaction_id()); + dba.Commit(); + } + { + database::GraphDbAccessor dba{worker(2)}; + auto v = dba.FindVertex(gid, false); + ASSERT_TRUE(v); + // Check remote data after commit. + EXPECT_TRUE(v->has_label(l1)); + EXPECT_TRUE(v->has_label(l2)); + EXPECT_EQ(v->PropsAt(prop).Value(), 42); + } +} + +class DistributedEdgeCreateTest : public DistributedGraphDbTest { + protected: + storage::VertexAddress w1_a; + storage::VertexAddress w1_b; + storage::VertexAddress w2_a; + storage::EdgeAddress e_ga; + + void SetUp() override { + DistributedGraphDbTest::SetUp(); + w1_a = InsertVertex(worker(1)); + w1_b = InsertVertex(worker(1)); + w2_a = InsertVertex(worker(2)); + } + + void CreateEdge(database::GraphDb &creator, storage::VertexAddress from_addr, + storage::VertexAddress to_addr) { + database::GraphDbAccessor dba{creator}; + auto edge_type = dba.EdgeType("et"); + VertexAccessor v1{from_addr, dba}; + VertexAccessor v2{to_addr, dba}; + e_ga = dba.InsertEdge(v1, v2, edge_type).GlobalAddress(); + master().remote_updates_server().Apply(dba.transaction_id()); + worker(1).remote_updates_server().Apply(dba.transaction_id()); + worker(2).remote_updates_server().Apply(dba.transaction_id()); + dba.Commit(); + } + + int EdgeCount(database::GraphDb &db) { + database::GraphDbAccessor dba(db); + auto edges = dba.Edges(false); + return std::distance(edges.begin(), edges.end()); + }; + + void CheckCounts(int master_count, int worker1_count, int worker2_count) { + EXPECT_EQ(EdgeCount(master()), master_count); + EXPECT_EQ(EdgeCount(worker(1)), worker1_count); + EXPECT_EQ(EdgeCount(worker(2)), worker2_count); + } + + void CheckState(database::GraphDb &db, bool edge_is_local, + storage::VertexAddress from_addr, + storage::VertexAddress to_addr) { + database::GraphDbAccessor dba{db}; + + // Check edge data. + { + EdgeAccessor edge{e_ga, dba}; + EXPECT_EQ(edge.address().is_local(), edge_is_local); + EXPECT_EQ(edge.GlobalAddress(), e_ga); + auto from = edge.from(); + EXPECT_EQ(from.GlobalAddress(), from_addr); + auto to = edge.to(); + EXPECT_EQ(to.GlobalAddress(), to_addr); + } + + auto edges = [](auto iterable) { + std::vector res; + for (auto edge : iterable) res.emplace_back(edge); + return res; + }; + + // Check `from` data. + { + VertexAccessor from{from_addr, dba}; + ASSERT_EQ(edges(from.out()).size(), 1); + EXPECT_EQ(edges(from.out())[0].GlobalAddress(), e_ga); + // In case of cycles we have 1 in the `in` edges. + EXPECT_EQ(edges(from.in()).size(), from_addr == to_addr); + } + + // Check `to` data. + { + VertexAccessor to{to_addr, dba}; + // In case of cycles we have 1 in the `out` edges. + EXPECT_EQ(edges(to.out()).size(), from_addr == to_addr); + ASSERT_EQ(edges(to.in()).size(), 1); + EXPECT_EQ(edges(to.in())[0].GlobalAddress(), e_ga); + } + } + + void CheckAll(storage::VertexAddress from_addr, + storage::VertexAddress to_addr) { + int edge_worker = from_addr.worker_id(); + CheckCounts(edge_worker == 0, edge_worker == 1, edge_worker == 2); + CheckState(master(), edge_worker == 0, from_addr, to_addr); + CheckState(worker(1), edge_worker == 1, from_addr, to_addr); + CheckState(worker(2), edge_worker == 2, from_addr, to_addr); + } + + void TearDown() override { DistributedGraphDbTest::TearDown(); } +}; + +TEST_F(DistributedEdgeCreateTest, LocalRemote) { + CreateEdge(worker(1), w1_a, w2_a); + CheckAll(w1_a, w2_a); +} + +TEST_F(DistributedEdgeCreateTest, RemoteLocal) { + CreateEdge(worker(2), w1_a, w2_a); + CheckAll(w1_a, w2_a); +} + +TEST_F(DistributedEdgeCreateTest, RemoteRemoteDifferentWorkers) { + CreateEdge(master(), w1_a, w2_a); + CheckAll(w1_a, w2_a); +} + +TEST_F(DistributedEdgeCreateTest, RemoteRemoteSameWorkers) { + CreateEdge(master(), w1_a, w1_b); + CheckAll(w1_a, w1_b); +} + +TEST_F(DistributedEdgeCreateTest, RemoteRemoteCycle) { + CreateEdge(master(), w1_a, w1_a); + CheckAll(w1_a, w1_a); +}