Extract TypedValue/DecodedValue conversion to higher component

Summary:
This is the first step in cutting the crazy dependencies of
communication module to the whole database. Includes have been
reorganized and conversion between DecodedValue and other Memgraph types
(TypedValue and PropertyValue) has been extracted to a higher level
component called `communication/conversion`. Encoder, like Decoder, now
relies only on DecodedValue. Hopefully the conversion operations will
not significantly slow down streaming Bolt data.

Additionally, Bolt ID is now wrapped in a class. Our storage model uses
*unsigned* int64, while Bolt expects *signed* int64. The implicit
conversions may lead to encode/decode errors, so the wrapper should
enforce some type safety to prevent such errors.

Reviewers: mferencevic, buda, msantl, mtomic

Reviewed By: mferencevic, mtomic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1453
This commit is contained in:
Teon Banek 2018-07-02 15:34:33 +02:00
parent 9ded2ff6d9
commit d7a9c5bab8
48 changed files with 589 additions and 391 deletions

View File

@ -8,12 +8,13 @@ add_subdirectory(telemetry)
# all memgraph src files
set(memgraph_src_files
communication/bolt/v1/decoder/decoded_value.cpp
communication/buffer.cpp
communication/client.cpp
communication/context.cpp
communication/conversion.cpp
communication/helpers.cpp
communication/init.cpp
communication/bolt/v1/decoder/decoded_value.cpp
communication/rpc/client.cpp
communication/rpc/protocol.cpp
communication/rpc/server.cpp
@ -25,20 +26,20 @@ set(memgraph_src_files
database/state_delta.cpp
distributed/bfs_rpc_clients.cpp
distributed/bfs_subcursor.cpp
distributed/cache.cpp
distributed/cluster_discovery_master.cpp
distributed/cluster_discovery_worker.cpp
distributed/coordination.cpp
distributed/coordination_master.cpp
distributed/coordination_worker.cpp
distributed/data_manager.cpp
distributed/data_rpc_clients.cpp
distributed/data_rpc_server.cpp
distributed/durability_rpc_clients.cpp
distributed/durability_rpc_server.cpp
distributed/index_rpc_server.cpp
distributed/plan_consumer.cpp
distributed/plan_dispatcher.cpp
distributed/cache.cpp
distributed/data_manager.cpp
distributed/data_rpc_clients.cpp
distributed/data_rpc_server.cpp
distributed/produce_rpc_server.cpp
distributed/pull_rpc_clients.cpp
distributed/serialization.cpp
@ -49,7 +50,6 @@ set(memgraph_src_files
durability/snapshooter.cpp
durability/wal.cpp
query/common.cpp
query/repl.cpp
query/frontend/ast/ast.cpp
query/frontend/ast/cypher_main_visitor.cpp
query/frontend/semantic/symbol_generator.cpp
@ -61,6 +61,7 @@ set(memgraph_src_files
query/plan/preprocess.cpp
query/plan/rule_based_planner.cpp
query/plan/variable_start_planner.cpp
query/repl.cpp
query/typed_value.cpp
stats/metrics.cpp
stats/stats.cpp

View File

@ -6,8 +6,9 @@
#include "communication/bolt/v1/decoder/decoder.hpp"
#include "communication/bolt/v1/encoder/chunked_encoder_buffer.hpp"
#include "communication/bolt/v1/encoder/client_encoder.hpp"
#include "query/typed_value.hpp"
#include "communication/client.hpp"
#include "communication/context.hpp"
#include "io/network/endpoint.hpp"
#include "utils/exceptions.hpp"
namespace communication::bolt {
@ -98,9 +99,7 @@ class Client final {
DLOG(INFO) << "Sending run message with statement: '" << query
<< "'; parameters: " << parameters;
std::map<std::string, query::TypedValue> params_tv(parameters.begin(),
parameters.end());
encoder_.MessageRun(query, params_tv, false);
encoder_.MessageRun(query, parameters, false);
encoder_.MessagePullAll();
DLOG(INFO) << "Reading run message response";

View File

@ -1,7 +1,6 @@
#pragma once
#include <cstdint>
#include "utils/cast.hpp"
namespace communication::bolt {

View File

@ -1,7 +1,9 @@
#include "glog/logging.h"
#include "communication/bolt/v1/decoder/decoded_value.hpp"
#include <glog/logging.h>
#include "utils/algorithm.hpp"
namespace communication::bolt {
#define DEF_GETTER_BY_VAL(type, value_type, field) \
@ -165,67 +167,6 @@ DecodedValue::~DecodedValue() {
LOG(FATAL) << "Unsupported DecodedValue::Type";
}
DecodedValue::operator query::TypedValue() const {
switch (type_) {
case Type::Null:
return query::TypedValue::Null;
case Type::Bool:
return query::TypedValue(bool_v);
case Type::Int:
return query::TypedValue(int_v);
case Type::Double:
return query::TypedValue(double_v);
case Type::String:
return query::TypedValue(string_v);
case Type::List:
return query::TypedValue(
std::vector<query::TypedValue>(list_v.begin(), list_v.end()));
case Type::Map:
return query::TypedValue(
std::map<std::string, query::TypedValue>(map_v.begin(), map_v.end()));
case Type::Vertex:
case Type::Edge:
case Type::UnboundedEdge:
case Type::Path:
throw DecodedValueException(
"Unsupported conversion from DecodedValue to TypedValue");
}
}
DecodedValue::operator PropertyValue() const {
switch (type_) {
case Type::Null:
return PropertyValue::Null;
case Type::Bool:
return PropertyValue(bool_v);
case Type::Int:
return PropertyValue(int_v);
case Type::Double:
return PropertyValue(double_v);
case Type::String:
return PropertyValue(string_v);
case Type::List: {
std::vector<PropertyValue> vec;
vec.reserve(list_v.size());
for (const auto &value : list_v)
vec.emplace_back(static_cast<PropertyValue>(value));
return PropertyValue(std::move(vec));
}
case Type::Map: {
std::map<std::string, PropertyValue> map;
for (const auto &kv : map_v)
map.emplace(kv.first, static_cast<PropertyValue>(kv.second));
return PropertyValue(std::move(map));
}
case Type::Vertex:
case Type::Edge:
case Type::UnboundedEdge:
case Type::Path:
throw DecodedValueException(
"Unsupported conversion from DecodedValue to PropertyValue");
}
}
std::ostream &operator<<(std::ostream &os, const DecodedVertex &vertex) {
os << "V(";
utils::PrintIterable(os, vertex.labels, ":",

View File

@ -1,12 +1,11 @@
#pragma once
#include <algorithm>
#include <map>
#include <string>
#include <vector>
#include "query/typed_value.hpp"
#include "storage/property_value.hpp"
#include "utils/algorithm.hpp"
#include "utils/cast.hpp"
#include "utils/exceptions.hpp"
namespace communication::bolt {
@ -14,12 +13,38 @@ namespace communication::bolt {
/** Forward declaration of DecodedValue class. */
class DecodedValue;
/** Wraps int64_t to prevent dangerous implicit conversions. */
class Id {
public:
Id() = default;
/** Construct Id from uint64_t */
static Id FromUint(uint64_t id) { return Id(utils::MemcpyCast<int64_t>(id)); }
/** Construct Id from int64_t */
static Id FromInt(int64_t id) { return Id(id); }
int64_t AsInt() const { return id_; }
uint64_t AsUint() const { return utils::MemcpyCast<uint64_t>(id_); }
private:
explicit Id(int64_t id) : id_(id) {}
int64_t id_;
};
inline bool operator==(const Id &id1, const Id &id2) {
return id1.AsInt() == id2.AsInt();
}
inline bool operator!=(const Id &id1, const Id &id2) { return !(id1 == id2); }
/**
* Structure used when reading a Vertex with the decoder.
* The decoder writes data into this structure.
*/
struct DecodedVertex {
int64_t id;
Id id;
std::vector<std::string> labels;
std::map<std::string, DecodedValue> properties;
};
@ -29,9 +54,9 @@ struct DecodedVertex {
* The decoder writes data into this structure.
*/
struct DecodedEdge {
int64_t id;
int64_t from;
int64_t to;
Id id;
Id from;
Id to;
std::string type;
std::map<std::string, DecodedValue> properties;
};
@ -41,7 +66,7 @@ struct DecodedEdge {
* The decoder writes data into this structure.
*/
struct DecodedUnboundedEdge {
int64_t id;
Id id;
std::string type;
std::map<std::string, DecodedValue> properties;
};
@ -51,25 +76,56 @@ struct DecodedUnboundedEdge {
* The decoder writes data into this structure.
*/
struct DecodedPath {
DecodedPath() {}
DecodedPath(const std::vector<DecodedVertex> &vertices,
const std::vector<DecodedEdge> &edges) {
// Helper function. Looks for the given element in the collection. If found,
// puts its index into `indices`. Otherwise emplaces the given element
// into the collection and puts that index into `indices`. A multiplier is
// added to switch between positive and negative indices (that define edge
// direction).
auto add_element = [this](auto &collection, const auto &element,
int multiplier, int offset) {
auto found =
std::find_if(collection.begin(), collection.end(),
[&](const auto &e) { return e.id == element.id; });
indices.emplace_back(multiplier *
(std::distance(collection.begin(), found) + offset));
if (found == collection.end()) collection.push_back(element);
};
this->vertices.reserve(vertices.size());
this->edges.reserve(edges.size());
this->vertices.emplace_back(vertices[0]);
for (uint i = 0; i < edges.size(); i++) {
const auto &e = edges[i];
const auto &v = vertices[i + 1];
DecodedUnboundedEdge unbounded_edge{e.id, e.type, e.properties};
add_element(this->edges, unbounded_edge, e.to == v.id ? 1 : -1, 1);
add_element(this->vertices, v, 1, 0);
}
}
/** Unique vertices in the path. */
std::vector<DecodedVertex> vertices;
/** Unique edges in the path. */
std::vector<DecodedUnboundedEdge> edges;
/**
* Indices that map path positions to vertices/edges.
* Positive indices for left-to-right directionality and negative for
* right-to-left.
*/
std::vector<int64_t> indices;
};
/**
* DecodedValue provides an encapsulation arround TypedValue, DecodedVertex
* and DecodedEdge. This is necessary because TypedValue stores vertices and
* edges as our internal accessors. Because of that the Bolt decoder can't
* decode vertices and edges directly into a TypedValue so a DecodedValue is
* used instead.
*/
/** DecodedValue represents supported values in the Bolt protocol. */
class DecodedValue {
public:
/** Default constructor, makes Null */
DecodedValue() : type_(Type::Null) {}
/** Types that can be stored in a DecodedValue. */
// TODO: Path isn't supported yet!
enum class Type : unsigned {
Null,
Bool,
@ -161,10 +217,6 @@ class DecodedValue {
#undef TYPE_CHECKER
operator query::TypedValue() const;
// PropertyValue operator must be explicit to prevent ambiguity.
explicit operator PropertyValue() const;
friend std::ostream &operator<<(std::ostream &os, const DecodedValue &value);
private:

View File

@ -335,7 +335,7 @@ class Decoder {
if (!ReadValue(&dv, DecodedValue::Type::Int)) {
return false;
}
vertex.id = dv.ValueInt();
vertex.id = Id::FromInt(dv.ValueInt());
// read labels
if (!ReadValue(&dv, DecodedValue::Type::List)) {
@ -382,19 +382,19 @@ class Decoder {
if (!ReadValue(&dv, DecodedValue::Type::Int)) {
return false;
}
edge.id = dv.ValueInt();
edge.id = Id::FromInt(dv.ValueInt());
// read from
if (!ReadValue(&dv, DecodedValue::Type::Int)) {
return false;
}
edge.from = dv.ValueInt();
edge.from = Id::FromInt(dv.ValueInt());
// read to
if (!ReadValue(&dv, DecodedValue::Type::Int)) {
return false;
}
edge.to = dv.ValueInt();
edge.to = Id::FromInt(dv.ValueInt());
// read type
if (!ReadValue(&dv, DecodedValue::Type::String)) {
@ -421,7 +421,7 @@ class Decoder {
if (!ReadValue(&dv, DecodedValue::Type::Int)) {
return false;
}
edge.id = dv.ValueInt();
edge.id = Id::FromInt(dv.ValueInt());
// read type
if (!ReadValue(&dv, DecodedValue::Type::String)) {

View File

@ -1,15 +1,14 @@
#pragma once
#include "communication/bolt/v1/decoder/decoded_value.hpp"
#include "communication/bolt/v1/encoder/primitive_encoder.hpp"
#include "database/graph_db_accessor.hpp"
#include "query/typed_value.hpp"
namespace communication::bolt {
/**
* Bolt BaseEncoder. Subclass of PrimitiveEncoder. Extends it with the
* capability to encode TypedValues (as well as lists and maps of TypedValues),
* Edges, Vertices and Paths.
* capability to encode DecodedValues (as well as lists and maps of
* DecodedValues), Edges, Vertices and Paths.
*
* @tparam Buffer the output buffer that should be used
*/
@ -18,150 +17,130 @@ class BaseEncoder : public PrimitiveEncoder<Buffer> {
public:
explicit BaseEncoder(Buffer &buffer) : PrimitiveEncoder<Buffer>(buffer) {}
void WriteList(const std::vector<query::TypedValue> &value) {
void WriteList(const std::vector<DecodedValue> &value) {
this->WriteTypeSize(value.size(), MarkerList);
for (auto &x : value) WriteTypedValue(x);
for (auto &x : value) WriteDecodedValue(x);
}
/**
* Writes a map value.
*
* @tparam TMap - an iterable of (std::string, TypedValue) pairs.
* @tparam TMap - an iterable of (std::string, DecodedValue) pairs.
*/
template <typename TMap>
void WriteMap(const TMap &value) {
this->WriteTypeSize(value.size(), MarkerMap);
for (auto &x : value) {
this->WriteString(x.first);
WriteTypedValue(x.second);
WriteDecodedValue(x.second);
}
}
void WriteVertex(const VertexAccessor &vertex) {
void WriteVertex(const DecodedVertex &vertex) {
this->WriteRAW(utils::UnderlyingCast(Marker::TinyStruct) + 3);
this->WriteRAW(utils::UnderlyingCast(Signature::Node));
WriteUInt(vertex.gid());
this->WriteInt(vertex.id.AsInt());
// write labels
const auto &labels = vertex.labels();
const auto &labels = vertex.labels;
this->WriteTypeSize(labels.size(), MarkerList);
for (const auto &label : labels)
this->WriteString(vertex.db_accessor().LabelName(label));
for (const auto &label : labels) this->WriteString(label);
// write properties
const auto &props = vertex.Properties();
const auto &props = vertex.properties;
this->WriteTypeSize(props.size(), MarkerMap);
for (const auto &prop : props) {
this->WriteString(vertex.db_accessor().PropertyName(prop.first));
WriteTypedValue(prop.second);
this->WriteString(prop.first);
WriteDecodedValue(prop.second);
}
}
void WriteEdge(const EdgeAccessor &edge, bool unbound = false) {
void WriteEdge(const DecodedEdge &edge, bool unbound = false) {
this->WriteRAW(utils::UnderlyingCast(Marker::TinyStruct) +
(unbound ? 3 : 5));
this->WriteRAW(utils::UnderlyingCast(
unbound ? Signature::UnboundRelationship : Signature::Relationship));
WriteUInt(edge.gid());
this->WriteInt(edge.id.AsInt());
if (!unbound) {
WriteUInt(edge.from().gid());
WriteUInt(edge.to().gid());
this->WriteInt(edge.from.AsInt());
this->WriteInt(edge.to.AsInt());
}
// write type
this->WriteString(edge.db_accessor().EdgeTypeName(edge.EdgeType()));
this->WriteString(edge.type);
// write properties
const auto &props = edge.Properties();
const auto &props = edge.properties;
this->WriteTypeSize(props.size(), MarkerMap);
for (const auto &prop : props) {
this->WriteString(edge.db_accessor().PropertyName(prop.first));
WriteTypedValue(prop.second);
this->WriteString(prop.first);
WriteDecodedValue(prop.second);
}
}
void WritePath(const query::Path &path) {
// Prepare the data structures to be written.
//
// Unique vertices in the path.
std::vector<VertexAccessor> vertices;
// Unique edges in the path.
std::vector<EdgeAccessor> edges;
// Indices that map path positions to vertices/edges elements. Positive
// indices for left-to-right directionality and negative for right-to-left.
std::vector<int> indices;
void WriteEdge(const DecodedUnboundedEdge &edge) {
this->WriteRAW(utils::UnderlyingCast(Marker::TinyStruct) + 3);
this->WriteRAW(utils::UnderlyingCast(Signature::UnboundRelationship));
// Helper function. Looks for the given element in the collection. If found
// it puts it's index into `indices`. Otherwise emplaces the given element
// into the collection and puts that index into `indices`. A multiplier is
// added to switch between positive and negative indices (that define edge
// direction).
auto add_element = [&indices](auto &collection, const auto &element,
int multiplier, int offset) {
auto found = std::find(collection.begin(), collection.end(), element);
indices.emplace_back(multiplier *
(std::distance(collection.begin(), found) + offset));
if (found == collection.end()) collection.emplace_back(element);
};
this->WriteInt(edge.id.AsInt());
vertices.emplace_back(path.vertices()[0]);
for (uint i = 0; i < path.size(); i++) {
const auto &e = path.edges()[i];
const auto &v = path.vertices()[i + 1];
add_element(edges, e, e.to_is(v) ? 1 : -1, 1);
add_element(vertices, v, 1, 0);
this->WriteString(edge.type);
const auto &props = edge.properties;
this->WriteTypeSize(props.size(), MarkerMap);
for (const auto &prop : props) {
this->WriteString(prop.first);
WriteDecodedValue(prop.second);
}
}
// Write data.
void WritePath(const DecodedPath &path) {
this->WriteRAW(utils::UnderlyingCast(Marker::TinyStruct) + 3);
this->WriteRAW(utils::UnderlyingCast(Signature::Path));
this->WriteTypeSize(vertices.size(), MarkerList);
for (auto &v : vertices) WriteVertex(v);
this->WriteTypeSize(edges.size(), MarkerList);
for (auto &e : edges) WriteEdge(e, true);
this->WriteTypeSize(indices.size(), MarkerList);
for (auto &i : indices) this->WriteInt(i);
this->WriteTypeSize(path.vertices.size(), MarkerList);
for (auto &v : path.vertices) WriteVertex(v);
this->WriteTypeSize(path.edges.size(), MarkerList);
for (auto &e : path.edges) WriteEdge(e);
this->WriteTypeSize(path.indices.size(), MarkerList);
for (auto &i : path.indices) this->WriteInt(i);
}
void WriteTypedValue(const query::TypedValue &value) {
void WriteDecodedValue(const DecodedValue &value) {
switch (value.type()) {
case query::TypedValue::Type::Null:
case DecodedValue::Type::Null:
this->WriteNull();
break;
case query::TypedValue::Type::Bool:
this->WriteBool(value.Value<bool>());
case DecodedValue::Type::Bool:
this->WriteBool(value.ValueBool());
break;
case query::TypedValue::Type::Int:
this->WriteInt(value.Value<int64_t>());
case DecodedValue::Type::Int:
this->WriteInt(value.ValueInt());
break;
case query::TypedValue::Type::Double:
this->WriteDouble(value.Value<double>());
case DecodedValue::Type::Double:
this->WriteDouble(value.ValueDouble());
break;
case query::TypedValue::Type::String:
this->WriteString(value.Value<std::string>());
case DecodedValue::Type::String:
this->WriteString(value.ValueString());
break;
case query::TypedValue::Type::List:
WriteList(value.Value<std::vector<query::TypedValue>>());
case DecodedValue::Type::List:
WriteList(value.ValueList());
break;
case query::TypedValue::Type::Map:
WriteMap(value.Value<std::map<std::string, query::TypedValue>>());
case DecodedValue::Type::Map:
WriteMap(value.ValueMap());
break;
case query::TypedValue::Type::Vertex:
WriteVertex(value.Value<VertexAccessor>());
case DecodedValue::Type::Vertex:
WriteVertex(value.ValueVertex());
break;
case query::TypedValue::Type::Edge:
WriteEdge(value.Value<EdgeAccessor>());
case DecodedValue::Type::Edge:
WriteEdge(value.ValueEdge());
break;
case query::TypedValue::Type::Path:
case DecodedValue::Type::UnboundedEdge:
WriteEdge(value.ValueUnboundedEdge());
break;
case DecodedValue::Type::Path:
WritePath(value.ValuePath());
break;
}
}
private:
void WriteUInt(const uint64_t &value) {
this->WriteInt(*reinterpret_cast<const int64_t *>(&value));
}
};
} // namespace communication::bolt

View File

@ -39,7 +39,7 @@ class ClientEncoder : private BaseEncoder<Buffer> {
* when flushing, false otherwise
*/
bool MessageInit(const std::string client_name,
const std::map<std::string, query::TypedValue> &auth_token) {
const std::map<std::string, DecodedValue> &auth_token) {
WriteRAW(utils::UnderlyingCast(Marker::TinyStruct2));
WriteRAW(utils::UnderlyingCast(Signature::Init));
WriteString(client_name);
@ -61,8 +61,8 @@ class ClientEncoder : private BaseEncoder<Buffer> {
* @returns true if the data was successfully sent to the client
* when flushing, false otherwise
*/
bool MessageRun(const std::string statement,
const std::map<std::string, query::TypedValue> &parameters,
bool MessageRun(const std::string &statement,
const std::map<std::string, DecodedValue> &parameters,
bool flush = true) {
WriteRAW(utils::UnderlyingCast(Marker::TinyStruct2));
WriteRAW(utils::UnderlyingCast(Signature::Run));

View File

@ -36,7 +36,7 @@ class Encoder : private BaseEncoder<Buffer> {
*
* @param values the fields list object that should be sent
*/
void MessageRecord(const std::vector<query::TypedValue> &values) {
void MessageRecord(const std::vector<DecodedValue> &values) {
WriteRAW(utils::UnderlyingCast(Marker::TinyStruct1));
WriteRAW(utils::UnderlyingCast(Signature::Record));
WriteList(values);
@ -56,7 +56,7 @@ class Encoder : private BaseEncoder<Buffer> {
* @returns true if the data was successfully sent to the client
* when flushing, false otherwise
*/
bool MessageSuccess(const std::map<std::string, query::TypedValue> &metadata,
bool MessageSuccess(const std::map<std::string, DecodedValue> &metadata,
bool flush = true) {
WriteRAW(utils::UnderlyingCast(Marker::TinyStruct1));
WriteRAW(utils::UnderlyingCast(Signature::Success));
@ -79,7 +79,7 @@ class Encoder : private BaseEncoder<Buffer> {
* false otherwise
*/
bool MessageSuccess() {
std::map<std::string, query::TypedValue> metadata;
std::map<std::string, DecodedValue> metadata;
return MessageSuccess(metadata);
}
@ -95,8 +95,7 @@ class Encoder : private BaseEncoder<Buffer> {
* @returns true if the data was successfully sent to the client,
* false otherwise
*/
bool MessageFailure(
const std::map<std::string, query::TypedValue> &metadata) {
bool MessageFailure(const std::map<std::string, DecodedValue> &metadata) {
WriteRAW(utils::UnderlyingCast(Marker::TinyStruct1));
WriteRAW(utils::UnderlyingCast(Signature::Failure));
WriteMap(metadata);
@ -115,8 +114,7 @@ class Encoder : private BaseEncoder<Buffer> {
* @returns true if the data was successfully sent to the client,
* false otherwise
*/
bool MessageIgnored(
const std::map<std::string, query::TypedValue> &metadata) {
bool MessageIgnored(const std::map<std::string, DecodedValue> &metadata) {
WriteRAW(utils::UnderlyingCast(Marker::TinyStruct1));
WriteRAW(utils::UnderlyingCast(Signature::Ignored));
WriteMap(metadata);

View File

@ -3,14 +3,14 @@
#include <string>
#include "communication/bolt/v1/codes.hpp"
#include "storage/property_value.hpp"
#include "utils/bswap.hpp"
#include "utils/cast.hpp"
namespace communication::bolt {
/**
* Bolt PrimitiveEncoder. Has public interfaces for writing Bolt encoded data.
* Supported types are: Null, Bool, Int, Double, String and PropertyValue.
* Supported types are: Null, Bool, Int, Double and String.
*
* Bolt encoding is used both for streaming data to network clients and for
* database durability.
@ -93,45 +93,8 @@ class PrimitiveEncoder {
WriteRAW(value.c_str(), value.size());
}
void WritePropertyValue(const PropertyValue &value) {
auto write_list = [this](const std::vector<PropertyValue> &value) {
WriteTypeSize(value.size(), MarkerList);
for (auto &x : value) WritePropertyValue(x);
};
auto write_map = [this](const std::map<std::string, PropertyValue> &value) {
WriteTypeSize(value.size(), MarkerMap);
for (auto &x : value) {
WriteString(x.first);
WritePropertyValue(x.second);
}
};
switch (value.type()) {
case PropertyValue::Type::Null:
WriteNull();
break;
case PropertyValue::Type::Bool:
WriteBool(value.Value<bool>());
break;
case PropertyValue::Type::Int:
WriteInt(value.Value<int64_t>());
break;
case PropertyValue::Type::Double:
WriteDouble(value.Value<double>());
break;
case PropertyValue::Type::String:
WriteString(value.Value<std::string>());
break;
case PropertyValue::Type::List:
write_list(value.Value<std::vector<PropertyValue>>());
break;
case PropertyValue::Type::Map:
write_map(value.Value<std::map<std::string, PropertyValue>>());
break;
}
}
protected:
Buffer &buffer_;
};
} // namespace communication::bolt

View File

@ -2,7 +2,7 @@
#include "communication/bolt/v1/encoder/chunked_encoder_buffer.hpp"
#include "communication/bolt/v1/encoder/encoder.hpp"
#include "query/typed_value.hpp"
#include "communication/conversion.hpp"
namespace communication::bolt {
@ -25,10 +25,10 @@ class ResultStream {
* @param fields the header fields that should be sent.
*/
void Header(const std::vector<std::string> &fields) {
std::vector<query::TypedValue> vec;
std::map<std::string, query::TypedValue> data;
for (auto &i : fields) vec.push_back(query::TypedValue(i));
data.insert(std::make_pair(std::string("fields"), query::TypedValue(vec)));
std::vector<DecodedValue> vec;
std::map<std::string, DecodedValue> data;
for (auto &i : fields) vec.push_back(DecodedValue(i));
data.insert(std::make_pair(std::string("fields"), DecodedValue(vec)));
// this message shouldn't send directly to the client because if an error
// happened the client will receive two messages (success and failure)
// instead of only one
@ -47,10 +47,20 @@ class ResultStream {
*
* @param values the values that should be sent
*/
void Result(std::vector<query::TypedValue> &values) {
void Result(std::vector<DecodedValue> &values) {
encoder_.MessageRecord(values);
}
// TODO: Move this to another class
void Result(std::vector<query::TypedValue> &values) {
std::vector<DecodedValue> decoded_values;
decoded_values.reserve(values.size());
for (const auto &v : values) {
decoded_values.push_back(communication::ToDecodedValue(v));
}
return Result(decoded_values);
}
/**
* Writes a summary. Typically a summary is something like:
* {
@ -63,7 +73,7 @@ class ResultStream {
*
* @param summary the summary map object that should be sent
*/
void Summary(const std::map<std::string, query::TypedValue> &summary) {
void Summary(const std::map<std::string, DecodedValue> &summary) {
// at this point message should not flush the socket so
// here is false because chunk has to be called instead of flush
encoder_.MessageSuccess(summary, false);

View File

@ -6,7 +6,7 @@
#include "communication/bolt/v1/codes.hpp"
#include "communication/bolt/v1/decoder/decoded_value.hpp"
#include "communication/bolt/v1/state.hpp"
#include "query/typed_value.hpp"
#include "utils/cast.hpp"
namespace communication::bolt {

View File

@ -9,6 +9,7 @@
#include "communication/bolt/v1/codes.hpp"
#include "communication/bolt/v1/decoder/decoded_value.hpp"
#include "communication/bolt/v1/state.hpp"
#include "communication/conversion.hpp"
#include "database/graph_db.hpp"
#include "distributed/pull_rpc_clients.hpp"
#include "query/exceptions.hpp"
@ -19,8 +20,8 @@ namespace communication::bolt {
template <typename TSession>
State HandleRun(TSession &session, State state, Marker marker) {
const std::map<std::string, query::TypedValue> kEmptyFields = {
{"fields", std::vector<query::TypedValue>{}}};
const std::map<std::string, DecodedValue> kEmptyFields = {
{"fields", std::vector<DecodedValue>{}}};
if (marker != Marker::TinyStruct2) {
DLOG(WARNING) << fmt::format(
@ -131,9 +132,9 @@ State HandleRun(TSession &session, State state, Marker marker) {
}
}
auto &params_map = params.ValueMap();
std::map<std::string, query::TypedValue> params_tv(params_map.begin(),
params_map.end());
std::map<std::string, query::TypedValue> params_tv;
for (const auto &kv : params.ValueMap())
params_tv.emplace(kv.first, communication::ToTypedValue(kv.second));
session
.interpreter_(query.ValueString(), *session.db_accessor_, params_tv,
in_explicit_transaction)

View File

@ -5,7 +5,6 @@
#include "communication/bolt/v1/codes.hpp"
#include "communication/bolt/v1/decoder/decoded_value.hpp"
#include "communication/bolt/v1/encoder/result_stream.hpp"
#include "communication/bolt/v1/state.hpp"
#include "utils/likely.hpp"

View File

@ -0,0 +1,192 @@
#include "communication/conversion.hpp"
#include <map>
#include <string>
#include <vector>
#include "database/graph_db_accessor.hpp"
using communication::bolt::DecodedValue;
namespace communication {
query::TypedValue ToTypedValue(const DecodedValue &value) {
switch (value.type()) {
case DecodedValue::Type::Null:
return query::TypedValue::Null;
case DecodedValue::Type::Bool:
return query::TypedValue(value.ValueBool());
case DecodedValue::Type::Int:
return query::TypedValue(value.ValueInt());
case DecodedValue::Type::Double:
return query::TypedValue(value.ValueDouble());
case DecodedValue::Type::String:
return query::TypedValue(value.ValueString());
case DecodedValue::Type::List: {
std::vector<query::TypedValue> list;
list.reserve(value.ValueList().size());
for (const auto &v : value.ValueList()) list.push_back(ToTypedValue(v));
return query::TypedValue(list);
}
case DecodedValue::Type::Map: {
std::map<std::string, query::TypedValue> map;
for (const auto &kv : value.ValueMap())
map.emplace(kv.first, ToTypedValue(kv.second));
return query::TypedValue(map);
}
case DecodedValue::Type::Vertex:
case DecodedValue::Type::Edge:
case DecodedValue::Type::UnboundedEdge:
case DecodedValue::Type::Path:
throw communication::bolt::DecodedValueException(
"Unsupported conversion from DecodedValue to TypedValue");
}
}
DecodedValue ToDecodedValue(const query::TypedValue &value) {
switch (value.type()) {
case query::TypedValue::Type::Null:
return DecodedValue();
case query::TypedValue::Type::Bool:
return DecodedValue(value.ValueBool());
case query::TypedValue::Type::Int:
return DecodedValue(value.ValueInt());
case query::TypedValue::Type::Double:
return DecodedValue(value.ValueDouble());
case query::TypedValue::Type::String:
return DecodedValue(value.ValueString());
case query::TypedValue::Type::List: {
std::vector<DecodedValue> values;
values.reserve(value.ValueList().size());
for (const auto &v : value.ValueList()) {
values.push_back(ToDecodedValue(v));
}
return DecodedValue(values);
}
case query::TypedValue::Type::Map: {
std::map<std::string, DecodedValue> map;
for (const auto &kv : value.ValueMap()) {
map.emplace(kv.first, ToDecodedValue(kv.second));
}
return DecodedValue(map);
}
case query::TypedValue::Type::Vertex:
return DecodedValue(ToDecodedVertex(value.ValueVertex()));
case query::TypedValue::Type::Edge:
return DecodedValue(ToDecodedEdge(value.ValueEdge()));
case query::TypedValue::Type::Path:
return DecodedValue(ToDecodedPath(value.ValuePath()));
}
}
communication::bolt::DecodedVertex ToDecodedVertex(
const VertexAccessor &vertex) {
auto id = communication::bolt::Id::FromUint(vertex.gid());
std::vector<std::string> labels;
labels.reserve(vertex.labels().size());
for (const auto &label : vertex.labels()) {
labels.push_back(vertex.db_accessor().LabelName(label));
}
std::map<std::string, DecodedValue> properties;
for (const auto &prop : vertex.Properties()) {
properties[vertex.db_accessor().PropertyName(prop.first)] =
ToDecodedValue(prop.second);
}
return communication::bolt::DecodedVertex{id, labels, properties};
}
communication::bolt::DecodedEdge ToDecodedEdge(const EdgeAccessor &edge) {
auto id = communication::bolt::Id::FromUint(edge.gid());
auto from = communication::bolt::Id::FromUint(edge.from().gid());
auto to = communication::bolt::Id::FromUint(edge.to().gid());
auto type = edge.db_accessor().EdgeTypeName(edge.EdgeType());
std::map<std::string, DecodedValue> properties;
for (const auto &prop : edge.Properties()) {
properties[edge.db_accessor().PropertyName(prop.first)] =
ToDecodedValue(prop.second);
}
return communication::bolt::DecodedEdge{id, from, to, type, properties};
}
communication::bolt::DecodedPath ToDecodedPath(const query::Path &path) {
std::vector<communication::bolt::DecodedVertex> vertices;
vertices.reserve(path.vertices().size());
for (const auto &v : path.vertices()) {
vertices.push_back(ToDecodedVertex(v));
}
std::vector<communication::bolt::DecodedEdge> edges;
edges.reserve(path.edges().size());
for (const auto &e : path.edges()) {
edges.push_back(ToDecodedEdge(e));
}
return communication::bolt::DecodedPath(vertices, edges);
}
PropertyValue ToPropertyValue(const DecodedValue &value) {
switch (value.type()) {
case DecodedValue::Type::Null:
return PropertyValue::Null;
case DecodedValue::Type::Bool:
return PropertyValue(value.ValueBool());
case DecodedValue::Type::Int:
return PropertyValue(value.ValueInt());
case DecodedValue::Type::Double:
return PropertyValue(value.ValueDouble());
case DecodedValue::Type::String:
return PropertyValue(value.ValueString());
case DecodedValue::Type::List: {
std::vector<PropertyValue> vec;
vec.reserve(value.ValueList().size());
for (const auto &value : value.ValueList())
vec.emplace_back(ToPropertyValue(value));
return PropertyValue(std::move(vec));
}
case DecodedValue::Type::Map: {
std::map<std::string, PropertyValue> map;
for (const auto &kv : value.ValueMap())
map.emplace(kv.first, ToPropertyValue(kv.second));
return PropertyValue(std::move(map));
}
case DecodedValue::Type::Vertex:
case DecodedValue::Type::Edge:
case DecodedValue::Type::UnboundedEdge:
case DecodedValue::Type::Path:
throw communication::bolt::DecodedValueException(
"Unsupported conversion from DecodedValue to PropertyValue");
}
}
DecodedValue ToDecodedValue(const PropertyValue &value) {
switch (value.type()) {
case PropertyValue::Type::Null:
return DecodedValue();
case PropertyValue::Type::Bool:
return DecodedValue(value.Value<bool>());
case PropertyValue::Type::Int:
return DecodedValue(value.Value<int64_t>());
break;
case PropertyValue::Type::Double:
return DecodedValue(value.Value<double>());
case PropertyValue::Type::String:
return DecodedValue(value.Value<std::string>());
case PropertyValue::Type::List: {
const auto &values = value.Value<std::vector<PropertyValue>>();
std::vector<DecodedValue> vec;
vec.reserve(values.size());
for (const auto &v : values) {
vec.push_back(ToDecodedValue(v));
}
return DecodedValue(vec);
}
case PropertyValue::Type::Map: {
const auto &map = value.Value<std::map<std::string, PropertyValue>>();
std::map<std::string, DecodedValue> dv_map;
for (const auto &kv : map) {
dv_map.emplace(kv.first, ToDecodedValue(kv.second));
}
return DecodedValue(dv_map);
}
}
}
} // namespace communication

View File

@ -0,0 +1,26 @@
/// @file Conversion functions between DecodedValue and other memgraph types.
#pragma once
#include "communication/bolt/v1/decoder/decoded_value.hpp"
#include "query/typed_value.hpp"
#include "storage/property_value.hpp"
namespace communication {
communication::bolt::DecodedVertex ToDecodedVertex(
const VertexAccessor &vertex);
communication::bolt::DecodedEdge ToDecodedEdge(const EdgeAccessor &edge);
communication::bolt::DecodedPath ToDecodedPath(const query::Path &path);
communication::bolt::DecodedValue ToDecodedValue(
const query::TypedValue &value);
query::TypedValue ToTypedValue(const communication::bolt::DecodedValue &value);
communication::bolt::DecodedValue ToDecodedValue(const PropertyValue &value);
PropertyValue ToPropertyValue(const communication::bolt::DecodedValue &value);
} // namespace communication

View File

@ -3,7 +3,11 @@
#include <map>
#include "glog/logging.h"
#include "query/typed_value.hpp"
#include "communication/bolt/v1/decoder/decoded_value.hpp"
#include "utils/algorithm.hpp"
// TODO: Why is this here?! It's only used in tests and query/repl.cpp
/**
* A mocker for the data output record stream.
@ -11,6 +15,7 @@
* sent to it in an acceptable order, and tracks
* the content of those messages.
*/
template <class TResultValue = communication::bolt::DecodedValue>
class ResultStreamFaker {
public:
ResultStreamFaker() = default;
@ -26,13 +31,14 @@ class ResultStreamFaker {
current_state_ = State::WritingResults;
}
void Result(const std::vector<query::TypedValue> &values) {
void Result(const std::vector<TResultValue> &values) {
DCHECK(current_state_ == State::WritingResults)
<< "Can't accept results before header nor after summary";
results_.push_back(values);
}
void Summary(const std::map<std::string, query::TypedValue> &summary) {
void Summary(
const std::map<std::string, communication::bolt::DecodedValue> &summary) {
DCHECK(current_state_ != State::Done) << "Can only send a summary once";
summary_ = summary;
current_state_ = State::Done;
@ -52,7 +58,7 @@ class ResultStreamFaker {
friend std::ostream &operator<<(std::ostream &os,
const ResultStreamFaker &results) {
auto typed_value_to_string = [](const query::TypedValue &value) {
auto decoded_value_to_string = [](const auto &value) {
std::stringstream ss;
ss << value;
return ss.str();
@ -71,7 +77,7 @@ class ResultStreamFaker {
for (int col_ind = 0; col_ind < static_cast<int>(column_widths.size());
++col_ind) {
std::string string_val =
typed_value_to_string(results_data[row_ind][col_ind]);
decoded_value_to_string(results_data[row_ind][col_ind]);
column_widths[col_ind] =
std::max(column_widths[col_ind], (int)string_val.size());
result_strings[row_ind][col_ind] = string_val;
@ -129,6 +135,6 @@ class ResultStreamFaker {
// the data that the record stream can accept
std::vector<std::string> header_;
std::vector<std::vector<query::TypedValue>> results_;
std::map<std::string, query::TypedValue> summary_;
std::vector<std::vector<TResultValue>> results_;
std::map<std::string, communication::bolt::DecodedValue> summary_;
};

View File

@ -1,8 +1,10 @@
#include "database/state_delta.hpp"
#include <string>
#include "communication/bolt/v1/decoder/decoded_value.hpp"
#include "communication/conversion.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/state_delta.hpp"
namespace database {
@ -161,7 +163,7 @@ StateDelta StateDelta::BuildIndex(tx::TransactionId tx_id, storage::Label label,
void StateDelta::Encode(
HashedFileWriter &writer,
communication::bolt::PrimitiveEncoder<HashedFileWriter> &encoder) const {
communication::bolt::BaseEncoder<HashedFileWriter> &encoder) const {
encoder.WriteInt(static_cast<int64_t>(type));
encoder.WriteInt(static_cast<int64_t>(transaction_id));
@ -206,13 +208,13 @@ void StateDelta::Encode(
encoder.WriteInt(vertex_id);
encoder.WriteInt(property.Id());
encoder.WriteString(property_name);
encoder.WritePropertyValue(value);
encoder.WriteDecodedValue(communication::ToDecodedValue(value));
break;
case Type::SET_PROPERTY_EDGE:
encoder.WriteInt(edge_id);
encoder.WriteInt(property.Id());
encoder.WriteString(property_name);
encoder.WritePropertyValue(value);
encoder.WriteDecodedValue(communication::ToDecodedValue(value));
break;
case Type::ADD_LABEL:
case Type::REMOVE_LABEL:
@ -302,14 +304,14 @@ std::experimental::optional<StateDelta> StateDelta::Decode(
DECODE_MEMBER_CAST(property, ValueInt, storage::Property)
DECODE_MEMBER(property_name, ValueString)
if (!decoder.ReadValue(&dv)) return nullopt;
r_val.value = static_cast<PropertyValue>(dv);
r_val.value = communication::ToPropertyValue(dv);
break;
case Type::SET_PROPERTY_EDGE:
DECODE_MEMBER(edge_id, ValueInt)
DECODE_MEMBER_CAST(property, ValueInt, storage::Property)
DECODE_MEMBER(property_name, ValueString)
if (!decoder.ReadValue(&dv)) return nullopt;
r_val.value = static_cast<PropertyValue>(dv);
r_val.value = communication::ToPropertyValue(dv);
break;
case Type::ADD_LABEL:
case Type::REMOVE_LABEL:

View File

@ -2,7 +2,7 @@
#pragma once
#include "communication/bolt/v1/decoder/decoder.hpp"
#include "communication/bolt/v1/encoder/primitive_encoder.hpp"
#include "communication/bolt/v1/encoder/base_encoder.hpp"
#include "database/state_delta.capnp.h"
#include "durability/hashed_file_reader.hpp"
#include "durability/hashed_file_writer.hpp"
@ -125,7 +125,7 @@ omitted in the comment.")
* with delta to the writer */
void Encode(
HashedFileWriter &writer,
communication::bolt::PrimitiveEncoder<HashedFileWriter> &encoder) const;
communication::bolt::BaseEncoder<HashedFileWriter> &encoder) const;
static StateDelta TxBegin(tx::TransactionId tx_id);
static StateDelta TxCommit(tx::TransactionId tx_id);

View File

@ -4,6 +4,7 @@
#include <limits>
#include <unordered_map>
#include "communication/conversion.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/indexes/label_property_index.hpp"
#include "durability/hashed_file_reader.hpp"
@ -132,8 +133,9 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db,
vertex_accessor.add_label(dba.Label(label));
}
for (const auto &property_pair : vertex->properties) {
vertex_accessor.PropsSet(dba.Property(property_pair.first),
query::TypedValue(property_pair.second));
vertex_accessor.PropsSet(
dba.Property(property_pair.first),
communication::ToTypedValue(property_pair.second));
}
auto vertex_record = vertex_accessor.GetNew();
for (const auto &edge : vertex->in) {
@ -187,7 +189,7 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db,
// We have to take full edge endpoints from vertices since the endpoints
// found here don't containt worker_id, and this can't be changed since this
// edges must be bolt-compliant
auto &edge_endpoints = edge_gid_endpoints_mapping[edge.id];
auto &edge_endpoints = edge_gid_endpoints_mapping[edge.id.AsUint()];
storage::VertexAddress from;
storage::VertexAddress to;
@ -199,11 +201,11 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db,
vertex_transform_to_local_if_possible(to);
auto edge_accessor = dba.InsertOnlyEdge(from, to, dba.EdgeType(edge.type),
edge.id, cypher_id);
edge.id.AsUint(), cypher_id);
for (const auto &property_pair : edge.properties)
edge_accessor.PropsSet(dba.Property(property_pair.first),
query::TypedValue(property_pair.second));
communication::ToTypedValue(property_pair.second));
}
// Vertex and edge counts are included in the hash. Re-read them to update the

View File

@ -46,7 +46,7 @@ bool Encode(const fs::path &snapshot_file, database::GraphDb &db,
// Write the transaction snapshot into the snapshot. It's used when
// recovering from the combination of snapshot and write-ahead-log.
{
std::vector<query::TypedValue> tx_snapshot;
std::vector<communication::bolt::DecodedValue> tx_snapshot;
for (int64_t tx : dba.transaction().snapshot())
tx_snapshot.emplace_back(tx);
encoder.WriteList(tx_snapshot);
@ -54,7 +54,7 @@ bool Encode(const fs::path &snapshot_file, database::GraphDb &db,
// Write label+property indexes as list ["label", "property", ...]
{
std::vector<query::TypedValue> index_vec;
std::vector<communication::bolt::DecodedValue> index_vec;
for (const auto &key : dba.GetIndicesKeys()) {
index_vec.emplace_back(dba.LabelName(key.label_));
index_vec.emplace_back(dba.PropertyName(key.property_));
@ -67,7 +67,7 @@ bool Encode(const fs::path &snapshot_file, database::GraphDb &db,
vertex_num++;
}
for (const auto &edge : dba.Edges(false)) {
encoder.WriteEdge(edge);
encoder.WriteEdge(communication::ToDecodedEdge(edge));
encoder.WriteInt(edge.cypher_id());
edge_num++;
}

View File

@ -24,7 +24,7 @@ class SnapshotDecoder : public communication::bolt::Decoder<Buffer> {
return std::experimental::nullopt;
}
auto &read_vertex = dv.ValueVertex();
vertex.gid = static_cast<uint64_t>(read_vertex.id);
vertex.gid = read_vertex.id.AsUint();
vertex.labels = read_vertex.labels;
vertex.properties = read_vertex.properties;

View File

@ -1,6 +1,8 @@
#pragma once
#include "communication/bolt/v1/encoder/base_encoder.hpp"
#include "communication/conversion.hpp"
#include "database/graph_db_accessor.hpp"
#include "utils/cast.hpp"
namespace durability {
@ -11,7 +13,8 @@ class SnapshotEncoder : public communication::bolt::BaseEncoder<Buffer> {
explicit SnapshotEncoder(Buffer &buffer)
: communication::bolt::BaseEncoder<Buffer>(buffer) {}
void WriteSnapshotVertex(const VertexAccessor &vertex) {
communication::bolt::BaseEncoder<Buffer>::WriteVertex(vertex);
communication::bolt::BaseEncoder<Buffer>::WriteVertex(
communication::ToDecodedVertex(vertex));
// Write cypher_id
this->WriteInt(vertex.cypher_id());

View File

@ -1,6 +1,5 @@
#include "wal.hpp"
#include "communication/bolt/v1/decoder/decoded_value.hpp"
#include "durability/paths.hpp"
#include "utils/file.hpp"
#include "utils/flag_validation.hpp"

View File

@ -8,8 +8,7 @@
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "communication/bolt/v1/decoder/decoder.hpp"
#include "communication/bolt/v1/encoder/primitive_encoder.hpp"
#include "communication/bolt/v1/encoder/base_encoder.hpp"
#include "data_structures/ring_buffer.hpp"
#include "database/state_delta.hpp"
#include "storage/gid.hpp"
@ -68,7 +67,7 @@ class WriteAheadLog {
int worker_id_;
const std::experimental::filesystem::path wal_dir_;
HashedFileWriter writer_;
communication::bolt::PrimitiveEncoder<HashedFileWriter> encoder_{writer_};
communication::bolt::BaseEncoder<HashedFileWriter> encoder_{writer_};
// The file to which the WAL flushes data. The path is fixed, the file gets
// moved when the WAL gets rotated.

View File

@ -98,7 +98,7 @@ Interpreter::Results Interpreter::operator()(
ctx.symbol_table_ = plan->symbol_table();
std::map<std::string, TypedValue> summary;
std::map<std::string, communication::bolt::DecodedValue> summary;
summary["parsing_time"] = frontend_time.count();
summary["planning_time"] = planning_time.count();
summary["cost_estimate"] = plan->cost();

View File

@ -2,6 +2,8 @@
#include <gflags/gflags.h>
#include "communication/bolt/v1/encoder/base_encoder.hpp"
#include "communication/conversion.hpp"
#include "data_structures/concurrent/concurrent_map.hpp"
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
@ -66,7 +68,8 @@ class Interpreter {
Results(Context ctx, std::shared_ptr<CachedPlan> plan,
std::unique_ptr<query::plan::Cursor> cursor,
std::vector<Symbol> output_symbols, std::vector<std::string> header,
std::map<std::string, TypedValue> summary, PlanCacheT &plan_cache)
std::map<std::string, communication::bolt::DecodedValue> summary,
PlanCacheT &plan_cache)
: ctx_(std::move(ctx)),
plan_(plan),
cursor_(std::move(cursor)),
@ -141,7 +144,7 @@ class Interpreter {
bool header_written_{false};
std::vector<std::string> header_;
std::map<std::string, TypedValue> summary_;
std::map<std::string, communication::bolt::DecodedValue> summary_;
utils::Timer execution_timer_;
// Gets invalidated after if an index has been built.

View File

@ -64,7 +64,7 @@ void query::Repl(database::GraphDb &db) {
// regular cypher queries
try {
database::GraphDbAccessor dba(db);
ResultStreamFaker results;
ResultStreamFaker<query::TypedValue> results;
interpeter(command, dba, {}, false).PullAll(results);
std::cout << results;
dba.Commit();

View File

@ -3,6 +3,8 @@
#include "gflags/gflags.h"
#include "glog/logging.h"
#include "communication/bolt/v1/decoder/decoder.hpp"
#include "communication/conversion.hpp"
#include "storage/pod_buffer.hpp"
#include "storage/property_value_store.hpp"
@ -211,7 +213,7 @@ PropertyValueStore::iterator PropertyValueStore::end() const {
std::string PropertyValueStore::SerializeProp(const PropertyValue &prop) const {
storage::PODBuffer pod_buffer;
BaseEncoder<storage::PODBuffer> encoder{pod_buffer};
encoder.WriteTypedValue(prop);
encoder.WriteDecodedValue(communication::ToDecodedValue(prop));
return std::string(reinterpret_cast<char *>(pod_buffer.buffer.data()),
pod_buffer.buffer.size());
}
@ -219,14 +221,14 @@ std::string PropertyValueStore::SerializeProp(const PropertyValue &prop) const {
PropertyValue PropertyValueStore::DeserializeProp(
const std::string &serialized_prop) const {
storage::PODBuffer pod_buffer{serialized_prop};
Decoder<storage::PODBuffer> decoder{pod_buffer};
communication::bolt::Decoder<storage::PODBuffer> decoder{pod_buffer};
DecodedValue dv;
if (!decoder.ReadValue(&dv)) {
DLOG(WARNING) << "Unable to read property value";
return PropertyValue::Null;
}
return dv.operator PropertyValue();
return communication::ToPropertyValue(dv);
}
storage::KVStore PropertyValueStore::ConstructDiskStorage() const {

View File

@ -47,7 +47,7 @@ BENCHMARK_DEFINE_F(ExpansionBenchFixture, Match)(benchmark::State &state) {
auto query = "MATCH (s:Starting) return s";
database::GraphDbAccessor dba(*db_);
while (state.KeepRunning()) {
ResultStreamFaker results;
ResultStreamFaker<query::TypedValue> results;
interpreter()(query, dba, {}, false).PullAll(results);
}
}
@ -61,7 +61,7 @@ BENCHMARK_DEFINE_F(ExpansionBenchFixture, Expand)(benchmark::State &state) {
auto query = "MATCH (s:Starting) WITH s MATCH (s)--(d) RETURN count(d)";
database::GraphDbAccessor dba(*db_);
while (state.KeepRunning()) {
ResultStreamFaker results;
ResultStreamFaker<query::TypedValue> results;
interpreter()(query, dba, {}, false).PullAll(results);
}
}

View File

@ -8,7 +8,9 @@
#include "communication/bolt/client.hpp"
#include "communication/bolt/v1/decoder/decoded_value.hpp"
#include "utils/algorithm.hpp"
#include "utils/exceptions.hpp"
#include "utils/thread/sync.hpp"
#include "utils/timer.hpp"
using communication::ClientContext;

View File

@ -1,5 +1,17 @@
#pragma once
#include <atomic>
#include <chrono>
#include <experimental/optional>
#include <fstream>
#include <iostream>
#include <map>
#include <string>
#include <thread>
#include <unordered_map>
#include <utility>
#include <vector>
#include "json/json.hpp"
#include "stats/metrics.hpp"

View File

@ -7,6 +7,7 @@
#include <random>
#include <sstream>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include <gflags/gflags.h>

View File

@ -1,4 +1,5 @@
#include <fstream>
#include <thread>
#include <vector>
#include <gflags/gflags.h>

View File

@ -3,6 +3,7 @@
#include <chrono>
#include <vector>
#include "communication/conversion.hpp"
#include "communication/result_stream_faker.hpp"
#include "database/graph_db_accessor.hpp"
#include "query/interpreter.hpp"
@ -62,7 +63,7 @@ class Cluster {
auto Execute(const std::string &query,
std::map<std::string, query::TypedValue> params = {}) {
database::GraphDbAccessor dba(*master_);
ResultStreamFaker result;
ResultStreamFaker<query::TypedValue> result;
interpreter_->operator()(query, dba, params, false).PullAll(result);
dba.Commit();
return result.GetResults();

View File

@ -13,7 +13,7 @@ int main(int argc, char *argv[]) {
}
database::SingleNode db;
database::GraphDbAccessor dba(db);
ResultStreamFaker results;
ResultStreamFaker<query::TypedValue> results;
query::Interpreter{db}(argv[1], dba, {}, false).PullAll(results);
std::cout << results;
return 0;

View File

@ -4,6 +4,8 @@
#include <vector>
#include "communication/bolt/v1/encoder/base_encoder.hpp"
#include "communication/conversion.hpp"
#include "durability/hashed_file_writer.hpp"
#include "durability/paths.hpp"
#include "durability/version.hpp"
#include "query/typed_value.hpp"
@ -25,12 +27,12 @@ class SnapshotWriter {
: worker_id_(worker_id), buffer_(path) {
encoder_.WriteRAW(durability::kMagicNumber.data(),
durability::kMagicNumber.size());
encoder_.WriteTypedValue(durability::kVersion);
encoder_.WriteDecodedValue(durability::kVersion);
encoder_.WriteInt(worker_id_);
encoder_.WriteInt(vertex_generator_local_count);
encoder_.WriteInt(edge_generator_local_count);
encoder_.WriteInt(0);
encoder_.WriteList(std::vector<query::TypedValue>{});
encoder_.WriteList(std::vector<communication::bolt::DecodedValue>{});
}
// reference to `buffer_` gets broken when moving, so let's just forbid moving
@ -39,8 +41,8 @@ class SnapshotWriter {
template <typename TValue>
void WriteList(const std::vector<TValue> &list) {
encoder_.WriteList(
std::vector<query::TypedValue>(list.begin(), list.end()));
encoder_.WriteList(std::vector<communication::bolt::DecodedValue>(
list.begin(), list.end()));
}
storage::VertexAddress DefaultVertexAddress(gid::Gid gid) {
@ -67,7 +69,11 @@ class SnapshotWriter {
encoder_.WriteInt(node.gid);
WriteList(node.labels);
encoder_.WriteMap(node.props);
std::map<std::string, communication::bolt::DecodedValue> props;
for (const auto &prop : node.props) {
props[prop.first] = communication::ToDecodedValue(prop.second);
}
encoder_.WriteMap(props);
// cypher_id
encoder_.WriteInt(utils::MemcpyCast<int64_t>(node.gid));
@ -94,7 +100,11 @@ class SnapshotWriter {
encoder_.WriteInt(edge.from);
encoder_.WriteInt(edge.to);
encoder_.WriteString(edge.type);
encoder_.WriteMap(edge.props);
std::map<std::string, communication::bolt::DecodedValue> props;
for (const auto &prop : edge.props) {
props[prop.first] = communication::ToDecodedValue(prop.second);
}
encoder_.WriteMap(props);
// cypher_id
encoder_.WriteInt(utils::MemcpyCast<int64_t>(edge.gid));

View File

@ -1,3 +1,8 @@
#include <fstream>
#include <random>
#include <set>
#include <thread>
#include <fmt/format.h>
#include <gflags/gflags.h>
#include <glog/logging.h>

View File

@ -342,7 +342,7 @@ TEST(BoltDecoder, Vertex) {
buffer.Write(test_int, 1);
ASSERT_EQ(decoder.ReadValue(&dv, DecodedValue::Type::Vertex), true);
auto &vertex = dv.ValueVertex();
ASSERT_EQ(vertex.id, 1);
ASSERT_EQ(vertex.id.AsUint(), 1);
ASSERT_EQ(vertex.labels[0], std::string("a"));
ASSERT_EQ(vertex.properties[std::string("a")].ValueInt(), 1);
}
@ -429,9 +429,9 @@ TEST(BoltDecoder, Edge) {
buffer.Write(test_int1, 1);
ASSERT_EQ(decoder.ReadValue(&de, DecodedValue::Type::Edge), true);
auto &edge = de.ValueEdge();
ASSERT_EQ(edge.id, 1);
ASSERT_EQ(edge.from, 2);
ASSERT_EQ(edge.to, 3);
ASSERT_EQ(edge.id.AsUint(), 1);
ASSERT_EQ(edge.from.AsUint(), 2);
ASSERT_EQ(edge.to.AsUint(), 3);
ASSERT_EQ(edge.type, std::string("a"));
ASSERT_EQ(edge.properties[std::string("a")].ValueInt(), 1);
}

View File

@ -2,11 +2,11 @@
#include "bolt_testdata.hpp"
#include "communication/bolt/v1/encoder/encoder.hpp"
#include "communication/conversion.hpp"
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "query/typed_value.hpp"
using query::TypedValue;
using communication::bolt::DecodedValue;
/**
* TODO (mferencevic): document
@ -65,10 +65,10 @@ std::vector<uint8_t> &output = output_stream.output;
TEST(BoltEncoder, NullAndBool) {
output.clear();
std::vector<TypedValue> vals;
vals.push_back(TypedValue::Null);
vals.push_back(TypedValue(true));
vals.push_back(TypedValue(false));
std::vector<DecodedValue> vals;
vals.push_back(DecodedValue());
vals.push_back(DecodedValue(true));
vals.push_back(DecodedValue(false));
bolt_encoder.MessageRecord(vals);
CheckRecordHeader(output, 3);
CheckOutput(output, (const uint8_t *)"\xC0\xC3\xC2", 3);
@ -77,8 +77,8 @@ TEST(BoltEncoder, NullAndBool) {
TEST(BoltEncoder, Int) {
int N = 28;
output.clear();
std::vector<TypedValue> vals;
for (int i = 0; i < N; ++i) vals.push_back(TypedValue(int_decoded[i]));
std::vector<DecodedValue> vals;
for (int i = 0; i < N; ++i) vals.push_back(DecodedValue(int_decoded[i]));
bolt_encoder.MessageRecord(vals);
CheckRecordHeader(output, N);
for (int i = 0; i < N; ++i)
@ -89,8 +89,8 @@ TEST(BoltEncoder, Int) {
TEST(BoltEncoder, Double) {
int N = 4;
output.clear();
std::vector<TypedValue> vals;
for (int i = 0; i < N; ++i) vals.push_back(TypedValue(double_decoded[i]));
std::vector<DecodedValue> vals;
for (int i = 0; i < N; ++i) vals.push_back(DecodedValue(double_decoded[i]));
bolt_encoder.MessageRecord(vals);
CheckRecordHeader(output, N);
for (int i = 0; i < N; ++i) CheckOutput(output, double_encoded[i], 9, false);
@ -99,9 +99,9 @@ TEST(BoltEncoder, Double) {
TEST(BoltEncoder, String) {
output.clear();
std::vector<TypedValue> vals;
std::vector<DecodedValue> vals;
for (uint64_t i = 0; i < sizes_num; ++i)
vals.push_back(TypedValue(std::string((const char *)data, sizes[i])));
vals.push_back(DecodedValue(std::string((const char *)data, sizes[i])));
bolt_encoder.MessageRecord(vals);
CheckRecordHeader(output, vals.size());
for (uint64_t i = 0; i < sizes_num; ++i) {
@ -113,12 +113,12 @@ TEST(BoltEncoder, String) {
TEST(BoltEncoder, List) {
output.clear();
std::vector<TypedValue> vals;
std::vector<DecodedValue> vals;
for (uint64_t i = 0; i < sizes_num; ++i) {
std::vector<TypedValue> val;
std::vector<DecodedValue> val;
for (uint64_t j = 0; j < sizes[i]; ++j)
val.push_back(TypedValue(std::string((const char *)&data[j], 1)));
vals.push_back(TypedValue(val));
val.push_back(DecodedValue(std::string((const char *)&data[j], 1)));
vals.push_back(DecodedValue(val));
}
bolt_encoder.MessageRecord(vals);
CheckRecordHeader(output, vals.size());
@ -134,16 +134,16 @@ TEST(BoltEncoder, List) {
TEST(BoltEncoder, Map) {
output.clear();
std::vector<TypedValue> vals;
std::vector<DecodedValue> vals;
uint8_t buff[10];
for (int i = 0; i < sizes_num; ++i) {
std::map<std::string, TypedValue> val;
std::map<std::string, DecodedValue> val;
for (int j = 0; j < sizes[i]; ++j) {
sprintf((char *)buff, "%05X", j);
std::string tmp((char *)buff, 5);
val.insert(std::make_pair(tmp, TypedValue(tmp)));
val.insert(std::make_pair(tmp, DecodedValue(tmp)));
}
vals.push_back(TypedValue(val));
vals.push_back(DecodedValue(val));
}
bolt_encoder.MessageRecord(vals);
CheckRecordHeader(output, vals.size());
@ -188,10 +188,10 @@ TEST(BoltEncoder, VertexAndEdge) {
ea.PropsSet(p4, pv4);
// check everything
std::vector<TypedValue> vals;
vals.push_back(TypedValue(va1));
vals.push_back(TypedValue(va2));
vals.push_back(TypedValue(ea));
std::vector<DecodedValue> vals;
vals.push_back(communication::ToDecodedValue(va1));
vals.push_back(communication::ToDecodedValue(va2));
vals.push_back(communication::ToDecodedValue(ea));
bolt_encoder.MessageRecord(vals);
// The vertexedge_encoded testdata has hardcoded zeros for IDs,
@ -214,18 +214,18 @@ TEST(BoltEncoder, BoltV1ExampleMessages) {
output.clear();
// record message
std::vector<TypedValue> rvals;
for (int i = 1; i < 4; ++i) rvals.push_back(TypedValue(i));
std::vector<DecodedValue> rvals;
for (int i = 1; i < 4; ++i) rvals.push_back(DecodedValue(i));
bolt_encoder.MessageRecord(rvals);
CheckOutput(output, (const uint8_t *)"\xB1\x71\x93\x01\x02\x03", 6);
// success message
std::string sv1("name"), sv2("age"), sk("fields");
std::vector<TypedValue> svec;
svec.push_back(TypedValue(sv1));
svec.push_back(TypedValue(sv2));
TypedValue slist(svec);
std::map<std::string, TypedValue> svals;
std::vector<DecodedValue> svec;
svec.push_back(DecodedValue(sv1));
svec.push_back(DecodedValue(sv2));
DecodedValue slist(svec);
std::map<std::string, DecodedValue> svals;
svals.insert(std::make_pair(sk, slist));
bolt_encoder.MessageSuccess(svals);
CheckOutput(output,
@ -236,8 +236,8 @@ TEST(BoltEncoder, BoltV1ExampleMessages) {
std::string fv1("Neo.ClientError.Statement.SyntaxError"),
fv2("Invalid syntax.");
std::string fk1("code"), fk2("message");
TypedValue ftv1(fv1), ftv2(fv2);
std::map<std::string, TypedValue> fvals;
DecodedValue ftv1(fv1), ftv2(fv2);
std::map<std::string, DecodedValue> fvals;
fvals.insert(std::make_pair(fk1, ftv1));
fvals.insert(std::make_pair(fk2, ftv2));
bolt_encoder.MessageFailure(fvals);

View File

@ -3,9 +3,8 @@
#include "communication/bolt/v1/encoder/chunked_encoder_buffer.hpp"
#include "communication/bolt/v1/encoder/encoder.hpp"
#include "communication/bolt/v1/encoder/result_stream.hpp"
#include "query/typed_value.hpp"
using query::TypedValue;
using communication::bolt::DecodedValue;
using BufferT = communication::bolt::ChunkedEncoderBuffer<TestOutputStream>;
using EncoderT = communication::bolt::Encoder<BufferT>;
@ -36,15 +35,15 @@ TEST(Bolt, ResultStream) {
PrintOutput(output);
CheckOutput(output, header_output, 45);
std::vector<TypedValue> result{TypedValue(5),
TypedValue(std::string("hello"))};
std::vector<DecodedValue> result{DecodedValue(5),
DecodedValue(std::string("hello"))};
result_stream.Result(result);
buffer.Flush();
PrintOutput(output);
CheckOutput(output, result_output, 14);
std::map<std::string, TypedValue> summary;
summary.insert(std::make_pair(std::string("changed"), TypedValue(10)));
std::map<std::string, DecodedValue> summary;
summary.insert(std::make_pair(std::string("changed"), DecodedValue(10)));
result_stream.Summary(summary);
buffer.Flush();
PrintOutput(output);

View File

@ -12,7 +12,7 @@ TEST(TransactionTimeout, TransactionTimeout) {
database::SingleNode db;
query::Interpreter interpreter{db};
auto interpret = [&](auto &dba, const std::string &query) {
ResultStreamFaker stream;
ResultStreamFaker<query::TypedValue> stream;
interpreter(query, dba, {}, false).PullAll(stream);
};

View File

@ -4,6 +4,7 @@
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "communication/result_stream_faker.hpp"
#include "database/graph_db.hpp"
#include "distributed/plan_consumer.hpp"
#include "distributed/plan_dispatcher.hpp"
@ -38,7 +39,7 @@ class DistributedInterpretationTest : public DistributedGraphDbTest {
auto RunWithDba(const std::string &query, GraphDbAccessor &dba) {
std::map<std::string, query::TypedValue> params = {};
ResultStreamFaker result;
ResultStreamFaker<query::TypedValue> result;
interpreter_.value()(query, dba, params, false).PullAll(result);
return result.GetResults();
}

View File

@ -482,18 +482,18 @@ TEST_F(Durability, SnapshotEncoding) {
decoder.ReadValue(&dv);
ASSERT_EQ(dv.type(), communication::bolt::DecodedValue::Type::Edge);
auto &edge = dv.ValueEdge();
decoded_edges.emplace(edge.id, edge);
decoded_edges.emplace(edge.id.AsUint(), edge);
// Read cypher_id.
decoder.ReadValue(&dv);
ASSERT_EQ(dv.type(), communication::bolt::DecodedValue::Type::Int);
}
EXPECT_EQ(decoded_edges.size(), 2);
EXPECT_EQ(decoded_edges[gid0].from, gid0);
EXPECT_EQ(decoded_edges[gid0].to, gid1);
EXPECT_EQ(decoded_edges[gid0].from.AsUint(), gid0);
EXPECT_EQ(decoded_edges[gid0].to.AsUint(), gid1);
EXPECT_EQ(decoded_edges[gid0].type, "et0");
EXPECT_EQ(decoded_edges[gid0].properties.size(), 1);
EXPECT_EQ(decoded_edges[gid1].from, gid2);
EXPECT_EQ(decoded_edges[gid1].to, gid1);
EXPECT_EQ(decoded_edges[gid1].from.AsUint(), gid2);
EXPECT_EQ(decoded_edges[gid1].to.AsUint(), gid1);
EXPECT_EQ(decoded_edges[gid1].type, "et1");
EXPECT_EQ(decoded_edges[gid1].properties.size(), 0);

View File

@ -17,11 +17,10 @@ class InterpreterTest : public ::testing::Test {
database::SingleNode db_;
query::Interpreter interpreter_{db_};
ResultStreamFaker Interpret(
const std::string &query,
const std::map<std::string, query::TypedValue> params = {}) {
auto Interpret(const std::string &query,
const std::map<std::string, query::TypedValue> &params = {}) {
database::GraphDbAccessor dba(db_);
ResultStreamFaker result;
ResultStreamFaker<query::TypedValue> result;
interpreter_(query, dba, params, false).PullAll(result);
return result;
}
@ -198,7 +197,7 @@ TEST_F(InterpreterTest, Bfs) {
}
database::GraphDbAccessor dba(db_);
ResultStreamFaker stream;
ResultStreamFaker<query::TypedValue> stream;
interpreter_(
"MATCH (n {id: 0})-[r *bfs..5 (e, n | n.reachable and "
"e.reachable)]->(m) RETURN r",
@ -242,7 +241,7 @@ TEST_F(InterpreterTest, Bfs) {
}
TEST_F(InterpreterTest, CreateIndexInMulticommandTransaction) {
ResultStreamFaker stream;
ResultStreamFaker<query::TypedValue> stream;
database::GraphDbAccessor dba(db_);
ASSERT_THROW(
interpreter_("CREATE INDEX ON :X(y)", dba, {}, true).PullAll(stream),
@ -252,7 +251,7 @@ TEST_F(InterpreterTest, CreateIndexInMulticommandTransaction) {
// Test shortest path end to end.
TEST_F(InterpreterTest, ShortestPath) {
{
ResultStreamFaker stream;
ResultStreamFaker<query::TypedValue> stream;
database::GraphDbAccessor dba(db_);
interpreter_(
"CREATE (n:A {x: 1}), (m:B {x: 2}), (l:C {x: 1}), (n)-[:r1 {w: 1 "
@ -263,7 +262,7 @@ TEST_F(InterpreterTest, ShortestPath) {
dba.Commit();
}
ResultStreamFaker stream;
ResultStreamFaker<query::TypedValue> stream;
database::GraphDbAccessor dba(db_);
interpreter_("MATCH (n)-[e *wshortest 5 (e, n | e.w) ]->(m) return e", dba,
{}, false)

View File

@ -4,7 +4,6 @@
#include <memory>
#include <vector>
#include "communication/result_stream_faker.hpp"
#include "query/common.hpp"
#include "query/context.hpp"
#include "query/frontend/semantic/symbol_table.hpp"
@ -18,30 +17,15 @@ using namespace query::plan;
using Bound = ScanAllByLabelPropertyRange::Bound;
/**
* Helper function that collects all the results from the given
* Produce into a ResultStreamFaker and returns the results from it.
*
* @param produce
* @param symbol_table
* @param db_accessor
* @return
*/
/** Helper function that collects all the results from the given Produce. */
std::vector<std::vector<TypedValue>> CollectProduce(
Produce *produce, SymbolTable &symbol_table,
database::GraphDbAccessor &db_accessor) {
ResultStreamFaker stream;
Frame frame(symbol_table.max_position());
// top level node in the operator tree is a produce (return)
// so stream out results
// generate header
std::vector<std::string> header;
for (auto named_expression : produce->named_expressions())
header.push_back(named_expression->name_);
stream.Header(header);
// collect the symbols from the return clause
std::vector<Symbol> symbols;
for (auto named_expression : produce->named_expressions())
@ -51,15 +35,14 @@ std::vector<std::vector<TypedValue>> CollectProduce(
context.symbol_table_ = symbol_table;
// stream out results
auto cursor = produce->MakeCursor(db_accessor);
std::vector<std::vector<TypedValue>> results;
while (cursor->Pull(frame, context)) {
std::vector<TypedValue> values;
for (auto &symbol : symbols) values.emplace_back(frame[symbol]);
stream.Result(values);
results.emplace_back(values);
}
stream.Summary({{std::string("type"), TypedValue("r")}});
return stream.GetResults();
return results;
}
int PullAll(std::shared_ptr<LogicalOperator> logical_op,

View File

@ -39,7 +39,7 @@ class QueryExecution : public testing::Test {
/** Executes the query and returns the results.
* Does NOT commit the transaction */
auto Execute(const std::string &query) {
ResultStreamFaker results;
ResultStreamFaker<query::TypedValue> results;
query::Interpreter{*db_}(query, *dba_, {}, false).PullAll(results);
return results.GetResults();
}

View File

@ -17,6 +17,7 @@
#include "durability/snapshot_encoder.hpp"
#include "durability/version.hpp"
#include "storage/address_types.hpp"
#include "utils/cast.hpp"
#include "utils/string.hpp"
#include "utils/timer.hpp"
@ -250,7 +251,8 @@ void WriteNodeRow(
}
}
CHECK(id) << "Node ID must be specified";
partial_vertices[*id] = {*id, static_cast<int>(*id), labels, properties, {}};
partial_vertices[*id] = {
*id, utils::MemcpyCast<int64_t>(*id), labels, properties, {}};
}
auto PassNodes(std::unordered_map<gid::Gid, durability::DecodedSnapshotVertex>
@ -307,7 +309,10 @@ void WriteRelationshipsRow(
CHECK(end_id) << "END_ID must be set";
CHECK(relationship_type) << "Relationship TYPE must be set";
edges[relationship_id] = {(int64_t)relationship_id, *start_id, *end_id,
auto bolt_id = communication::bolt::Id::FromUint(relationship_id);
auto bolt_start_id = communication::bolt::Id::FromUint(*start_id);
auto bolt_end_id = communication::bolt::Id::FromUint(*end_id);
edges[relationship_id] = {bolt_id, bolt_start_id, bolt_end_id,
*relationship_type, properties};
}
@ -353,7 +358,7 @@ void Convert(const std::vector<std::string> &nodes,
// 7) Summary with node count, relationship count and hash digest.
encoder.WriteRAW(durability::kMagicNumber.data(),
durability::kMagicNumber.size());
encoder.WriteTypedValue(durability::kVersion);
encoder.WriteDecodedValue(durability::kVersion);
encoder.WriteInt(0); // Worker Id - for this use case it's okay to set to 0
// since we are using a single-node version of
@ -379,11 +384,12 @@ void Convert(const std::vector<std::string> &nodes,
}
for (auto edge : edges) {
auto encoded = edge.second;
auto edge_address = storage::EdgeAddress(encoded.id, 0);
vertices[encoded.from].out.push_back(
{edge_address, storage::VertexAddress(encoded.to, 0), encoded.type});
vertices[encoded.to].in.push_back(
{edge_address, storage::VertexAddress(encoded.from, 0),
auto edge_address = storage::EdgeAddress(encoded.id.AsUint(), 0);
vertices[encoded.from.AsUint()].out.push_back(
{edge_address, storage::VertexAddress(encoded.to.AsUint(), 0),
encoded.type});
vertices[encoded.to.AsUint()].in.push_back(
{edge_address, storage::VertexAddress(encoded.from.AsUint(), 0),
encoded.type});
}
for (auto vertex_pair : vertices) {
@ -396,10 +402,12 @@ void Convert(const std::vector<std::string> &nodes,
encoder.WriteInt(vertex.gid);
auto &labels = vertex.labels;
std::vector<query::TypedValue> transformed;
std::transform(
labels.begin(), labels.end(), std::back_inserter(transformed),
[](const std::string &str) -> query::TypedValue { return str; });
std::vector<communication::bolt::DecodedValue> transformed;
std::transform(labels.begin(), labels.end(),
std::back_inserter(transformed),
[](const std::string &str) {
return communication::bolt::DecodedValue(str);
});
encoder.WriteList(transformed);
encoder.WriteMap(vertex.properties);
@ -426,14 +434,14 @@ void Convert(const std::vector<std::string> &nodes,
utils::UnderlyingCast(communication::bolt::Marker::TinyStruct) + 5);
encoder.WriteRAW(
utils::UnderlyingCast(communication::bolt::Signature::Relationship));
encoder.WriteInt(edge.id);
encoder.WriteInt(edge.from);
encoder.WriteInt(edge.to);
encoder.WriteInt(edge.id.AsInt());
encoder.WriteInt(edge.from.AsInt());
encoder.WriteInt(edge.to.AsInt());
encoder.WriteString(edge.type);
encoder.WriteMap(edge.properties);
// cypher_id
encoder.WriteInt(edge.id);
encoder.WriteInt(edge.id.AsInt());
}
buffer.WriteValue(node_count);