7ba8228c46
Summary: - Create types folder in storage/common - Move locking and kvstore to storage/common - Add storage/distributed/rpc folder Reviewers: teon.banek, ipaljak, msantl Reviewed By: msantl Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1713
249 lines
12 KiB
Plaintext
249 lines
12 KiB
Plaintext
#>cpp
|
|
#pragma once
|
|
|
|
#include <tuple>
|
|
|
|
#include "communication/rpc/messages.hpp"
|
|
#include "distributed/bfs_rpc_messages.capnp.h"
|
|
#include "distributed/bfs_subcursor.hpp"
|
|
#include "query/frontend/semantic/symbol_table.hpp"
|
|
#include "query/plan/distributed_ops.hpp"
|
|
#include "query/serialization.hpp"
|
|
#include "storage/distributed/rpc/serialization.hpp"
|
|
#include "transactions/type.hpp"
|
|
#include "utils/serialization.hpp"
|
|
cpp<#
|
|
|
|
(lcp:namespace distributed)
|
|
|
|
(lcp:capnp-namespace "distributed")
|
|
|
|
(lcp:capnp-import 'ast "/query/frontend/ast/ast_serialization.capnp")
|
|
(lcp:capnp-import 'dist-ops "/query/plan/distributed_ops.capnp")
|
|
(lcp:capnp-import 'query "/query/serialization.capnp")
|
|
(lcp:capnp-import 'storage "/storage/distributed/rpc/serialization.capnp")
|
|
(lcp:capnp-import 'symbol "/query/frontend/semantic/symbol.capnp")
|
|
(lcp:capnp-import 'utils "/utils/serialization.capnp")
|
|
|
|
(lcp:capnp-type-conversion "storage::EdgeAddress" "Storage.Address")
|
|
(lcp:capnp-type-conversion "storage::VertexAddress" "Storage.Address")
|
|
(lcp:capnp-type-conversion "storage::EdgeType" "Storage.Common")
|
|
|
|
(lcp:define-rpc create-bfs-subcursor
|
|
(:request
|
|
((tx-id "tx::TransactionId" :capnp-type "UInt64")
|
|
(direction "query::EdgeAtom::Direction"
|
|
:capnp-type "Ast.EdgeAtom.Direction" :capnp-init nil
|
|
:capnp-save (lcp:capnp-save-enum "::query::capnp::EdgeAtom::Direction"
|
|
"query::EdgeAtom::Direction"
|
|
'(in out both))
|
|
:capnp-load (lcp:capnp-load-enum "::query::capnp::EdgeAtom::Direction"
|
|
"query::EdgeAtom::Direction"
|
|
'(in out both)))
|
|
(edge-types "std::vector<storage::EdgeType>"
|
|
:capnp-save (lcp:capnp-save-vector "::storage::capnp::Common"
|
|
"storage::EdgeType")
|
|
:capnp-load (lcp:capnp-load-vector "::storage::capnp::Common"
|
|
"storage::EdgeType"))
|
|
(filter-lambda "query::plan::ExpansionLambda"
|
|
:capnp-type "DistOps.ExpansionLambda"
|
|
:capnp-save (lambda (builder member capnp-name)
|
|
#>cpp
|
|
std::vector<int> saved_ast_uids;
|
|
Save(${member}, &${builder}, &saved_ast_uids);
|
|
cpp<#)
|
|
:capnp-load (lambda (reader member capnp-name)
|
|
#>cpp
|
|
std::vector<int> loaded_ast_uids;
|
|
Load(&${member}, ${reader}, ast_storage, &loaded_ast_uids);
|
|
cpp<#))
|
|
(symbol-table "query::SymbolTable"
|
|
:capnp-type "Symbol.SymbolTable")
|
|
(evaluation-context "query::EvaluationContext"
|
|
:capnp-type "Query.EvaluationContext"
|
|
:capnp-save (lambda (builder member capnp-name)
|
|
#>cpp
|
|
query::SaveEvaluationContext(${member}, &${builder});
|
|
cpp<#)
|
|
:capnp-load (lambda (reader member capnp-name)
|
|
#>cpp
|
|
query::LoadEvaluationContext(${reader}, &${member});
|
|
cpp<#)))
|
|
(:serialize :capnp :load-args '((ast-storage "query::AstStorage *"))))
|
|
(:response ((member :int64_t))))
|
|
|
|
(lcp:define-rpc register-subcursors
|
|
(:request ((subcursor-ids "std::unordered_map<int16_t, int64_t>"
|
|
:capnp-type "Utils.Map(Utils.BoxInt16, Utils.BoxInt64)"
|
|
:capnp-save
|
|
(lambda (builder member capnp-name)
|
|
(declare (ignore capnp-name))
|
|
#>cpp
|
|
utils::SaveMap<utils::capnp::BoxInt16, utils::capnp::BoxInt64>(
|
|
${member}, &${builder},
|
|
[](auto *builder, const auto &entry) {
|
|
auto key_builder = builder->initKey();
|
|
key_builder.setValue(entry.first);
|
|
auto value_builder = builder->initValue();
|
|
value_builder.setValue(entry.second);
|
|
});
|
|
cpp<#)
|
|
:capnp-load
|
|
(lambda (reader member capnp-name)
|
|
(declare (ignore capnp-name))
|
|
#>cpp
|
|
utils::LoadMap<utils::capnp::BoxInt16, utils::capnp::BoxInt64>(
|
|
&${member}, ${reader},
|
|
[](const auto &reader) {
|
|
int16_t key = reader.getKey().getValue();
|
|
int64_t value = reader.getValue().getValue();
|
|
return std::make_pair(key, value);
|
|
});
|
|
cpp<#))))
|
|
(:response ()))
|
|
|
|
(lcp:define-rpc reset-subcursor
|
|
(:request ((subcursor-id :int64_t)))
|
|
(:response ()))
|
|
|
|
(lcp:define-rpc remove-bfs-subcursor
|
|
(:request ((member :int64_t)))
|
|
(:response ()))
|
|
|
|
(lcp:define-enum expand-result
|
|
(success failure lambda-error)
|
|
(:serialize :capnp))
|
|
|
|
(lcp:define-rpc expand-level
|
|
(:request ((member :int64_t)))
|
|
(:response ((result "ExpandResult"))))
|
|
|
|
(lcp:define-rpc subcursor-pull
|
|
(:request ((member :int64_t)))
|
|
(:response
|
|
((vertex "std::experimental::optional<VertexAccessor>"
|
|
:capnp-type "Utils.Optional(Storage.VertexAccessor)"
|
|
:capnp-save (lcp:capnp-save-optional
|
|
"storage::capnp::VertexAccessor"
|
|
"VertexAccessor"
|
|
"[worker_id](auto *builder, const auto &vertex) {
|
|
storage::SaveVertexAccessor(vertex, builder, storage::SendVersions::BOTH, worker_id);
|
|
}")
|
|
:capnp-load (lcp:capnp-load-optional
|
|
"storage::capnp::VertexAccessor"
|
|
"VertexAccessor"
|
|
"[dba, data_manager](const auto &reader) {
|
|
return storage::LoadVertexAccessor(reader, dba, data_manager);
|
|
}")))
|
|
(:serialize :capnp
|
|
:save-args '((worker-id :int))
|
|
:load-args '((dba "database::GraphDbAccessor *")
|
|
(data-manager "distributed::DataManager *")))))
|
|
|
|
(lcp:define-rpc set-source
|
|
(:request
|
|
((subcursor-id :int64_t)
|
|
(source "storage::VertexAddress")))
|
|
(:response ()))
|
|
|
|
(lcp:define-rpc expand-to-remote-vertex
|
|
(:request
|
|
((subcursor-id :int64_t)
|
|
(edge "storage::EdgeAddress")
|
|
(vertex "storage::VertexAddress")))
|
|
(:response ((member :bool))))
|
|
|
|
(lcp:define-rpc reconstruct-path
|
|
(:request
|
|
((subcursor-id :int64_t)
|
|
(vertex "std::experimental::optional<storage::VertexAddress>"
|
|
:capnp-save (lcp:capnp-save-optional "storage::capnp::Address" "storage::VertexAddress")
|
|
:capnp-load (lcp:capnp-load-optional "storage::capnp::Address" "storage::VertexAddress"))
|
|
(edge "std::experimental::optional<storage::EdgeAddress>"
|
|
:capnp-save (lcp:capnp-save-optional "storage::capnp::Address" "storage::EdgeAddress")
|
|
:capnp-load (lcp:capnp-load-optional "storage::capnp::Address" "storage::EdgeAddress")))
|
|
(:public
|
|
#>cpp
|
|
using Capnp = capnp::ReconstructPathReq;
|
|
static const communication::rpc::MessageType TypeInfo;
|
|
|
|
ReconstructPathReq() {}
|
|
|
|
ReconstructPathReq(int64_t subcursor_id, storage::VertexAddress vertex)
|
|
: subcursor_id(subcursor_id),
|
|
vertex(vertex),
|
|
edge(std::experimental::nullopt) {}
|
|
|
|
ReconstructPathReq(int64_t subcursor_id, storage::EdgeAddress edge)
|
|
: subcursor_id(subcursor_id),
|
|
vertex(std::experimental::nullopt),
|
|
edge(edge) {}
|
|
cpp<#))
|
|
(:response
|
|
((edges "std::vector<EdgeAccessor>" :capnp-type "List(Storage.EdgeAccessor)"
|
|
:capnp-save (lcp:capnp-save-vector "storage::capnp::EdgeAccessor"
|
|
"EdgeAccessor"
|
|
"[worker_id](auto *builder, const auto &edge) {
|
|
storage::SaveEdgeAccessor(edge, builder, storage::SendVersions::BOTH, worker_id);
|
|
}")
|
|
:capnp-load (lcp:capnp-load-vector "storage::capnp::EdgeAccessor"
|
|
"EdgeAccessor"
|
|
"[dba, data_manager](const auto &reader) {
|
|
return storage::LoadEdgeAccessor(reader, dba, data_manager);
|
|
}"))
|
|
(next-vertex "std::experimental::optional<storage::VertexAddress>"
|
|
:capnp-save (lcp:capnp-save-optional "storage::capnp::Address" "storage::VertexAddress")
|
|
:capnp-load (lcp:capnp-load-optional "storage::capnp::Address" "storage::VertexAddress"))
|
|
(next-edge "std::experimental::optional<storage::EdgeAddress>"
|
|
:capnp-save (lcp:capnp-save-optional "storage::capnp::Address" "storage::EdgeAddress")
|
|
:capnp-load (lcp:capnp-load-optional "storage::capnp::Address" "storage::EdgeAddress")))
|
|
(:serialize :capnp
|
|
:save-args '((worker-id :int))
|
|
:load-args '((dba "database::GraphDbAccessor *")
|
|
(data-manager "distributed::DataManager *")))
|
|
(:public
|
|
#>cpp
|
|
using Capnp = capnp::ReconstructPathRes;
|
|
static const communication::rpc::MessageType TypeInfo;
|
|
|
|
ReconstructPathRes() {}
|
|
|
|
ReconstructPathRes(
|
|
const std::vector<EdgeAccessor> &edges,
|
|
std::experimental::optional<storage::VertexAddress> next_vertex,
|
|
std::experimental::optional<storage::EdgeAddress> next_edge)
|
|
: edges(edges), next_vertex(std::move(next_vertex)), next_edge(std::move(next_edge)) {
|
|
CHECK(!static_cast<bool>(next_vertex) || !static_cast<bool>(next_edge))
|
|
<< "At most one of `next_vertex` and `next_edge` should be set";
|
|
}
|
|
cpp<#)))
|
|
|
|
(lcp:define-rpc prepare-for-expand
|
|
(:request
|
|
((subcursor-id :int64_t)
|
|
(clear :bool)
|
|
(frame "std::vector<query::TypedValue>"
|
|
:capnp-type "List(Query.TypedValue)"
|
|
:capnp-save (lcp:capnp-save-vector
|
|
"query::capnp::TypedValue"
|
|
"query::TypedValue"
|
|
"[&self](auto *builder, const auto &value) {
|
|
query::SaveCapnpTypedValue(value, builder,
|
|
storage::SendVersions::ONLY_OLD, self.worker_id);
|
|
}")
|
|
:capnp-load (lcp:capnp-load-vector
|
|
"query::capnp::TypedValue"
|
|
"query::TypedValue"
|
|
"[dba, data_manager](const auto &reader) {
|
|
query::TypedValue value;
|
|
query::LoadCapnpTypedValue(reader, &value, dba, data_manager);
|
|
return value;
|
|
}"))
|
|
(worker-id :int :capnp-save :dont-save))
|
|
(:serialize :capnp
|
|
:load-args '((dba "database::GraphDbAccessor *")
|
|
(data-manager "distributed::DataManager *"))))
|
|
(:response ()))
|
|
|
|
(lcp:pop-namespace) ;; distributed
|