memgraph/src/distributed/updates_rpc_messages.lcp
Teon Banek e0474a8e92 Replace boost with capnp in RPC
Summary:
Converts the RPC stack to use Cap'n Proto for serialization instead of
boost. There are still some traces of boost in other places in the code,
but most of it is removed. A future diff should cleanup boost for good.

The RPC API is now changed to be more flexible with regards to how
serialize data. This makes the simplest cases a bit more verbose, but
allows complex serialization code to be correctly written instead of
relying on hacks. (For reference, look for the old serialization of
`PullRpc` which had a nasty pointer hacks to inject accessors in
`TypedValue`.)

Since RPC messages were uselessly modeled via inheritance of Message
base class, that class is now removed. Furthermore, that approach
doesn't really work with Cap'n Proto. Instead, each message type is
required to have some type information. This can be automated, so
`define-rpc` has been added to LCP, which hopefully simplifies defining
new RPC request and response messages.

Specify Cap'n Proto schema ID in cmake

This preserves Cap'n Proto generated typeIds across multiple generations
of capnp schemas through LCP. It is imperative that typeId stays the
same to ensure that different compilations of Memgraph may communicate
via RPC in a distributed cluster.

Use CLOS for meta information on C++ types in LCP

Since some structure slots and functions have started to repeat
themselves, it makes sense to model C++ meta information via Common Lisp
Object System.

Depends on D1391

Reviewers: buda, dgleich, mferencevic, mtomic, mculinovic, msantl

Reviewed By: msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1407
2018-06-04 10:45:12 +02:00

188 lines
7.0 KiB
Plaintext

