Fix worker id in accessor serialization

Summary:
Previous version of code was wrong in some cases. Example:

There's an edge e = (a)->(b). Node (a) belongs to worker 1, node (b) belongs to worker 0. Therefore, edge e belongs to worker 1.

When edge e was serialized on worker 0, address of node b was local and it would be globalized, but the worker id of edge (e) would be used, which is wrong.

Reviewers: teon.banek

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1599
This commit is contained in:
Marin Tomic 2018-09-13 15:30:20 +02:00
parent 06b3240f9e
commit 78ee3ed37e
7 changed files with 33 additions and 29 deletions

View File

@ -96,8 +96,8 @@ cpp<#
:capnp-save (lcp:capnp-save-optional
"storage::capnp::VertexAccessor"
"VertexAccessor"
"[](auto *builder, const auto &vertex) {
storage::SaveVertexAccessor(vertex, builder, storage::SendVersions::BOTH);
"[worker_id](auto *builder, const auto &vertex) {
storage::SaveVertexAccessor(vertex, builder, storage::SendVersions::BOTH, worker_id);
}")
:capnp-load (lcp:capnp-load-optional
"storage::capnp::VertexAccessor"
@ -105,8 +105,10 @@ cpp<#
"[dba, data_manager](const auto &reader) {
return storage::LoadVertexAccessor(reader, dba, data_manager);
}")))
(:serialize :capnp :load-args '((dba "database::GraphDbAccessor *")
(data-manager "distributed::DataManager *")))))
(:serialize :capnp
:save-args '((worker-id :int))
:load-args '((dba "database::GraphDbAccessor *")
(data-manager "distributed::DataManager *")))))
(lcp:define-rpc set-source
(:request
@ -151,8 +153,8 @@ cpp<#
((edges "std::vector<EdgeAccessor>" :capnp-type "List(Storage.EdgeAccessor)"
:capnp-save (lcp:capnp-save-vector "storage::capnp::EdgeAccessor"
"EdgeAccessor"
"[](auto *builder, const auto &edge) {
storage::SaveEdgeAccessor(edge, builder, storage::SendVersions::BOTH);
"[worker_id](auto *builder, const auto &edge) {
storage::SaveEdgeAccessor(edge, builder, storage::SendVersions::BOTH, worker_id);
}")
:capnp-load (lcp:capnp-load-vector "storage::capnp::EdgeAccessor"
"EdgeAccessor"
@ -165,8 +167,10 @@ cpp<#
(next-edge "std::experimental::optional<storage::EdgeAddress>"
:capnp-save (lcp:capnp-save-optional "storage::capnp::Address" "storage::EdgeAddress")
:capnp-load (lcp:capnp-load-optional "storage::capnp::Address" "storage::EdgeAddress")))
(:serialize :capnp :load-args '((dba "database::GraphDbAccessor *")
(data-manager "distributed::DataManager *")))
(:serialize :capnp
:save-args '((worker-id :int))
:load-args '((dba "database::GraphDbAccessor *")
(data-manager "distributed::DataManager *")))
(:public
#>cpp
using Capnp = capnp::ReconstructPathRes;

View File

@ -81,7 +81,7 @@ class BfsRpcServer {
req.Load(req_reader);
auto vertex = subcursor_storage_->Get(req.member)->Pull();
SubcursorPullRes res(vertex);
res.Save(res_builder);
res.Save(res_builder, db_->WorkerId());
});
server_->Register<ExpandToRemoteVertexRpc>(
@ -108,7 +108,7 @@ class BfsRpcServer {
LOG(FATAL) << "`edge` or `vertex` should be set in ReconstructPathReq";
}
ReconstructPathRes res(result.edges, result.next_vertex, result.next_edge);
res.Save(res_builder);
res.Save(res_builder, db_->WorkerId());
});
server_->Register<PrepareForExpandRpc>([this](const auto &req_reader,

View File

@ -80,7 +80,7 @@ the relevant parts of the response, ready for use."))
for (size_t val_i = 0; val_i < frame.size(); ++val_i) {
const auto &value = frame[val_i];
auto value_builder = frame_builder[val_i];
query::SaveCapnpTypedValue(value, &value_builder, send_versions);
query::SaveCapnpTypedValue(value, &value_builder, send_versions, worker_id);
}
}
cpp<#)
@ -93,8 +93,7 @@ the relevant parts of the response, ready for use."))
current_frame.reserve(frame_reader.size());
for (const auto &value_reader : frame_reader) {
query::TypedValue value;
query::LoadCapnpTypedValue(value_reader, &value, dba,
data_manager);
query::LoadCapnpTypedValue(value_reader, &value, dba, data_manager);
current_frame.emplace_back(value);
}
${member}.emplace_back(current_frame);

View File

@ -6,7 +6,7 @@ namespace query {
void SaveCapnpTypedValue(const TypedValue &value,
capnp::TypedValue::Builder *builder,
storage::SendVersions versions) {
storage::SendVersions versions, int worker_id) {
switch (value.type()) {
case TypedValue::Type::Null:
builder->setNullType();
@ -28,7 +28,7 @@ void SaveCapnpTypedValue(const TypedValue &value,
auto list_builder = builder->initList(values.size());
for (size_t i = 0; i < values.size(); ++i) {
auto value_builder = list_builder[i];
SaveCapnpTypedValue(values[i], &value_builder, versions);
SaveCapnpTypedValue(values[i], &value_builder, versions, worker_id);
}
return;
}
@ -40,7 +40,7 @@ void SaveCapnpTypedValue(const TypedValue &value,
auto kv_builder = map_builder[i];
kv_builder.setKey(kv.first);
auto value_builder = kv_builder.initValue();
SaveCapnpTypedValue(kv.second, &value_builder, versions);
SaveCapnpTypedValue(kv.second, &value_builder, versions, worker_id);
++i;
}
return;
@ -48,12 +48,13 @@ void SaveCapnpTypedValue(const TypedValue &value,
case TypedValue::Type::Vertex: {
auto vertex_builder = builder->initVertex();
storage::SaveVertexAccessor(value.ValueVertex(), &vertex_builder,
versions);
versions, worker_id);
return;
}
case TypedValue::Type::Edge: {
auto edge_builder = builder->initEdge();
storage::SaveEdgeAccessor(value.ValueEdge(), &edge_builder, versions);
storage::SaveEdgeAccessor(value.ValueEdge(), &edge_builder, versions,
worker_id);
return;
}
case TypedValue::Type::Path: {
@ -63,12 +64,13 @@ void SaveCapnpTypedValue(const TypedValue &value,
for (size_t i = 0; i < path.vertices().size(); ++i) {
auto vertex_builder = vertices_builder[i];
storage::SaveVertexAccessor(path.vertices()[i], &vertex_builder,
versions);
versions, worker_id);
}
auto edges_builder = path_builder.initEdges(path.edges().size());
for (size_t i = 0; i < path.edges().size(); ++i) {
auto edge_builder = edges_builder[i];
storage::SaveEdgeAccessor(path.edges()[i], &edge_builder, versions);
storage::SaveEdgeAccessor(path.edges()[i], &edge_builder, versions,
worker_id);
}
return;
}

View File

@ -12,7 +12,7 @@ namespace query {
void SaveCapnpTypedValue(const query::TypedValue &value,
capnp::TypedValue::Builder *builder,
storage::SendVersions versions);
storage::SendVersions versions, int worker_id);
void LoadCapnpTypedValue(const capnp::TypedValue::Reader &reader,
query::TypedValue *value,

View File

@ -212,7 +212,7 @@ std::unique_ptr<Edge> LoadEdge(const capnp::Edge::Reader &reader) {
template <class TRecord, class TCapnpRecord>
void SaveRecordAccessor(const RecordAccessor<TRecord> &accessor,
typename TCapnpRecord::Builder *builder,
SendVersions versions) {
SendVersions versions, int worker_id) {
builder->setCypherId(accessor.CypherId());
builder->setAddress(accessor.GlobalAddress().raw());
@ -223,7 +223,6 @@ void SaveRecordAccessor(const RecordAccessor<TRecord> &accessor,
CHECK(result) << "Attempting to serialize an element not visible to "
"current transaction.";
}
int worker_id = accessor.GlobalAddress().worker_id();
auto old_rec = accessor.GetOld();
if (old_rec && versions != SendVersions::ONLY_NEW) {
auto old_builder = builder->initOld();
@ -245,16 +244,16 @@ void SaveRecordAccessor(const RecordAccessor<TRecord> &accessor,
void SaveVertexAccessor(const VertexAccessor &vertex_accessor,
capnp::VertexAccessor::Builder *builder,
SendVersions versions) {
SendVersions versions, int worker_id) {
SaveRecordAccessor<Vertex, capnp::VertexAccessor>(vertex_accessor, builder,
versions);
versions, worker_id);
}
void SaveEdgeAccessor(const EdgeAccessor &edge_accessor,
capnp::EdgeAccessor::Builder *builder,
SendVersions versions) {
SendVersions versions, int worker_id) {
SaveRecordAccessor<Edge, capnp::EdgeAccessor>(edge_accessor, builder,
versions);
versions, worker_id);
}
VertexAccessor LoadVertexAccessor(const capnp::VertexAccessor::Reader &reader,

View File

@ -52,7 +52,7 @@ enum class SendVersions { BOTH, ONLY_OLD, ONLY_NEW };
void SaveVertexAccessor(const VertexAccessor &vertex_accessor,
capnp::VertexAccessor::Builder *builder,
SendVersions versions);
SendVersions versions, int worker_id);
VertexAccessor LoadVertexAccessor(const capnp::VertexAccessor::Reader &reader,
database::GraphDbAccessor *dba,
@ -60,7 +60,7 @@ VertexAccessor LoadVertexAccessor(const capnp::VertexAccessor::Reader &reader,
void SaveEdgeAccessor(const EdgeAccessor &edge_accessor,
capnp::EdgeAccessor::Builder *builder,
SendVersions versions);
SendVersions versions, int worker_id);
EdgeAccessor LoadEdgeAccessor(const capnp::EdgeAccessor::Reader &reader,
database::GraphDbAccessor *dba,