From d7a9c5bab8723d7f315733a897f88b230442f80c Mon Sep 17 00:00:00 2001 From: Teon Banek Date: Mon, 2 Jul 2018 15:34:33 +0200 Subject: [PATCH] Extract TypedValue/DecodedValue conversion to higher component Summary: This is the first step in cutting the crazy dependencies of communication module to the whole database. Includes have been reorganized and conversion between DecodedValue and other Memgraph types (TypedValue and PropertyValue) has been extracted to a higher level component called `communication/conversion`. Encoder, like Decoder, now relies only on DecodedValue. Hopefully the conversion operations will not significantly slow down streaming Bolt data. Additionally, Bolt ID is now wrapped in a class. Our storage model uses *unsigned* int64, while Bolt expects *signed* int64. The implicit conversions may lead to encode/decode errors, so the wrapper should enforce some type safety to prevent such errors. Reviewers: mferencevic, buda, msantl, mtomic Reviewed By: mferencevic, mtomic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1453 --- src/CMakeLists.txt | 13 +- src/communication/bolt/client.hpp | 9 +- src/communication/bolt/v1/codes.hpp | 1 - .../bolt/v1/decoder/decoded_value.cpp | 67 +----- .../bolt/v1/decoder/decoded_value.hpp | 92 +++++++-- src/communication/bolt/v1/decoder/decoder.hpp | 10 +- .../bolt/v1/encoder/base_encoder.hpp | 149 ++++++-------- .../bolt/v1/encoder/client_encoder.hpp | 6 +- src/communication/bolt/v1/encoder/encoder.hpp | 12 +- .../bolt/v1/encoder/primitive_encoder.hpp | 43 +--- .../bolt/v1/encoder/result_stream.hpp | 24 ++- src/communication/bolt/v1/states/error.hpp | 2 +- .../bolt/v1/states/executing.hpp | 11 +- src/communication/bolt/v1/states/init.hpp | 1 - src/communication/conversion.cpp | 192 ++++++++++++++++++ src/communication/conversion.hpp | 26 +++ src/communication/result_stream_faker.hpp | 20 +- src/database/state_delta.cpp | 14 +- src/database/state_delta.lcp | 4 +- src/durability/recovery.cpp | 12 +- src/durability/snapshooter.cpp | 6 +- src/durability/snapshot_decoder.hpp | 2 +- src/durability/snapshot_encoder.hpp | 5 +- src/durability/wal.cpp | 1 - src/durability/wal.hpp | 5 +- src/query/interpreter.cpp | 2 +- src/query/interpreter.hpp | 7 +- src/query/repl.cpp | 2 +- src/storage/property_value_store.cpp | 8 +- tests/benchmark/expansion.cpp | 4 +- tests/macro_benchmark/clients/common.hpp | 2 + .../clients/long_running_common.hpp | 12 ++ .../macro_benchmark/clients/pokec_client.cpp | 1 + .../macro_benchmark/clients/query_client.cpp | 1 + tests/manual/distributed_common.hpp | 3 +- tests/manual/single_query.cpp | 2 +- .../snapshot_generation/snapshot_writer.hpp | 22 +- tests/stress/long_running.cpp | 5 + tests/unit/bolt_decoder.cpp | 8 +- tests/unit/bolt_encoder.cpp | 66 +++--- tests/unit/bolt_result_stream.cpp | 11 +- tests/unit/database_transaction_timeout.cpp | 2 +- tests/unit/distributed_interpretation.cpp | 3 +- tests/unit/durability.cpp | 10 +- tests/unit/interpreter.cpp | 15 +- tests/unit/query_plan_common.hpp | 25 +-- tests/unit/query_plan_edge_cases.cpp | 2 +- tools/src/mg_import_csv/main.cpp | 40 ++-- 48 files changed, 589 insertions(+), 391 deletions(-) create mode 100644 src/communication/conversion.cpp create mode 100644 src/communication/conversion.hpp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 486725bcd..925191344 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -8,12 +8,13 @@ add_subdirectory(telemetry) # all memgraph src files set(memgraph_src_files + communication/bolt/v1/decoder/decoded_value.cpp communication/buffer.cpp communication/client.cpp communication/context.cpp + communication/conversion.cpp communication/helpers.cpp communication/init.cpp - communication/bolt/v1/decoder/decoded_value.cpp communication/rpc/client.cpp communication/rpc/protocol.cpp communication/rpc/server.cpp @@ -25,20 +26,20 @@ set(memgraph_src_files database/state_delta.cpp distributed/bfs_rpc_clients.cpp distributed/bfs_subcursor.cpp + distributed/cache.cpp distributed/cluster_discovery_master.cpp distributed/cluster_discovery_worker.cpp distributed/coordination.cpp distributed/coordination_master.cpp distributed/coordination_worker.cpp + distributed/data_manager.cpp + distributed/data_rpc_clients.cpp + distributed/data_rpc_server.cpp distributed/durability_rpc_clients.cpp distributed/durability_rpc_server.cpp distributed/index_rpc_server.cpp distributed/plan_consumer.cpp distributed/plan_dispatcher.cpp - distributed/cache.cpp - distributed/data_manager.cpp - distributed/data_rpc_clients.cpp - distributed/data_rpc_server.cpp distributed/produce_rpc_server.cpp distributed/pull_rpc_clients.cpp distributed/serialization.cpp @@ -49,7 +50,6 @@ set(memgraph_src_files durability/snapshooter.cpp durability/wal.cpp query/common.cpp - query/repl.cpp query/frontend/ast/ast.cpp query/frontend/ast/cypher_main_visitor.cpp query/frontend/semantic/symbol_generator.cpp @@ -61,6 +61,7 @@ set(memgraph_src_files query/plan/preprocess.cpp query/plan/rule_based_planner.cpp query/plan/variable_start_planner.cpp + query/repl.cpp query/typed_value.cpp stats/metrics.cpp stats/stats.cpp diff --git a/src/communication/bolt/client.hpp b/src/communication/bolt/client.hpp index 70aa7a396..c75e0ae2a 100644 --- a/src/communication/bolt/client.hpp +++ b/src/communication/bolt/client.hpp @@ -6,8 +6,9 @@ #include "communication/bolt/v1/decoder/decoder.hpp" #include "communication/bolt/v1/encoder/chunked_encoder_buffer.hpp" #include "communication/bolt/v1/encoder/client_encoder.hpp" - -#include "query/typed_value.hpp" +#include "communication/client.hpp" +#include "communication/context.hpp" +#include "io/network/endpoint.hpp" #include "utils/exceptions.hpp" namespace communication::bolt { @@ -98,9 +99,7 @@ class Client final { DLOG(INFO) << "Sending run message with statement: '" << query << "'; parameters: " << parameters; - std::map params_tv(parameters.begin(), - parameters.end()); - encoder_.MessageRun(query, params_tv, false); + encoder_.MessageRun(query, parameters, false); encoder_.MessagePullAll(); DLOG(INFO) << "Reading run message response"; diff --git a/src/communication/bolt/v1/codes.hpp b/src/communication/bolt/v1/codes.hpp index be769ff5d..83a2ce60e 100644 --- a/src/communication/bolt/v1/codes.hpp +++ b/src/communication/bolt/v1/codes.hpp @@ -1,7 +1,6 @@ #pragma once #include -#include "utils/cast.hpp" namespace communication::bolt { diff --git a/src/communication/bolt/v1/decoder/decoded_value.cpp b/src/communication/bolt/v1/decoder/decoded_value.cpp index c16ccf0cf..d03712e0d 100644 --- a/src/communication/bolt/v1/decoder/decoded_value.cpp +++ b/src/communication/bolt/v1/decoder/decoded_value.cpp @@ -1,7 +1,9 @@ -#include "glog/logging.h" - #include "communication/bolt/v1/decoder/decoded_value.hpp" +#include + +#include "utils/algorithm.hpp" + namespace communication::bolt { #define DEF_GETTER_BY_VAL(type, value_type, field) \ @@ -165,67 +167,6 @@ DecodedValue::~DecodedValue() { LOG(FATAL) << "Unsupported DecodedValue::Type"; } -DecodedValue::operator query::TypedValue() const { - switch (type_) { - case Type::Null: - return query::TypedValue::Null; - case Type::Bool: - return query::TypedValue(bool_v); - case Type::Int: - return query::TypedValue(int_v); - case Type::Double: - return query::TypedValue(double_v); - case Type::String: - return query::TypedValue(string_v); - case Type::List: - return query::TypedValue( - std::vector(list_v.begin(), list_v.end())); - case Type::Map: - return query::TypedValue( - std::map(map_v.begin(), map_v.end())); - case Type::Vertex: - case Type::Edge: - case Type::UnboundedEdge: - case Type::Path: - throw DecodedValueException( - "Unsupported conversion from DecodedValue to TypedValue"); - } -} - -DecodedValue::operator PropertyValue() const { - switch (type_) { - case Type::Null: - return PropertyValue::Null; - case Type::Bool: - return PropertyValue(bool_v); - case Type::Int: - return PropertyValue(int_v); - case Type::Double: - return PropertyValue(double_v); - case Type::String: - return PropertyValue(string_v); - case Type::List: { - std::vector vec; - vec.reserve(list_v.size()); - for (const auto &value : list_v) - vec.emplace_back(static_cast(value)); - return PropertyValue(std::move(vec)); - } - case Type::Map: { - std::map map; - for (const auto &kv : map_v) - map.emplace(kv.first, static_cast(kv.second)); - return PropertyValue(std::move(map)); - } - case Type::Vertex: - case Type::Edge: - case Type::UnboundedEdge: - case Type::Path: - throw DecodedValueException( - "Unsupported conversion from DecodedValue to PropertyValue"); - } -} - std::ostream &operator<<(std::ostream &os, const DecodedVertex &vertex) { os << "V("; utils::PrintIterable(os, vertex.labels, ":", diff --git a/src/communication/bolt/v1/decoder/decoded_value.hpp b/src/communication/bolt/v1/decoder/decoded_value.hpp index 56a2f5f68..a5d1160c4 100644 --- a/src/communication/bolt/v1/decoder/decoded_value.hpp +++ b/src/communication/bolt/v1/decoder/decoded_value.hpp @@ -1,12 +1,11 @@ #pragma once +#include #include #include #include -#include "query/typed_value.hpp" -#include "storage/property_value.hpp" -#include "utils/algorithm.hpp" +#include "utils/cast.hpp" #include "utils/exceptions.hpp" namespace communication::bolt { @@ -14,12 +13,38 @@ namespace communication::bolt { /** Forward declaration of DecodedValue class. */ class DecodedValue; +/** Wraps int64_t to prevent dangerous implicit conversions. */ +class Id { + public: + Id() = default; + + /** Construct Id from uint64_t */ + static Id FromUint(uint64_t id) { return Id(utils::MemcpyCast(id)); } + + /** Construct Id from int64_t */ + static Id FromInt(int64_t id) { return Id(id); } + + int64_t AsInt() const { return id_; } + uint64_t AsUint() const { return utils::MemcpyCast(id_); } + + private: + explicit Id(int64_t id) : id_(id) {} + + int64_t id_; +}; + +inline bool operator==(const Id &id1, const Id &id2) { + return id1.AsInt() == id2.AsInt(); +} + +inline bool operator!=(const Id &id1, const Id &id2) { return !(id1 == id2); } + /** * Structure used when reading a Vertex with the decoder. * The decoder writes data into this structure. */ struct DecodedVertex { - int64_t id; + Id id; std::vector labels; std::map properties; }; @@ -29,9 +54,9 @@ struct DecodedVertex { * The decoder writes data into this structure. */ struct DecodedEdge { - int64_t id; - int64_t from; - int64_t to; + Id id; + Id from; + Id to; std::string type; std::map properties; }; @@ -41,7 +66,7 @@ struct DecodedEdge { * The decoder writes data into this structure. */ struct DecodedUnboundedEdge { - int64_t id; + Id id; std::string type; std::map properties; }; @@ -51,25 +76,56 @@ struct DecodedUnboundedEdge { * The decoder writes data into this structure. */ struct DecodedPath { + DecodedPath() {} + + DecodedPath(const std::vector &vertices, + const std::vector &edges) { + // Helper function. Looks for the given element in the collection. If found, + // puts its index into `indices`. Otherwise emplaces the given element + // into the collection and puts that index into `indices`. A multiplier is + // added to switch between positive and negative indices (that define edge + // direction). + auto add_element = [this](auto &collection, const auto &element, + int multiplier, int offset) { + auto found = + std::find_if(collection.begin(), collection.end(), + [&](const auto &e) { return e.id == element.id; }); + indices.emplace_back(multiplier * + (std::distance(collection.begin(), found) + offset)); + if (found == collection.end()) collection.push_back(element); + }; + + this->vertices.reserve(vertices.size()); + this->edges.reserve(edges.size()); + this->vertices.emplace_back(vertices[0]); + for (uint i = 0; i < edges.size(); i++) { + const auto &e = edges[i]; + const auto &v = vertices[i + 1]; + DecodedUnboundedEdge unbounded_edge{e.id, e.type, e.properties}; + add_element(this->edges, unbounded_edge, e.to == v.id ? 1 : -1, 1); + add_element(this->vertices, v, 1, 0); + } + } + + /** Unique vertices in the path. */ std::vector vertices; + /** Unique edges in the path. */ std::vector edges; + /** + * Indices that map path positions to vertices/edges. + * Positive indices for left-to-right directionality and negative for + * right-to-left. + */ std::vector indices; }; -/** - * DecodedValue provides an encapsulation arround TypedValue, DecodedVertex - * and DecodedEdge. This is necessary because TypedValue stores vertices and - * edges as our internal accessors. Because of that the Bolt decoder can't - * decode vertices and edges directly into a TypedValue so a DecodedValue is - * used instead. - */ +/** DecodedValue represents supported values in the Bolt protocol. */ class DecodedValue { public: /** Default constructor, makes Null */ DecodedValue() : type_(Type::Null) {} /** Types that can be stored in a DecodedValue. */ - // TODO: Path isn't supported yet! enum class Type : unsigned { Null, Bool, @@ -161,10 +217,6 @@ class DecodedValue { #undef TYPE_CHECKER - operator query::TypedValue() const; - // PropertyValue operator must be explicit to prevent ambiguity. - explicit operator PropertyValue() const; - friend std::ostream &operator<<(std::ostream &os, const DecodedValue &value); private: diff --git a/src/communication/bolt/v1/decoder/decoder.hpp b/src/communication/bolt/v1/decoder/decoder.hpp index a7bb44c39..345f618d7 100644 --- a/src/communication/bolt/v1/decoder/decoder.hpp +++ b/src/communication/bolt/v1/decoder/decoder.hpp @@ -335,7 +335,7 @@ class Decoder { if (!ReadValue(&dv, DecodedValue::Type::Int)) { return false; } - vertex.id = dv.ValueInt(); + vertex.id = Id::FromInt(dv.ValueInt()); // read labels if (!ReadValue(&dv, DecodedValue::Type::List)) { @@ -382,19 +382,19 @@ class Decoder { if (!ReadValue(&dv, DecodedValue::Type::Int)) { return false; } - edge.id = dv.ValueInt(); + edge.id = Id::FromInt(dv.ValueInt()); // read from if (!ReadValue(&dv, DecodedValue::Type::Int)) { return false; } - edge.from = dv.ValueInt(); + edge.from = Id::FromInt(dv.ValueInt()); // read to if (!ReadValue(&dv, DecodedValue::Type::Int)) { return false; } - edge.to = dv.ValueInt(); + edge.to = Id::FromInt(dv.ValueInt()); // read type if (!ReadValue(&dv, DecodedValue::Type::String)) { @@ -421,7 +421,7 @@ class Decoder { if (!ReadValue(&dv, DecodedValue::Type::Int)) { return false; } - edge.id = dv.ValueInt(); + edge.id = Id::FromInt(dv.ValueInt()); // read type if (!ReadValue(&dv, DecodedValue::Type::String)) { diff --git a/src/communication/bolt/v1/encoder/base_encoder.hpp b/src/communication/bolt/v1/encoder/base_encoder.hpp index f13a5aec3..16329133a 100644 --- a/src/communication/bolt/v1/encoder/base_encoder.hpp +++ b/src/communication/bolt/v1/encoder/base_encoder.hpp @@ -1,15 +1,14 @@ #pragma once +#include "communication/bolt/v1/decoder/decoded_value.hpp" #include "communication/bolt/v1/encoder/primitive_encoder.hpp" -#include "database/graph_db_accessor.hpp" -#include "query/typed_value.hpp" namespace communication::bolt { /** * Bolt BaseEncoder. Subclass of PrimitiveEncoder. Extends it with the - * capability to encode TypedValues (as well as lists and maps of TypedValues), - * Edges, Vertices and Paths. + * capability to encode DecodedValues (as well as lists and maps of + * DecodedValues), Edges, Vertices and Paths. * * @tparam Buffer the output buffer that should be used */ @@ -18,150 +17,130 @@ class BaseEncoder : public PrimitiveEncoder { public: explicit BaseEncoder(Buffer &buffer) : PrimitiveEncoder(buffer) {} - void WriteList(const std::vector &value) { + void WriteList(const std::vector &value) { this->WriteTypeSize(value.size(), MarkerList); - for (auto &x : value) WriteTypedValue(x); + for (auto &x : value) WriteDecodedValue(x); } /** * Writes a map value. * - * @tparam TMap - an iterable of (std::string, TypedValue) pairs. + * @tparam TMap - an iterable of (std::string, DecodedValue) pairs. */ template void WriteMap(const TMap &value) { this->WriteTypeSize(value.size(), MarkerMap); for (auto &x : value) { this->WriteString(x.first); - WriteTypedValue(x.second); + WriteDecodedValue(x.second); } } - void WriteVertex(const VertexAccessor &vertex) { + void WriteVertex(const DecodedVertex &vertex) { this->WriteRAW(utils::UnderlyingCast(Marker::TinyStruct) + 3); this->WriteRAW(utils::UnderlyingCast(Signature::Node)); - WriteUInt(vertex.gid()); + this->WriteInt(vertex.id.AsInt()); // write labels - const auto &labels = vertex.labels(); + const auto &labels = vertex.labels; this->WriteTypeSize(labels.size(), MarkerList); - for (const auto &label : labels) - this->WriteString(vertex.db_accessor().LabelName(label)); + for (const auto &label : labels) this->WriteString(label); // write properties - const auto &props = vertex.Properties(); + const auto &props = vertex.properties; this->WriteTypeSize(props.size(), MarkerMap); for (const auto &prop : props) { - this->WriteString(vertex.db_accessor().PropertyName(prop.first)); - WriteTypedValue(prop.second); + this->WriteString(prop.first); + WriteDecodedValue(prop.second); } } - void WriteEdge(const EdgeAccessor &edge, bool unbound = false) { + void WriteEdge(const DecodedEdge &edge, bool unbound = false) { this->WriteRAW(utils::UnderlyingCast(Marker::TinyStruct) + (unbound ? 3 : 5)); this->WriteRAW(utils::UnderlyingCast( unbound ? Signature::UnboundRelationship : Signature::Relationship)); - WriteUInt(edge.gid()); + this->WriteInt(edge.id.AsInt()); if (!unbound) { - WriteUInt(edge.from().gid()); - WriteUInt(edge.to().gid()); + this->WriteInt(edge.from.AsInt()); + this->WriteInt(edge.to.AsInt()); } - // write type - this->WriteString(edge.db_accessor().EdgeTypeName(edge.EdgeType())); + this->WriteString(edge.type); - // write properties - const auto &props = edge.Properties(); + const auto &props = edge.properties; this->WriteTypeSize(props.size(), MarkerMap); for (const auto &prop : props) { - this->WriteString(edge.db_accessor().PropertyName(prop.first)); - WriteTypedValue(prop.second); + this->WriteString(prop.first); + WriteDecodedValue(prop.second); } } - void WritePath(const query::Path &path) { - // Prepare the data structures to be written. - // - // Unique vertices in the path. - std::vector vertices; - // Unique edges in the path. - std::vector edges; - // Indices that map path positions to vertices/edges elements. Positive - // indices for left-to-right directionality and negative for right-to-left. - std::vector indices; + void WriteEdge(const DecodedUnboundedEdge &edge) { + this->WriteRAW(utils::UnderlyingCast(Marker::TinyStruct) + 3); + this->WriteRAW(utils::UnderlyingCast(Signature::UnboundRelationship)); - // Helper function. Looks for the given element in the collection. If found - // it puts it's index into `indices`. Otherwise emplaces the given element - // into the collection and puts that index into `indices`. A multiplier is - // added to switch between positive and negative indices (that define edge - // direction). - auto add_element = [&indices](auto &collection, const auto &element, - int multiplier, int offset) { - auto found = std::find(collection.begin(), collection.end(), element); - indices.emplace_back(multiplier * - (std::distance(collection.begin(), found) + offset)); - if (found == collection.end()) collection.emplace_back(element); - }; + this->WriteInt(edge.id.AsInt()); - vertices.emplace_back(path.vertices()[0]); - for (uint i = 0; i < path.size(); i++) { - const auto &e = path.edges()[i]; - const auto &v = path.vertices()[i + 1]; - add_element(edges, e, e.to_is(v) ? 1 : -1, 1); - add_element(vertices, v, 1, 0); + this->WriteString(edge.type); + + const auto &props = edge.properties; + this->WriteTypeSize(props.size(), MarkerMap); + for (const auto &prop : props) { + this->WriteString(prop.first); + WriteDecodedValue(prop.second); } + } - // Write data. + void WritePath(const DecodedPath &path) { this->WriteRAW(utils::UnderlyingCast(Marker::TinyStruct) + 3); this->WriteRAW(utils::UnderlyingCast(Signature::Path)); - this->WriteTypeSize(vertices.size(), MarkerList); - for (auto &v : vertices) WriteVertex(v); - this->WriteTypeSize(edges.size(), MarkerList); - for (auto &e : edges) WriteEdge(e, true); - this->WriteTypeSize(indices.size(), MarkerList); - for (auto &i : indices) this->WriteInt(i); + this->WriteTypeSize(path.vertices.size(), MarkerList); + for (auto &v : path.vertices) WriteVertex(v); + this->WriteTypeSize(path.edges.size(), MarkerList); + for (auto &e : path.edges) WriteEdge(e); + this->WriteTypeSize(path.indices.size(), MarkerList); + for (auto &i : path.indices) this->WriteInt(i); } - void WriteTypedValue(const query::TypedValue &value) { + void WriteDecodedValue(const DecodedValue &value) { switch (value.type()) { - case query::TypedValue::Type::Null: + case DecodedValue::Type::Null: this->WriteNull(); break; - case query::TypedValue::Type::Bool: - this->WriteBool(value.Value()); + case DecodedValue::Type::Bool: + this->WriteBool(value.ValueBool()); break; - case query::TypedValue::Type::Int: - this->WriteInt(value.Value()); + case DecodedValue::Type::Int: + this->WriteInt(value.ValueInt()); break; - case query::TypedValue::Type::Double: - this->WriteDouble(value.Value()); + case DecodedValue::Type::Double: + this->WriteDouble(value.ValueDouble()); break; - case query::TypedValue::Type::String: - this->WriteString(value.Value()); + case DecodedValue::Type::String: + this->WriteString(value.ValueString()); break; - case query::TypedValue::Type::List: - WriteList(value.Value>()); + case DecodedValue::Type::List: + WriteList(value.ValueList()); break; - case query::TypedValue::Type::Map: - WriteMap(value.Value>()); + case DecodedValue::Type::Map: + WriteMap(value.ValueMap()); break; - case query::TypedValue::Type::Vertex: - WriteVertex(value.Value()); + case DecodedValue::Type::Vertex: + WriteVertex(value.ValueVertex()); break; - case query::TypedValue::Type::Edge: - WriteEdge(value.Value()); + case DecodedValue::Type::Edge: + WriteEdge(value.ValueEdge()); break; - case query::TypedValue::Type::Path: + case DecodedValue::Type::UnboundedEdge: + WriteEdge(value.ValueUnboundedEdge()); + break; + case DecodedValue::Type::Path: WritePath(value.ValuePath()); break; } } - - private: - void WriteUInt(const uint64_t &value) { - this->WriteInt(*reinterpret_cast(&value)); - } }; + } // namespace communication::bolt diff --git a/src/communication/bolt/v1/encoder/client_encoder.hpp b/src/communication/bolt/v1/encoder/client_encoder.hpp index 55f43d6b7..79fca8791 100644 --- a/src/communication/bolt/v1/encoder/client_encoder.hpp +++ b/src/communication/bolt/v1/encoder/client_encoder.hpp @@ -39,7 +39,7 @@ class ClientEncoder : private BaseEncoder { * when flushing, false otherwise */ bool MessageInit(const std::string client_name, - const std::map &auth_token) { + const std::map &auth_token) { WriteRAW(utils::UnderlyingCast(Marker::TinyStruct2)); WriteRAW(utils::UnderlyingCast(Signature::Init)); WriteString(client_name); @@ -61,8 +61,8 @@ class ClientEncoder : private BaseEncoder { * @returns true if the data was successfully sent to the client * when flushing, false otherwise */ - bool MessageRun(const std::string statement, - const std::map ¶meters, + bool MessageRun(const std::string &statement, + const std::map ¶meters, bool flush = true) { WriteRAW(utils::UnderlyingCast(Marker::TinyStruct2)); WriteRAW(utils::UnderlyingCast(Signature::Run)); diff --git a/src/communication/bolt/v1/encoder/encoder.hpp b/src/communication/bolt/v1/encoder/encoder.hpp index a8d222d1e..72f57d2e4 100644 --- a/src/communication/bolt/v1/encoder/encoder.hpp +++ b/src/communication/bolt/v1/encoder/encoder.hpp @@ -36,7 +36,7 @@ class Encoder : private BaseEncoder { * * @param values the fields list object that should be sent */ - void MessageRecord(const std::vector &values) { + void MessageRecord(const std::vector &values) { WriteRAW(utils::UnderlyingCast(Marker::TinyStruct1)); WriteRAW(utils::UnderlyingCast(Signature::Record)); WriteList(values); @@ -56,7 +56,7 @@ class Encoder : private BaseEncoder { * @returns true if the data was successfully sent to the client * when flushing, false otherwise */ - bool MessageSuccess(const std::map &metadata, + bool MessageSuccess(const std::map &metadata, bool flush = true) { WriteRAW(utils::UnderlyingCast(Marker::TinyStruct1)); WriteRAW(utils::UnderlyingCast(Signature::Success)); @@ -79,7 +79,7 @@ class Encoder : private BaseEncoder { * false otherwise */ bool MessageSuccess() { - std::map metadata; + std::map metadata; return MessageSuccess(metadata); } @@ -95,8 +95,7 @@ class Encoder : private BaseEncoder { * @returns true if the data was successfully sent to the client, * false otherwise */ - bool MessageFailure( - const std::map &metadata) { + bool MessageFailure(const std::map &metadata) { WriteRAW(utils::UnderlyingCast(Marker::TinyStruct1)); WriteRAW(utils::UnderlyingCast(Signature::Failure)); WriteMap(metadata); @@ -115,8 +114,7 @@ class Encoder : private BaseEncoder { * @returns true if the data was successfully sent to the client, * false otherwise */ - bool MessageIgnored( - const std::map &metadata) { + bool MessageIgnored(const std::map &metadata) { WriteRAW(utils::UnderlyingCast(Marker::TinyStruct1)); WriteRAW(utils::UnderlyingCast(Signature::Ignored)); WriteMap(metadata); diff --git a/src/communication/bolt/v1/encoder/primitive_encoder.hpp b/src/communication/bolt/v1/encoder/primitive_encoder.hpp index 329c139b0..acd42c3ed 100644 --- a/src/communication/bolt/v1/encoder/primitive_encoder.hpp +++ b/src/communication/bolt/v1/encoder/primitive_encoder.hpp @@ -3,14 +3,14 @@ #include #include "communication/bolt/v1/codes.hpp" -#include "storage/property_value.hpp" #include "utils/bswap.hpp" +#include "utils/cast.hpp" namespace communication::bolt { /** * Bolt PrimitiveEncoder. Has public interfaces for writing Bolt encoded data. - * Supported types are: Null, Bool, Int, Double, String and PropertyValue. + * Supported types are: Null, Bool, Int, Double and String. * * Bolt encoding is used both for streaming data to network clients and for * database durability. @@ -93,45 +93,8 @@ class PrimitiveEncoder { WriteRAW(value.c_str(), value.size()); } - void WritePropertyValue(const PropertyValue &value) { - auto write_list = [this](const std::vector &value) { - WriteTypeSize(value.size(), MarkerList); - for (auto &x : value) WritePropertyValue(x); - }; - - auto write_map = [this](const std::map &value) { - WriteTypeSize(value.size(), MarkerMap); - for (auto &x : value) { - WriteString(x.first); - WritePropertyValue(x.second); - } - }; - switch (value.type()) { - case PropertyValue::Type::Null: - WriteNull(); - break; - case PropertyValue::Type::Bool: - WriteBool(value.Value()); - break; - case PropertyValue::Type::Int: - WriteInt(value.Value()); - break; - case PropertyValue::Type::Double: - WriteDouble(value.Value()); - break; - case PropertyValue::Type::String: - WriteString(value.Value()); - break; - case PropertyValue::Type::List: - write_list(value.Value>()); - break; - case PropertyValue::Type::Map: - write_map(value.Value>()); - break; - } - } - protected: Buffer &buffer_; }; + } // namespace communication::bolt diff --git a/src/communication/bolt/v1/encoder/result_stream.hpp b/src/communication/bolt/v1/encoder/result_stream.hpp index f393c707b..e86b9be3e 100644 --- a/src/communication/bolt/v1/encoder/result_stream.hpp +++ b/src/communication/bolt/v1/encoder/result_stream.hpp @@ -2,7 +2,7 @@ #include "communication/bolt/v1/encoder/chunked_encoder_buffer.hpp" #include "communication/bolt/v1/encoder/encoder.hpp" -#include "query/typed_value.hpp" +#include "communication/conversion.hpp" namespace communication::bolt { @@ -25,10 +25,10 @@ class ResultStream { * @param fields the header fields that should be sent. */ void Header(const std::vector &fields) { - std::vector vec; - std::map data; - for (auto &i : fields) vec.push_back(query::TypedValue(i)); - data.insert(std::make_pair(std::string("fields"), query::TypedValue(vec))); + std::vector vec; + std::map data; + for (auto &i : fields) vec.push_back(DecodedValue(i)); + data.insert(std::make_pair(std::string("fields"), DecodedValue(vec))); // this message shouldn't send directly to the client because if an error // happened the client will receive two messages (success and failure) // instead of only one @@ -47,10 +47,20 @@ class ResultStream { * * @param values the values that should be sent */ - void Result(std::vector &values) { + void Result(std::vector &values) { encoder_.MessageRecord(values); } + // TODO: Move this to another class + void Result(std::vector &values) { + std::vector decoded_values; + decoded_values.reserve(values.size()); + for (const auto &v : values) { + decoded_values.push_back(communication::ToDecodedValue(v)); + } + return Result(decoded_values); + } + /** * Writes a summary. Typically a summary is something like: * { @@ -63,7 +73,7 @@ class ResultStream { * * @param summary the summary map object that should be sent */ - void Summary(const std::map &summary) { + void Summary(const std::map &summary) { // at this point message should not flush the socket so // here is false because chunk has to be called instead of flush encoder_.MessageSuccess(summary, false); diff --git a/src/communication/bolt/v1/states/error.hpp b/src/communication/bolt/v1/states/error.hpp index e52e44f06..e620991cc 100644 --- a/src/communication/bolt/v1/states/error.hpp +++ b/src/communication/bolt/v1/states/error.hpp @@ -6,7 +6,7 @@ #include "communication/bolt/v1/codes.hpp" #include "communication/bolt/v1/decoder/decoded_value.hpp" #include "communication/bolt/v1/state.hpp" -#include "query/typed_value.hpp" +#include "utils/cast.hpp" namespace communication::bolt { diff --git a/src/communication/bolt/v1/states/executing.hpp b/src/communication/bolt/v1/states/executing.hpp index ac10af79d..b687b923a 100644 --- a/src/communication/bolt/v1/states/executing.hpp +++ b/src/communication/bolt/v1/states/executing.hpp @@ -9,6 +9,7 @@ #include "communication/bolt/v1/codes.hpp" #include "communication/bolt/v1/decoder/decoded_value.hpp" #include "communication/bolt/v1/state.hpp" +#include "communication/conversion.hpp" #include "database/graph_db.hpp" #include "distributed/pull_rpc_clients.hpp" #include "query/exceptions.hpp" @@ -19,8 +20,8 @@ namespace communication::bolt { template State HandleRun(TSession &session, State state, Marker marker) { - const std::map kEmptyFields = { - {"fields", std::vector{}}}; + const std::map kEmptyFields = { + {"fields", std::vector{}}}; if (marker != Marker::TinyStruct2) { DLOG(WARNING) << fmt::format( @@ -131,9 +132,9 @@ State HandleRun(TSession &session, State state, Marker marker) { } } - auto ¶ms_map = params.ValueMap(); - std::map params_tv(params_map.begin(), - params_map.end()); + std::map params_tv; + for (const auto &kv : params.ValueMap()) + params_tv.emplace(kv.first, communication::ToTypedValue(kv.second)); session .interpreter_(query.ValueString(), *session.db_accessor_, params_tv, in_explicit_transaction) diff --git a/src/communication/bolt/v1/states/init.hpp b/src/communication/bolt/v1/states/init.hpp index 4713d7020..a7e016cf0 100644 --- a/src/communication/bolt/v1/states/init.hpp +++ b/src/communication/bolt/v1/states/init.hpp @@ -5,7 +5,6 @@ #include "communication/bolt/v1/codes.hpp" #include "communication/bolt/v1/decoder/decoded_value.hpp" -#include "communication/bolt/v1/encoder/result_stream.hpp" #include "communication/bolt/v1/state.hpp" #include "utils/likely.hpp" diff --git a/src/communication/conversion.cpp b/src/communication/conversion.cpp new file mode 100644 index 000000000..26211c2ba --- /dev/null +++ b/src/communication/conversion.cpp @@ -0,0 +1,192 @@ +#include "communication/conversion.hpp" + +#include +#include +#include + +#include "database/graph_db_accessor.hpp" + +using communication::bolt::DecodedValue; + +namespace communication { + +query::TypedValue ToTypedValue(const DecodedValue &value) { + switch (value.type()) { + case DecodedValue::Type::Null: + return query::TypedValue::Null; + case DecodedValue::Type::Bool: + return query::TypedValue(value.ValueBool()); + case DecodedValue::Type::Int: + return query::TypedValue(value.ValueInt()); + case DecodedValue::Type::Double: + return query::TypedValue(value.ValueDouble()); + case DecodedValue::Type::String: + return query::TypedValue(value.ValueString()); + case DecodedValue::Type::List: { + std::vector list; + list.reserve(value.ValueList().size()); + for (const auto &v : value.ValueList()) list.push_back(ToTypedValue(v)); + return query::TypedValue(list); + } + case DecodedValue::Type::Map: { + std::map map; + for (const auto &kv : value.ValueMap()) + map.emplace(kv.first, ToTypedValue(kv.second)); + return query::TypedValue(map); + } + case DecodedValue::Type::Vertex: + case DecodedValue::Type::Edge: + case DecodedValue::Type::UnboundedEdge: + case DecodedValue::Type::Path: + throw communication::bolt::DecodedValueException( + "Unsupported conversion from DecodedValue to TypedValue"); + } +} + +DecodedValue ToDecodedValue(const query::TypedValue &value) { + switch (value.type()) { + case query::TypedValue::Type::Null: + return DecodedValue(); + case query::TypedValue::Type::Bool: + return DecodedValue(value.ValueBool()); + case query::TypedValue::Type::Int: + return DecodedValue(value.ValueInt()); + case query::TypedValue::Type::Double: + return DecodedValue(value.ValueDouble()); + case query::TypedValue::Type::String: + return DecodedValue(value.ValueString()); + case query::TypedValue::Type::List: { + std::vector values; + values.reserve(value.ValueList().size()); + for (const auto &v : value.ValueList()) { + values.push_back(ToDecodedValue(v)); + } + return DecodedValue(values); + } + case query::TypedValue::Type::Map: { + std::map map; + for (const auto &kv : value.ValueMap()) { + map.emplace(kv.first, ToDecodedValue(kv.second)); + } + return DecodedValue(map); + } + case query::TypedValue::Type::Vertex: + return DecodedValue(ToDecodedVertex(value.ValueVertex())); + case query::TypedValue::Type::Edge: + return DecodedValue(ToDecodedEdge(value.ValueEdge())); + case query::TypedValue::Type::Path: + return DecodedValue(ToDecodedPath(value.ValuePath())); + } +} + +communication::bolt::DecodedVertex ToDecodedVertex( + const VertexAccessor &vertex) { + auto id = communication::bolt::Id::FromUint(vertex.gid()); + std::vector labels; + labels.reserve(vertex.labels().size()); + for (const auto &label : vertex.labels()) { + labels.push_back(vertex.db_accessor().LabelName(label)); + } + std::map properties; + for (const auto &prop : vertex.Properties()) { + properties[vertex.db_accessor().PropertyName(prop.first)] = + ToDecodedValue(prop.second); + } + return communication::bolt::DecodedVertex{id, labels, properties}; +} + +communication::bolt::DecodedEdge ToDecodedEdge(const EdgeAccessor &edge) { + auto id = communication::bolt::Id::FromUint(edge.gid()); + auto from = communication::bolt::Id::FromUint(edge.from().gid()); + auto to = communication::bolt::Id::FromUint(edge.to().gid()); + auto type = edge.db_accessor().EdgeTypeName(edge.EdgeType()); + std::map properties; + for (const auto &prop : edge.Properties()) { + properties[edge.db_accessor().PropertyName(prop.first)] = + ToDecodedValue(prop.second); + } + return communication::bolt::DecodedEdge{id, from, to, type, properties}; +} + +communication::bolt::DecodedPath ToDecodedPath(const query::Path &path) { + std::vector vertices; + vertices.reserve(path.vertices().size()); + for (const auto &v : path.vertices()) { + vertices.push_back(ToDecodedVertex(v)); + } + std::vector edges; + edges.reserve(path.edges().size()); + for (const auto &e : path.edges()) { + edges.push_back(ToDecodedEdge(e)); + } + return communication::bolt::DecodedPath(vertices, edges); +} + +PropertyValue ToPropertyValue(const DecodedValue &value) { + switch (value.type()) { + case DecodedValue::Type::Null: + return PropertyValue::Null; + case DecodedValue::Type::Bool: + return PropertyValue(value.ValueBool()); + case DecodedValue::Type::Int: + return PropertyValue(value.ValueInt()); + case DecodedValue::Type::Double: + return PropertyValue(value.ValueDouble()); + case DecodedValue::Type::String: + return PropertyValue(value.ValueString()); + case DecodedValue::Type::List: { + std::vector vec; + vec.reserve(value.ValueList().size()); + for (const auto &value : value.ValueList()) + vec.emplace_back(ToPropertyValue(value)); + return PropertyValue(std::move(vec)); + } + case DecodedValue::Type::Map: { + std::map map; + for (const auto &kv : value.ValueMap()) + map.emplace(kv.first, ToPropertyValue(kv.second)); + return PropertyValue(std::move(map)); + } + case DecodedValue::Type::Vertex: + case DecodedValue::Type::Edge: + case DecodedValue::Type::UnboundedEdge: + case DecodedValue::Type::Path: + throw communication::bolt::DecodedValueException( + "Unsupported conversion from DecodedValue to PropertyValue"); + } +} + +DecodedValue ToDecodedValue(const PropertyValue &value) { + switch (value.type()) { + case PropertyValue::Type::Null: + return DecodedValue(); + case PropertyValue::Type::Bool: + return DecodedValue(value.Value()); + case PropertyValue::Type::Int: + return DecodedValue(value.Value()); + break; + case PropertyValue::Type::Double: + return DecodedValue(value.Value()); + case PropertyValue::Type::String: + return DecodedValue(value.Value()); + case PropertyValue::Type::List: { + const auto &values = value.Value>(); + std::vector vec; + vec.reserve(values.size()); + for (const auto &v : values) { + vec.push_back(ToDecodedValue(v)); + } + return DecodedValue(vec); + } + case PropertyValue::Type::Map: { + const auto &map = value.Value>(); + std::map dv_map; + for (const auto &kv : map) { + dv_map.emplace(kv.first, ToDecodedValue(kv.second)); + } + return DecodedValue(dv_map); + } + } +} + +} // namespace communication diff --git a/src/communication/conversion.hpp b/src/communication/conversion.hpp new file mode 100644 index 000000000..2c16facb1 --- /dev/null +++ b/src/communication/conversion.hpp @@ -0,0 +1,26 @@ +/// @file Conversion functions between DecodedValue and other memgraph types. +#pragma once + +#include "communication/bolt/v1/decoder/decoded_value.hpp" +#include "query/typed_value.hpp" +#include "storage/property_value.hpp" + +namespace communication { + +communication::bolt::DecodedVertex ToDecodedVertex( + const VertexAccessor &vertex); + +communication::bolt::DecodedEdge ToDecodedEdge(const EdgeAccessor &edge); + +communication::bolt::DecodedPath ToDecodedPath(const query::Path &path); + +communication::bolt::DecodedValue ToDecodedValue( + const query::TypedValue &value); + +query::TypedValue ToTypedValue(const communication::bolt::DecodedValue &value); + +communication::bolt::DecodedValue ToDecodedValue(const PropertyValue &value); + +PropertyValue ToPropertyValue(const communication::bolt::DecodedValue &value); + +} // namespace communication diff --git a/src/communication/result_stream_faker.hpp b/src/communication/result_stream_faker.hpp index d8194f9ec..7a0a919b3 100644 --- a/src/communication/result_stream_faker.hpp +++ b/src/communication/result_stream_faker.hpp @@ -3,7 +3,11 @@ #include #include "glog/logging.h" -#include "query/typed_value.hpp" + +#include "communication/bolt/v1/decoder/decoded_value.hpp" +#include "utils/algorithm.hpp" + +// TODO: Why is this here?! It's only used in tests and query/repl.cpp /** * A mocker for the data output record stream. @@ -11,6 +15,7 @@ * sent to it in an acceptable order, and tracks * the content of those messages. */ +template class ResultStreamFaker { public: ResultStreamFaker() = default; @@ -26,13 +31,14 @@ class ResultStreamFaker { current_state_ = State::WritingResults; } - void Result(const std::vector &values) { + void Result(const std::vector &values) { DCHECK(current_state_ == State::WritingResults) << "Can't accept results before header nor after summary"; results_.push_back(values); } - void Summary(const std::map &summary) { + void Summary( + const std::map &summary) { DCHECK(current_state_ != State::Done) << "Can only send a summary once"; summary_ = summary; current_state_ = State::Done; @@ -52,7 +58,7 @@ class ResultStreamFaker { friend std::ostream &operator<<(std::ostream &os, const ResultStreamFaker &results) { - auto typed_value_to_string = [](const query::TypedValue &value) { + auto decoded_value_to_string = [](const auto &value) { std::stringstream ss; ss << value; return ss.str(); @@ -71,7 +77,7 @@ class ResultStreamFaker { for (int col_ind = 0; col_ind < static_cast(column_widths.size()); ++col_ind) { std::string string_val = - typed_value_to_string(results_data[row_ind][col_ind]); + decoded_value_to_string(results_data[row_ind][col_ind]); column_widths[col_ind] = std::max(column_widths[col_ind], (int)string_val.size()); result_strings[row_ind][col_ind] = string_val; @@ -129,6 +135,6 @@ class ResultStreamFaker { // the data that the record stream can accept std::vector header_; - std::vector> results_; - std::map summary_; + std::vector> results_; + std::map summary_; }; diff --git a/src/database/state_delta.cpp b/src/database/state_delta.cpp index 2d448f493..04972e688 100644 --- a/src/database/state_delta.cpp +++ b/src/database/state_delta.cpp @@ -1,8 +1,10 @@ +#include "database/state_delta.hpp" + #include #include "communication/bolt/v1/decoder/decoded_value.hpp" +#include "communication/conversion.hpp" #include "database/graph_db_accessor.hpp" -#include "database/state_delta.hpp" namespace database { @@ -161,7 +163,7 @@ StateDelta StateDelta::BuildIndex(tx::TransactionId tx_id, storage::Label label, void StateDelta::Encode( HashedFileWriter &writer, - communication::bolt::PrimitiveEncoder &encoder) const { + communication::bolt::BaseEncoder &encoder) const { encoder.WriteInt(static_cast(type)); encoder.WriteInt(static_cast(transaction_id)); @@ -206,13 +208,13 @@ void StateDelta::Encode( encoder.WriteInt(vertex_id); encoder.WriteInt(property.Id()); encoder.WriteString(property_name); - encoder.WritePropertyValue(value); + encoder.WriteDecodedValue(communication::ToDecodedValue(value)); break; case Type::SET_PROPERTY_EDGE: encoder.WriteInt(edge_id); encoder.WriteInt(property.Id()); encoder.WriteString(property_name); - encoder.WritePropertyValue(value); + encoder.WriteDecodedValue(communication::ToDecodedValue(value)); break; case Type::ADD_LABEL: case Type::REMOVE_LABEL: @@ -302,14 +304,14 @@ std::experimental::optional StateDelta::Decode( DECODE_MEMBER_CAST(property, ValueInt, storage::Property) DECODE_MEMBER(property_name, ValueString) if (!decoder.ReadValue(&dv)) return nullopt; - r_val.value = static_cast(dv); + r_val.value = communication::ToPropertyValue(dv); break; case Type::SET_PROPERTY_EDGE: DECODE_MEMBER(edge_id, ValueInt) DECODE_MEMBER_CAST(property, ValueInt, storage::Property) DECODE_MEMBER(property_name, ValueString) if (!decoder.ReadValue(&dv)) return nullopt; - r_val.value = static_cast(dv); + r_val.value = communication::ToPropertyValue(dv); break; case Type::ADD_LABEL: case Type::REMOVE_LABEL: diff --git a/src/database/state_delta.lcp b/src/database/state_delta.lcp index 9d3daee26..70a85879e 100644 --- a/src/database/state_delta.lcp +++ b/src/database/state_delta.lcp @@ -2,7 +2,7 @@ #pragma once #include "communication/bolt/v1/decoder/decoder.hpp" -#include "communication/bolt/v1/encoder/primitive_encoder.hpp" +#include "communication/bolt/v1/encoder/base_encoder.hpp" #include "database/state_delta.capnp.h" #include "durability/hashed_file_reader.hpp" #include "durability/hashed_file_writer.hpp" @@ -125,7 +125,7 @@ omitted in the comment.") * with delta to the writer */ void Encode( HashedFileWriter &writer, - communication::bolt::PrimitiveEncoder &encoder) const; + communication::bolt::BaseEncoder &encoder) const; static StateDelta TxBegin(tx::TransactionId tx_id); static StateDelta TxCommit(tx::TransactionId tx_id); diff --git a/src/durability/recovery.cpp b/src/durability/recovery.cpp index 4006e2993..ddab0b15e 100644 --- a/src/durability/recovery.cpp +++ b/src/durability/recovery.cpp @@ -4,6 +4,7 @@ #include #include +#include "communication/conversion.hpp" #include "database/graph_db_accessor.hpp" #include "database/indexes/label_property_index.hpp" #include "durability/hashed_file_reader.hpp" @@ -132,8 +133,9 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db, vertex_accessor.add_label(dba.Label(label)); } for (const auto &property_pair : vertex->properties) { - vertex_accessor.PropsSet(dba.Property(property_pair.first), - query::TypedValue(property_pair.second)); + vertex_accessor.PropsSet( + dba.Property(property_pair.first), + communication::ToTypedValue(property_pair.second)); } auto vertex_record = vertex_accessor.GetNew(); for (const auto &edge : vertex->in) { @@ -187,7 +189,7 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db, // We have to take full edge endpoints from vertices since the endpoints // found here don't containt worker_id, and this can't be changed since this // edges must be bolt-compliant - auto &edge_endpoints = edge_gid_endpoints_mapping[edge.id]; + auto &edge_endpoints = edge_gid_endpoints_mapping[edge.id.AsUint()]; storage::VertexAddress from; storage::VertexAddress to; @@ -199,11 +201,11 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db, vertex_transform_to_local_if_possible(to); auto edge_accessor = dba.InsertOnlyEdge(from, to, dba.EdgeType(edge.type), - edge.id, cypher_id); + edge.id.AsUint(), cypher_id); for (const auto &property_pair : edge.properties) edge_accessor.PropsSet(dba.Property(property_pair.first), - query::TypedValue(property_pair.second)); + communication::ToTypedValue(property_pair.second)); } // Vertex and edge counts are included in the hash. Re-read them to update the diff --git a/src/durability/snapshooter.cpp b/src/durability/snapshooter.cpp index 0b523a501..8258ec8d8 100644 --- a/src/durability/snapshooter.cpp +++ b/src/durability/snapshooter.cpp @@ -46,7 +46,7 @@ bool Encode(const fs::path &snapshot_file, database::GraphDb &db, // Write the transaction snapshot into the snapshot. It's used when // recovering from the combination of snapshot and write-ahead-log. { - std::vector tx_snapshot; + std::vector tx_snapshot; for (int64_t tx : dba.transaction().snapshot()) tx_snapshot.emplace_back(tx); encoder.WriteList(tx_snapshot); @@ -54,7 +54,7 @@ bool Encode(const fs::path &snapshot_file, database::GraphDb &db, // Write label+property indexes as list ["label", "property", ...] { - std::vector index_vec; + std::vector index_vec; for (const auto &key : dba.GetIndicesKeys()) { index_vec.emplace_back(dba.LabelName(key.label_)); index_vec.emplace_back(dba.PropertyName(key.property_)); @@ -67,7 +67,7 @@ bool Encode(const fs::path &snapshot_file, database::GraphDb &db, vertex_num++; } for (const auto &edge : dba.Edges(false)) { - encoder.WriteEdge(edge); + encoder.WriteEdge(communication::ToDecodedEdge(edge)); encoder.WriteInt(edge.cypher_id()); edge_num++; } diff --git a/src/durability/snapshot_decoder.hpp b/src/durability/snapshot_decoder.hpp index e6a9d45a5..97671a78f 100644 --- a/src/durability/snapshot_decoder.hpp +++ b/src/durability/snapshot_decoder.hpp @@ -24,7 +24,7 @@ class SnapshotDecoder : public communication::bolt::Decoder { return std::experimental::nullopt; } auto &read_vertex = dv.ValueVertex(); - vertex.gid = static_cast(read_vertex.id); + vertex.gid = read_vertex.id.AsUint(); vertex.labels = read_vertex.labels; vertex.properties = read_vertex.properties; diff --git a/src/durability/snapshot_encoder.hpp b/src/durability/snapshot_encoder.hpp index cbcf05a23..8edd284d3 100644 --- a/src/durability/snapshot_encoder.hpp +++ b/src/durability/snapshot_encoder.hpp @@ -1,6 +1,8 @@ #pragma once #include "communication/bolt/v1/encoder/base_encoder.hpp" +#include "communication/conversion.hpp" +#include "database/graph_db_accessor.hpp" #include "utils/cast.hpp" namespace durability { @@ -11,7 +13,8 @@ class SnapshotEncoder : public communication::bolt::BaseEncoder { explicit SnapshotEncoder(Buffer &buffer) : communication::bolt::BaseEncoder(buffer) {} void WriteSnapshotVertex(const VertexAccessor &vertex) { - communication::bolt::BaseEncoder::WriteVertex(vertex); + communication::bolt::BaseEncoder::WriteVertex( + communication::ToDecodedVertex(vertex)); // Write cypher_id this->WriteInt(vertex.cypher_id()); diff --git a/src/durability/wal.cpp b/src/durability/wal.cpp index bd1af8f0c..edd5bfcc6 100644 --- a/src/durability/wal.cpp +++ b/src/durability/wal.cpp @@ -1,6 +1,5 @@ #include "wal.hpp" -#include "communication/bolt/v1/decoder/decoded_value.hpp" #include "durability/paths.hpp" #include "utils/file.hpp" #include "utils/flag_validation.hpp" diff --git a/src/durability/wal.hpp b/src/durability/wal.hpp index e16de3694..35d2b2efc 100644 --- a/src/durability/wal.hpp +++ b/src/durability/wal.hpp @@ -8,8 +8,7 @@ #include #include -#include "communication/bolt/v1/decoder/decoder.hpp" -#include "communication/bolt/v1/encoder/primitive_encoder.hpp" +#include "communication/bolt/v1/encoder/base_encoder.hpp" #include "data_structures/ring_buffer.hpp" #include "database/state_delta.hpp" #include "storage/gid.hpp" @@ -68,7 +67,7 @@ class WriteAheadLog { int worker_id_; const std::experimental::filesystem::path wal_dir_; HashedFileWriter writer_; - communication::bolt::PrimitiveEncoder encoder_{writer_}; + communication::bolt::BaseEncoder encoder_{writer_}; // The file to which the WAL flushes data. The path is fixed, the file gets // moved when the WAL gets rotated. diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 5c5df863b..74a4be1fc 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -98,7 +98,7 @@ Interpreter::Results Interpreter::operator()( ctx.symbol_table_ = plan->symbol_table(); - std::map summary; + std::map summary; summary["parsing_time"] = frontend_time.count(); summary["planning_time"] = planning_time.count(); summary["cost_estimate"] = plan->cost(); diff --git a/src/query/interpreter.hpp b/src/query/interpreter.hpp index fae3b95bb..db90fe1af 100644 --- a/src/query/interpreter.hpp +++ b/src/query/interpreter.hpp @@ -2,6 +2,8 @@ #include +#include "communication/bolt/v1/encoder/base_encoder.hpp" +#include "communication/conversion.hpp" #include "data_structures/concurrent/concurrent_map.hpp" #include "database/graph_db.hpp" #include "database/graph_db_accessor.hpp" @@ -66,7 +68,8 @@ class Interpreter { Results(Context ctx, std::shared_ptr plan, std::unique_ptr cursor, std::vector output_symbols, std::vector header, - std::map summary, PlanCacheT &plan_cache) + std::map summary, + PlanCacheT &plan_cache) : ctx_(std::move(ctx)), plan_(plan), cursor_(std::move(cursor)), @@ -141,7 +144,7 @@ class Interpreter { bool header_written_{false}; std::vector header_; - std::map summary_; + std::map summary_; utils::Timer execution_timer_; // Gets invalidated after if an index has been built. diff --git a/src/query/repl.cpp b/src/query/repl.cpp index b052cef7d..96cf47a87 100644 --- a/src/query/repl.cpp +++ b/src/query/repl.cpp @@ -64,7 +64,7 @@ void query::Repl(database::GraphDb &db) { // regular cypher queries try { database::GraphDbAccessor dba(db); - ResultStreamFaker results; + ResultStreamFaker results; interpeter(command, dba, {}, false).PullAll(results); std::cout << results; dba.Commit(); diff --git a/src/storage/property_value_store.cpp b/src/storage/property_value_store.cpp index 0bbe9be38..4f6d5d70e 100644 --- a/src/storage/property_value_store.cpp +++ b/src/storage/property_value_store.cpp @@ -3,6 +3,8 @@ #include "gflags/gflags.h" #include "glog/logging.h" +#include "communication/bolt/v1/decoder/decoder.hpp" +#include "communication/conversion.hpp" #include "storage/pod_buffer.hpp" #include "storage/property_value_store.hpp" @@ -211,7 +213,7 @@ PropertyValueStore::iterator PropertyValueStore::end() const { std::string PropertyValueStore::SerializeProp(const PropertyValue &prop) const { storage::PODBuffer pod_buffer; BaseEncoder encoder{pod_buffer}; - encoder.WriteTypedValue(prop); + encoder.WriteDecodedValue(communication::ToDecodedValue(prop)); return std::string(reinterpret_cast(pod_buffer.buffer.data()), pod_buffer.buffer.size()); } @@ -219,14 +221,14 @@ std::string PropertyValueStore::SerializeProp(const PropertyValue &prop) const { PropertyValue PropertyValueStore::DeserializeProp( const std::string &serialized_prop) const { storage::PODBuffer pod_buffer{serialized_prop}; - Decoder decoder{pod_buffer}; + communication::bolt::Decoder decoder{pod_buffer}; DecodedValue dv; if (!decoder.ReadValue(&dv)) { DLOG(WARNING) << "Unable to read property value"; return PropertyValue::Null; } - return dv.operator PropertyValue(); + return communication::ToPropertyValue(dv); } storage::KVStore PropertyValueStore::ConstructDiskStorage() const { diff --git a/tests/benchmark/expansion.cpp b/tests/benchmark/expansion.cpp index e927e41e9..369b75437 100644 --- a/tests/benchmark/expansion.cpp +++ b/tests/benchmark/expansion.cpp @@ -47,7 +47,7 @@ BENCHMARK_DEFINE_F(ExpansionBenchFixture, Match)(benchmark::State &state) { auto query = "MATCH (s:Starting) return s"; database::GraphDbAccessor dba(*db_); while (state.KeepRunning()) { - ResultStreamFaker results; + ResultStreamFaker results; interpreter()(query, dba, {}, false).PullAll(results); } } @@ -61,7 +61,7 @@ BENCHMARK_DEFINE_F(ExpansionBenchFixture, Expand)(benchmark::State &state) { auto query = "MATCH (s:Starting) WITH s MATCH (s)--(d) RETURN count(d)"; database::GraphDbAccessor dba(*db_); while (state.KeepRunning()) { - ResultStreamFaker results; + ResultStreamFaker results; interpreter()(query, dba, {}, false).PullAll(results); } } diff --git a/tests/macro_benchmark/clients/common.hpp b/tests/macro_benchmark/clients/common.hpp index f1583a6d5..b90195acd 100644 --- a/tests/macro_benchmark/clients/common.hpp +++ b/tests/macro_benchmark/clients/common.hpp @@ -8,7 +8,9 @@ #include "communication/bolt/client.hpp" #include "communication/bolt/v1/decoder/decoded_value.hpp" +#include "utils/algorithm.hpp" #include "utils/exceptions.hpp" +#include "utils/thread/sync.hpp" #include "utils/timer.hpp" using communication::ClientContext; diff --git a/tests/macro_benchmark/clients/long_running_common.hpp b/tests/macro_benchmark/clients/long_running_common.hpp index 8ab7311ee..2dad46661 100644 --- a/tests/macro_benchmark/clients/long_running_common.hpp +++ b/tests/macro_benchmark/clients/long_running_common.hpp @@ -1,5 +1,17 @@ #pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + #include "json/json.hpp" #include "stats/metrics.hpp" diff --git a/tests/macro_benchmark/clients/pokec_client.cpp b/tests/macro_benchmark/clients/pokec_client.cpp index cf19edbea..f0ad5dea5 100644 --- a/tests/macro_benchmark/clients/pokec_client.cpp +++ b/tests/macro_benchmark/clients/pokec_client.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include diff --git a/tests/macro_benchmark/clients/query_client.cpp b/tests/macro_benchmark/clients/query_client.cpp index 224465265..903da4a5b 100644 --- a/tests/macro_benchmark/clients/query_client.cpp +++ b/tests/macro_benchmark/clients/query_client.cpp @@ -1,4 +1,5 @@ #include +#include #include #include diff --git a/tests/manual/distributed_common.hpp b/tests/manual/distributed_common.hpp index a33d5acd5..0ac0d1729 100644 --- a/tests/manual/distributed_common.hpp +++ b/tests/manual/distributed_common.hpp @@ -3,6 +3,7 @@ #include #include +#include "communication/conversion.hpp" #include "communication/result_stream_faker.hpp" #include "database/graph_db_accessor.hpp" #include "query/interpreter.hpp" @@ -62,7 +63,7 @@ class Cluster { auto Execute(const std::string &query, std::map params = {}) { database::GraphDbAccessor dba(*master_); - ResultStreamFaker result; + ResultStreamFaker result; interpreter_->operator()(query, dba, params, false).PullAll(result); dba.Commit(); return result.GetResults(); diff --git a/tests/manual/single_query.cpp b/tests/manual/single_query.cpp index d56564235..7ef65fa9c 100644 --- a/tests/manual/single_query.cpp +++ b/tests/manual/single_query.cpp @@ -13,7 +13,7 @@ int main(int argc, char *argv[]) { } database::SingleNode db; database::GraphDbAccessor dba(db); - ResultStreamFaker results; + ResultStreamFaker results; query::Interpreter{db}(argv[1], dba, {}, false).PullAll(results); std::cout << results; return 0; diff --git a/tests/manual/snapshot_generation/snapshot_writer.hpp b/tests/manual/snapshot_generation/snapshot_writer.hpp index ec9f8d16e..df8881ecd 100644 --- a/tests/manual/snapshot_generation/snapshot_writer.hpp +++ b/tests/manual/snapshot_generation/snapshot_writer.hpp @@ -4,6 +4,8 @@ #include #include "communication/bolt/v1/encoder/base_encoder.hpp" +#include "communication/conversion.hpp" +#include "durability/hashed_file_writer.hpp" #include "durability/paths.hpp" #include "durability/version.hpp" #include "query/typed_value.hpp" @@ -25,12 +27,12 @@ class SnapshotWriter { : worker_id_(worker_id), buffer_(path) { encoder_.WriteRAW(durability::kMagicNumber.data(), durability::kMagicNumber.size()); - encoder_.WriteTypedValue(durability::kVersion); + encoder_.WriteDecodedValue(durability::kVersion); encoder_.WriteInt(worker_id_); encoder_.WriteInt(vertex_generator_local_count); encoder_.WriteInt(edge_generator_local_count); encoder_.WriteInt(0); - encoder_.WriteList(std::vector{}); + encoder_.WriteList(std::vector{}); } // reference to `buffer_` gets broken when moving, so let's just forbid moving @@ -39,8 +41,8 @@ class SnapshotWriter { template void WriteList(const std::vector &list) { - encoder_.WriteList( - std::vector(list.begin(), list.end())); + encoder_.WriteList(std::vector( + list.begin(), list.end())); } storage::VertexAddress DefaultVertexAddress(gid::Gid gid) { @@ -67,7 +69,11 @@ class SnapshotWriter { encoder_.WriteInt(node.gid); WriteList(node.labels); - encoder_.WriteMap(node.props); + std::map props; + for (const auto &prop : node.props) { + props[prop.first] = communication::ToDecodedValue(prop.second); + } + encoder_.WriteMap(props); // cypher_id encoder_.WriteInt(utils::MemcpyCast(node.gid)); @@ -94,7 +100,11 @@ class SnapshotWriter { encoder_.WriteInt(edge.from); encoder_.WriteInt(edge.to); encoder_.WriteString(edge.type); - encoder_.WriteMap(edge.props); + std::map props; + for (const auto &prop : edge.props) { + props[prop.first] = communication::ToDecodedValue(prop.second); + } + encoder_.WriteMap(props); // cypher_id encoder_.WriteInt(utils::MemcpyCast(edge.gid)); diff --git a/tests/stress/long_running.cpp b/tests/stress/long_running.cpp index d70b97e71..9b1b562b2 100644 --- a/tests/stress/long_running.cpp +++ b/tests/stress/long_running.cpp @@ -1,3 +1,8 @@ +#include +#include +#include +#include + #include #include #include diff --git a/tests/unit/bolt_decoder.cpp b/tests/unit/bolt_decoder.cpp index 7adaa10d4..ea3b3d756 100644 --- a/tests/unit/bolt_decoder.cpp +++ b/tests/unit/bolt_decoder.cpp @@ -342,7 +342,7 @@ TEST(BoltDecoder, Vertex) { buffer.Write(test_int, 1); ASSERT_EQ(decoder.ReadValue(&dv, DecodedValue::Type::Vertex), true); auto &vertex = dv.ValueVertex(); - ASSERT_EQ(vertex.id, 1); + ASSERT_EQ(vertex.id.AsUint(), 1); ASSERT_EQ(vertex.labels[0], std::string("a")); ASSERT_EQ(vertex.properties[std::string("a")].ValueInt(), 1); } @@ -429,9 +429,9 @@ TEST(BoltDecoder, Edge) { buffer.Write(test_int1, 1); ASSERT_EQ(decoder.ReadValue(&de, DecodedValue::Type::Edge), true); auto &edge = de.ValueEdge(); - ASSERT_EQ(edge.id, 1); - ASSERT_EQ(edge.from, 2); - ASSERT_EQ(edge.to, 3); + ASSERT_EQ(edge.id.AsUint(), 1); + ASSERT_EQ(edge.from.AsUint(), 2); + ASSERT_EQ(edge.to.AsUint(), 3); ASSERT_EQ(edge.type, std::string("a")); ASSERT_EQ(edge.properties[std::string("a")].ValueInt(), 1); } diff --git a/tests/unit/bolt_encoder.cpp b/tests/unit/bolt_encoder.cpp index b5cd6d909..598ec6a04 100644 --- a/tests/unit/bolt_encoder.cpp +++ b/tests/unit/bolt_encoder.cpp @@ -2,11 +2,11 @@ #include "bolt_testdata.hpp" #include "communication/bolt/v1/encoder/encoder.hpp" +#include "communication/conversion.hpp" #include "database/graph_db.hpp" #include "database/graph_db_accessor.hpp" -#include "query/typed_value.hpp" -using query::TypedValue; +using communication::bolt::DecodedValue; /** * TODO (mferencevic): document @@ -65,10 +65,10 @@ std::vector &output = output_stream.output; TEST(BoltEncoder, NullAndBool) { output.clear(); - std::vector vals; - vals.push_back(TypedValue::Null); - vals.push_back(TypedValue(true)); - vals.push_back(TypedValue(false)); + std::vector vals; + vals.push_back(DecodedValue()); + vals.push_back(DecodedValue(true)); + vals.push_back(DecodedValue(false)); bolt_encoder.MessageRecord(vals); CheckRecordHeader(output, 3); CheckOutput(output, (const uint8_t *)"\xC0\xC3\xC2", 3); @@ -77,8 +77,8 @@ TEST(BoltEncoder, NullAndBool) { TEST(BoltEncoder, Int) { int N = 28; output.clear(); - std::vector vals; - for (int i = 0; i < N; ++i) vals.push_back(TypedValue(int_decoded[i])); + std::vector vals; + for (int i = 0; i < N; ++i) vals.push_back(DecodedValue(int_decoded[i])); bolt_encoder.MessageRecord(vals); CheckRecordHeader(output, N); for (int i = 0; i < N; ++i) @@ -89,8 +89,8 @@ TEST(BoltEncoder, Int) { TEST(BoltEncoder, Double) { int N = 4; output.clear(); - std::vector vals; - for (int i = 0; i < N; ++i) vals.push_back(TypedValue(double_decoded[i])); + std::vector vals; + for (int i = 0; i < N; ++i) vals.push_back(DecodedValue(double_decoded[i])); bolt_encoder.MessageRecord(vals); CheckRecordHeader(output, N); for (int i = 0; i < N; ++i) CheckOutput(output, double_encoded[i], 9, false); @@ -99,9 +99,9 @@ TEST(BoltEncoder, Double) { TEST(BoltEncoder, String) { output.clear(); - std::vector vals; + std::vector vals; for (uint64_t i = 0; i < sizes_num; ++i) - vals.push_back(TypedValue(std::string((const char *)data, sizes[i]))); + vals.push_back(DecodedValue(std::string((const char *)data, sizes[i]))); bolt_encoder.MessageRecord(vals); CheckRecordHeader(output, vals.size()); for (uint64_t i = 0; i < sizes_num; ++i) { @@ -113,12 +113,12 @@ TEST(BoltEncoder, String) { TEST(BoltEncoder, List) { output.clear(); - std::vector vals; + std::vector vals; for (uint64_t i = 0; i < sizes_num; ++i) { - std::vector val; + std::vector val; for (uint64_t j = 0; j < sizes[i]; ++j) - val.push_back(TypedValue(std::string((const char *)&data[j], 1))); - vals.push_back(TypedValue(val)); + val.push_back(DecodedValue(std::string((const char *)&data[j], 1))); + vals.push_back(DecodedValue(val)); } bolt_encoder.MessageRecord(vals); CheckRecordHeader(output, vals.size()); @@ -134,16 +134,16 @@ TEST(BoltEncoder, List) { TEST(BoltEncoder, Map) { output.clear(); - std::vector vals; + std::vector vals; uint8_t buff[10]; for (int i = 0; i < sizes_num; ++i) { - std::map val; + std::map val; for (int j = 0; j < sizes[i]; ++j) { sprintf((char *)buff, "%05X", j); std::string tmp((char *)buff, 5); - val.insert(std::make_pair(tmp, TypedValue(tmp))); + val.insert(std::make_pair(tmp, DecodedValue(tmp))); } - vals.push_back(TypedValue(val)); + vals.push_back(DecodedValue(val)); } bolt_encoder.MessageRecord(vals); CheckRecordHeader(output, vals.size()); @@ -188,10 +188,10 @@ TEST(BoltEncoder, VertexAndEdge) { ea.PropsSet(p4, pv4); // check everything - std::vector vals; - vals.push_back(TypedValue(va1)); - vals.push_back(TypedValue(va2)); - vals.push_back(TypedValue(ea)); + std::vector vals; + vals.push_back(communication::ToDecodedValue(va1)); + vals.push_back(communication::ToDecodedValue(va2)); + vals.push_back(communication::ToDecodedValue(ea)); bolt_encoder.MessageRecord(vals); // The vertexedge_encoded testdata has hardcoded zeros for IDs, @@ -214,18 +214,18 @@ TEST(BoltEncoder, BoltV1ExampleMessages) { output.clear(); // record message - std::vector rvals; - for (int i = 1; i < 4; ++i) rvals.push_back(TypedValue(i)); + std::vector rvals; + for (int i = 1; i < 4; ++i) rvals.push_back(DecodedValue(i)); bolt_encoder.MessageRecord(rvals); CheckOutput(output, (const uint8_t *)"\xB1\x71\x93\x01\x02\x03", 6); // success message std::string sv1("name"), sv2("age"), sk("fields"); - std::vector svec; - svec.push_back(TypedValue(sv1)); - svec.push_back(TypedValue(sv2)); - TypedValue slist(svec); - std::map svals; + std::vector svec; + svec.push_back(DecodedValue(sv1)); + svec.push_back(DecodedValue(sv2)); + DecodedValue slist(svec); + std::map svals; svals.insert(std::make_pair(sk, slist)); bolt_encoder.MessageSuccess(svals); CheckOutput(output, @@ -236,8 +236,8 @@ TEST(BoltEncoder, BoltV1ExampleMessages) { std::string fv1("Neo.ClientError.Statement.SyntaxError"), fv2("Invalid syntax."); std::string fk1("code"), fk2("message"); - TypedValue ftv1(fv1), ftv2(fv2); - std::map fvals; + DecodedValue ftv1(fv1), ftv2(fv2); + std::map fvals; fvals.insert(std::make_pair(fk1, ftv1)); fvals.insert(std::make_pair(fk2, ftv2)); bolt_encoder.MessageFailure(fvals); diff --git a/tests/unit/bolt_result_stream.cpp b/tests/unit/bolt_result_stream.cpp index 9d3305a5e..d0976b49b 100644 --- a/tests/unit/bolt_result_stream.cpp +++ b/tests/unit/bolt_result_stream.cpp @@ -3,9 +3,8 @@ #include "communication/bolt/v1/encoder/chunked_encoder_buffer.hpp" #include "communication/bolt/v1/encoder/encoder.hpp" #include "communication/bolt/v1/encoder/result_stream.hpp" -#include "query/typed_value.hpp" -using query::TypedValue; +using communication::bolt::DecodedValue; using BufferT = communication::bolt::ChunkedEncoderBuffer; using EncoderT = communication::bolt::Encoder; @@ -36,15 +35,15 @@ TEST(Bolt, ResultStream) { PrintOutput(output); CheckOutput(output, header_output, 45); - std::vector result{TypedValue(5), - TypedValue(std::string("hello"))}; + std::vector result{DecodedValue(5), + DecodedValue(std::string("hello"))}; result_stream.Result(result); buffer.Flush(); PrintOutput(output); CheckOutput(output, result_output, 14); - std::map summary; - summary.insert(std::make_pair(std::string("changed"), TypedValue(10))); + std::map summary; + summary.insert(std::make_pair(std::string("changed"), DecodedValue(10))); result_stream.Summary(summary); buffer.Flush(); PrintOutput(output); diff --git a/tests/unit/database_transaction_timeout.cpp b/tests/unit/database_transaction_timeout.cpp index 7f03bfb0a..90c7cbf8e 100644 --- a/tests/unit/database_transaction_timeout.cpp +++ b/tests/unit/database_transaction_timeout.cpp @@ -12,7 +12,7 @@ TEST(TransactionTimeout, TransactionTimeout) { database::SingleNode db; query::Interpreter interpreter{db}; auto interpret = [&](auto &dba, const std::string &query) { - ResultStreamFaker stream; + ResultStreamFaker stream; interpreter(query, dba, {}, false).PullAll(stream); }; diff --git a/tests/unit/distributed_interpretation.cpp b/tests/unit/distributed_interpretation.cpp index 8c7f02e7d..33d97c082 100644 --- a/tests/unit/distributed_interpretation.cpp +++ b/tests/unit/distributed_interpretation.cpp @@ -4,6 +4,7 @@ #include "gmock/gmock.h" #include "gtest/gtest.h" +#include "communication/result_stream_faker.hpp" #include "database/graph_db.hpp" #include "distributed/plan_consumer.hpp" #include "distributed/plan_dispatcher.hpp" @@ -38,7 +39,7 @@ class DistributedInterpretationTest : public DistributedGraphDbTest { auto RunWithDba(const std::string &query, GraphDbAccessor &dba) { std::map params = {}; - ResultStreamFaker result; + ResultStreamFaker result; interpreter_.value()(query, dba, params, false).PullAll(result); return result.GetResults(); } diff --git a/tests/unit/durability.cpp b/tests/unit/durability.cpp index f767a4dd8..faea6cf9f 100644 --- a/tests/unit/durability.cpp +++ b/tests/unit/durability.cpp @@ -482,18 +482,18 @@ TEST_F(Durability, SnapshotEncoding) { decoder.ReadValue(&dv); ASSERT_EQ(dv.type(), communication::bolt::DecodedValue::Type::Edge); auto &edge = dv.ValueEdge(); - decoded_edges.emplace(edge.id, edge); + decoded_edges.emplace(edge.id.AsUint(), edge); // Read cypher_id. decoder.ReadValue(&dv); ASSERT_EQ(dv.type(), communication::bolt::DecodedValue::Type::Int); } EXPECT_EQ(decoded_edges.size(), 2); - EXPECT_EQ(decoded_edges[gid0].from, gid0); - EXPECT_EQ(decoded_edges[gid0].to, gid1); + EXPECT_EQ(decoded_edges[gid0].from.AsUint(), gid0); + EXPECT_EQ(decoded_edges[gid0].to.AsUint(), gid1); EXPECT_EQ(decoded_edges[gid0].type, "et0"); EXPECT_EQ(decoded_edges[gid0].properties.size(), 1); - EXPECT_EQ(decoded_edges[gid1].from, gid2); - EXPECT_EQ(decoded_edges[gid1].to, gid1); + EXPECT_EQ(decoded_edges[gid1].from.AsUint(), gid2); + EXPECT_EQ(decoded_edges[gid1].to.AsUint(), gid1); EXPECT_EQ(decoded_edges[gid1].type, "et1"); EXPECT_EQ(decoded_edges[gid1].properties.size(), 0); diff --git a/tests/unit/interpreter.cpp b/tests/unit/interpreter.cpp index 9db2d3af2..9224fe146 100644 --- a/tests/unit/interpreter.cpp +++ b/tests/unit/interpreter.cpp @@ -17,11 +17,10 @@ class InterpreterTest : public ::testing::Test { database::SingleNode db_; query::Interpreter interpreter_{db_}; - ResultStreamFaker Interpret( - const std::string &query, - const std::map params = {}) { + auto Interpret(const std::string &query, + const std::map ¶ms = {}) { database::GraphDbAccessor dba(db_); - ResultStreamFaker result; + ResultStreamFaker result; interpreter_(query, dba, params, false).PullAll(result); return result; } @@ -198,7 +197,7 @@ TEST_F(InterpreterTest, Bfs) { } database::GraphDbAccessor dba(db_); - ResultStreamFaker stream; + ResultStreamFaker stream; interpreter_( "MATCH (n {id: 0})-[r *bfs..5 (e, n | n.reachable and " "e.reachable)]->(m) RETURN r", @@ -242,7 +241,7 @@ TEST_F(InterpreterTest, Bfs) { } TEST_F(InterpreterTest, CreateIndexInMulticommandTransaction) { - ResultStreamFaker stream; + ResultStreamFaker stream; database::GraphDbAccessor dba(db_); ASSERT_THROW( interpreter_("CREATE INDEX ON :X(y)", dba, {}, true).PullAll(stream), @@ -252,7 +251,7 @@ TEST_F(InterpreterTest, CreateIndexInMulticommandTransaction) { // Test shortest path end to end. TEST_F(InterpreterTest, ShortestPath) { { - ResultStreamFaker stream; + ResultStreamFaker stream; database::GraphDbAccessor dba(db_); interpreter_( "CREATE (n:A {x: 1}), (m:B {x: 2}), (l:C {x: 1}), (n)-[:r1 {w: 1 " @@ -263,7 +262,7 @@ TEST_F(InterpreterTest, ShortestPath) { dba.Commit(); } - ResultStreamFaker stream; + ResultStreamFaker stream; database::GraphDbAccessor dba(db_); interpreter_("MATCH (n)-[e *wshortest 5 (e, n | e.w) ]->(m) return e", dba, {}, false) diff --git a/tests/unit/query_plan_common.hpp b/tests/unit/query_plan_common.hpp index a46bfa8e0..f0731ce68 100644 --- a/tests/unit/query_plan_common.hpp +++ b/tests/unit/query_plan_common.hpp @@ -4,7 +4,6 @@ #include #include -#include "communication/result_stream_faker.hpp" #include "query/common.hpp" #include "query/context.hpp" #include "query/frontend/semantic/symbol_table.hpp" @@ -18,30 +17,15 @@ using namespace query::plan; using Bound = ScanAllByLabelPropertyRange::Bound; -/** - * Helper function that collects all the results from the given - * Produce into a ResultStreamFaker and returns the results from it. - * - * @param produce - * @param symbol_table - * @param db_accessor - * @return - */ +/** Helper function that collects all the results from the given Produce. */ std::vector> CollectProduce( Produce *produce, SymbolTable &symbol_table, database::GraphDbAccessor &db_accessor) { - ResultStreamFaker stream; Frame frame(symbol_table.max_position()); // top level node in the operator tree is a produce (return) // so stream out results - // generate header - std::vector header; - for (auto named_expression : produce->named_expressions()) - header.push_back(named_expression->name_); - stream.Header(header); - // collect the symbols from the return clause std::vector symbols; for (auto named_expression : produce->named_expressions()) @@ -51,15 +35,14 @@ std::vector> CollectProduce( context.symbol_table_ = symbol_table; // stream out results auto cursor = produce->MakeCursor(db_accessor); + std::vector> results; while (cursor->Pull(frame, context)) { std::vector values; for (auto &symbol : symbols) values.emplace_back(frame[symbol]); - stream.Result(values); + results.emplace_back(values); } - stream.Summary({{std::string("type"), TypedValue("r")}}); - - return stream.GetResults(); + return results; } int PullAll(std::shared_ptr logical_op, diff --git a/tests/unit/query_plan_edge_cases.cpp b/tests/unit/query_plan_edge_cases.cpp index e8f12b2d6..6621f3132 100644 --- a/tests/unit/query_plan_edge_cases.cpp +++ b/tests/unit/query_plan_edge_cases.cpp @@ -39,7 +39,7 @@ class QueryExecution : public testing::Test { /** Executes the query and returns the results. * Does NOT commit the transaction */ auto Execute(const std::string &query) { - ResultStreamFaker results; + ResultStreamFaker results; query::Interpreter{*db_}(query, *dba_, {}, false).PullAll(results); return results.GetResults(); } diff --git a/tools/src/mg_import_csv/main.cpp b/tools/src/mg_import_csv/main.cpp index ef8bf72e4..220edf704 100644 --- a/tools/src/mg_import_csv/main.cpp +++ b/tools/src/mg_import_csv/main.cpp @@ -17,6 +17,7 @@ #include "durability/snapshot_encoder.hpp" #include "durability/version.hpp" #include "storage/address_types.hpp" +#include "utils/cast.hpp" #include "utils/string.hpp" #include "utils/timer.hpp" @@ -250,7 +251,8 @@ void WriteNodeRow( } } CHECK(id) << "Node ID must be specified"; - partial_vertices[*id] = {*id, static_cast(*id), labels, properties, {}}; + partial_vertices[*id] = { + *id, utils::MemcpyCast(*id), labels, properties, {}}; } auto PassNodes(std::unordered_map @@ -307,7 +309,10 @@ void WriteRelationshipsRow( CHECK(end_id) << "END_ID must be set"; CHECK(relationship_type) << "Relationship TYPE must be set"; - edges[relationship_id] = {(int64_t)relationship_id, *start_id, *end_id, + auto bolt_id = communication::bolt::Id::FromUint(relationship_id); + auto bolt_start_id = communication::bolt::Id::FromUint(*start_id); + auto bolt_end_id = communication::bolt::Id::FromUint(*end_id); + edges[relationship_id] = {bolt_id, bolt_start_id, bolt_end_id, *relationship_type, properties}; } @@ -353,7 +358,7 @@ void Convert(const std::vector &nodes, // 7) Summary with node count, relationship count and hash digest. encoder.WriteRAW(durability::kMagicNumber.data(), durability::kMagicNumber.size()); - encoder.WriteTypedValue(durability::kVersion); + encoder.WriteDecodedValue(durability::kVersion); encoder.WriteInt(0); // Worker Id - for this use case it's okay to set to 0 // since we are using a single-node version of @@ -379,11 +384,12 @@ void Convert(const std::vector &nodes, } for (auto edge : edges) { auto encoded = edge.second; - auto edge_address = storage::EdgeAddress(encoded.id, 0); - vertices[encoded.from].out.push_back( - {edge_address, storage::VertexAddress(encoded.to, 0), encoded.type}); - vertices[encoded.to].in.push_back( - {edge_address, storage::VertexAddress(encoded.from, 0), + auto edge_address = storage::EdgeAddress(encoded.id.AsUint(), 0); + vertices[encoded.from.AsUint()].out.push_back( + {edge_address, storage::VertexAddress(encoded.to.AsUint(), 0), + encoded.type}); + vertices[encoded.to.AsUint()].in.push_back( + {edge_address, storage::VertexAddress(encoded.from.AsUint(), 0), encoded.type}); } for (auto vertex_pair : vertices) { @@ -396,10 +402,12 @@ void Convert(const std::vector &nodes, encoder.WriteInt(vertex.gid); auto &labels = vertex.labels; - std::vector transformed; - std::transform( - labels.begin(), labels.end(), std::back_inserter(transformed), - [](const std::string &str) -> query::TypedValue { return str; }); + std::vector transformed; + std::transform(labels.begin(), labels.end(), + std::back_inserter(transformed), + [](const std::string &str) { + return communication::bolt::DecodedValue(str); + }); encoder.WriteList(transformed); encoder.WriteMap(vertex.properties); @@ -426,14 +434,14 @@ void Convert(const std::vector &nodes, utils::UnderlyingCast(communication::bolt::Marker::TinyStruct) + 5); encoder.WriteRAW( utils::UnderlyingCast(communication::bolt::Signature::Relationship)); - encoder.WriteInt(edge.id); - encoder.WriteInt(edge.from); - encoder.WriteInt(edge.to); + encoder.WriteInt(edge.id.AsInt()); + encoder.WriteInt(edge.from.AsInt()); + encoder.WriteInt(edge.to.AsInt()); encoder.WriteString(edge.type); encoder.WriteMap(edge.properties); // cypher_id - encoder.WriteInt(edge.id); + encoder.WriteInt(edge.id.AsInt()); } buffer.WriteValue(node_count);