diff --git a/src/distributed/bfs_rpc_messages.lcp b/src/distributed/bfs_rpc_messages.lcp index e1ecbe690..844a17a13 100644 --- a/src/distributed/bfs_rpc_messages.lcp +++ b/src/distributed/bfs_rpc_messages.lcp @@ -45,6 +45,16 @@ cpp<# :capnp-load (lcp:capnp-load-vector "::storage::capnp::EdgeType" "storage::EdgeType")) (filter-lambda "query::plan::ExpansionLambda" + :slk-save (lambda (member) + #>cpp + std::vector saved_ast_uids; + slk::Save(self.${member}, builder, &saved_ast_uids); + cpp<#) + :slk-load (lambda (member) + #>cpp + std::vector loaded_ast_uids; + slk::Load(&self->${member}, reader, ast_storage, &loaded_ast_uids); + cpp<#) :capnp-type "DistOps.ExpansionLambda" :capnp-save (lambda (builder member capnp-name) #>cpp @@ -67,7 +77,8 @@ cpp<# #>cpp query::LoadEvaluationContext(${reader}, &${member}); cpp<#))) - (:serialize (:capnp :load-args '((ast-storage "query::AstStorage *"))))) + (:serialize (:slk :load-args '((ast-storage "query::AstStorage *"))) + (:capnp :load-args '((ast-storage "query::AstStorage *"))))) (:response ((member :int64_t)))) (lcp:define-rpc register-subcursors @@ -120,6 +131,22 @@ cpp<# (:request ((member :int64_t))) (:response ((vertex "std::experimental::optional" + :slk-save (lambda (member) + #>cpp + slk::Save(static_cast(self.${member}), builder); + if (self.${member}) { + slk::Save(*self.${member}, builder, + storage::SendVersions::BOTH, worker_id); + } + cpp<#) + :slk-load (lambda (member) + #>cpp + bool has_value; + slk::Load(&has_value, reader); + if (has_value) { + self->${member} = slk::LoadVertexAccessor(reader, dba, data_manager); + } + cpp<#) :capnp-type "Utils.Optional(Storage.VertexAccessor)" :capnp-save (lcp:capnp-save-optional "storage::capnp::VertexAccessor" @@ -133,8 +160,11 @@ cpp<# "[dba, data_manager](const auto &reader) { return storage::LoadVertexAccessor(reader, dba, data_manager); }"))) - (:serialize (:capnp - :save-args '((worker-id :int)) + (:serialize (:slk :save-args '((worker-id :int16_t)) + :load-args '((dba "database::GraphDbAccessor *") + (data-manager "distributed::DataManager *"))) + (:capnp + :save-args '((worker-id :int16_t)) :load-args '((dba "database::GraphDbAccessor *") (data-manager "distributed::DataManager *")))))) @@ -177,7 +207,25 @@ cpp<# edge(edge) {} cpp<#)) (:response - ((edges "std::vector" :capnp-type "List(Storage.EdgeAccessor)" + ((edges "std::vector" + :slk-save (lambda (member) + #>cpp + size_t size = self.${member}.size(); + slk::Save(size, builder); + for (const auto &v : self.${member}) { + slk::Save(v, builder, storage::SendVersions::BOTH, worker_id); + } + cpp<#) + :slk-load (lambda (member) + #>cpp + size_t size; + slk::Load(&size, reader); + self->${member}.reserve(size); + for (size_t i = 0; i < size; ++i) { + self->${member}.push_back(slk::LoadEdgeAccessor(reader, dba, data_manager)); + } + cpp<#) + :capnp-type "List(Storage.EdgeAccessor)" :capnp-save (lcp:capnp-save-vector "storage::capnp::EdgeAccessor" "EdgeAccessor" "[worker_id](auto *builder, const auto &edge) { @@ -194,8 +242,11 @@ cpp<# (next-edge "std::experimental::optional" :capnp-save (lcp:capnp-save-optional "storage::capnp::Address" "storage::EdgeAddress") :capnp-load (lcp:capnp-load-optional "storage::capnp::Address" "storage::EdgeAddress"))) - (:serialize (:capnp - :save-args '((worker-id :int)) + (:serialize (:slk :save-args '((worker-id :int16_t)) + :load-args '((dba "database::GraphDbAccessor *") + (data-manager "distributed::DataManager *"))) + (:capnp + :save-args '((worker-id :int16_t)) :load-args '((dba "database::GraphDbAccessor *") (data-manager "distributed::DataManager *")))) (:public @@ -219,6 +270,23 @@ cpp<# ((subcursor-id :int64_t) (clear :bool) (frame "std::vector" + :slk-save (lambda (member) + #>cpp + size_t size = self.${member}.size(); + slk::Save(size, builder); + for (const auto &v : self.${member}) { + slk::Save(v, builder, storage::SendVersions::ONLY_OLD, self.worker_id); + } + cpp<#) + :slk-load (lambda (member) + #>cpp + size_t size; + slk::Load(&size, reader); + self->${member}.resize(size); + for (size_t i = 0; i < size; ++i) { + slk::Load(&self->${member}[i], reader, dba, data_manager); + } + cpp<#) :capnp-type "List(Query.TypedValue)" :capnp-save (lcp:capnp-save-vector "query::capnp::TypedValue" @@ -235,8 +303,10 @@ cpp<# query::LoadCapnpTypedValue(reader, &value, dba, data_manager); return value; }")) - (worker-id :int :dont-save t)) - (:serialize (:capnp + (worker-id :int16_t :dont-save t)) + (:serialize (:slk :load-args '((dba "database::GraphDbAccessor *") + (data-manager "distributed::DataManager *"))) + (:capnp :load-args '((dba "database::GraphDbAccessor *") (data-manager "distributed::DataManager *"))))) (:response ())) diff --git a/src/distributed/data_rpc_messages.lcp b/src/distributed/data_rpc_messages.lcp index 81d92011b..32e230e98 100644 --- a/src/distributed/data_rpc_messages.lcp +++ b/src/distributed/data_rpc_messages.lcp @@ -23,7 +23,7 @@ cpp<# (lcp:define-struct tx-gid-pair () ((tx-id "tx::TransactionId" :capnp-type "UInt64") (gid "gid::Gid" :capnp-type "UInt64")) - (:serialize (:capnp))) + (:serialize (:slk) (:capnp))) (lcp:define-rpc vertex (:request ((member "TxGidPair"))) @@ -37,11 +37,23 @@ cpp<# #>cpp storage::SaveVertex(*${member}, &${builder}, self.worker_id); cpp<#) + :slk-save + (lambda (member) + #>cpp + slk::Save(*self.${member}, builder, self.worker_id); + cpp<#) :capnp-load (lambda (reader member capnp-name) (declare (ignore member capnp-name)) #>cpp self->vertex_output = storage::LoadVertex(${reader}); + cpp<#) + :slk-load + (lambda (member) + (declare (ignore member)) + #>cpp + self->vertex_output = std::make_unique(); + slk::Load(self->vertex_output.get(), reader); cpp<#)) (worker-id :int64_t :dont-save t) (vertex-output "std::unique_ptr" :initarg nil :dont-save t)))) @@ -58,11 +70,21 @@ cpp<# #>cpp storage::SaveEdge(*${member}, &${builder}, self.worker_id); cpp<#) + :slk-save + (lambda (member) + #>cpp + slk::Save(*self.${member}, builder, self.worker_id); + cpp<#) :capnp-load (lambda (reader member capnp-name) (declare (ignore member capnp-name)) #>cpp self->edge_output = storage::LoadEdge(${reader}); + cpp<#) + :slk-load + (lambda (member) + #>cpp + slk::Load(&self->edge_output, reader); cpp<#)) (worker-id :int64_t :dont-save t) (edge-output "std::unique_ptr" :initarg nil :dont-save t)))) diff --git a/src/distributed/plan_rpc_messages.lcp b/src/distributed/plan_rpc_messages.lcp index 41b2d30e3..d25ba4c7f 100644 --- a/src/distributed/plan_rpc_messages.lcp +++ b/src/distributed/plan_rpc_messages.lcp @@ -30,6 +30,26 @@ cpp<# self->storage = std::move(helper.ast_storage); cpp<#) +(defun slk-save-plan (member) + #>cpp + query::plan::LogicalOperator::SaveHelper helper; + slk::Save( + self.${member}, builder, &helper.saved_ops, + [&helper](const auto &val, auto *builder) { + slk::Save(val, builder, &helper); + }); + cpp<#) + +(defun slk-load-plan (member) + #>cpp + query::plan::LogicalOperator::SlkLoadHelper helper; + slk::Load(&self->${member}, reader, &helper.loaded_ops, + [&helper](auto *op, auto *reader) { + slk::ConstructAndLoad(op, reader, &helper); + }); + self->storage = std::move(helper.ast_storage); + cpp<#) + (defun save-plan (builder member capnp-name) (declare (ignore capnp-name)) #>cpp @@ -46,6 +66,8 @@ cpp<# ((plan-id :int64_t) (plan "std::shared_ptr" :capnp-type "Utils.SharedPtr(Plan.LogicalOperator)" + :slk-save #'slk-save-plan + :slk-load #'slk-load-plan :capnp-save #'save-plan :capnp-load #'load-plan) (symbol-table "query::SymbolTable" :capnp-type "Query.SymbolTable") (storage "query::AstStorage" :initarg nil :dont-save t))) diff --git a/src/distributed/pull_produce_rpc_messages.lcp b/src/distributed/pull_produce_rpc_messages.lcp index 861e5502d..5f343327a 100644 --- a/src/distributed/pull_produce_rpc_messages.lcp +++ b/src/distributed/pull_produce_rpc_messages.lcp @@ -67,9 +67,40 @@ RPC. Indicates the state of execution on the worker.") "The data returned to the end consumer (the Pull operator). Contains only the relevant parts of the response, ready for use.")) +(defun slk-save-frames (member) + #>cpp + size_t frame_count = self.${member}.size(); + slk::Save(frame_count, builder); + for (const auto &frame : self.${member}) { + size_t frame_size = frame.size(); + slk::Save(frame_size, builder); + for (const auto &value : frame) { + slk::Save(value, builder, self.send_versions, self.worker_id); + } + } + cpp<#) + +(defun slk-load-frames (member) + #>cpp + size_t frame_count = 0; + slk::Load(&frame_count, reader); + self->${member}.reserve(frame_count); + for (size_t frame_i = 0; frame_i < frame_count; ++frame_i) { + size_t frame_size = 0; + slk::Load(&frame_size, reader); + std::vector frame(frame_size); + for (size_t val_i = 0; val_i < frame_size; ++val_i) { + slk::Load(&frame[val_i], reader, dba, data_manager); + } + self->${member}.emplace_back(std::move(frame)); + } + cpp<#) + (lcp:define-struct pull-res-data () ((pull-state "PullState") (frames "std::vector>" + :slk-save #'slk-save-frames + :slk-load #'slk-load-frames :capnp-type "List(List(Query.TypedValue))" :capnp-save (lambda (builder member capnp-name) @@ -174,13 +205,17 @@ to the appropriate value. Not used on side that generates the response.") PullResData(PullResData &&) = default; PullResData &operator=(PullResData &&) = default; cpp<#) - (:serialize (:capnp :load-args '((dba "database::GraphDbAccessor *") + (:serialize (:slk :load-args '((dba "database::GraphDbAccessor *") + (data-manager "distributed::DataManager *"))) + (:capnp :load-args '((dba "database::GraphDbAccessor *") (data-manager "distributed::DataManager *"))))) (lcp:define-rpc pull (:request ((tx-id "tx::TransactionId") (tx-snapshot "tx::Snapshot" + :slk-save #'slk-save-snapshot + :slk-load #'slk-load-snapshot :capnp-type "List(UInt64)" :capnp-init nil :capnp-save #'save-snapshot @@ -210,9 +245,15 @@ to the appropriate value. Not used on side that generates the response.") "storage::SendVersions" '(both only-old only-new))))) (:response - ((data "PullResData" :initarg :move)) - (:serialize (:capnp :base t :load-args '((dba "database::GraphDbAccessor *") - (data-manager "distributed::DataManager *")))))) + ((data "PullResData" :initarg :move + :slk-load (lambda (m) + #>cpp + slk::Load(&self->${m}, reader, dba, data_manager); + cpp<#))) + (:serialize (:slk :load-args '((dba "database::GraphDbAccessor *") + (data-manager "distributed::DataManager *"))) + (:capnp :load-args '((dba "database::GraphDbAccessor *") + (data-manager "distributed::DataManager *")))))) ;; TODO make a separate RPC for the continuation of an existing pull, as an ;; optimization not to have to send the full PullReqData pack every time. diff --git a/src/distributed/updates_rpc_messages.lcp b/src/distributed/updates_rpc_messages.lcp index a3ab71aab..3ba795266 100644 --- a/src/distributed/updates_rpc_messages.lcp +++ b/src/distributed/updates_rpc_messages.lcp @@ -51,7 +51,7 @@ cpp<# ((result "UpdateResult") (cypher-id :int64_t :documentation "Only valid if creation was successful.") (gid "gid::Gid" :documentation "Only valid if creation was successful.")) - (:serialize (:capnp))) + (:serialize (:slk) (:capnp))) (lcp:define-struct create-vertex-req-data () ((tx-id "tx::TransactionId") @@ -107,7 +107,7 @@ cpp<# return reader.getValue(); }); cpp<#))) - (:serialize (:capnp))) + (:serialize (:slk) (:capnp))) (lcp:define-rpc create-vertex (:request ((member "CreateVertexReqData"))) @@ -138,7 +138,7 @@ cpp<# return reader.getValue(); }); cpp<#))) - (:serialize (:capnp))) + (:serialize (:slk) (:capnp))) (lcp:define-rpc create-edge (:request ((member "CreateEdgeReqData"))) @@ -150,7 +150,7 @@ cpp<# (to "gid::Gid") (edge-type "storage::EdgeType") (tx-id "tx::TransactionId")) - (:serialize (:capnp))) + (:serialize (:slk) (:capnp))) (lcp:define-rpc add-in-edge (:request ((member "AddInEdgeReqData"))) @@ -160,7 +160,7 @@ cpp<# ((gid "gid::Gid") (tx-id "tx::TransactionId") (check-empty :bool)) - (:serialize (:capnp))) + (:serialize (:slk) (:capnp))) (lcp:define-rpc remove-vertex (:request ((member "RemoveVertexReqData"))) @@ -171,7 +171,7 @@ cpp<# (edge-id "gid::Gid") (vertex-from-id "gid::Gid") (vertex-to-address "storage::VertexAddress")) - (:serialize (:capnp))) + (:serialize (:slk) (:capnp))) (lcp:define-rpc remove-edge (:request ((member "RemoveEdgeData"))) @@ -181,7 +181,7 @@ cpp<# ((tx-id "tx::TransactionId") (vertex "gid::Gid") (edge-address "storage::EdgeAddress")) - (:serialize (:capnp))) + (:serialize (:slk) (:capnp))) (lcp:define-rpc remove-in-edge (:request ((member "RemoveInEdgeData"))) diff --git a/src/lisp/lcp.lisp b/src/lisp/lcp.lisp index 597db933c..61141f3d4 100644 --- a/src/lisp/lcp.lisp +++ b/src/lisp/lcp.lisp @@ -1333,13 +1333,13 @@ enums which aren't defined in LCP." (:public ,(decl-type-info req-name) ,(def-constructor req-name (second request))) - (:serialize (:capnp))) + (:serialize (:slk) (:capnp))) (define-struct ,res-sym () ,@(cdr response) (:public ,(decl-type-info res-name) ,(def-constructor res-name (second response))) - (:serialize (:capnp))) + (:serialize (:slk) (:capnp))) ,rpc-decl)))) (defun read-lcp (filepath) diff --git a/src/transactions/distributed/engine_rpc_messages.lcp b/src/transactions/distributed/engine_rpc_messages.lcp index 83cf0602e..3a95a06dc 100644 --- a/src/transactions/distributed/engine_rpc_messages.lcp +++ b/src/transactions/distributed/engine_rpc_messages.lcp @@ -20,11 +20,13 @@ cpp<# (lcp:define-struct tx-and-snapshot () ((tx-id "TransactionId") (snapshot "Snapshot" + :slk-save #'slk-save-snapshot + :slk-load #'slk-load-snapshot :capnp-type "List(UInt64)" :capnp-init nil :capnp-save #'save-snapshot :capnp-load #'load-snapshot)) - (:serialize (:capnp))) + (:serialize (:slk) (:capnp))) (lcp:define-rpc begin (:request ()) @@ -45,6 +47,8 @@ cpp<# (lcp:define-rpc snapshot (:request ((member "TransactionId"))) (:response ((member "Snapshot" + :slk-save #'slk-save-snapshot + :slk-load #'slk-load-snapshot :capnp-type "List(UInt64)" :capnp-init nil :capnp-save #'save-snapshot @@ -57,6 +61,8 @@ cpp<# (lcp:define-rpc gc-snapshot (:request ()) (:response ((member "Snapshot" + :slk-save #'slk-save-snapshot + :slk-load #'slk-load-snapshot :capnp-type "List(UInt64)" :capnp-init nil :capnp-save #'save-snapshot @@ -65,6 +71,8 @@ cpp<# (lcp:define-rpc clog-info (:request ((member "TransactionId"))) (:response ((member "CommitLog::Info" + :slk-save #'slk-save-commitlog-info + :slk-load #'slk-load-commitlog-info :capnp-type "UInt8" :capnp-init nil :capnp-save #'save-commitlog-info @@ -73,6 +81,8 @@ cpp<# (lcp:define-rpc active-transactions (:request ()) (:response ((member "Snapshot" + :slk-save #'slk-save-snapshot + :slk-load #'slk-load-snapshot :capnp-type "List(UInt64)" :capnp-init nil :capnp-save #'save-snapshot diff --git a/src/transactions/distributed/serialization.lcp b/src/transactions/distributed/serialization.lcp index 685e323b9..bc4c57868 100644 --- a/src/transactions/distributed/serialization.lcp +++ b/src/transactions/distributed/serialization.lcp @@ -1,5 +1,17 @@ ;; This file doesn't need to be preprocessed. It only holds helper functions. +(defun slk-save-commitlog-info (member) + #>cpp + slk::Save(static_cast(self.${member}), builder); + cpp<#) + +(defun slk-load-commitlog-info (member) + #>cpp + uint8_t info_flags; + slk::Load(&info_flags, reader); + self->${member} = tx::CommitLog::Info(info_flags); + cpp<#) + (defun save-commitlog-info (builder member capnp-name) #>cpp ${builder}->set${capnp-name}(${member}); @@ -10,6 +22,18 @@ ${member} = CommitLog::Info(${reader}.get${capnp-name}()); cpp<#) +(defun slk-save-snapshot (member) + #>cpp + slk::Save(self.${member}.transaction_ids(), builder); + cpp<#) + +(defun slk-load-snapshot (member) + #>cpp + std::vector transaction_ids; + slk::Load(&transaction_ids, reader); + self->${member} = tx::Snapshot(std::move(transaction_ids)); + cpp<#) + (defun save-snapshot (builder member capnp-name) #>cpp auto list_builder = builder->init${capnp-name}(${member}.transaction_ids().size());