Update DataRpcClient and DataRpcServer

Summary: `DataRpcClient` and `DataRpcServer` now work with both old and new record. This will be needed later when I replace `Cache` with `LruCache`.

Reviewers: msantl, teon.banek, ipaljak

Reviewed By: msantl, teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1875
This commit is contained in:
Vinko Kasljevic 2019-02-15 15:39:27 +01:00
parent a50abcab99
commit f685c08b7b
7 changed files with 184 additions and 45 deletions

View File

@ -96,13 +96,17 @@ VertexAccessor GraphDbAccessor::InsertVertex(
std::experimental::optional<VertexAccessor> GraphDbAccessor::FindVertexOptional(
gid::Gid gid, bool current_state) {
VertexAccessor record_accessor(
storage::VertexAddress(db_.storage().LocalAddress<Vertex>(gid)), *this);
auto record_accessor = FindVertexRaw(gid);
if (!record_accessor.Visible(transaction(), current_state))
return std::experimental::nullopt;
return record_accessor;
}
VertexAccessor GraphDbAccessor::FindVertexRaw(gid::Gid gid) {
return VertexAccessor(
storage::VertexAddress(db_.storage().LocalAddress<Vertex>(gid)), *this);
}
VertexAccessor GraphDbAccessor::FindVertex(gid::Gid gid, bool current_state) {
auto found = FindVertexOptional(gid, current_state);
CHECK(found) << "Unable to find vertex for id: " << gid;
@ -111,13 +115,17 @@ VertexAccessor GraphDbAccessor::FindVertex(gid::Gid gid, bool current_state) {
std::experimental::optional<EdgeAccessor> GraphDbAccessor::FindEdgeOptional(
gid::Gid gid, bool current_state) {
EdgeAccessor record_accessor(
storage::EdgeAddress(db_.storage().LocalAddress<Edge>(gid)), *this);
auto record_accessor = FindEdgeRaw(gid);
if (!record_accessor.Visible(transaction(), current_state))
return std::experimental::nullopt;
return record_accessor;
}
EdgeAccessor GraphDbAccessor::FindEdgeRaw(gid::Gid gid) {
return EdgeAccessor(
storage::EdgeAddress(db_.storage().LocalAddress<Edge>(gid)), *this);
}
EdgeAccessor GraphDbAccessor::FindEdge(gid::Gid gid, bool current_state) {
auto found = FindEdgeOptional(gid, current_state);
CHECK(found) << "Unable to find edge for id: " << gid;

View File

@ -144,6 +144,12 @@ class GraphDbAccessor {
std::experimental::optional<VertexAccessor> FindVertexOptional(
gid::Gid gid, bool current_state);
/**
* Obtains the vertex accessor for given id without checking if the
* vertex is visible.
*/
VertexAccessor FindVertexRaw(gid::Gid gid);
/**
* Obtains the vertex for the given ID. If there is no vertex for the given
* ID, or it's not visible to this accessor's transaction, MG is crashed
@ -375,6 +381,12 @@ class GraphDbAccessor {
std::experimental::optional<EdgeAccessor> FindEdgeOptional(
gid::Gid gid, bool current_state);
/**
* Obtains the edge accessor for the given id without checking if the edge
* is visible.
*/
EdgeAccessor FindEdgeRaw(gid::Gid gid);
/**
* Obtains the edge for the given ID. If there is no edge for the given
* ID, or it's not visible to this accessor's transaction, MG is crashed

View File

@ -62,6 +62,7 @@ class DataManager {
/// from the given transaction's ID and command ID, and caches it. Sets the
/// given pointers to point to the fetched data. Analogue to
/// mvcc::VersionList::find_set_old_new.
// TODO (vkasljevic) remove this and use Find instead
template <typename TRecord>
void FindSetOldNew(tx::TransactionId tx_id, int worker_id, gid::Gid gid,
TRecord **old_record, TRecord **new_record) {
@ -79,7 +80,8 @@ class DataManager {
}
auto remote = data_clients_.RemoteElement<TRecord>(worker_id, tx_id, gid);
LocalizeAddresses(*remote.record_ptr);
if (remote.old_record_ptr) LocalizeAddresses(*remote.old_record_ptr);
if (remote.new_record_ptr) LocalizeAddresses(*remote.new_record_ptr);
// This logic is a bit strange because we need to make sure that someone
// else didn't get a response and updated the cache before we did and we
@ -88,9 +90,9 @@ class DataManager {
// FindSetOldNew
std::lock_guard<std::mutex> guard(lock);
auto it_pair = cache.emplace(
std::move(gid),
CachedRecordData<TRecord>(remote.cypher_id,
std::move(remote.record_ptr), nullptr));
std::move(gid), CachedRecordData<TRecord>(
remote.cypher_id, std::move(remote.old_record_ptr),
std::move(remote.new_record_ptr)));
*old_record = it_pair.first->second.old_record.get();
*new_record = it_pair.first->second.new_record.get();
@ -110,12 +112,14 @@ class DataManager {
} else {
guard.unlock();
auto remote = data_clients_.RemoteElement<TRecord>(worker_id, tx_id, gid);
LocalizeAddresses(*remote.record_ptr);
if (remote.old_record_ptr) LocalizeAddresses(*remote.old_record_ptr);
if (remote.new_record_ptr) LocalizeAddresses(*remote.new_record_ptr);
guard.lock();
return cache
.emplace(std::move(gid),
CachedRecordData<TRecord>(
remote.cypher_id, std::move(remote.record_ptr), nullptr))
CachedRecordData<TRecord>(remote.cypher_id,
std::move(remote.old_record_ptr),
std::move(remote.new_record_ptr)))
.first->second;
}
}

View File

@ -12,20 +12,22 @@ template <>
RemoteElementInfo<Edge> DataRpcClients::RemoteElement(int worker_id,
tx::TransactionId tx_id,
gid::Gid gid) {
auto response =
coordination_->GetClientPool(worker_id)->Call<EdgeRpc>(TxGidPair{tx_id, gid});
auto response = coordination_->GetClientPool(worker_id)->Call<EdgeRpc>(
TxGidPair{tx_id, gid});
return RemoteElementInfo<Edge>(response.cypher_id,
std::move(response.edge_output));
std::move(response.edge_old_output),
std::move(response.edge_new_output));
}
template <>
RemoteElementInfo<Vertex> DataRpcClients::RemoteElement(int worker_id,
tx::TransactionId tx_id,
gid::Gid gid) {
auto response =
coordination_->GetClientPool(worker_id)->Call<VertexRpc>(TxGidPair{tx_id, gid});
auto response = coordination_->GetClientPool(worker_id)->Call<VertexRpc>(
TxGidPair{tx_id, gid});
return RemoteElementInfo<Vertex>(response.cypher_id,
std::move(response.vertex_output));
std::move(response.vertex_old_output),
std::move(response.vertex_new_output));
}
std::unordered_map<int, int64_t> DataRpcClients::VertexCounts(

View File

@ -25,11 +25,15 @@ struct RemoteElementInfo {
RemoteElementInfo &operator=(const RemoteElementInfo &) = delete;
RemoteElementInfo &operator=(RemoteElementInfo &&) = delete;
RemoteElementInfo(int64_t cypher_id, std::unique_ptr<TRecord> record_ptr)
: cypher_id(cypher_id), record_ptr(std::move(record_ptr)) {}
RemoteElementInfo(int64_t cypher_id, std::unique_ptr<TRecord> old_record_ptr,
std::unique_ptr<TRecord> new_record_ptr)
: cypher_id(cypher_id),
old_record_ptr(std::move(old_record_ptr)),
new_record_ptr(std::move(new_record_ptr)) {}
int64_t cypher_id;
std::unique_ptr<TRecord> record_ptr;
std::unique_ptr<TRecord> old_record_ptr;
std::unique_ptr<TRecord> new_record_ptr;
};
/// Provides access to other worker's data.

View File

@ -29,65 +29,175 @@ cpp<#
(:request ((member "TxGidPair")))
(:response
((cypher-id :int64_t)
(vertex-input "const Vertex *"
(vertex-old-input "const Vertex *"
:capnp-type "Storage.Vertex"
:capnp-init nil
:capnp-save
(lambda (builder member capnp-name)
(declare (ignore capnp-name))
#>cpp
storage::SaveVertex(*${member}, &${builder}, self.worker_id);
if (${member}) {
auto my_builder = ${builder}->initVertexOldInput();
storage::SaveVertex(*${member}, &my_builder, self.worker_id);
}
cpp<#)
:slk-save
(lambda (member)
#>cpp
slk::Save(*self.${member}, builder, self.worker_id);
bool has_ptr = self.${member};
slk::Save(has_ptr, builder);
if (has_ptr) {
slk::Save(*self.${member}, builder, self.worker_id);
}
cpp<#)
:capnp-load
(lambda (reader member capnp-name)
(declare (ignore member capnp-name))
#>cpp
self->vertex_output = storage::LoadVertex(${reader});
if (${reader}.hasVertexOldInput()) {
auto my_reader = ${reader}.getVertexOldInput();
self->vertex_old_output = storage::LoadVertex(my_reader);
}
cpp<#)
:slk-load
(lambda (member)
(declare (ignore member))
#>cpp
self->vertex_output = std::make_unique<Vertex>();
slk::Load(self->vertex_output.get(), reader);
bool has_ptr;
slk::Load(&has_ptr, reader);
if (has_ptr) {
self->vertex_old_output = std::make_unique<Vertex>();
slk::Load(self->vertex_old_output.get(), reader);
}
cpp<#))
(vertex-new-input "const Vertex *"
:capnp-type "Storage.Vertex"
:capnp-init nil
:capnp-save
(lambda (builder member capnp-name)
(declare (ignore capnp-name))
#>cpp
if (${member}) {
auto my_builder = ${builder}->initVertexNewInput();
storage::SaveVertex(*${member}, &my_builder, self.worker_id);
}
cpp<#)
:slk-save
(lambda (member)
#>cpp
bool has_ptr = self.${member};
slk::Save(has_ptr, builder);
if (has_ptr) {
slk::Save(*self.${member}, builder, self.worker_id);
}
cpp<#)
:capnp-load
(lambda (reader member capnp-name)
(declare (ignore member capnp-name))
#>cpp
if (${reader}.hasVertexNewInput()) {
auto my_reader = ${reader}.getVertexNewInput();
self->vertex_new_output = storage::LoadVertex(my_reader);
}
cpp<#)
:slk-load
(lambda (member)
(declare (ignore member))
#>cpp
bool has_ptr;
slk::Load(&has_ptr, reader);
if (has_ptr) {
self->vertex_new_output = std::make_unique<Vertex>();
slk::Load(self->vertex_new_output.get(), reader);
}
cpp<#))
(worker-id :int64_t :dont-save t)
(vertex-output "std::unique_ptr<Vertex>" :initarg nil :dont-save t))))
(vertex-old-output "std::unique_ptr<Vertex>" :initarg nil :dont-save t)
(vertex-new-output "std::unique_ptr<Vertex>" :initarg nil :dont-save t))))
(lcp:define-rpc edge
(:request ((member "TxGidPair")))
(:response
((cypher-id :int64_t)
(edge-input "const Edge *"
(edge-old-input "const Edge *"
:capnp-type "Storage.Edge"
:capnp-init nil
:capnp-save
(lambda (builder member capnp-name)
(declare (ignore capnp-name))
#>cpp
storage::SaveEdge(*${member}, &${builder}, self.worker_id);
if (${member}) {
auto my_builder = ${builder}->initEdgeOldInput();
storage::SaveEdge(*${member}, &my_builder, self.worker_id);
}
cpp<#)
:slk-save
(lambda (member)
#>cpp
slk::Save(*self.${member}, builder, self.worker_id);
bool has_ptr = self.${member};
slk::Save(has_ptr, builder);
if (has_ptr) {
slk::Save(*self.${member}, builder, self.worker_id);
}
cpp<#)
:capnp-load
(lambda (reader member capnp-name)
(declare (ignore member capnp-name))
#>cpp
self->edge_output = storage::LoadEdge(${reader});
if (${reader}.hasEdgeOldInput()) {
auto my_reader = ${reader}.getEdgeOldInput();
self->edge_old_output = storage::LoadEdge(my_reader);
}
cpp<#)
:slk-load
(lambda (member)
#>cpp
slk::Load(&self->edge_output, reader);
// slk::Load will read a bool which was explicity
// saved in :slk::save and based on that read record
// data
slk::Load(&self->edge_old_output, reader);
cpp<#))
(edge-new-input "const Edge *"
:capnp-type "Storage.Edge"
:capnp-init nil
:capnp-save
(lambda (builder member capnp-name)
(declare (ignore capnp-name))
#>cpp
if (${member}) {
auto my_builder = ${builder}->initEdgeNewInput();
storage::SaveEdge(*${member}, &my_builder, self.worker_id);
}
cpp<#)
:slk-save
(lambda (member)
#>cpp
bool has_ptr = self.${member};
slk::Save(has_ptr, builder);
if (has_ptr) {
slk::Save(*self.${member}, builder, self.worker_id);
}
cpp<#)
:capnp-load
(lambda (reader member capnp-name)
(declare (ignore member capnp-name))
#>cpp
if (${reader}.hasEdgeNewInput()) {
auto my_reader = ${reader}.getEdgeNewInput();
self->edge_new_output = storage::LoadEdge(my_reader);
}
cpp<#)
:slk-load
(lambda (member)
#>cpp
// slk::Load will read a bool which was explicity
// saved in :slk::save and based on that read record
// data
slk::Load(&self->edge_new_output, reader);
cpp<#))
(worker-id :int64_t :dont-save t)
(edge-output "std::unique_ptr<Edge>" :initarg nil :dont-save t))))
(edge-old-output "std::unique_ptr<Edge>" :initarg nil :dont-save t)
(edge-new-output "std::unique_ptr<Edge>" :initarg nil :dont-save t))))
(lcp:define-rpc vertex-count
(:request ((member "tx::TransactionId" :capnp-type "UInt64")))

View File

@ -14,21 +14,20 @@ DataRpcServer::DataRpcServer(database::GraphDb *db,
coordination->Register<VertexRpc>(
[this](const auto &req_reader, auto *res_builder) {
auto dba = db_->Access(req_reader.getMember().getTxId());
auto vertex = dba->FindVertex(req_reader.getMember().getGid(), false);
CHECK(vertex.GetOld())
<< "Old record must exist when sending vertex by RPC";
VertexRes response(vertex.CypherId(), vertex.GetOld(), db_->WorkerId());
auto vertex = dba->FindVertexRaw(req_reader.getMember().getGid());
VertexRes response(vertex.CypherId(), vertex.GetOld(),
vertex.GetNew(), db_->WorkerId());
Save(response, res_builder);
});
coordination->Register<EdgeRpc>([this](const auto &req_reader,
auto *res_builder) {
auto dba = db_->Access(req_reader.getMember().getTxId());
auto edge = dba->FindEdge(req_reader.getMember().getGid(), false);
CHECK(edge.GetOld()) << "Old record must exist when sending edge by RPC";
EdgeRes response(edge.CypherId(), edge.GetOld(), db_->WorkerId());
Save(response, res_builder);
});
coordination->Register<EdgeRpc>(
[this](const auto &req_reader, auto *res_builder) {
auto dba = db_->Access(req_reader.getMember().getTxId());
auto edge = dba->FindEdgeRaw(req_reader.getMember().getGid());
EdgeRes response(edge.CypherId(), edge.GetOld(),
edge.GetNew(), db_->WorkerId());
Save(response, res_builder);
});
coordination->Register<VertexCountRpc>(
[this](const auto &req_reader, auto *res_builder) {