#>cpp
#pragma once
#include <unordered_map>
#include "communication/rpc/messages.hpp"
#include "database/state_delta.hpp"
#include "distributed/updates_rpc_messages.capnp.h"
#include "storage/address_types.hpp"
#include "storage/gid.hpp"
#include "transactions/type.hpp"
#include "utils/serialization.hpp"
cpp<#
(lcp:namespace distributed)
(lcp:capnp-namespace "distributed")
(lcp:capnp-import 'db "/database/state_delta.capnp")
(lcp:capnp-import 'dis "/distributed/serialization.capnp")
(lcp:capnp-import 'storage "/storage/serialization.capnp")
(lcp:capnp-import 'utils "/utils/serialization.capnp")
(lcp:capnp-type-conversion "tx::TransactionId" "UInt64")
(lcp:capnp-type-conversion "gid::Gid" "UInt64")
(lcp:capnp-type-conversion "storage::Label" "Storage.Common")
(lcp:capnp-type-conversion "storage::EdgeType" "Storage.Common")
(lcp:capnp-type-conversion "storage::Property" "Storage.Common")
(lcp:capnp-type-conversion "storage::EdgeAddress" "Storage.Address")
(lcp:capnp-type-conversion "storage::VertexAddress" "Storage.Address")
(lcp:define-enum update-result
(done
serialization-error
lock-timeout-error
update-deleted-error
unable-to-delete-vertex-error)
(:documentation "The result of sending or applying a deferred update to a worker.")
(:serialize))
(lcp:define-rpc update
(:request ((member "database::StateDelta" :capnp-type "Db.StateDelta")))
(:response ((member "UpdateResult"
:capnp-init nil
:capnp-save (lcp:capnp-save-enum "capnp::UpdateResult" "UpdateResult")
:capnp-load (lcp:capnp-load-enum "capnp::UpdateResult" "UpdateResult")))))
(lcp:define-rpc update-apply
(:request ((member "tx::TransactionId")))
(:response ((member "UpdateResult"
:capnp-init nil
:capnp-save (lcp:capnp-save-enum "capnp::UpdateResult" "UpdateResult")
:capnp-load (lcp:capnp-load-enum "capnp::UpdateResult" "UpdateResult")))))
(lcp:define-struct create-result ()
((result "UpdateResult"
:capnp-init nil
:capnp-save (lcp:capnp-save-enum "capnp::UpdateResult" "UpdateResult")
:capnp-load (lcp:capnp-load-enum "capnp::UpdateResult" "UpdateResult"))
(gid "gid::Gid" :documentation "Only valid if creation was successful."))
(:serialize :boost :capnp))
(lcp:define-struct create-vertex-req-data ()
((tx-id "tx::TransactionId")
(labels "std::vector<storage::Label>"
:capnp-save (lcp:capnp-save-vector "storage::capnp::Common" "storage::Label")
:capnp-load (lcp:capnp-load-vector "storage::capnp::Common" "storage::Label"))
(properties "std::unordered_map<storage::Property, query::TypedValue>"
:save-fun
#>cpp
ar << properties.size();
for (auto &kv : properties) {
ar << kv.first;
utils::SaveTypedValue(ar, kv.second);
}
cpp<#
:load-fun
#>cpp
size_t props_size;
ar >> props_size;
for (size_t i = 0; i < props_size; ++i) {
storage::Property p;
ar >> p;
query::TypedValue tv;
utils::LoadTypedValue(ar, tv);
properties.emplace(p, std::move(tv));
}
cpp<#
:capnp-type "Utils.Map(Storage.Common, Dis.TypedValue)"
:capnp-save
(lambda (builder member)
#>cpp
utils::SaveMap<storage::capnp::Common, capnp::TypedValue>(
${member}, &${builder},
[](auto *builder, const auto &entry) {
auto key_builder = builder->initKey();
entry.first.Save(&key_builder);
auto value_builder = builder->initValue();
utils::SaveCapnpTypedValue(entry.second, &value_builder);
});
cpp<#)
:capnp-load
(lambda (reader member)
#>cpp
utils::LoadMap<storage::capnp::Common, capnp::TypedValue>(
&${member}, ${reader},
[](const auto &reader) {
storage::Property prop;
prop.Load(reader.getKey());
query::TypedValue value;
utils::LoadCapnpTypedValue(reader.getValue(), &value);
return std::make_pair(prop, value);
});
cpp<#)))
(:serialize :capnp))
(lcp:define-rpc create-vertex
(:request ((member "CreateVertexReqData")))
(:response ((member "CreateResult"))))
(lcp:define-struct create-edge-req-data ()
((from "gid::Gid")
(to "storage::VertexAddress")
(edge-type "storage::EdgeType")
(tx-id "tx::TransactionId"))
(:serialize :capnp))
(lcp:define-rpc create-edge
(:request ((member "CreateEdgeReqData")))
(:response ((member "CreateResult"))))
(lcp:define-struct add-in-edge-req-data ()
((from "storage::VertexAddress")
(edge-address "storage::EdgeAddress")
(to "gid::Gid")
(edge-type "storage::EdgeType")
(tx-id "tx::TransactionId"))
(:serialize :capnp))
(lcp:define-rpc add-in-edge
(:request ((member "AddInEdgeReqData")))
(:response ((member "UpdateResult"
:capnp-init nil
:capnp-save (lcp:capnp-save-enum "capnp::UpdateResult" "UpdateResult")
:capnp-load (lcp:capnp-load-enum "capnp::UpdateResult" "UpdateResult")))))
(lcp:define-struct remove-vertex-req-data ()
((gid "gid::Gid")
(tx-id "tx::TransactionId")
(check-empty :bool))
(:serialize :capnp))
(lcp:define-rpc remove-vertex
(:request ((member "RemoveVertexReqData")))
(:response ((member "UpdateResult"
:capnp-init nil
:capnp-save (lcp:capnp-save-enum "capnp::UpdateResult" "UpdateResult")
:capnp-load (lcp:capnp-load-enum "capnp::UpdateResult" "UpdateResult")))))
(lcp:define-struct remove-edge-data ()
((tx-id "tx::TransactionId")
(edge-id "gid::Gid")
(vertex-from-id "gid::Gid")
(vertex-to-address "storage::VertexAddress"))
(:serialize :capnp))
(lcp:define-rpc remove-edge
(:request ((member "RemoveEdgeData")))
(:response ((member "UpdateResult"
:capnp-init nil
:capnp-save (lcp:capnp-save-enum "capnp::UpdateResult" "UpdateResult")
:capnp-load (lcp:capnp-load-enum "capnp::UpdateResult" "UpdateResult")))))
(lcp:define-struct remove-in-edge-data ()
((tx-id "tx::TransactionId")
(vertex "gid::Gid")
(edge-address "storage::EdgeAddress"))
(:serialize :capnp))
(lcp:define-rpc remove-in-edge
(:request ((member "RemoveInEdgeData")))
(:response ((member "UpdateResult"
:capnp-init nil
:capnp-save (lcp:capnp-save-enum "capnp::UpdateResult" "UpdateResult")
:capnp-load (lcp:capnp-load-enum "capnp::UpdateResult" "UpdateResult")))))
(lcp:pop-namespace) ;; distributed