diff --git a/src/communication/raft/raft.hpp b/src/communication/raft/raft.hpp index d84f36a9b..311b6dc23 100644 --- a/src/communication/raft/raft.hpp +++ b/src/communication/raft/raft.hpp @@ -13,7 +13,7 @@ #include "boost/serialization/vector.hpp" #include "glog/logging.h" -#include "utils/serialization_optional.hpp" +#include "utils/serialization.hpp" namespace communication::raft { diff --git a/src/database/graph_db_datatypes.hpp b/src/database/graph_db_datatypes.hpp index 8c136584d..c651aa311 100644 --- a/src/database/graph_db_datatypes.hpp +++ b/src/database/graph_db_datatypes.hpp @@ -9,12 +9,13 @@ namespace GraphDbTypes { template -class Common : TotalOrdering { +class Common : public TotalOrdering { public: using StorageT = uint16_t; Common() {} explicit Common(const StorageT storage) : storage_(storage) {} + friend bool operator==(const TSpecificType &a, const TSpecificType &b) { return a.storage_ == b.storage_; } @@ -22,6 +23,8 @@ class Common : TotalOrdering { return a.storage_ < b.storage_; } + StorageT storage() const { return storage_; } + struct Hash { std::hash hash{}; size_t operator()(const TSpecificType &t) const { return hash(t.storage_); } diff --git a/src/distributed/serialization.hpp b/src/distributed/serialization.hpp new file mode 100644 index 000000000..c5ec9c581 --- /dev/null +++ b/src/distributed/serialization.hpp @@ -0,0 +1,166 @@ +#pragma once + +#include +#include +#include + +#include "storage/edge.hpp" +#include "storage/vertex.hpp" +#include "utils/serialization.hpp" + +namespace distributed { + +namespace impl { + +// Saves the given address into the given archive. Converts a local address to a +// global one, using the given worker_id. +template +void SaveAddress(TArchive &ar, TAddress address, int worker_id) { + auto gid = address.is_remote() ? address.global_id() : address.local()->gid_; + ar << gid; + ar << worker_id; +}; + +// Saves the given properties into the given archive. +template +void SaveProperties(TArchive &ar, const PropertyValueStore &props) { + ar << props.size(); + for (auto &kv : props) { + ar << kv.first.storage(); + utils::SaveTypedValue(ar, kv.second); + } +} +} // namespace impl + +/** + * Saves the given vertex into the given Boost archive. + * + * @param ar - Archive into which to serialize. + * @param vertex - Getting serialized. + * @param worker_id - ID of the worker this is happening on. Necessary for local + * to global address conversion. + * @tparam TArchive - type of archive. + */ +template +void SaveVertex(TArchive &ar, const Vertex &vertex, int worker_id) { + auto save_edges = [&ar, worker_id](auto &edges) { + ar << edges.size(); + for (auto &edge_struct : edges) { + impl::SaveAddress(ar, edge_struct.vertex, worker_id); + impl::SaveAddress(ar, edge_struct.edge, worker_id); + ar << edge_struct.edge_type.storage(); + } + }; + save_edges(vertex.out_); + save_edges(vertex.in_); + + ar << vertex.labels_.size(); + for (auto &label : vertex.labels_) { + ar << label.storage(); + } + + impl::SaveProperties(ar, vertex.properties_); +} + +/** + * Saves the given edge into the given Boost archive. + * + * @param - Archive into which to serialize. + * @param edge - Getting serialized. + * @param worker_id - ID of the worker this is happening on. Necessary for local + * to global address conversion. + * @tparam TArchive - type of archive. + */ +template +void SaveEdge(TArchive &ar, const Edge &edge, int worker_id) { + impl::SaveAddress(ar, edge.from_, worker_id); + impl::SaveAddress(ar, edge.to_, worker_id); + ar << edge.edge_type_.storage(); + impl::SaveProperties(ar, edge.properties_); +} + +namespace impl { + +template +Edges::VertexAddress LoadVertexAddress(TArchive &ar) { + gid::Gid vertex_id; + ar >> vertex_id; + int worker_id; + ar >> worker_id; + return {vertex_id, worker_id}; +} + +template +void LoadProperties(TArchive &ar, PropertyValueStore &store) { + size_t count; + ar >> count; + for (size_t i = 0; i < count; ++i) { + GraphDbTypes::Property::StorageT prop; + ar >> prop; + query::TypedValue value; + utils::LoadTypedValue(ar, value); + store.set(GraphDbTypes::Property(prop), static_cast(value)); + } +} + +} // namespace impl + +/** + * Loads a Vertex from the given archive and returns it. + * + * @param ar - The archive to load from. + * @tparam TArchive - archive type. + */ +template +std::unique_ptr LoadVertex(TArchive &ar) { + auto vertex = std::make_unique(); + + auto decode_edges = [&ar](Edges &edges) { + size_t count; + ar >> count; + for (size_t i = 0; i < count; ++i) { + auto vertex_address = impl::LoadVertexAddress(ar); + GraphDbTypes::EdgeType::StorageT edge_type; + gid::Gid edge_id; + ar >> edge_id; + int edge_worker_id; + ar >> edge_worker_id; + ar >> edge_type; + edges.emplace(vertex_address, {edge_id, edge_worker_id}, + GraphDbTypes::EdgeType(edge_type)); + } + }; + decode_edges(vertex->out_); + decode_edges(vertex->in_); + + size_t count; + ar >> count; + for (size_t i = 0; i < count; ++i) { + GraphDbTypes::Label::StorageT label; + ar >> label; + vertex->labels_.emplace_back(label); + } + impl::LoadProperties(ar, vertex->properties_); + + return vertex; +} + +/** + * Loads an Edge from the given archive and returns it. + * + * @param ar - The archive to load from. + * @tparam TArchive - archive type. + */ +template +std::unique_ptr LoadEdge(TArchive &ar) { + auto from = impl::LoadVertexAddress(ar); + auto to = impl::LoadVertexAddress(ar); + GraphDbTypes::EdgeType::StorageT edge_type; + ar >> edge_type; + auto edge = + std::make_unique(from, to, GraphDbTypes::EdgeType{edge_type}); + impl::LoadProperties(ar, edge->properties_); + + return edge; +} +} // namespace distributed diff --git a/src/query/frontend/ast/ast.hpp b/src/query/frontend/ast/ast.hpp index ea58fbf09..e0e5a2918 100644 --- a/src/query/frontend/ast/ast.hpp +++ b/src/query/frontend/ast/ast.hpp @@ -18,6 +18,7 @@ #include "query/interpret/awesome_memgraph_functions.hpp" #include "query/parameters.hpp" #include "query/typed_value.hpp" +#include "utils/serialization.hpp" // Hash function for the key in pattern atom property maps. namespace std { @@ -971,123 +972,14 @@ class PrimitiveLiteral : public BaseLiteral { void save(TArchive &ar, const unsigned int) const { ar << boost::serialization::base_object(*this); ar << token_position_; - SaveTypedValue(ar, value_); - } - - template - static void SaveTypedValue(TArchive &ar, const TypedValue &value) { - ar << value.type(); - switch (value.type()) { - case TypedValue::Type::Null: - return; - case TypedValue::Type::Bool: - ar << value.Value(); - return; - case TypedValue::Type::Int: - ar << value.Value(); - return; - case TypedValue::Type::Double: - ar << value.Value(); - return; - case TypedValue::Type::String: - ar << value.Value(); - return; - case TypedValue::Type::List: { - const auto &values = value.Value>(); - ar << values.size(); - for (const auto &v : values) { - SaveTypedValue(ar, v); - } - return; - } - case TypedValue::Type::Map: { - const auto &map = value.Value>(); - ar << map.size(); - for (const auto &key_value : map) { - ar << key_value.first; - SaveTypedValue(ar, key_value.second); - } - return; - } - case TypedValue::Type::Vertex: - case TypedValue::Type::Edge: - case TypedValue::Type::Path: - throw utils::BasicException("Unexpected value type in literal: '{}'", - value.type()); - } + utils::SaveTypedValue(ar, value_); } template void load(TArchive &ar, const unsigned int) { ar >> boost::serialization::base_object(*this); ar >> token_position_; - LoadTypedValue(ar, value_); - } - - template - static void LoadTypedValue(TArchive &ar, TypedValue &value) { - TypedValue::Type type = TypedValue::Type::Null; - ar >> type; - switch (type) { - case TypedValue::Type::Null: - return; - case TypedValue::Type::Bool: { - bool v; - ar >> v; - value = v; - return; - } - case TypedValue::Type::Int: { - int64_t v; - ar >> v; - value = v; - return; - } - case TypedValue::Type::Double: { - double v; - ar >> v; - value = v; - return; - } - case TypedValue::Type::String: { - std::string v; - ar >> v; - value = v; - return; - } - case TypedValue::Type::List: { - std::vector values; - size_t size; - ar >> size; - values.reserve(size); - for (size_t i = 0; i < size; ++i) { - TypedValue tv; - LoadTypedValue(ar, tv); - values.emplace_back(tv); - } - value = values; - return; - } - case TypedValue::Type::Map: { - std::map map; - size_t size; - ar >> size; - for (size_t i = 0; i < size; ++i) { - std::string key; - ar >> key; - TypedValue v; - LoadTypedValue(ar, v); - map.emplace(key, v); - } - value = map; - return; - } - case TypedValue::Type::Vertex: - case TypedValue::Type::Edge: - case TypedValue::Type::Path: - throw utils::BasicException("Unexpected value type in literal: '{}'", - type); - } + utils::LoadTypedValue(ar, value_); } template diff --git a/src/storage/edges.hpp b/src/storage/edges.hpp index e506b83d6..7c4b56c5c 100644 --- a/src/storage/edges.hpp +++ b/src/storage/edges.hpp @@ -19,9 +19,11 @@ class Vertex; * vertex (and consequently that edge Addresses are unique in it). */ class Edges { + public: using VertexAddress = storage::Address>; using EdgeAddress = storage::Address>; + private: struct Element { VertexAddress vertex; EdgeAddress edge; @@ -68,6 +70,7 @@ class Edges { } const Element &operator*() const { return *position_; } + const Element *operator->() const { return &(*position_); } bool operator==(const Iterator &other) const { return position_ == other.position_; @@ -90,10 +93,9 @@ class Edges { * present in this iterator. */ void update_position() { if (vertex_.local()) { - position_ = std::find_if(position_, - end_, [v = this->vertex_](const Element &e) { - return e.vertex == v; - }); + position_ = std::find_if( + position_, end_, + [v = this->vertex_](const Element &e) { return e.vertex == v; }); } if (edge_types_) { position_ = std::find_if(position_, end_, [this](const Element &e) { diff --git a/src/utils/serialization.hpp b/src/utils/serialization.hpp new file mode 100644 index 000000000..ed497fb99 --- /dev/null +++ b/src/utils/serialization.hpp @@ -0,0 +1,153 @@ +#include + +#include "boost/serialization/split_free.hpp" +#include "query/typed_value.hpp" +#include "storage/edge.hpp" +#include "storage/vertex.hpp" +#include "utils/exceptions.hpp" + +namespace boost::serialization { + +template +inline void serialize(TArchive &ar, std::experimental::optional &opt, + unsigned int version) { + split_free(ar, opt, version); +} + +template +void save(TArchive &ar, const std::experimental::optional &opt, + unsigned int) { + ar << static_cast(opt); + if (opt) { + ar << *opt; + } +} + +template +void load(TArchive &ar, std::experimental::optional &opt, unsigned int) { + bool has_value; + ar >> has_value; + if (has_value) { + T tmp; + ar >> tmp; + opt = std::move(tmp); + } else { + opt = std::experimental::nullopt; + } +} + +} // boost::serialization + +namespace utils { + +/** Saves the given value into the given Boost archive. */ +template +void SaveTypedValue(TArchive &ar, const query::TypedValue &value) { + ar << value.type(); + switch (value.type()) { + case query::TypedValue::Type::Null: + return; + case query::TypedValue::Type::Bool: + ar << value.Value(); + return; + case query::TypedValue::Type::Int: + ar << value.Value(); + return; + case query::TypedValue::Type::Double: + ar << value.Value(); + return; + case query::TypedValue::Type::String: + ar << value.Value(); + return; + case query::TypedValue::Type::List: { + const auto &values = value.Value>(); + ar << values.size(); + for (const auto &v : values) { + SaveTypedValue(ar, v); + } + return; + } + case query::TypedValue::Type::Map: { + const auto &map = value.Value>(); + ar << map.size(); + for (const auto &key_value : map) { + ar << key_value.first; + SaveTypedValue(ar, key_value.second); + } + return; + } + case query::TypedValue::Type::Vertex: + case query::TypedValue::Type::Edge: + case query::TypedValue::Type::Path: + throw utils::BasicException("Unable to archive TypedValue of type: {}", + value.type()); + } +} + +/** Loads a typed value into the given reference from the given archive. */ +template +void LoadTypedValue(TArchive &ar, query::TypedValue &value) { + query::TypedValue::Type type = query::TypedValue::Type::Null; + ar >> type; + switch (type) { + case query::TypedValue::Type::Null: + return; + case query::TypedValue::Type::Bool: { + bool v; + ar >> v; + value = v; + return; + } + case query::TypedValue::Type::Int: { + int64_t v; + ar >> v; + value = v; + return; + } + case query::TypedValue::Type::Double: { + double v; + ar >> v; + value = v; + return; + } + case query::TypedValue::Type::String: { + std::string v; + ar >> v; + value = v; + return; + } + case query::TypedValue::Type::List: { + std::vector values; + size_t size; + ar >> size; + values.reserve(size); + for (size_t i = 0; i < size; ++i) { + query::TypedValue tv; + LoadTypedValue(ar, tv); + values.emplace_back(tv); + } + value = values; + return; + } + case query::TypedValue::Type::Map: { + std::map map; + size_t size; + ar >> size; + for (size_t i = 0; i < size; ++i) { + std::string key; + ar >> key; + query::TypedValue v; + LoadTypedValue(ar, v); + map.emplace(key, v); + } + value = map; + return; + } + case query::TypedValue::Type::Vertex: + case query::TypedValue::Type::Edge: + case query::TypedValue::Type::Path: + throw utils::BasicException( + "Unexpected TypedValue type '{}' when loading from archive", type); + } +} +} // namespace utils diff --git a/src/utils/serialization_optional.hpp b/src/utils/serialization_optional.hpp deleted file mode 100644 index 554509a69..000000000 --- a/src/utils/serialization_optional.hpp +++ /dev/null @@ -1,35 +0,0 @@ -#include - -#include "boost/serialization/split_free.hpp" - -namespace boost::serialization { - -template -inline void serialize(TArchive &ar, std::experimental::optional &opt, - unsigned int version) { - split_free(ar, opt, version); -} - -template -void save(TArchive &ar, const std::experimental::optional &opt, - unsigned int) { - ar << static_cast(opt); - if (opt) { - ar << *opt; - } -} - -template -void load(TArchive &ar, std::experimental::optional &opt, unsigned int) { - bool has_value; - ar >> has_value; - if (has_value) { - T tmp; - ar >> tmp; - opt = std::move(tmp); - } else { - opt = std::experimental::nullopt; - } -} - -} // boost::serialization diff --git a/tests/unit/distributed_serialization.cpp b/tests/unit/distributed_serialization.cpp new file mode 100644 index 000000000..f300d2537 --- /dev/null +++ b/tests/unit/distributed_serialization.cpp @@ -0,0 +1,161 @@ +#include +#include + +#include "boost/archive/binary_iarchive.hpp" +#include "boost/archive/binary_oarchive.hpp" + +#include "distributed/serialization.hpp" +#include "mvcc/version_list.hpp" +#include "query/typed_value.hpp" +#include "storage/edge.hpp" +#include "storage/property_value_store.hpp" +#include "storage/vertex.hpp" +#include "transactions/engine_master.hpp" + +using namespace GraphDbTypes; + +template +TAddress ToGlobal(const TAddress &address, int worker_id) { + if (address.is_remote()) return address; + return TAddress{address.local()->gid_, worker_id}; +} + +#define CHECK_RETURN(condition) \ + { \ + if (!(condition)) return false; \ + } + +bool CheckEdges(const Edges &e1, int w1, const Edges &e2, int w2) { + CHECK_RETURN(e1.size() == e2.size()); + auto e1_it = e1.begin(); + for (auto e2_it = e2.begin(); e2_it != e2.end(); ++e1_it, ++e2_it) { + CHECK_RETURN(ToGlobal(e1_it->vertex, w1) == ToGlobal(e2_it->vertex, w2)); + CHECK_RETURN(ToGlobal(e1_it->edge, w1) == ToGlobal(e2_it->edge, w2)); + CHECK_RETURN(e1_it->edge_type == e2_it->edge_type); + } + return true; +} + +bool CheckProperties(const PropertyValueStore &p1, + const PropertyValueStore &p2) { + CHECK_RETURN(p1.size() == p2.size()); + auto p1_it = p1.begin(); + for (auto p2_it = p2.begin(); p2_it != p2.end(); ++p1_it, ++p2_it) { + CHECK_RETURN(p1_it->first == p2_it->first); + auto tv = + query::TypedValue(p1_it->second) == query::TypedValue(p2_it->second); + CHECK_RETURN(tv.IsBool()); + CHECK_RETURN(tv.ValueBool()); + } + return true; +} + +bool CheckVertex(const Vertex &v1, int w1, const Vertex &v2, int w2) { + CHECK_RETURN(CheckEdges(v1.in_, w1, v2.in_, w2)); + CHECK_RETURN(CheckEdges(v1.out_, w1, v2.out_, w2)); + CHECK_RETURN(v1.labels_ == v2.labels_); + CHECK_RETURN(CheckProperties(v1.properties_, v2.properties_)); + return true; +} + +bool CheckEdge(const Edge &e1, int w1, const Edge &e2, int w2) { + CHECK_RETURN(ToGlobal(e1.from_, w1) == ToGlobal(e2.from_, w2)); + CHECK_RETURN(ToGlobal(e1.to_, w1) == ToGlobal(e2.to_, w2)); + CHECK_RETURN(e1.edge_type_ == e2.edge_type_); + CHECK_RETURN(CheckProperties(e1.properties_, e2.properties_)); + return true; +} + +#undef CHECK_RETURN + +#define SAVE_AND_LOAD(type, name, element) \ + std::unique_ptr name; \ + { \ + std::ostringstream ostream; \ + boost::archive::binary_oarchive oar{ostream}; \ + distributed::Save##type(oar, element, 0); \ + std::istringstream istream{ostream.str()}; \ + boost::archive::binary_iarchive iar{istream}; \ + name = distributed::Load##type(iar); \ + } + +TEST(DistributedSerialization, Empty) { + Vertex v; + int w_id{0}; + SAVE_AND_LOAD(Vertex, v_recovered, v) + EXPECT_TRUE(CheckVertex(v, w_id, *v_recovered, w_id)); +} + +#define UPDATE_AND_CHECK(type, x, action) \ + { \ + SAVE_AND_LOAD(type, before, x) \ + EXPECT_TRUE(Check##type(x, 0, *before, 0)); \ + action; \ + EXPECT_FALSE(Check##type(x, 0, *before, 0)); \ + SAVE_AND_LOAD(type, after, x) \ + EXPECT_TRUE(Check##type(x, 0, *after, 0)); \ + } + +#define UPDATE_AND_CHECK_V(v, action) UPDATE_AND_CHECK(Vertex, v, action) +#define UPDATE_AND_CHECK_E(e, action) UPDATE_AND_CHECK(Edge, e, action) + +TEST(DistributedSerialization, VertexLabels) { + Vertex v; + UPDATE_AND_CHECK_V(v, v.labels_.emplace_back(Label(1))); + UPDATE_AND_CHECK_V(v, v.labels_.emplace_back(Label(2))); + UPDATE_AND_CHECK_V(v, v.labels_.resize(1)); + UPDATE_AND_CHECK_V(v, v.labels_.clear()); +} + +TEST(DistributedSerialization, VertexProperties) { + Vertex v; + UPDATE_AND_CHECK_V(v, v.properties_.set(Property(1), true)); + UPDATE_AND_CHECK_V(v, v.properties_.set(Property(1), "string")); + UPDATE_AND_CHECK_V(v, v.properties_.set(Property(2), 42)); + UPDATE_AND_CHECK_V(v, v.properties_.erase(Property(1))); + UPDATE_AND_CHECK_V(v, v.properties_.clear()); +} + +class DistributedSerializationMvcc : public ::testing::Test { + protected: + tx::MasterEngine engine; + tx::Transaction *tx = engine.Begin(); + mvcc::VersionList v1_vlist{*tx, 0}; + Vertex &v1 = *v1_vlist.Oldest(); + mvcc::VersionList v2_vlist{*tx, 1}; + Vertex &v2 = *v2_vlist.Oldest(); + mvcc::VersionList e1_vlist{*tx, 0, &v1_vlist, &v2_vlist, EdgeType(0)}; + Edge &e1 = *e1_vlist.Oldest(); + mvcc::VersionList e2_vlist{*tx, 1, &v2_vlist, &v1_vlist, EdgeType(2)}; + Edge &e2 = *e2_vlist.Oldest(); +}; + +TEST_F(DistributedSerializationMvcc, VertexEdges) { + UPDATE_AND_CHECK_V(v1, v1.out_.emplace(&v2_vlist, &e1_vlist, EdgeType(0))); + UPDATE_AND_CHECK_V(v2, v2.in_.emplace(&v1_vlist, &e1_vlist, EdgeType(0))); + UPDATE_AND_CHECK_V(v1, v1.in_.emplace(&v2_vlist, &e2_vlist, EdgeType(2))); + UPDATE_AND_CHECK_V(v2, v2.out_.emplace(&v1_vlist, &e2_vlist, EdgeType(2))); +} + +TEST_F(DistributedSerializationMvcc, EdgeFromAndTo) { + UPDATE_AND_CHECK_E(e1, e1.from_ = &v2_vlist); + UPDATE_AND_CHECK_E(e1, e1.to_ = &v1_vlist); +} + +TEST_F(DistributedSerializationMvcc, EdgeType) { + UPDATE_AND_CHECK_E(e1, e1.edge_type_ = EdgeType(123)); + UPDATE_AND_CHECK_E(e1, e1.edge_type_ = EdgeType(55)); +} + +TEST_F(DistributedSerializationMvcc, EdgeProperties) { + UPDATE_AND_CHECK_E(e1, e1.properties_.set(Property(1), true)); + UPDATE_AND_CHECK_E(e1, e1.properties_.set(Property(1), "string")); + UPDATE_AND_CHECK_E(e1, e1.properties_.set(Property(2), 42)); + UPDATE_AND_CHECK_E(e1, e1.properties_.erase(Property(1))); + UPDATE_AND_CHECK_E(e1, e1.properties_.clear()); +} + +#undef UPDATE_AND_CHECK_E +#undef UPDATE_AND_CHECK_V +#undef UPDATE_AND_CHECK +#undef SAVE_AND_LOAD diff --git a/tests/unit/serialization_optional.cpp b/tests/unit/serialization_optional.cpp index 85a6e0388..f02d8428b 100644 --- a/tests/unit/serialization_optional.cpp +++ b/tests/unit/serialization_optional.cpp @@ -5,7 +5,7 @@ #include "boost/archive/binary_iarchive.hpp" #include "boost/archive/binary_oarchive.hpp" -#include "utils/serialization_optional.hpp" +#include "utils/serialization.hpp" using std::experimental::optional;