Serialize RPC messages using SLK

Reviewers: msantl, mtomic, mferencevic

Reviewed By: mtomic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1771
This commit is contained in:
Teon Banek 2018-11-28 10:12:10 +01:00
parent b1e21489e9
commit 68b2dcc490
8 changed files with 212 additions and 23 deletions

View File

@ -45,6 +45,16 @@ cpp<#
:capnp-load (lcp:capnp-load-vector "::storage::capnp::EdgeType"
"storage::EdgeType"))
(filter-lambda "query::plan::ExpansionLambda"
:slk-save (lambda (member)
#>cpp
std::vector<int32_t> saved_ast_uids;
slk::Save(self.${member}, builder, &saved_ast_uids);
cpp<#)
:slk-load (lambda (member)
#>cpp
std::vector<int32_t> loaded_ast_uids;
slk::Load(&self->${member}, reader, ast_storage, &loaded_ast_uids);
cpp<#)
:capnp-type "DistOps.ExpansionLambda"
:capnp-save (lambda (builder member capnp-name)
#>cpp
@ -67,7 +77,8 @@ cpp<#
#>cpp
query::LoadEvaluationContext(${reader}, &${member});
cpp<#)))
(:serialize (:capnp :load-args '((ast-storage "query::AstStorage *")))))
(:serialize (:slk :load-args '((ast-storage "query::AstStorage *")))
(:capnp :load-args '((ast-storage "query::AstStorage *")))))
(:response ((member :int64_t))))
(lcp:define-rpc register-subcursors
@ -120,6 +131,22 @@ cpp<#
(:request ((member :int64_t)))
(:response
((vertex "std::experimental::optional<VertexAccessor>"
:slk-save (lambda (member)
#>cpp
slk::Save(static_cast<bool>(self.${member}), builder);
if (self.${member}) {
slk::Save(*self.${member}, builder,
storage::SendVersions::BOTH, worker_id);
}
cpp<#)
:slk-load (lambda (member)
#>cpp
bool has_value;
slk::Load(&has_value, reader);
if (has_value) {
self->${member} = slk::LoadVertexAccessor(reader, dba, data_manager);
}
cpp<#)
:capnp-type "Utils.Optional(Storage.VertexAccessor)"
:capnp-save (lcp:capnp-save-optional
"storage::capnp::VertexAccessor"
@ -133,8 +160,11 @@ cpp<#
"[dba, data_manager](const auto &reader) {
return storage::LoadVertexAccessor(reader, dba, data_manager);
}")))
(:serialize (:capnp
:save-args '((worker-id :int))
(:serialize (:slk :save-args '((worker-id :int16_t))
:load-args '((dba "database::GraphDbAccessor *")
(data-manager "distributed::DataManager *")))
(:capnp
:save-args '((worker-id :int16_t))
:load-args '((dba "database::GraphDbAccessor *")
(data-manager "distributed::DataManager *"))))))
@ -177,7 +207,25 @@ cpp<#
edge(edge) {}
cpp<#))
(:response
((edges "std::vector<EdgeAccessor>" :capnp-type "List(Storage.EdgeAccessor)"
((edges "std::vector<EdgeAccessor>"
:slk-save (lambda (member)
#>cpp
size_t size = self.${member}.size();
slk::Save(size, builder);
for (const auto &v : self.${member}) {
slk::Save(v, builder, storage::SendVersions::BOTH, worker_id);
}
cpp<#)
:slk-load (lambda (member)
#>cpp
size_t size;
slk::Load(&size, reader);
self->${member}.reserve(size);
for (size_t i = 0; i < size; ++i) {
self->${member}.push_back(slk::LoadEdgeAccessor(reader, dba, data_manager));
}
cpp<#)
:capnp-type "List(Storage.EdgeAccessor)"
:capnp-save (lcp:capnp-save-vector "storage::capnp::EdgeAccessor"
"EdgeAccessor"
"[worker_id](auto *builder, const auto &edge) {
@ -194,8 +242,11 @@ cpp<#
(next-edge "std::experimental::optional<storage::EdgeAddress>"
:capnp-save (lcp:capnp-save-optional "storage::capnp::Address" "storage::EdgeAddress")
:capnp-load (lcp:capnp-load-optional "storage::capnp::Address" "storage::EdgeAddress")))
(:serialize (:capnp
:save-args '((worker-id :int))
(:serialize (:slk :save-args '((worker-id :int16_t))
:load-args '((dba "database::GraphDbAccessor *")
(data-manager "distributed::DataManager *")))
(:capnp
:save-args '((worker-id :int16_t))
:load-args '((dba "database::GraphDbAccessor *")
(data-manager "distributed::DataManager *"))))
(:public
@ -219,6 +270,23 @@ cpp<#
((subcursor-id :int64_t)
(clear :bool)
(frame "std::vector<query::TypedValue>"
:slk-save (lambda (member)
#>cpp
size_t size = self.${member}.size();
slk::Save(size, builder);
for (const auto &v : self.${member}) {
slk::Save(v, builder, storage::SendVersions::ONLY_OLD, self.worker_id);
}
cpp<#)
:slk-load (lambda (member)
#>cpp
size_t size;
slk::Load(&size, reader);
self->${member}.resize(size);
for (size_t i = 0; i < size; ++i) {
slk::Load(&self->${member}[i], reader, dba, data_manager);
}
cpp<#)
:capnp-type "List(Query.TypedValue)"
:capnp-save (lcp:capnp-save-vector
"query::capnp::TypedValue"
@ -235,8 +303,10 @@ cpp<#
query::LoadCapnpTypedValue(reader, &value, dba, data_manager);
return value;
}"))
(worker-id :int :dont-save t))
(:serialize (:capnp
(worker-id :int16_t :dont-save t))
(:serialize (:slk :load-args '((dba "database::GraphDbAccessor *")
(data-manager "distributed::DataManager *")))
(:capnp
:load-args '((dba "database::GraphDbAccessor *")
(data-manager "distributed::DataManager *")))))
(:response ()))

View File

@ -23,7 +23,7 @@ cpp<#
(lcp:define-struct tx-gid-pair ()
((tx-id "tx::TransactionId" :capnp-type "UInt64")
(gid "gid::Gid" :capnp-type "UInt64"))
(:serialize (:capnp)))
(:serialize (:slk) (:capnp)))
(lcp:define-rpc vertex
(:request ((member "TxGidPair")))
@ -37,11 +37,23 @@ cpp<#
#>cpp
storage::SaveVertex(*${member}, &${builder}, self.worker_id);
cpp<#)
:slk-save
(lambda (member)
#>cpp
slk::Save(*self.${member}, builder, self.worker_id);
cpp<#)
:capnp-load
(lambda (reader member capnp-name)
(declare (ignore member capnp-name))
#>cpp
self->vertex_output = storage::LoadVertex(${reader});
cpp<#)
:slk-load
(lambda (member)
(declare (ignore member))
#>cpp
self->vertex_output = std::make_unique<Vertex>();
slk::Load(self->vertex_output.get(), reader);
cpp<#))
(worker-id :int64_t :dont-save t)
(vertex-output "std::unique_ptr<Vertex>" :initarg nil :dont-save t))))
@ -58,11 +70,21 @@ cpp<#
#>cpp
storage::SaveEdge(*${member}, &${builder}, self.worker_id);
cpp<#)
:slk-save
(lambda (member)
#>cpp
slk::Save(*self.${member}, builder, self.worker_id);
cpp<#)
:capnp-load
(lambda (reader member capnp-name)
(declare (ignore member capnp-name))
#>cpp
self->edge_output = storage::LoadEdge(${reader});
cpp<#)
:slk-load
(lambda (member)
#>cpp
slk::Load(&self->edge_output, reader);
cpp<#))
(worker-id :int64_t :dont-save t)
(edge-output "std::unique_ptr<Edge>" :initarg nil :dont-save t))))

View File

@ -30,6 +30,26 @@ cpp<#
self->storage = std::move(helper.ast_storage);
cpp<#)
(defun slk-save-plan (member)
#>cpp
query::plan::LogicalOperator::SaveHelper helper;
slk::Save<query::plan::LogicalOperator>(
self.${member}, builder, &helper.saved_ops,
[&helper](const auto &val, auto *builder) {
slk::Save(val, builder, &helper);
});
cpp<#)
(defun slk-load-plan (member)
#>cpp
query::plan::LogicalOperator::SlkLoadHelper helper;
slk::Load<query::plan::LogicalOperator>(&self->${member}, reader, &helper.loaded_ops,
[&helper](auto *op, auto *reader) {
slk::ConstructAndLoad(op, reader, &helper);
});
self->storage = std::move(helper.ast_storage);
cpp<#)
(defun save-plan (builder member capnp-name)
(declare (ignore capnp-name))
#>cpp
@ -46,6 +66,8 @@ cpp<#
((plan-id :int64_t)
(plan "std::shared_ptr<query::plan::LogicalOperator>"
:capnp-type "Utils.SharedPtr(Plan.LogicalOperator)"
:slk-save #'slk-save-plan
:slk-load #'slk-load-plan
:capnp-save #'save-plan :capnp-load #'load-plan)
(symbol-table "query::SymbolTable" :capnp-type "Query.SymbolTable")
(storage "query::AstStorage" :initarg nil :dont-save t)))

View File

@ -67,9 +67,40 @@ RPC. Indicates the state of execution on the worker.")
"The data returned to the end consumer (the Pull operator). Contains only
the relevant parts of the response, ready for use."))
(defun slk-save-frames (member)
#>cpp
size_t frame_count = self.${member}.size();
slk::Save(frame_count, builder);
for (const auto &frame : self.${member}) {
size_t frame_size = frame.size();
slk::Save(frame_size, builder);
for (const auto &value : frame) {
slk::Save(value, builder, self.send_versions, self.worker_id);
}
}
cpp<#)
(defun slk-load-frames (member)
#>cpp
size_t frame_count = 0;
slk::Load(&frame_count, reader);
self->${member}.reserve(frame_count);
for (size_t frame_i = 0; frame_i < frame_count; ++frame_i) {
size_t frame_size = 0;
slk::Load(&frame_size, reader);
std::vector<query::TypedValue> frame(frame_size);
for (size_t val_i = 0; val_i < frame_size; ++val_i) {
slk::Load(&frame[val_i], reader, dba, data_manager);
}
self->${member}.emplace_back(std::move(frame));
}
cpp<#)
(lcp:define-struct pull-res-data ()
((pull-state "PullState")
(frames "std::vector<std::vector<query::TypedValue>>"
:slk-save #'slk-save-frames
:slk-load #'slk-load-frames
:capnp-type "List(List(Query.TypedValue))"
:capnp-save
(lambda (builder member capnp-name)
@ -174,13 +205,17 @@ to the appropriate value. Not used on side that generates the response.")
PullResData(PullResData &&) = default;
PullResData &operator=(PullResData &&) = default;
cpp<#)
(:serialize (:capnp :load-args '((dba "database::GraphDbAccessor *")
(:serialize (:slk :load-args '((dba "database::GraphDbAccessor *")
(data-manager "distributed::DataManager *")))
(:capnp :load-args '((dba "database::GraphDbAccessor *")
(data-manager "distributed::DataManager *")))))
(lcp:define-rpc pull
(:request
((tx-id "tx::TransactionId")
(tx-snapshot "tx::Snapshot"
:slk-save #'slk-save-snapshot
:slk-load #'slk-load-snapshot
:capnp-type "List(UInt64)"
:capnp-init nil
:capnp-save #'save-snapshot
@ -210,9 +245,15 @@ to the appropriate value. Not used on side that generates the response.")
"storage::SendVersions"
'(both only-old only-new)))))
(:response
((data "PullResData" :initarg :move))
(:serialize (:capnp :base t :load-args '((dba "database::GraphDbAccessor *")
(data-manager "distributed::DataManager *"))))))
((data "PullResData" :initarg :move
:slk-load (lambda (m)
#>cpp
slk::Load(&self->${m}, reader, dba, data_manager);
cpp<#)))
(:serialize (:slk :load-args '((dba "database::GraphDbAccessor *")
(data-manager "distributed::DataManager *")))
(:capnp :load-args '((dba "database::GraphDbAccessor *")
(data-manager "distributed::DataManager *"))))))
;; TODO make a separate RPC for the continuation of an existing pull, as an
;; optimization not to have to send the full PullReqData pack every time.

View File

@ -51,7 +51,7 @@ cpp<#
((result "UpdateResult")
(cypher-id :int64_t :documentation "Only valid if creation was successful.")
(gid "gid::Gid" :documentation "Only valid if creation was successful."))
(:serialize (:capnp)))
(:serialize (:slk) (:capnp)))
(lcp:define-struct create-vertex-req-data ()
((tx-id "tx::TransactionId")
@ -107,7 +107,7 @@ cpp<#
return reader.getValue();
});
cpp<#)))
(:serialize (:capnp)))
(:serialize (:slk) (:capnp)))
(lcp:define-rpc create-vertex
(:request ((member "CreateVertexReqData")))
@ -138,7 +138,7 @@ cpp<#
return reader.getValue();
});
cpp<#)))
(:serialize (:capnp)))
(:serialize (:slk) (:capnp)))
(lcp:define-rpc create-edge
(:request ((member "CreateEdgeReqData")))
@ -150,7 +150,7 @@ cpp<#
(to "gid::Gid")
(edge-type "storage::EdgeType")
(tx-id "tx::TransactionId"))
(:serialize (:capnp)))
(:serialize (:slk) (:capnp)))
(lcp:define-rpc add-in-edge
(:request ((member "AddInEdgeReqData")))
@ -160,7 +160,7 @@ cpp<#
((gid "gid::Gid")
(tx-id "tx::TransactionId")
(check-empty :bool))
(:serialize (:capnp)))
(:serialize (:slk) (:capnp)))
(lcp:define-rpc remove-vertex
(:request ((member "RemoveVertexReqData")))
@ -171,7 +171,7 @@ cpp<#
(edge-id "gid::Gid")
(vertex-from-id "gid::Gid")
(vertex-to-address "storage::VertexAddress"))
(:serialize (:capnp)))
(:serialize (:slk) (:capnp)))
(lcp:define-rpc remove-edge
(:request ((member "RemoveEdgeData")))
@ -181,7 +181,7 @@ cpp<#
((tx-id "tx::TransactionId")
(vertex "gid::Gid")
(edge-address "storage::EdgeAddress"))
(:serialize (:capnp)))
(:serialize (:slk) (:capnp)))
(lcp:define-rpc remove-in-edge
(:request ((member "RemoveInEdgeData")))

View File

@ -1333,13 +1333,13 @@ enums which aren't defined in LCP."
(:public
,(decl-type-info req-name)
,(def-constructor req-name (second request)))
(:serialize (:capnp)))
(:serialize (:slk) (:capnp)))
(define-struct ,res-sym ()
,@(cdr response)
(:public
,(decl-type-info res-name)
,(def-constructor res-name (second response)))
(:serialize (:capnp)))
(:serialize (:slk) (:capnp)))
,rpc-decl))))
(defun read-lcp (filepath)

View File

@ -20,11 +20,13 @@ cpp<#
(lcp:define-struct tx-and-snapshot ()
((tx-id "TransactionId")
(snapshot "Snapshot"
:slk-save #'slk-save-snapshot
:slk-load #'slk-load-snapshot
:capnp-type "List(UInt64)"
:capnp-init nil
:capnp-save #'save-snapshot
:capnp-load #'load-snapshot))
(:serialize (:capnp)))
(:serialize (:slk) (:capnp)))
(lcp:define-rpc begin
(:request ())
@ -45,6 +47,8 @@ cpp<#
(lcp:define-rpc snapshot
(:request ((member "TransactionId")))
(:response ((member "Snapshot"
:slk-save #'slk-save-snapshot
:slk-load #'slk-load-snapshot
:capnp-type "List(UInt64)"
:capnp-init nil
:capnp-save #'save-snapshot
@ -57,6 +61,8 @@ cpp<#
(lcp:define-rpc gc-snapshot
(:request ())
(:response ((member "Snapshot"
:slk-save #'slk-save-snapshot
:slk-load #'slk-load-snapshot
:capnp-type "List(UInt64)"
:capnp-init nil
:capnp-save #'save-snapshot
@ -65,6 +71,8 @@ cpp<#
(lcp:define-rpc clog-info
(:request ((member "TransactionId")))
(:response ((member "CommitLog::Info"
:slk-save #'slk-save-commitlog-info
:slk-load #'slk-load-commitlog-info
:capnp-type "UInt8"
:capnp-init nil
:capnp-save #'save-commitlog-info
@ -73,6 +81,8 @@ cpp<#
(lcp:define-rpc active-transactions
(:request ())
(:response ((member "Snapshot"
:slk-save #'slk-save-snapshot
:slk-load #'slk-load-snapshot
:capnp-type "List(UInt64)"
:capnp-init nil
:capnp-save #'save-snapshot

View File

@ -1,5 +1,17 @@
;; This file doesn't need to be preprocessed. It only holds helper functions.
(defun slk-save-commitlog-info (member)
#>cpp
slk::Save(static_cast<uint8_t>(self.${member}), builder);
cpp<#)
(defun slk-load-commitlog-info (member)
#>cpp
uint8_t info_flags;
slk::Load(&info_flags, reader);
self->${member} = tx::CommitLog::Info(info_flags);
cpp<#)
(defun save-commitlog-info (builder member capnp-name)
#>cpp
${builder}->set${capnp-name}(${member});
@ -10,6 +22,18 @@
${member} = CommitLog::Info(${reader}.get${capnp-name}());
cpp<#)
(defun slk-save-snapshot (member)
#>cpp
slk::Save(self.${member}.transaction_ids(), builder);
cpp<#)
(defun slk-load-snapshot (member)
#>cpp
std::vector<uint64_t> transaction_ids;
slk::Load(&transaction_ids, reader);
self->${member} = tx::Snapshot(std::move(transaction_ids));
cpp<#)
(defun save-snapshot (builder member capnp-name)
#>cpp
auto list_builder = builder->init${capnp-name}(${member}.transaction_ids().size());