From 54fa46541e848e24c4432a088bb11e36012403e3 Mon Sep 17 00:00:00 2001 From: Matija Santl <matija.santl@memgraph.com> Date: Wed, 7 Nov 2018 13:41:39 +0100 Subject: [PATCH] Serialize `StateDelta` for HA Summary: Since we need to send `StateDelta`s over the wire in HA, we need to be able to serialize those bad boys. This diff hopefully does this the right way. Reviewers: teon.banek, mferencevic, ipaljak Reviewed By: teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1725 --- .gitignore | 2 + cmake/functions.cmake | 9 +- src/CMakeLists.txt | 59 ++++++++----- src/communication/CMakeLists.txt | 2 +- src/database/single_node_ha/serialization.lcp | 10 +++ src/durability/distributed/state_delta.lcp | 3 +- src/durability/single_node/state_delta.lcp | 4 +- src/durability/single_node_ha/state_delta.lcp | 41 ++++++--- src/io/CMakeLists.txt | 2 +- src/stats/CMakeLists.txt | 2 +- .../single_node_ha/rpc/serialization.capnp | 34 +++++++ .../single_node_ha/rpc/serialization.cpp | 88 +++++++++++++++++++ .../single_node_ha/rpc/serialization.hpp | 25 ++++++ src/utils/CMakeLists.txt | 2 +- 14 files changed, 236 insertions(+), 47 deletions(-) create mode 100644 src/database/single_node_ha/serialization.lcp create mode 100644 src/storage/single_node_ha/rpc/serialization.capnp create mode 100644 src/storage/single_node_ha/rpc/serialization.cpp create mode 100644 src/storage/single_node_ha/rpc/serialization.hpp diff --git a/.gitignore b/.gitignore index 7640b7b97..b0877e428 100644 --- a/.gitignore +++ b/.gitignore @@ -51,6 +51,8 @@ src/database/distributed/counters_rpc_messages.capnp src/database/distributed/counters_rpc_messages.hpp src/database/distributed/serialization.capnp src/database/distributed/serialization.hpp +src/database/single_node_ha/serialization.capnp +src/database/single_node_ha/serialization.hpp src/distributed/bfs_rpc_messages.capnp src/distributed/bfs_rpc_messages.hpp src/distributed/coordination_rpc_messages.capnp diff --git a/cmake/functions.cmake b/cmake/functions.cmake index 2891a67ab..777ddd4f2 100644 --- a/cmake/functions.cmake +++ b/cmake/functions.cmake @@ -88,14 +88,15 @@ endfunction() # Define `add_capnp` function for registering a capnp file for generation. # -# The `define_add_capnp` expects 2 arguments: +# The `define_add_capnp` expects 3 arguments: +# * name -- name for the function, you usually want `add_capnp` # * main_src_files -- variable to be updated with generated cpp files # * generated_capnp_files -- variable to be updated with generated hpp and cpp files # # The `add_capnp` function expects a single argument, path to capnp file. # Each added file is standalone and we avoid recompiling everything. -macro(define_add_capnp main_src_files generated_capnp_files) - function(add_capnp capnp_src_file) +macro(define_add_capnp name main_src_files generated_capnp_files) + function(${name} capnp_src_file) set(cpp_file ${CMAKE_CURRENT_SOURCE_DIR}/${capnp_src_file}.c++) set(h_file ${CMAKE_CURRENT_SOURCE_DIR}/${capnp_src_file}.h) add_custom_command(OUTPUT ${cpp_file} ${h_file} @@ -106,5 +107,5 @@ macro(define_add_capnp main_src_files generated_capnp_files) set(${generated_capnp_files} ${${generated_capnp_files}} ${cpp_file} ${h_file} PARENT_SCOPE) # Update *global* main_src_files set(${main_src_files} ${${main_src_files}} ${cpp_file} PARENT_SCOPE) - endfunction(add_capnp) + endfunction(${name}) endmacro(define_add_capnp) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 3b1dbcba8..e08bebb5a 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -161,66 +161,66 @@ set(mg_distributed_sources # ----------------------------------------------------------------------------- -define_add_capnp(mg_distributed_sources generated_capnp_files) +define_add_capnp(add_capnp_distributed mg_distributed_sources generated_capnp_files) define_add_lcp(add_lcp_distributed mg_distributed_sources generated_lcp_distributed_files) add_lcp_distributed(durability/distributed/state_delta.lcp) add_lcp_distributed(database/distributed/counters_rpc_messages.lcp CAPNP_SCHEMA @0x95a2c3ea3871e945) -add_capnp(database/distributed/counters_rpc_messages.capnp) +add_capnp_distributed(database/distributed/counters_rpc_messages.capnp) add_lcp_distributed(database/distributed/serialization.lcp CAPNP_SCHEMA @0xdea01657b3563887 DEPENDS durability/distributed/state_delta.lcp) -add_capnp(database/distributed/serialization.capnp) +add_capnp_distributed(database/distributed/serialization.capnp) add_lcp_distributed(distributed/bfs_rpc_messages.lcp CAPNP_SCHEMA @0x8e508640b09b6d2a) -add_capnp(distributed/bfs_rpc_messages.capnp) +add_capnp_distributed(distributed/bfs_rpc_messages.capnp) add_lcp_distributed(distributed/coordination_rpc_messages.lcp CAPNP_SCHEMA @0x93df0c4703cf98fb) -add_capnp(distributed/coordination_rpc_messages.capnp) +add_capnp_distributed(distributed/coordination_rpc_messages.capnp) add_lcp_distributed(distributed/data_rpc_messages.lcp CAPNP_SCHEMA @0xc1c8a341ba37aaf5) -add_capnp(distributed/data_rpc_messages.capnp) +add_capnp_distributed(distributed/data_rpc_messages.capnp) add_lcp_distributed(distributed/durability_rpc_messages.lcp CAPNP_SCHEMA @0xf5e53bc271e2163d) -add_capnp(distributed/durability_rpc_messages.capnp) +add_capnp_distributed(distributed/durability_rpc_messages.capnp) add_lcp_distributed(distributed/index_rpc_messages.lcp CAPNP_SCHEMA @0xa8aab46862945bd6) -add_capnp(distributed/index_rpc_messages.capnp) +add_capnp_distributed(distributed/index_rpc_messages.capnp) add_lcp_distributed(distributed/plan_rpc_messages.lcp CAPNP_SCHEMA @0xfcbc48dc9f106d28) -add_capnp(distributed/plan_rpc_messages.capnp) +add_capnp_distributed(distributed/plan_rpc_messages.capnp) add_lcp_distributed(distributed/pull_produce_rpc_messages.lcp CAPNP_SCHEMA @0xa78a9254a73685bd DEPENDS transactions/distributed/serialization.lcp) -add_capnp(distributed/pull_produce_rpc_messages.capnp) +add_capnp_distributed(distributed/pull_produce_rpc_messages.capnp) add_lcp_distributed(distributed/storage_gc_rpc_messages.lcp CAPNP_SCHEMA @0xd705663dfe36cf81) -add_capnp(distributed/storage_gc_rpc_messages.capnp) +add_capnp_distributed(distributed/storage_gc_rpc_messages.capnp) add_lcp_distributed(distributed/token_sharing_rpc_messages.lcp CAPNP_SCHEMA @0x8f295db54ec4caec) -add_capnp(distributed/token_sharing_rpc_messages.capnp) +add_capnp_distributed(distributed/token_sharing_rpc_messages.capnp) add_lcp_distributed(distributed/updates_rpc_messages.lcp CAPNP_SCHEMA @0x82d5f38d73c7b53a) -add_capnp(distributed/updates_rpc_messages.capnp) +add_capnp_distributed(distributed/updates_rpc_messages.capnp) add_lcp_distributed(distributed/dynamic_worker_rpc_messages.lcp CAPNP_SCHEMA @0x8c53f6c9a0c71b05) -add_capnp(distributed/dynamic_worker_rpc_messages.capnp) +add_capnp_distributed(distributed/dynamic_worker_rpc_messages.capnp) # distributed_ops.lcp is leading the capnp code generation, so we don't need # to generate any capnp for operator.lcp add_lcp_distributed(query/frontend/ast/ast.lcp) add_lcp_distributed(query/frontend/ast/ast_serialization.lcp CAPNP_SCHEMA @0xb107d3d6b4b1600b DEPENDS query/frontend/ast/ast.lcp) -add_capnp(query/frontend/ast/ast_serialization.capnp) +add_capnp_distributed(query/frontend/ast/ast_serialization.capnp) add_lcp_distributed(query/plan/operator.lcp) add_lcp_distributed(query/plan/distributed_ops.lcp CAPNP_SCHEMA @0xe5cae8d045d30c42 DEPENDS query/plan/operator.lcp) -add_capnp(query/plan/distributed_ops.capnp) +add_capnp_distributed(query/plan/distributed_ops.capnp) add_lcp_distributed(storage/distributed/rpc/concurrent_id_mapper_rpc_messages.lcp CAPNP_SCHEMA @0xa6068dae93d225dd) -add_capnp(storage/distributed/rpc/concurrent_id_mapper_rpc_messages.capnp) +add_capnp_distributed(storage/distributed/rpc/concurrent_id_mapper_rpc_messages.capnp) add_lcp_distributed(transactions/distributed/engine_rpc_messages.lcp CAPNP_SCHEMA @0xde02b7c49180cad5 DEPENDS transactions/distributed/serialization.lcp) -add_capnp(transactions/distributed/engine_rpc_messages.capnp) +add_capnp_distributed(transactions/distributed/engine_rpc_messages.capnp) add_custom_target(generate_lcp_distributed DEPENDS ${generated_lcp_distributed_files}) # Registering capnp must come after registering lcp files. -add_capnp(communication/rpc/messages.capnp) -add_capnp(durability/distributed/serialization.capnp) -add_capnp(query/frontend/semantic/symbol.capnp) -add_capnp(query/serialization.capnp) -add_capnp(storage/distributed/rpc/serialization.capnp) +add_capnp_distributed(communication/rpc/messages.capnp) +add_capnp_distributed(durability/distributed/serialization.capnp) +add_capnp_distributed(query/frontend/semantic/symbol.capnp) +add_capnp_distributed(query/serialization.capnp) +add_capnp_distributed(storage/distributed/rpc/serialization.capnp) add_custom_target(generate_capnp DEPENDS generate_lcp_distributed ${generated_capnp_files}) @@ -275,19 +275,31 @@ set(mg_single_node_ha_sources storage/common/locking/record_lock.cpp storage/single_node_ha/edge_accessor.cpp storage/single_node_ha/record_accessor.cpp + storage/single_node_ha/rpc/serialization.cpp storage/single_node_ha/vertex_accessor.cpp transactions/single_node_ha/engine.cpp memgraph_init.cpp ) +define_add_capnp(add_capnp_single_node_ha mg_single_node_ha_sources generated_capnp_single_node_ha_files) + define_add_lcp(add_lcp_single_node_ha mg_single_node_ha_sources generated_lcp_single_node_ha_files) add_lcp_single_node_ha(durability/single_node_ha/state_delta.lcp) +add_lcp_single_node_ha(database/single_node_ha/serialization.lcp CAPNP_SCHEMA @0xd0f4a502575fb6f7 + DEPENDS durability/single_node_ha/state_delta.lcp) +add_capnp_single_node_ha(database/single_node_ha/serialization.capnp) add_lcp_single_node_ha(query/frontend/ast/ast.lcp) add_lcp_single_node_ha(query/plan/operator.lcp) add_custom_target(generate_lcp_single_node_ha DEPENDS ${generated_lcp_single_node_ha_files}) +# Registering capnp must come after registering lcp files. + +add_capnp_single_node_ha(storage/single_node_ha/rpc/serialization.capnp) + +add_custom_target(generate_capnp_single_node_ha DEPENDS generate_lcp_single_node_ha ${generated_capnp_single_node_ha_files}) + set(MG_SINGLE_NODE_HA_LIBS stdc++fs Threads::Threads fmt cppitertools antlr_opencypher_parser_lib dl glog gflags capnp kj mg-utils mg-io mg-integrations-kafka mg-requests mg-communication mg-auth mg-stats) @@ -306,6 +318,7 @@ add_library(mg-single-node-ha STATIC ${mg_single_node_ha_sources}) target_link_libraries(mg-single-node-ha ${MG_SINGLE_NODE_HA_LIBS}) add_dependencies(mg-single-node-ha generate_opencypher_parser) add_dependencies(mg-single-node-ha generate_lcp_single_node_ha) +add_dependencies(mg-single-node-ha generate_capnp_single_node_ha) target_compile_definitions(mg-single-node-ha PUBLIC MG_SINGLE_NODE_HA) # ---------------------------------------------------------------------------- diff --git a/src/communication/CMakeLists.txt b/src/communication/CMakeLists.txt index d9cbfe6a8..c87219504 100644 --- a/src/communication/CMakeLists.txt +++ b/src/communication/CMakeLists.txt @@ -9,7 +9,7 @@ set(communication_src_files rpc/protocol.cpp rpc/server.cpp) -define_add_capnp(communication_src_files communication_capnp_files) +define_add_capnp(add_capnp communication_src_files communication_capnp_files) add_capnp(rpc/messages.capnp) diff --git a/src/database/single_node_ha/serialization.lcp b/src/database/single_node_ha/serialization.lcp new file mode 100644 index 000000000..51f1b07e0 --- /dev/null +++ b/src/database/single_node_ha/serialization.lcp @@ -0,0 +1,10 @@ +#>cpp +#pragma once + +#include "database/single_node_ha/serialization.capnp.h" +#include "durability/single_node_ha/state_delta.hpp" +#include "storage/single_node_ha/rpc/serialization.hpp" +cpp<# + +;; Generate serialization of state-delta +(load "durability/single_node_ha/state_delta.lcp") diff --git a/src/durability/distributed/state_delta.lcp b/src/durability/distributed/state_delta.lcp index 02c72c996..16cf79484 100644 --- a/src/durability/distributed/state_delta.lcp +++ b/src/durability/distributed/state_delta.lcp @@ -68,8 +68,7 @@ cpp<# (check-empty :bool)) (:documentation "Describes single change to the database state. Used for durability (WAL) and -state communication over network in HA and for distributed remote storage -changes. +for distributed remote storage changes. Labels, Properties and EdgeTypes are stored both as values (integers) and strings (their names). The values are used when applying deltas in a running diff --git a/src/durability/single_node/state_delta.lcp b/src/durability/single_node/state_delta.lcp index c7b6760a3..006ab5a82 100644 --- a/src/durability/single_node/state_delta.lcp +++ b/src/durability/single_node/state_delta.lcp @@ -45,9 +45,7 @@ cpp<# (check-empty :bool) (unique :bool)) (:documentation - "Describes single change to the database state. Used for durability (WAL) and -state communication over network in HA and for distributed remote storage -changes. + "Describes single change to the database state. Used for durability (WAL). Labels, Properties and EdgeTypes are stored both as values (integers) and strings (their names). The values are used when applying deltas in a running diff --git a/src/durability/single_node_ha/state_delta.lcp b/src/durability/single_node_ha/state_delta.lcp index f4f6167a2..6a7da2829 100644 --- a/src/durability/single_node_ha/state_delta.lcp +++ b/src/durability/single_node_ha/state_delta.lcp @@ -20,34 +20,51 @@ cpp<# class GraphDbAccessor; cpp<# +(lcp:capnp-namespace "database") + +(lcp:capnp-import 'storage "/storage/single_node_ha/rpc/serialization.capnp") + +(lcp:capnp-type-conversion "tx::TransactionId" "UInt64") +(lcp:capnp-type-conversion "gid::Gid" "UInt64") +(lcp:capnp-type-conversion "storage::Label" "Storage.Common") +(lcp:capnp-type-conversion "storage::EdgeType" "Storage.Common") +(lcp:capnp-type-conversion "storage::Property" "Storage.Common") + (lcp:define-struct state-delta () ( ;; Members valid for every delta. (type "Type") (transaction-id "tx::TransactionId") ;; Members valid only for some deltas, see StateDelta::Type comments above. - ;; TODO: when preparing the WAL for distributed, most likely remove Gids and - ;; only keep addresses. (vertex-id "gid::Gid") (edge-id "gid::Gid") - (edge-address "mvcc::VersionList<Edge> *") (vertex-from-id "gid::Gid") - (vertex-from-address "mvcc::VersionList<Vertex> *") (vertex-to-id "gid::Gid") - (vertex-to-address "mvcc::VersionList<Vertex> *") (edge-type "storage::EdgeType") (edge-type-name "std::string") (property "storage::Property") (property-name "std::string") - (value "PropertyValue" :initval "PropertyValue::Null") + (value "PropertyValue" :initval "PropertyValue::Null" + :capnp-type "Storage.PropertyValue" + :capnp-save + (lambda (builder member capnp-name) + (declare (ignore capnp-name)) + #>cpp + storage::SaveCapnpPropertyValue(${member}, &${builder}); + cpp<#) + :capnp-load + (lambda (reader member capnp-name) + (declare (ignore capnp-name)) + #>cpp + storage::LoadCapnpPropertyValue(${reader}, &${member}); + cpp<#)) (label "storage::Label") (label-name "std::string") (check-empty :bool) (unique :bool)) (:documentation - "Describes single change to the database state. Used for durability (WAL) and -state communication over network in HA and for distributed remote storage -changes. + "Describes single change to the database state. Used for state communication +over network in HA. Labels, Properties and EdgeTypes are stored both as values (integers) and strings (their names). The values are used when applying deltas in a running @@ -76,7 +93,8 @@ in StateDeltas.") (:documentation "Defines StateDelta type. For each type the comment indicates which values need to be stored. All deltas have the transaction_id member, so that's -omitted in the comment.")) +omitted in the comment.") + (:serialize :capnp)) #>cpp StateDelta() = default; StateDelta(const enum Type &type, tx::TransactionId tx_id) @@ -133,6 +151,7 @@ omitted in the comment.")) /// Applies CRUD delta to database accessor. Fails on other types of deltas void Apply(GraphDbAccessor &dba) const; - cpp<#)) + cpp<#) + (:serialize :capnp)) (lcp:pop-namespace) ;; database diff --git a/src/io/CMakeLists.txt b/src/io/CMakeLists.txt index e6bd87933..5c3d683f1 100644 --- a/src/io/CMakeLists.txt +++ b/src/io/CMakeLists.txt @@ -4,7 +4,7 @@ set(io_src_files network/socket.cpp network/utils.cpp) -define_add_capnp(io_src_files io_capnp_files) +define_add_capnp(add_capnp io_src_files io_capnp_files) add_capnp(network/endpoint.capnp) diff --git a/src/stats/CMakeLists.txt b/src/stats/CMakeLists.txt index 514306ad7..5f49c7b56 100644 --- a/src/stats/CMakeLists.txt +++ b/src/stats/CMakeLists.txt @@ -2,7 +2,7 @@ set(stats_src_files metrics.cpp stats.cpp) -define_add_capnp(stats_src_files stats_capnp_files) +define_add_capnp(add_capnp stats_src_files stats_capnp_files) define_add_lcp(add_lcp stats_src_files stats_lcp_files) add_lcp(stats_rpc_messages.lcp CAPNP_SCHEMA @0xc19a87c81b9b4512) diff --git a/src/storage/single_node_ha/rpc/serialization.capnp b/src/storage/single_node_ha/rpc/serialization.capnp new file mode 100644 index 000000000..27d780db3 --- /dev/null +++ b/src/storage/single_node_ha/rpc/serialization.capnp @@ -0,0 +1,34 @@ +@0x8424471a44ccd2df; + +using Cxx = import "/capnp/c++.capnp"; +$Cxx.namespace("storage::capnp"); + +struct Common { + storage @0 :UInt16; + union { + label @1 :Label; + edgeType @2 :EdgeType; + property @3 :Property; + } +} + +struct Label {} +struct EdgeType {} +struct Property {} + +struct PropertyValue { + union { + nullType @0 :Void; + bool @1 :Bool; + integer @2 :Int64; + double @3 :Float64; + string @4 :Text; + list @5 :List(PropertyValue); + map @6 :List(MapEntry); + } + + struct MapEntry { + key @0 :Text; + value @1 :PropertyValue; + } +} diff --git a/src/storage/single_node_ha/rpc/serialization.cpp b/src/storage/single_node_ha/rpc/serialization.cpp new file mode 100644 index 000000000..2cc8ec97d --- /dev/null +++ b/src/storage/single_node_ha/rpc/serialization.cpp @@ -0,0 +1,88 @@ +#include "storage/single_node_ha/rpc/serialization.hpp" + +namespace storage { + +void SaveCapnpPropertyValue(const PropertyValue &value, + capnp::PropertyValue::Builder *builder) { + switch (value.type()) { + case PropertyValue::Type::Null: + builder->setNullType(); + return; + case PropertyValue::Type::Bool: + builder->setBool(value.Value<bool>()); + return; + case PropertyValue::Type::Int: + builder->setInteger(value.Value<int64_t>()); + return; + case PropertyValue::Type::Double: + builder->setDouble(value.Value<double>()); + return; + case PropertyValue::Type::String: + builder->setString(value.Value<std::string>()); + return; + case PropertyValue::Type::List: { + const auto &values = value.Value<std::vector<PropertyValue>>(); + auto list_builder = builder->initList(values.size()); + for (size_t i = 0; i < values.size(); ++i) { + auto value_builder = list_builder[i]; + SaveCapnpPropertyValue(values[i], &value_builder); + } + return; + } + case PropertyValue::Type::Map: { + const auto &map = value.Value<std::map<std::string, PropertyValue>>(); + auto map_builder = builder->initMap(map.size()); + size_t i = 0; + for (const auto &kv : map) { + auto kv_builder = map_builder[i]; + kv_builder.setKey(kv.first); + auto value_builder = kv_builder.initValue(); + SaveCapnpPropertyValue(kv.second, &value_builder); + ++i; + } + return; + } + } +} + +void LoadCapnpPropertyValue(const capnp::PropertyValue::Reader &reader, + PropertyValue *value) { + switch (reader.which()) { + case capnp::PropertyValue::NULL_TYPE: + *value = PropertyValue::Null; + return; + case capnp::PropertyValue::BOOL: + *value = reader.getBool(); + return; + case capnp::PropertyValue::INTEGER: + *value = reader.getInteger(); + return; + case capnp::PropertyValue::DOUBLE: + *value = reader.getDouble(); + return; + case capnp::PropertyValue::STRING: + *value = reader.getString().cStr(); + return; + case capnp::PropertyValue::LIST: { + std::vector<PropertyValue> list; + list.reserve(reader.getList().size()); + for (const auto &value_reader : reader.getList()) { + list.emplace_back(); + LoadCapnpPropertyValue(value_reader, &list.back()); + } + *value = list; + return; + } + case capnp::PropertyValue::MAP: { + std::map<std::string, PropertyValue> map; + for (const auto &kv_reader : reader.getMap()) { + auto key = kv_reader.getKey(); + LoadCapnpPropertyValue(kv_reader.getValue(), &map[key]); + } + *value = map; + return; + } + } +} + +} // namespace storage diff --git a/src/storage/single_node_ha/rpc/serialization.hpp b/src/storage/single_node_ha/rpc/serialization.hpp new file mode 100644 index 000000000..3e5d723fe --- /dev/null +++ b/src/storage/single_node_ha/rpc/serialization.hpp @@ -0,0 +1,25 @@ +#pragma once + +#include "storage/common/types/property_value.hpp" +#include "storage/common/types/types.hpp" +#include "storage/single_node_ha/rpc/serialization.capnp.h" + +namespace storage { + +template <class Type> +void Save(const Common<Type> &common, capnp::Common::Builder *builder) { + builder->setStorage(common.id_); +} + +template <class Type> +void Load(Common<Type> *common, const capnp::Common::Reader &reader) { + common->id_ = reader.getStorage(); +} + +void SaveCapnpPropertyValue(const PropertyValue &value, + capnp::PropertyValue::Builder *builder); + +void LoadCapnpPropertyValue(const capnp::PropertyValue::Reader &reader, + PropertyValue *value); + +} // namespace storage diff --git a/src/utils/CMakeLists.txt b/src/utils/CMakeLists.txt index d7b334a20..53e968948 100644 --- a/src/utils/CMakeLists.txt +++ b/src/utils/CMakeLists.txt @@ -7,7 +7,7 @@ set(utils_src_files uuid.cpp watchdog.cpp) -define_add_capnp(utils_src_files utils_capnp_files) +define_add_capnp(add_capnp utils_src_files utils_capnp_files) add_capnp(serialization.capnp)