#>cpp #pragma once #include #include "communication/rpc/messages.hpp" #include "distributed/bfs_rpc_messages.capnp.h" #include "distributed/bfs_subcursor.hpp" #include "query/frontend/semantic/symbol_table.hpp" #include "query/plan/distributed_ops.hpp" #include "query/serialization.hpp" #include "storage/distributed/rpc/serialization.hpp" #include "transactions/type.hpp" #include "rpc/serialization.hpp" cpp<# (lcp:namespace distributed) (lcp:capnp-namespace "distributed") (lcp:capnp-import 'ast "/query/frontend/ast/ast_serialization.capnp") (lcp:capnp-import 'dist-ops "/query/plan/distributed_ops.capnp") (lcp:capnp-import 'query "/query/serialization.capnp") (lcp:capnp-import 'storage "/storage/distributed/rpc/serialization.capnp") (lcp:capnp-import 'utils "/rpc/serialization.capnp") (lcp:capnp-type-conversion "storage::EdgeAddress" "Storage.Address") (lcp:capnp-type-conversion "storage::VertexAddress" "Storage.Address") (lcp:capnp-type-conversion "storage::EdgeType" "Storage.EdgeType") (lcp:define-rpc create-bfs-subcursor (:request ((tx-id "tx::TransactionId" :capnp-type "UInt64") (direction "query::EdgeAtom::Direction" :capnp-type "Ast.EdgeAtom.Direction" :capnp-init nil :capnp-save (lcp:capnp-save-enum "::query::capnp::EdgeAtom::Direction" "query::EdgeAtom::Direction" '(in out both)) :capnp-load (lcp:capnp-load-enum "::query::capnp::EdgeAtom::Direction" "query::EdgeAtom::Direction" '(in out both))) (edge-types "std::vector" :capnp-save (lcp:capnp-save-vector "::storage::capnp::EdgeType" "storage::EdgeType") :capnp-load (lcp:capnp-load-vector "::storage::capnp::EdgeType" "storage::EdgeType")) (filter-lambda "query::plan::ExpansionLambda" :slk-load (lambda (member) #>cpp slk::Load(&self->${member}, reader, ast_storage); cpp<#) :capnp-type "DistOps.ExpansionLambda" :capnp-load (lambda (reader member capnp-name) #>cpp Load(&${member}, ${reader}, ast_storage); cpp<#)) (symbol-table "query::SymbolTable" :capnp-type "Query.SymbolTable") (timestamp :int64_t) (parameters "query::Parameters" :capnp-type "Utils.Map(Utils.BoxInt64, Storage.PropertyValue)")) (:serialize (:slk :load-args '((ast-storage "query::AstStorage *"))) (:capnp :load-args '((ast-storage "query::AstStorage *"))))) (:response ((member :int64_t)))) (lcp:define-rpc register-subcursors (:request ((subcursor-ids "std::unordered_map" :capnp-type "Utils.Map(Utils.BoxInt16, Utils.BoxInt64)" :capnp-save (lambda (builder member capnp-name) (declare (ignore capnp-name)) #>cpp utils::SaveMap( ${member}, &${builder}, [](auto *builder, const auto &entry) { auto key_builder = builder->initKey(); key_builder.setValue(entry.first); auto value_builder = builder->initValue(); value_builder.setValue(entry.second); }); cpp<#) :capnp-load (lambda (reader member capnp-name) (declare (ignore capnp-name)) #>cpp utils::LoadMap( &${member}, ${reader}, [](const auto &reader) { int16_t key = reader.getKey().getValue(); int64_t value = reader.getValue().getValue(); return std::make_pair(key, value); }); cpp<#)))) (:response ())) (lcp:define-rpc reset-subcursor (:request ((subcursor-id :int64_t))) (:response ())) (lcp:define-enum expand-result (success failure lambda-error) (:serialize)) (lcp:define-rpc expand-level (:request ((member :int64_t))) (:response ((result "ExpandResult")))) (lcp:define-rpc subcursor-pull (:request ((member :int64_t))) (:response ((vertex "std::experimental::optional" :slk-save (lambda (member) #>cpp slk::Save(static_cast(self.${member}), builder); if (self.${member}) { slk::Save(*self.${member}, builder, storage::SendVersions::BOTH, worker_id); } cpp<#) :slk-load (lambda (member) #>cpp bool has_value; slk::Load(&has_value, reader); if (has_value) { self->${member} = slk::LoadVertexAccessor(reader, dba, data_manager); } cpp<#) :capnp-type "Utils.Optional(Storage.VertexAccessor)" :capnp-save (lcp:capnp-save-optional "storage::capnp::VertexAccessor" "VertexAccessor" "[worker_id](auto *builder, const auto &vertex) { storage::SaveVertexAccessor(vertex, builder, storage::SendVersions::BOTH, worker_id); }") :capnp-load (lcp:capnp-load-optional "storage::capnp::VertexAccessor" "VertexAccessor" "[dba, data_manager](const auto &reader) { return storage::LoadVertexAccessor(reader, dba, data_manager); }"))) (:serialize (:slk :save-args '((worker-id :int16_t)) :load-args '((dba "database::GraphDbAccessor *") (data-manager "distributed::DataManager *"))) (:capnp :save-args '((worker-id :int16_t)) :load-args '((dba "database::GraphDbAccessor *") (data-manager "distributed::DataManager *")))))) (lcp:define-rpc set-source (:request ((subcursor-id :int64_t) (source "storage::VertexAddress"))) (:response ())) (lcp:define-rpc expand-to-remote-vertex (:request ((subcursor-id :int64_t) (edge "storage::EdgeAddress") (vertex "storage::VertexAddress"))) (:response ((member :bool)))) (lcp:define-rpc reconstruct-path (:request ((subcursor-id :int64_t) (vertex "std::experimental::optional" :capnp-save (lcp:capnp-save-optional "storage::capnp::Address" "storage::VertexAddress") :capnp-load (lcp:capnp-load-optional "storage::capnp::Address" "storage::VertexAddress")) (edge "std::experimental::optional" :capnp-save (lcp:capnp-save-optional "storage::capnp::Address" "storage::EdgeAddress") :capnp-load (lcp:capnp-load-optional "storage::capnp::Address" "storage::EdgeAddress"))) (:public #>cpp using Capnp = capnp::ReconstructPathReq; ReconstructPathReq() {} ReconstructPathReq(int64_t subcursor_id, storage::VertexAddress vertex) : subcursor_id(subcursor_id), vertex(vertex), edge(std::experimental::nullopt) {} ReconstructPathReq(int64_t subcursor_id, storage::EdgeAddress edge) : subcursor_id(subcursor_id), vertex(std::experimental::nullopt), edge(edge) {} cpp<#)) (:response ((edges "std::vector" :slk-save (lambda (member) #>cpp size_t size = self.${member}.size(); slk::Save(size, builder); for (const auto &v : self.${member}) { slk::Save(v, builder, storage::SendVersions::BOTH, worker_id); } cpp<#) :slk-load (lambda (member) #>cpp size_t size; slk::Load(&size, reader); self->${member}.reserve(size); for (size_t i = 0; i < size; ++i) { self->${member}.push_back(slk::LoadEdgeAccessor(reader, dba, data_manager)); } cpp<#) :capnp-type "List(Storage.EdgeAccessor)" :capnp-save (lcp:capnp-save-vector "storage::capnp::EdgeAccessor" "EdgeAccessor" "[worker_id](auto *builder, const auto &edge) { storage::SaveEdgeAccessor(edge, builder, storage::SendVersions::BOTH, worker_id); }") :capnp-load (lcp:capnp-load-vector "storage::capnp::EdgeAccessor" "EdgeAccessor" "[dba, data_manager](const auto &reader) { return storage::LoadEdgeAccessor(reader, dba, data_manager); }")) (next-vertex "std::experimental::optional" :capnp-save (lcp:capnp-save-optional "storage::capnp::Address" "storage::VertexAddress") :capnp-load (lcp:capnp-load-optional "storage::capnp::Address" "storage::VertexAddress")) (next-edge "std::experimental::optional" :capnp-save (lcp:capnp-save-optional "storage::capnp::Address" "storage::EdgeAddress") :capnp-load (lcp:capnp-load-optional "storage::capnp::Address" "storage::EdgeAddress"))) (:serialize (:slk :save-args '((worker-id :int16_t)) :load-args '((dba "database::GraphDbAccessor *") (data-manager "distributed::DataManager *"))) (:capnp :save-args '((worker-id :int16_t)) :load-args '((dba "database::GraphDbAccessor *") (data-manager "distributed::DataManager *")))) (:public #>cpp using Capnp = capnp::ReconstructPathRes; ReconstructPathRes() {} ReconstructPathRes( const std::vector &edges, std::experimental::optional next_vertex, std::experimental::optional next_edge) : edges(edges), next_vertex(std::move(next_vertex)), next_edge(std::move(next_edge)) { CHECK(!static_cast(next_vertex) || !static_cast(next_edge)) << "At most one of `next_vertex` and `next_edge` should be set"; } cpp<#))) (lcp:define-rpc prepare-for-expand (:request ((subcursor-id :int64_t) (clear :bool) (frame "std::vector" :slk-save (lambda (member) #>cpp size_t size = self.${member}.size(); slk::Save(size, builder); for (const auto &v : self.${member}) { slk::Save(v, builder, storage::SendVersions::ONLY_OLD, self.worker_id); } cpp<#) :slk-load (lambda (member) #>cpp size_t size; slk::Load(&size, reader); self->${member}.resize(size); for (size_t i = 0; i < size; ++i) { slk::Load(&self->${member}[i], reader, dba, data_manager); } cpp<#) :capnp-type "List(Query.TypedValue)" :capnp-save (lcp:capnp-save-vector "query::capnp::TypedValue" "query::TypedValue" "[&self](auto *builder, const auto &value) { query::SaveCapnpTypedValue(value, builder, storage::SendVersions::ONLY_OLD, self.worker_id); }") :capnp-load (lcp:capnp-load-vector "query::capnp::TypedValue" "query::TypedValue" "[dba, data_manager](const auto &reader) { query::TypedValue value; query::LoadCapnpTypedValue(reader, &value, dba, data_manager); return value; }")) (worker-id :int16_t :dont-save t)) (:serialize (:slk :load-args '((dba "database::GraphDbAccessor *") (data-manager "distributed::DataManager *"))) (:capnp :load-args '((dba "database::GraphDbAccessor *") (data-manager "distributed::DataManager *"))))) (:response ())) (lcp:pop-namespace) ;; distributed