diff --git a/src/database/distributed/graph_db_accessor.cpp b/src/database/distributed/graph_db_accessor.cpp index cb05b7a23..04e3b9734 100644 --- a/src/database/distributed/graph_db_accessor.cpp +++ b/src/database/distributed/graph_db_accessor.cpp @@ -96,13 +96,17 @@ VertexAccessor GraphDbAccessor::InsertVertex( std::experimental::optional GraphDbAccessor::FindVertexOptional( gid::Gid gid, bool current_state) { - VertexAccessor record_accessor( - storage::VertexAddress(db_.storage().LocalAddress(gid)), *this); + auto record_accessor = FindVertexRaw(gid); if (!record_accessor.Visible(transaction(), current_state)) return std::experimental::nullopt; return record_accessor; } +VertexAccessor GraphDbAccessor::FindVertexRaw(gid::Gid gid) { + return VertexAccessor( + storage::VertexAddress(db_.storage().LocalAddress(gid)), *this); +} + VertexAccessor GraphDbAccessor::FindVertex(gid::Gid gid, bool current_state) { auto found = FindVertexOptional(gid, current_state); CHECK(found) << "Unable to find vertex for id: " << gid; @@ -111,13 +115,17 @@ VertexAccessor GraphDbAccessor::FindVertex(gid::Gid gid, bool current_state) { std::experimental::optional GraphDbAccessor::FindEdgeOptional( gid::Gid gid, bool current_state) { - EdgeAccessor record_accessor( - storage::EdgeAddress(db_.storage().LocalAddress(gid)), *this); + auto record_accessor = FindEdgeRaw(gid); if (!record_accessor.Visible(transaction(), current_state)) return std::experimental::nullopt; return record_accessor; } +EdgeAccessor GraphDbAccessor::FindEdgeRaw(gid::Gid gid) { + return EdgeAccessor( + storage::EdgeAddress(db_.storage().LocalAddress(gid)), *this); +} + EdgeAccessor GraphDbAccessor::FindEdge(gid::Gid gid, bool current_state) { auto found = FindEdgeOptional(gid, current_state); CHECK(found) << "Unable to find edge for id: " << gid; diff --git a/src/database/distributed/graph_db_accessor.hpp b/src/database/distributed/graph_db_accessor.hpp index f5e0dc5a9..d304d015b 100644 --- a/src/database/distributed/graph_db_accessor.hpp +++ b/src/database/distributed/graph_db_accessor.hpp @@ -144,6 +144,12 @@ class GraphDbAccessor { std::experimental::optional FindVertexOptional( gid::Gid gid, bool current_state); + /** + * Obtains the vertex accessor for given id without checking if the + * vertex is visible. + */ + VertexAccessor FindVertexRaw(gid::Gid gid); + /** * Obtains the vertex for the given ID. If there is no vertex for the given * ID, or it's not visible to this accessor's transaction, MG is crashed @@ -375,6 +381,12 @@ class GraphDbAccessor { std::experimental::optional FindEdgeOptional( gid::Gid gid, bool current_state); + /** + * Obtains the edge accessor for the given id without checking if the edge + * is visible. + */ + EdgeAccessor FindEdgeRaw(gid::Gid gid); + /** * Obtains the edge for the given ID. If there is no edge for the given * ID, or it's not visible to this accessor's transaction, MG is crashed diff --git a/src/distributed/data_manager.hpp b/src/distributed/data_manager.hpp index 19829b58b..6b7293070 100644 --- a/src/distributed/data_manager.hpp +++ b/src/distributed/data_manager.hpp @@ -62,6 +62,7 @@ class DataManager { /// from the given transaction's ID and command ID, and caches it. Sets the /// given pointers to point to the fetched data. Analogue to /// mvcc::VersionList::find_set_old_new. + // TODO (vkasljevic) remove this and use Find instead template void FindSetOldNew(tx::TransactionId tx_id, int worker_id, gid::Gid gid, TRecord **old_record, TRecord **new_record) { @@ -79,7 +80,8 @@ class DataManager { } auto remote = data_clients_.RemoteElement(worker_id, tx_id, gid); - LocalizeAddresses(*remote.record_ptr); + if (remote.old_record_ptr) LocalizeAddresses(*remote.old_record_ptr); + if (remote.new_record_ptr) LocalizeAddresses(*remote.new_record_ptr); // This logic is a bit strange because we need to make sure that someone // else didn't get a response and updated the cache before we did and we @@ -88,9 +90,9 @@ class DataManager { // FindSetOldNew std::lock_guard guard(lock); auto it_pair = cache.emplace( - std::move(gid), - CachedRecordData(remote.cypher_id, - std::move(remote.record_ptr), nullptr)); + std::move(gid), CachedRecordData( + remote.cypher_id, std::move(remote.old_record_ptr), + std::move(remote.new_record_ptr))); *old_record = it_pair.first->second.old_record.get(); *new_record = it_pair.first->second.new_record.get(); @@ -110,12 +112,14 @@ class DataManager { } else { guard.unlock(); auto remote = data_clients_.RemoteElement(worker_id, tx_id, gid); - LocalizeAddresses(*remote.record_ptr); + if (remote.old_record_ptr) LocalizeAddresses(*remote.old_record_ptr); + if (remote.new_record_ptr) LocalizeAddresses(*remote.new_record_ptr); guard.lock(); return cache .emplace(std::move(gid), - CachedRecordData( - remote.cypher_id, std::move(remote.record_ptr), nullptr)) + CachedRecordData(remote.cypher_id, + std::move(remote.old_record_ptr), + std::move(remote.new_record_ptr))) .first->second; } } diff --git a/src/distributed/data_rpc_clients.cpp b/src/distributed/data_rpc_clients.cpp index 925f0f449..e1a831c6b 100644 --- a/src/distributed/data_rpc_clients.cpp +++ b/src/distributed/data_rpc_clients.cpp @@ -12,20 +12,22 @@ template <> RemoteElementInfo DataRpcClients::RemoteElement(int worker_id, tx::TransactionId tx_id, gid::Gid gid) { - auto response = - coordination_->GetClientPool(worker_id)->Call(TxGidPair{tx_id, gid}); + auto response = coordination_->GetClientPool(worker_id)->Call( + TxGidPair{tx_id, gid}); return RemoteElementInfo(response.cypher_id, - std::move(response.edge_output)); + std::move(response.edge_old_output), + std::move(response.edge_new_output)); } template <> RemoteElementInfo DataRpcClients::RemoteElement(int worker_id, tx::TransactionId tx_id, gid::Gid gid) { - auto response = - coordination_->GetClientPool(worker_id)->Call(TxGidPair{tx_id, gid}); + auto response = coordination_->GetClientPool(worker_id)->Call( + TxGidPair{tx_id, gid}); return RemoteElementInfo(response.cypher_id, - std::move(response.vertex_output)); + std::move(response.vertex_old_output), + std::move(response.vertex_new_output)); } std::unordered_map DataRpcClients::VertexCounts( diff --git a/src/distributed/data_rpc_clients.hpp b/src/distributed/data_rpc_clients.hpp index d7674dae2..eeec356c7 100644 --- a/src/distributed/data_rpc_clients.hpp +++ b/src/distributed/data_rpc_clients.hpp @@ -25,11 +25,15 @@ struct RemoteElementInfo { RemoteElementInfo &operator=(const RemoteElementInfo &) = delete; RemoteElementInfo &operator=(RemoteElementInfo &&) = delete; - RemoteElementInfo(int64_t cypher_id, std::unique_ptr record_ptr) - : cypher_id(cypher_id), record_ptr(std::move(record_ptr)) {} + RemoteElementInfo(int64_t cypher_id, std::unique_ptr old_record_ptr, + std::unique_ptr new_record_ptr) + : cypher_id(cypher_id), + old_record_ptr(std::move(old_record_ptr)), + new_record_ptr(std::move(new_record_ptr)) {} int64_t cypher_id; - std::unique_ptr record_ptr; + std::unique_ptr old_record_ptr; + std::unique_ptr new_record_ptr; }; /// Provides access to other worker's data. diff --git a/src/distributed/data_rpc_messages.lcp b/src/distributed/data_rpc_messages.lcp index fbaaf27e9..593e3815a 100644 --- a/src/distributed/data_rpc_messages.lcp +++ b/src/distributed/data_rpc_messages.lcp @@ -29,65 +29,175 @@ cpp<# (:request ((member "TxGidPair"))) (:response ((cypher-id :int64_t) - (vertex-input "const Vertex *" + (vertex-old-input "const Vertex *" :capnp-type "Storage.Vertex" + :capnp-init nil :capnp-save (lambda (builder member capnp-name) (declare (ignore capnp-name)) #>cpp - storage::SaveVertex(*${member}, &${builder}, self.worker_id); + if (${member}) { + auto my_builder = ${builder}->initVertexOldInput(); + storage::SaveVertex(*${member}, &my_builder, self.worker_id); + } cpp<#) :slk-save (lambda (member) #>cpp - slk::Save(*self.${member}, builder, self.worker_id); + bool has_ptr = self.${member}; + slk::Save(has_ptr, builder); + if (has_ptr) { + slk::Save(*self.${member}, builder, self.worker_id); + } cpp<#) :capnp-load (lambda (reader member capnp-name) (declare (ignore member capnp-name)) #>cpp - self->vertex_output = storage::LoadVertex(${reader}); + if (${reader}.hasVertexOldInput()) { + auto my_reader = ${reader}.getVertexOldInput(); + self->vertex_old_output = storage::LoadVertex(my_reader); + } cpp<#) :slk-load (lambda (member) (declare (ignore member)) #>cpp - self->vertex_output = std::make_unique(); - slk::Load(self->vertex_output.get(), reader); + bool has_ptr; + slk::Load(&has_ptr, reader); + if (has_ptr) { + self->vertex_old_output = std::make_unique(); + slk::Load(self->vertex_old_output.get(), reader); + } + cpp<#)) + (vertex-new-input "const Vertex *" + :capnp-type "Storage.Vertex" + :capnp-init nil + :capnp-save + (lambda (builder member capnp-name) + (declare (ignore capnp-name)) + #>cpp + if (${member}) { + auto my_builder = ${builder}->initVertexNewInput(); + storage::SaveVertex(*${member}, &my_builder, self.worker_id); + } + cpp<#) + :slk-save + (lambda (member) + #>cpp + bool has_ptr = self.${member}; + slk::Save(has_ptr, builder); + if (has_ptr) { + slk::Save(*self.${member}, builder, self.worker_id); + } + cpp<#) + :capnp-load + (lambda (reader member capnp-name) + (declare (ignore member capnp-name)) + #>cpp + if (${reader}.hasVertexNewInput()) { + auto my_reader = ${reader}.getVertexNewInput(); + self->vertex_new_output = storage::LoadVertex(my_reader); + } + cpp<#) + :slk-load + (lambda (member) + (declare (ignore member)) + #>cpp + bool has_ptr; + slk::Load(&has_ptr, reader); + if (has_ptr) { + self->vertex_new_output = std::make_unique(); + slk::Load(self->vertex_new_output.get(), reader); + } cpp<#)) (worker-id :int64_t :dont-save t) - (vertex-output "std::unique_ptr" :initarg nil :dont-save t)))) + (vertex-old-output "std::unique_ptr" :initarg nil :dont-save t) + (vertex-new-output "std::unique_ptr" :initarg nil :dont-save t)))) (lcp:define-rpc edge (:request ((member "TxGidPair"))) (:response ((cypher-id :int64_t) - (edge-input "const Edge *" + (edge-old-input "const Edge *" :capnp-type "Storage.Edge" + :capnp-init nil :capnp-save (lambda (builder member capnp-name) (declare (ignore capnp-name)) #>cpp - storage::SaveEdge(*${member}, &${builder}, self.worker_id); + if (${member}) { + auto my_builder = ${builder}->initEdgeOldInput(); + storage::SaveEdge(*${member}, &my_builder, self.worker_id); + } cpp<#) :slk-save (lambda (member) #>cpp - slk::Save(*self.${member}, builder, self.worker_id); + bool has_ptr = self.${member}; + slk::Save(has_ptr, builder); + if (has_ptr) { + slk::Save(*self.${member}, builder, self.worker_id); + } cpp<#) :capnp-load (lambda (reader member capnp-name) (declare (ignore member capnp-name)) #>cpp - self->edge_output = storage::LoadEdge(${reader}); + if (${reader}.hasEdgeOldInput()) { + auto my_reader = ${reader}.getEdgeOldInput(); + self->edge_old_output = storage::LoadEdge(my_reader); + } cpp<#) :slk-load (lambda (member) #>cpp - slk::Load(&self->edge_output, reader); + // slk::Load will read a bool which was explicity + // saved in :slk::save and based on that read record + // data + slk::Load(&self->edge_old_output, reader); + cpp<#)) + (edge-new-input "const Edge *" + :capnp-type "Storage.Edge" + :capnp-init nil + :capnp-save + (lambda (builder member capnp-name) + (declare (ignore capnp-name)) + #>cpp + if (${member}) { + auto my_builder = ${builder}->initEdgeNewInput(); + storage::SaveEdge(*${member}, &my_builder, self.worker_id); + } + cpp<#) + :slk-save + (lambda (member) + #>cpp + bool has_ptr = self.${member}; + slk::Save(has_ptr, builder); + if (has_ptr) { + slk::Save(*self.${member}, builder, self.worker_id); + } + cpp<#) + :capnp-load + (lambda (reader member capnp-name) + (declare (ignore member capnp-name)) + #>cpp + if (${reader}.hasEdgeNewInput()) { + auto my_reader = ${reader}.getEdgeNewInput(); + self->edge_new_output = storage::LoadEdge(my_reader); + } + cpp<#) + :slk-load + (lambda (member) + #>cpp + // slk::Load will read a bool which was explicity + // saved in :slk::save and based on that read record + // data + slk::Load(&self->edge_new_output, reader); cpp<#)) (worker-id :int64_t :dont-save t) - (edge-output "std::unique_ptr" :initarg nil :dont-save t)))) + (edge-old-output "std::unique_ptr" :initarg nil :dont-save t) + (edge-new-output "std::unique_ptr" :initarg nil :dont-save t)))) (lcp:define-rpc vertex-count (:request ((member "tx::TransactionId" :capnp-type "UInt64"))) diff --git a/src/distributed/data_rpc_server.cpp b/src/distributed/data_rpc_server.cpp index a70fb8e0e..55b6860b6 100644 --- a/src/distributed/data_rpc_server.cpp +++ b/src/distributed/data_rpc_server.cpp @@ -14,21 +14,20 @@ DataRpcServer::DataRpcServer(database::GraphDb *db, coordination->Register( [this](const auto &req_reader, auto *res_builder) { auto dba = db_->Access(req_reader.getMember().getTxId()); - auto vertex = dba->FindVertex(req_reader.getMember().getGid(), false); - CHECK(vertex.GetOld()) - << "Old record must exist when sending vertex by RPC"; - VertexRes response(vertex.CypherId(), vertex.GetOld(), db_->WorkerId()); + auto vertex = dba->FindVertexRaw(req_reader.getMember().getGid()); + VertexRes response(vertex.CypherId(), vertex.GetOld(), + vertex.GetNew(), db_->WorkerId()); Save(response, res_builder); }); - coordination->Register([this](const auto &req_reader, - auto *res_builder) { - auto dba = db_->Access(req_reader.getMember().getTxId()); - auto edge = dba->FindEdge(req_reader.getMember().getGid(), false); - CHECK(edge.GetOld()) << "Old record must exist when sending edge by RPC"; - EdgeRes response(edge.CypherId(), edge.GetOld(), db_->WorkerId()); - Save(response, res_builder); - }); + coordination->Register( + [this](const auto &req_reader, auto *res_builder) { + auto dba = db_->Access(req_reader.getMember().getTxId()); + auto edge = dba->FindEdgeRaw(req_reader.getMember().getGid()); + EdgeRes response(edge.CypherId(), edge.GetOld(), + edge.GetNew(), db_->WorkerId()); + Save(response, res_builder); + }); coordination->Register( [this](const auto &req_reader, auto *res_builder) {