Serialize StateDelta for HA

Summary:
Since we need to send `StateDelta`s over the wire in HA, we need to be
able to serialize those bad boys.
This diff hopefully does this the right way.

Reviewers: teon.banek, mferencevic, ipaljak

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1725
This commit is contained in:
Matija Santl 2018-11-07 13:41:39 +01:00
parent 6d21e58b09
commit 54fa46541e
14 changed files with 236 additions and 47 deletions

2
.gitignore vendored
View File

@ -51,6 +51,8 @@ src/database/distributed/counters_rpc_messages.capnp
src/database/distributed/counters_rpc_messages.hpp
src/database/distributed/serialization.capnp
src/database/distributed/serialization.hpp
src/database/single_node_ha/serialization.capnp
src/database/single_node_ha/serialization.hpp
src/distributed/bfs_rpc_messages.capnp
src/distributed/bfs_rpc_messages.hpp
src/distributed/coordination_rpc_messages.capnp

View File

@ -88,14 +88,15 @@ endfunction()
# Define `add_capnp` function for registering a capnp file for generation.
#
# The `define_add_capnp` expects 2 arguments:
# The `define_add_capnp` expects 3 arguments:
# * name -- name for the function, you usually want `add_capnp`
# * main_src_files -- variable to be updated with generated cpp files
# * generated_capnp_files -- variable to be updated with generated hpp and cpp files
#
# The `add_capnp` function expects a single argument, path to capnp file.
# Each added file is standalone and we avoid recompiling everything.
macro(define_add_capnp main_src_files generated_capnp_files)
function(add_capnp capnp_src_file)
macro(define_add_capnp name main_src_files generated_capnp_files)
function(${name} capnp_src_file)
set(cpp_file ${CMAKE_CURRENT_SOURCE_DIR}/${capnp_src_file}.c++)
set(h_file ${CMAKE_CURRENT_SOURCE_DIR}/${capnp_src_file}.h)
add_custom_command(OUTPUT ${cpp_file} ${h_file}
@ -106,5 +107,5 @@ macro(define_add_capnp main_src_files generated_capnp_files)
set(${generated_capnp_files} ${${generated_capnp_files}} ${cpp_file} ${h_file} PARENT_SCOPE)
# Update *global* main_src_files
set(${main_src_files} ${${main_src_files}} ${cpp_file} PARENT_SCOPE)
endfunction(add_capnp)
endfunction(${name})
endmacro(define_add_capnp)

View File

@ -161,66 +161,66 @@ set(mg_distributed_sources
# -----------------------------------------------------------------------------
define_add_capnp(mg_distributed_sources generated_capnp_files)
define_add_capnp(add_capnp_distributed mg_distributed_sources generated_capnp_files)
define_add_lcp(add_lcp_distributed mg_distributed_sources generated_lcp_distributed_files)
add_lcp_distributed(durability/distributed/state_delta.lcp)
add_lcp_distributed(database/distributed/counters_rpc_messages.lcp CAPNP_SCHEMA @0x95a2c3ea3871e945)
add_capnp(database/distributed/counters_rpc_messages.capnp)
add_capnp_distributed(database/distributed/counters_rpc_messages.capnp)
add_lcp_distributed(database/distributed/serialization.lcp CAPNP_SCHEMA @0xdea01657b3563887
DEPENDS durability/distributed/state_delta.lcp)
add_capnp(database/distributed/serialization.capnp)
add_capnp_distributed(database/distributed/serialization.capnp)
add_lcp_distributed(distributed/bfs_rpc_messages.lcp CAPNP_SCHEMA @0x8e508640b09b6d2a)
add_capnp(distributed/bfs_rpc_messages.capnp)
add_capnp_distributed(distributed/bfs_rpc_messages.capnp)
add_lcp_distributed(distributed/coordination_rpc_messages.lcp CAPNP_SCHEMA @0x93df0c4703cf98fb)
add_capnp(distributed/coordination_rpc_messages.capnp)
add_capnp_distributed(distributed/coordination_rpc_messages.capnp)
add_lcp_distributed(distributed/data_rpc_messages.lcp CAPNP_SCHEMA @0xc1c8a341ba37aaf5)
add_capnp(distributed/data_rpc_messages.capnp)
add_capnp_distributed(distributed/data_rpc_messages.capnp)
add_lcp_distributed(distributed/durability_rpc_messages.lcp CAPNP_SCHEMA @0xf5e53bc271e2163d)
add_capnp(distributed/durability_rpc_messages.capnp)
add_capnp_distributed(distributed/durability_rpc_messages.capnp)
add_lcp_distributed(distributed/index_rpc_messages.lcp CAPNP_SCHEMA @0xa8aab46862945bd6)
add_capnp(distributed/index_rpc_messages.capnp)
add_capnp_distributed(distributed/index_rpc_messages.capnp)
add_lcp_distributed(distributed/plan_rpc_messages.lcp CAPNP_SCHEMA @0xfcbc48dc9f106d28)
add_capnp(distributed/plan_rpc_messages.capnp)
add_capnp_distributed(distributed/plan_rpc_messages.capnp)
add_lcp_distributed(distributed/pull_produce_rpc_messages.lcp CAPNP_SCHEMA @0xa78a9254a73685bd
DEPENDS transactions/distributed/serialization.lcp)
add_capnp(distributed/pull_produce_rpc_messages.capnp)
add_capnp_distributed(distributed/pull_produce_rpc_messages.capnp)
add_lcp_distributed(distributed/storage_gc_rpc_messages.lcp CAPNP_SCHEMA @0xd705663dfe36cf81)
add_capnp(distributed/storage_gc_rpc_messages.capnp)
add_capnp_distributed(distributed/storage_gc_rpc_messages.capnp)
add_lcp_distributed(distributed/token_sharing_rpc_messages.lcp CAPNP_SCHEMA @0x8f295db54ec4caec)
add_capnp(distributed/token_sharing_rpc_messages.capnp)
add_capnp_distributed(distributed/token_sharing_rpc_messages.capnp)
add_lcp_distributed(distributed/updates_rpc_messages.lcp CAPNP_SCHEMA @0x82d5f38d73c7b53a)
add_capnp(distributed/updates_rpc_messages.capnp)
add_capnp_distributed(distributed/updates_rpc_messages.capnp)
add_lcp_distributed(distributed/dynamic_worker_rpc_messages.lcp CAPNP_SCHEMA @0x8c53f6c9a0c71b05)
add_capnp(distributed/dynamic_worker_rpc_messages.capnp)
add_capnp_distributed(distributed/dynamic_worker_rpc_messages.capnp)
# distributed_ops.lcp is leading the capnp code generation, so we don't need
# to generate any capnp for operator.lcp
add_lcp_distributed(query/frontend/ast/ast.lcp)
add_lcp_distributed(query/frontend/ast/ast_serialization.lcp CAPNP_SCHEMA @0xb107d3d6b4b1600b
DEPENDS query/frontend/ast/ast.lcp)
add_capnp(query/frontend/ast/ast_serialization.capnp)
add_capnp_distributed(query/frontend/ast/ast_serialization.capnp)
add_lcp_distributed(query/plan/operator.lcp)
add_lcp_distributed(query/plan/distributed_ops.lcp CAPNP_SCHEMA @0xe5cae8d045d30c42
DEPENDS query/plan/operator.lcp)
add_capnp(query/plan/distributed_ops.capnp)
add_capnp_distributed(query/plan/distributed_ops.capnp)
add_lcp_distributed(storage/distributed/rpc/concurrent_id_mapper_rpc_messages.lcp CAPNP_SCHEMA @0xa6068dae93d225dd)
add_capnp(storage/distributed/rpc/concurrent_id_mapper_rpc_messages.capnp)
add_capnp_distributed(storage/distributed/rpc/concurrent_id_mapper_rpc_messages.capnp)
add_lcp_distributed(transactions/distributed/engine_rpc_messages.lcp CAPNP_SCHEMA @0xde02b7c49180cad5
DEPENDS transactions/distributed/serialization.lcp)
add_capnp(transactions/distributed/engine_rpc_messages.capnp)
add_capnp_distributed(transactions/distributed/engine_rpc_messages.capnp)
add_custom_target(generate_lcp_distributed DEPENDS ${generated_lcp_distributed_files})
# Registering capnp must come after registering lcp files.
add_capnp(communication/rpc/messages.capnp)
add_capnp(durability/distributed/serialization.capnp)
add_capnp(query/frontend/semantic/symbol.capnp)
add_capnp(query/serialization.capnp)
add_capnp(storage/distributed/rpc/serialization.capnp)
add_capnp_distributed(communication/rpc/messages.capnp)
add_capnp_distributed(durability/distributed/serialization.capnp)
add_capnp_distributed(query/frontend/semantic/symbol.capnp)
add_capnp_distributed(query/serialization.capnp)
add_capnp_distributed(storage/distributed/rpc/serialization.capnp)
add_custom_target(generate_capnp DEPENDS generate_lcp_distributed ${generated_capnp_files})
@ -275,19 +275,31 @@ set(mg_single_node_ha_sources
storage/common/locking/record_lock.cpp
storage/single_node_ha/edge_accessor.cpp
storage/single_node_ha/record_accessor.cpp
storage/single_node_ha/rpc/serialization.cpp
storage/single_node_ha/vertex_accessor.cpp
transactions/single_node_ha/engine.cpp
memgraph_init.cpp
)
define_add_capnp(add_capnp_single_node_ha mg_single_node_ha_sources generated_capnp_single_node_ha_files)
define_add_lcp(add_lcp_single_node_ha mg_single_node_ha_sources generated_lcp_single_node_ha_files)
add_lcp_single_node_ha(durability/single_node_ha/state_delta.lcp)
add_lcp_single_node_ha(database/single_node_ha/serialization.lcp CAPNP_SCHEMA @0xd0f4a502575fb6f7
DEPENDS durability/single_node_ha/state_delta.lcp)
add_capnp_single_node_ha(database/single_node_ha/serialization.capnp)
add_lcp_single_node_ha(query/frontend/ast/ast.lcp)
add_lcp_single_node_ha(query/plan/operator.lcp)
add_custom_target(generate_lcp_single_node_ha DEPENDS ${generated_lcp_single_node_ha_files})
# Registering capnp must come after registering lcp files.
add_capnp_single_node_ha(storage/single_node_ha/rpc/serialization.capnp)
add_custom_target(generate_capnp_single_node_ha DEPENDS generate_lcp_single_node_ha ${generated_capnp_single_node_ha_files})
set(MG_SINGLE_NODE_HA_LIBS stdc++fs Threads::Threads fmt cppitertools
antlr_opencypher_parser_lib dl glog gflags capnp kj
mg-utils mg-io mg-integrations-kafka mg-requests mg-communication mg-auth mg-stats)
@ -306,6 +318,7 @@ add_library(mg-single-node-ha STATIC ${mg_single_node_ha_sources})
target_link_libraries(mg-single-node-ha ${MG_SINGLE_NODE_HA_LIBS})
add_dependencies(mg-single-node-ha generate_opencypher_parser)
add_dependencies(mg-single-node-ha generate_lcp_single_node_ha)
add_dependencies(mg-single-node-ha generate_capnp_single_node_ha)
target_compile_definitions(mg-single-node-ha PUBLIC MG_SINGLE_NODE_HA)
# ----------------------------------------------------------------------------

View File

@ -9,7 +9,7 @@ set(communication_src_files
rpc/protocol.cpp
rpc/server.cpp)
define_add_capnp(communication_src_files communication_capnp_files)
define_add_capnp(add_capnp communication_src_files communication_capnp_files)
add_capnp(rpc/messages.capnp)

View File

@ -0,0 +1,10 @@
#>cpp
#pragma once
#include "database/single_node_ha/serialization.capnp.h"
#include "durability/single_node_ha/state_delta.hpp"
#include "storage/single_node_ha/rpc/serialization.hpp"
cpp<#
;; Generate serialization of state-delta
(load "durability/single_node_ha/state_delta.lcp")

View File

@ -68,8 +68,7 @@ cpp<#
(check-empty :bool))
(:documentation
"Describes single change to the database state. Used for durability (WAL) and
state communication over network in HA and for distributed remote storage
changes.
for distributed remote storage changes.
Labels, Properties and EdgeTypes are stored both as values (integers) and
strings (their names). The values are used when applying deltas in a running

View File

@ -45,9 +45,7 @@ cpp<#
(check-empty :bool)
(unique :bool))
(:documentation
"Describes single change to the database state. Used for durability (WAL) and
state communication over network in HA and for distributed remote storage
changes.
"Describes single change to the database state. Used for durability (WAL).
Labels, Properties and EdgeTypes are stored both as values (integers) and
strings (their names). The values are used when applying deltas in a running

View File

@ -20,34 +20,51 @@ cpp<#
class GraphDbAccessor;
cpp<#
(lcp:capnp-namespace "database")
(lcp:capnp-import 'storage "/storage/single_node_ha/rpc/serialization.capnp")
(lcp:capnp-type-conversion "tx::TransactionId" "UInt64")
(lcp:capnp-type-conversion "gid::Gid" "UInt64")
(lcp:capnp-type-conversion "storage::Label" "Storage.Common")
(lcp:capnp-type-conversion "storage::EdgeType" "Storage.Common")
(lcp:capnp-type-conversion "storage::Property" "Storage.Common")
(lcp:define-struct state-delta ()
(
;; Members valid for every delta.
(type "Type")
(transaction-id "tx::TransactionId")
;; Members valid only for some deltas, see StateDelta::Type comments above.
;; TODO: when preparing the WAL for distributed, most likely remove Gids and
;; only keep addresses.
(vertex-id "gid::Gid")
(edge-id "gid::Gid")
(edge-address "mvcc::VersionList<Edge> *")
(vertex-from-id "gid::Gid")
(vertex-from-address "mvcc::VersionList<Vertex> *")
(vertex-to-id "gid::Gid")
(vertex-to-address "mvcc::VersionList<Vertex> *")
(edge-type "storage::EdgeType")
(edge-type-name "std::string")
(property "storage::Property")
(property-name "std::string")
(value "PropertyValue" :initval "PropertyValue::Null")
(value "PropertyValue" :initval "PropertyValue::Null"
:capnp-type "Storage.PropertyValue"
:capnp-save
(lambda (builder member capnp-name)
(declare (ignore capnp-name))
#>cpp
storage::SaveCapnpPropertyValue(${member}, &${builder});
cpp<#)
:capnp-load
(lambda (reader member capnp-name)
(declare (ignore capnp-name))
#>cpp
storage::LoadCapnpPropertyValue(${reader}, &${member});
cpp<#))
(label "storage::Label")
(label-name "std::string")
(check-empty :bool)
(unique :bool))
(:documentation
"Describes single change to the database state. Used for durability (WAL) and
state communication over network in HA and for distributed remote storage
changes.
"Describes single change to the database state. Used for state communication
over network in HA.
Labels, Properties and EdgeTypes are stored both as values (integers) and
strings (their names). The values are used when applying deltas in a running
@ -76,7 +93,8 @@ in StateDeltas.")
(:documentation
"Defines StateDelta type. For each type the comment indicates which values
need to be stored. All deltas have the transaction_id member, so that's
omitted in the comment."))
omitted in the comment.")
(:serialize :capnp))
#>cpp
StateDelta() = default;
StateDelta(const enum Type &type, tx::TransactionId tx_id)
@ -133,6 +151,7 @@ omitted in the comment."))
/// Applies CRUD delta to database accessor. Fails on other types of deltas
void Apply(GraphDbAccessor &dba) const;
cpp<#))
cpp<#)
(:serialize :capnp))
(lcp:pop-namespace) ;; database

View File

@ -4,7 +4,7 @@ set(io_src_files
network/socket.cpp
network/utils.cpp)
define_add_capnp(io_src_files io_capnp_files)
define_add_capnp(add_capnp io_src_files io_capnp_files)
add_capnp(network/endpoint.capnp)

View File

@ -2,7 +2,7 @@ set(stats_src_files
metrics.cpp
stats.cpp)
define_add_capnp(stats_src_files stats_capnp_files)
define_add_capnp(add_capnp stats_src_files stats_capnp_files)
define_add_lcp(add_lcp stats_src_files stats_lcp_files)
add_lcp(stats_rpc_messages.lcp CAPNP_SCHEMA @0xc19a87c81b9b4512)

View File

@ -0,0 +1,34 @@
@0x8424471a44ccd2df;
using Cxx = import "/capnp/c++.capnp";
$Cxx.namespace("storage::capnp");
struct Common {
storage @0 :UInt16;
union {
label @1 :Label;
edgeType @2 :EdgeType;
property @3 :Property;
}
}
struct Label {}
struct EdgeType {}
struct Property {}
struct PropertyValue {
union {
nullType @0 :Void;
bool @1 :Bool;
integer @2 :Int64;
double @3 :Float64;
string @4 :Text;
list @5 :List(PropertyValue);
map @6 :List(MapEntry);
}
struct MapEntry {
key @0 :Text;
value @1 :PropertyValue;
}
}

View File

@ -0,0 +1,88 @@
#include "storage/single_node_ha/rpc/serialization.hpp"
namespace storage {
void SaveCapnpPropertyValue(const PropertyValue &value,
capnp::PropertyValue::Builder *builder) {
switch (value.type()) {
case PropertyValue::Type::Null:
builder->setNullType();
return;
case PropertyValue::Type::Bool:
builder->setBool(value.Value<bool>());
return;
case PropertyValue::Type::Int:
builder->setInteger(value.Value<int64_t>());
return;
case PropertyValue::Type::Double:
builder->setDouble(value.Value<double>());
return;
case PropertyValue::Type::String:
builder->setString(value.Value<std::string>());
return;
case PropertyValue::Type::List: {
const auto &values = value.Value<std::vector<PropertyValue>>();
auto list_builder = builder->initList(values.size());
for (size_t i = 0; i < values.size(); ++i) {
auto value_builder = list_builder[i];
SaveCapnpPropertyValue(values[i], &value_builder);
}
return;
}
case PropertyValue::Type::Map: {
const auto &map = value.Value<std::map<std::string, PropertyValue>>();
auto map_builder = builder->initMap(map.size());
size_t i = 0;
for (const auto &kv : map) {
auto kv_builder = map_builder[i];
kv_builder.setKey(kv.first);
auto value_builder = kv_builder.initValue();
SaveCapnpPropertyValue(kv.second, &value_builder);
++i;
}
return;
}
}
}
void LoadCapnpPropertyValue(const capnp::PropertyValue::Reader &reader,
PropertyValue *value) {
switch (reader.which()) {
case capnp::PropertyValue::NULL_TYPE:
*value = PropertyValue::Null;
return;
case capnp::PropertyValue::BOOL:
*value = reader.getBool();
return;
case capnp::PropertyValue::INTEGER:
*value = reader.getInteger();
return;
case capnp::PropertyValue::DOUBLE:
*value = reader.getDouble();
return;
case capnp::PropertyValue::STRING:
*value = reader.getString().cStr();
return;
case capnp::PropertyValue::LIST: {
std::vector<PropertyValue> list;
list.reserve(reader.getList().size());
for (const auto &value_reader : reader.getList()) {
list.emplace_back();
LoadCapnpPropertyValue(value_reader, &list.back());
}
*value = list;
return;
}
case capnp::PropertyValue::MAP: {
std::map<std::string, PropertyValue> map;
for (const auto &kv_reader : reader.getMap()) {
auto key = kv_reader.getKey();
LoadCapnpPropertyValue(kv_reader.getValue(), &map[key]);
}
*value = map;
return;
}
}
}
} // namespace storage

View File

@ -0,0 +1,25 @@
#pragma once
#include "storage/common/types/property_value.hpp"
#include "storage/common/types/types.hpp"
#include "storage/single_node_ha/rpc/serialization.capnp.h"
namespace storage {
template <class Type>
void Save(const Common<Type> &common, capnp::Common::Builder *builder) {
builder->setStorage(common.id_);
}
template <class Type>
void Load(Common<Type> *common, const capnp::Common::Reader &reader) {
common->id_ = reader.getStorage();
}
void SaveCapnpPropertyValue(const PropertyValue &value,
capnp::PropertyValue::Builder *builder);
void LoadCapnpPropertyValue(const capnp::PropertyValue::Reader &reader,
PropertyValue *value);
} // namespace storage

View File

@ -7,7 +7,7 @@ set(utils_src_files
uuid.cpp
watchdog.cpp)
define_add_capnp(utils_src_files utils_capnp_files)
define_add_capnp(add_capnp utils_src_files utils_capnp_files)
add_capnp(serialization.capnp)