memgraph/src/distributed/bfs_rpc_messages.lcp
Teon Banek 09bc9cb164 Update save/load hooks in LCP
Reviewers: mtomic

Reviewed By: mtomic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1517
2018-08-02 09:33:33 +02:00

245 lines
10 KiB
Plaintext

#>cpp
#pragma once
#include <tuple>
#include "communication/rpc/messages.hpp"
#include "distributed/bfs_rpc_messages.capnp.h"
#include "distributed/bfs_subcursor.hpp"
#include "query/plan/operator.hpp"
#include "transactions/type.hpp"
#include "utils/serialization.hpp"
cpp<#
(lcp:namespace distributed)
(lcp:capnp-namespace "distributed")
(lcp:capnp-import 'ast "/query/frontend/ast/ast.capnp")
(lcp:capnp-import 'dis "/distributed/serialization.capnp")
(lcp:capnp-import 'query "/query/common.capnp")
(lcp:capnp-import 'storage "/storage/serialization.capnp")
(lcp:capnp-import 'utils "/utils/serialization.capnp")
(lcp:capnp-type-conversion "storage::EdgeAddress" "Storage.Address")
(lcp:capnp-type-conversion "storage::VertexAddress" "Storage.Address")
(defun save-element (builder member)
#>cpp
if (${member}) {
if constexpr (std::is_same<TElement, Vertex>::value) {
auto builder = ${builder}.initVertex();
SaveVertex(*${member}, &builder, worker_id);
} else {
auto builder = ${builder}.initEdge();
SaveEdge(*${member}, &builder, worker_id);
}
} else {
${builder}.setNull();
}
cpp<#)
(defun load-element (reader member)
(let ((output-member (cl-ppcre:regex-replace "input$" member "output")))
#>cpp
if (!${reader}.isNull()) {
if constexpr (std::is_same<TElement, Vertex>::value) {
const auto reader = ${reader}.getVertex();
${output-member} = LoadVertex(reader);
} else {
const auto reader = ${reader}.getEdge();
${output-member} = LoadEdge(reader);
}
}
cpp<#))
(lcp:define-struct (serialized-graph-element t-element) ()
((global-address "storage::Address<mvcc::VersionList<TElement>>"
:capnp-type "Storage.Address")
(old-element-input "TElement *"
:capnp-type '((null "Void") (vertex "Dis.Vertex") (edge "Dis.Edge"))
:capnp-save #'save-element :capnp-load #'load-element)
(old-element-output "std::unique_ptr<TElement>" :capnp-save :dont-save)
(new-element-input "TElement *"
:capnp-type '((null "Void") (vertex "Dis.Vertex") (edge "Dis.Edge"))
:capnp-save #'save-element :capnp-load #'load-element)
(new-element-output "std::unique_ptr<TElement>" :capnp-save :dont-save)
(worker-id :int16_t :capnp-save :dont-save))
(:public
#>cpp
SerializedGraphElement(storage::Address<mvcc::VersionList<TElement>> global_address,
TElement *old_element_input, TElement *new_element_input,
int16_t worker_id)
: global_address(global_address),
old_element_input(old_element_input),
old_element_output(nullptr),
new_element_input(new_element_input),
new_element_output(nullptr),
worker_id(worker_id) {
CHECK(global_address.is_remote())
<< "Only global addresses should be used with SerializedGraphElement";
}
SerializedGraphElement(const RecordAccessor<TElement> &accessor, int16_t worker_id)
: SerializedGraphElement(accessor.GlobalAddress(), accessor.GetOld(),
accessor.GetNew(), worker_id) {}
SerializedGraphElement() {}
cpp<#)
(:serialize :capnp :type-args '(vertex edge)))
#>cpp
using SerializedVertex = SerializedGraphElement<Vertex>;
using SerializedEdge = SerializedGraphElement<Edge>;
cpp<#
(lcp:define-rpc create-bfs-subcursor
(:request
((tx-id "tx::TransactionId" :capnp-type "UInt64")
(direction "query::EdgeAtom::Direction"
:capnp-type "Ast.EdgeAtom.Direction" :capnp-init nil
:capnp-save (lcp:capnp-save-enum "::query::capnp::EdgeAtom::Direction"
"query::EdgeAtom::Direction"
'(in out both))
:capnp-load (lcp:capnp-load-enum "::query::capnp::EdgeAtom::Direction"
"query::EdgeAtom::Direction"
'(in out both)))
;; TODO(mtomic): Why isn't edge-types serialized?
(edge-types "std::vector<storage::EdgeType>"
:capnp-save :dont-save)
(graph-view "query::GraphView"
:capnp-type "Query.GraphView" :capnp-init nil
:capnp-save (lcp:capnp-save-enum "::query::capnp::GraphView"
"query::GraphView"
'(old new))
:capnp-load (lcp:capnp-load-enum "::query::capnp::GraphView"
"query::GraphView"
'(old new)))))
(:response ((member :int64_t))))
(lcp:define-rpc register-subcursors
(:request ((subcursor-ids "std::unordered_map<int16_t, int64_t>"
:capnp-type "Utils.Map(Utils.BoxInt16, Utils.BoxInt64)"
:capnp-save
(lambda (builder member)
#>cpp
utils::SaveMap<utils::capnp::BoxInt16, utils::capnp::BoxInt64>(
${member}, &${builder},
[](auto *builder, const auto &entry) {
auto key_builder = builder->initKey();
key_builder.setValue(entry.first);
auto value_builder = builder->initValue();
value_builder.setValue(entry.second);
});
cpp<#)
:capnp-load
(lambda (reader member)
#>cpp
utils::LoadMap<utils::capnp::BoxInt16, utils::capnp::BoxInt64>(
&${member}, ${reader},
[](const auto &reader) {
int16_t key = reader.getKey().getValue();
int64_t value = reader.getValue().getValue();
return std::make_pair(key, value);
});
cpp<#))))
(:response ()))
(lcp:define-rpc reset-subcursor
(:request ((subcursor-id :int64_t)))
(:response ()))
(lcp:define-rpc remove-bfs-subcursor
(:request ((member :int64_t)))
(:response ()))
(lcp:define-rpc expand-level
(:request ((member :int64_t)))
(:response ((member :bool))))
(lcp:define-rpc subcursor-pull
(:request ((member :int64_t)))
(:response ((vertex "std::experimental::optional<SerializedVertex>" :initarg :move
:capnp-type "Utils.Optional(SerializedGraphElement)"
:capnp-save (lcp:capnp-save-optional "capnp::SerializedGraphElement" "SerializedVertex")
:capnp-load (lcp:capnp-load-optional "capnp::SerializedGraphElement" "SerializedVertex")))))
(lcp:define-rpc set-source
(:request
((subcursor-id :int64_t)
(source "storage::VertexAddress")))
(:response ()))
(lcp:define-rpc expand-to-remote-vertex
(:request
((subcursor-id :int64_t)
(edge "storage::EdgeAddress")
(vertex "storage::VertexAddress")))
(:response ((member :bool))))
(lcp:define-rpc reconstruct-path
(:request
((subcursor-id :int64_t)
(vertex "std::experimental::optional<storage::VertexAddress>"
:capnp-save (lcp:capnp-save-optional "storage::capnp::Address" "storage::VertexAddress")
:capnp-load (lcp:capnp-load-optional "storage::capnp::Address" "storage::VertexAddress"))
(edge "std::experimental::optional<storage::EdgeAddress>"
:capnp-save (lcp:capnp-save-optional "storage::capnp::Address" "storage::EdgeAddress")
:capnp-load (lcp:capnp-load-optional "storage::capnp::Address" "storage::EdgeAddress")))
(:public
#>cpp
using Capnp = capnp::ReconstructPathReq;
static const communication::rpc::MessageType TypeInfo;
ReconstructPathReq() {}
ReconstructPathReq(int64_t subcursor_id, storage::VertexAddress vertex)
: subcursor_id(subcursor_id),
vertex(vertex),
edge(std::experimental::nullopt) {}
ReconstructPathReq(int64_t subcursor_id, storage::EdgeAddress edge)
: subcursor_id(subcursor_id),
vertex(std::experimental::nullopt),
edge(edge) {}
cpp<#))
(:response
((subcursor-id :int64_t ;; TODO(mtomic): Unused?
:capnp-save :dont-save)
(edges "std::vector<SerializedEdge>" :capnp-type "List(SerializedGraphElement)"
:capnp-save (lcp:capnp-save-vector "capnp::SerializedGraphElement" "SerializedEdge")
:capnp-load (lcp:capnp-load-vector "capnp::SerializedGraphElement" "SerializedEdge"))
(next-vertex "std::experimental::optional<storage::VertexAddress>"
:capnp-save (lcp:capnp-save-optional "storage::capnp::Address" "storage::VertexAddress")
:capnp-load (lcp:capnp-load-optional "storage::capnp::Address" "storage::VertexAddress"))
(next-edge "std::experimental::optional<storage::EdgeAddress>"
:capnp-save (lcp:capnp-save-optional "storage::capnp::Address" "storage::EdgeAddress")
:capnp-load (lcp:capnp-load-optional "storage::capnp::Address" "storage::EdgeAddress")))
(:public
#>cpp
using Capnp = capnp::ReconstructPathRes;
static const communication::rpc::MessageType TypeInfo;
ReconstructPathRes() {}
ReconstructPathRes(
const std::vector<EdgeAccessor> &edge_accessors,
std::experimental::optional<storage::VertexAddress> next_vertex,
std::experimental::optional<storage::EdgeAddress> next_edge,
int16_t worker_id)
: next_vertex(std::move(next_vertex)), next_edge(std::move(next_edge)) {
CHECK(!static_cast<bool>(next_vertex) || !static_cast<bool>(next_edge))
<< "At most one of `next_vertex` and `next_edge` should be set";
for (const auto &edge : edge_accessors) {
edges.emplace_back(edge, worker_id);
}
}
cpp<#)))
(lcp:define-rpc prepare-for-expand
(:request
((subcursor-id :int64_t)
(clear :bool)))
(:response ()))
(lcp:pop-namespace) ;; distributed