Bolt: BaseEncoder split into PrimitiveEncoder and BaseEncoder
Summary: This is an attempt at solving circular dependencies happening in WAL implementation. The cycle is: BaseEncoder -> GraphDbAccessor -> GraphDb -> WAL -> BaseEncoder. The cycle will be broken by this diff because the WAL only needs primitive encoding and will be able to use `PrimitiveEncoder` only. This fix is not ideal, since the BaseEncoder -> GraphDbAccessor dependency is very unnatural. This could probably be fixes properly with a refactor of GraphDb/GraphDbAccessor that is in the post, but that takes time and this fix is not very dirty, more of an added complication. Reviewers: buda, mferencevic Reviewed By: buda Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D925
This commit is contained in:
parent
add801a80a
commit
b65fcc8f90
@ -1,101 +1,25 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include "communication/bolt/v1/codes.hpp"
|
#include "communication/bolt/v1/encoder/primitive_encoder.hpp"
|
||||||
#include "database/graph_db_accessor.hpp"
|
#include "database/graph_db_accessor.hpp"
|
||||||
#include "query/typed_value.hpp"
|
#include "query/typed_value.hpp"
|
||||||
#include "utils/bswap.hpp"
|
|
||||||
|
|
||||||
#include <string>
|
|
||||||
|
|
||||||
namespace communication::bolt {
|
namespace communication::bolt {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Bolt BaseEncoder. Has public interfaces for writing Bolt encoded data.
|
* Bolt BaseEncoder. Subclass of PrimitiveEncoder. Extends it with the
|
||||||
* Supported types are: Null, Bool, Int, Double, String, List, Map, Vertex, Edge
|
* capability to encode TypedValues (as well as lists and maps of TypedValues),
|
||||||
*
|
* Edges, Vertices and Paths.
|
||||||
* This class has a dual purpose. The first is streaming of bolt data to network
|
|
||||||
* clients. The second is streaming to disk in the database snapshotter.
|
|
||||||
*
|
*
|
||||||
* @tparam Buffer the output buffer that should be used
|
* @tparam Buffer the output buffer that should be used
|
||||||
*/
|
*/
|
||||||
template <typename Buffer>
|
template <typename Buffer>
|
||||||
class BaseEncoder {
|
class BaseEncoder : public PrimitiveEncoder<Buffer> {
|
||||||
public:
|
public:
|
||||||
BaseEncoder(Buffer &buffer) : buffer_(buffer) {}
|
BaseEncoder(Buffer &buffer) : PrimitiveEncoder<Buffer>(buffer) {}
|
||||||
|
|
||||||
void WriteRAW(const uint8_t *data, uint64_t len) { buffer_.Write(data, len); }
|
|
||||||
|
|
||||||
void WriteRAW(const char *data, uint64_t len) {
|
|
||||||
WriteRAW((const uint8_t *)data, len);
|
|
||||||
}
|
|
||||||
|
|
||||||
void WriteRAW(const uint8_t data) { WriteRAW(&data, 1); }
|
|
||||||
|
|
||||||
template <class T>
|
|
||||||
void WriteValue(T value) {
|
|
||||||
value = bswap(value);
|
|
||||||
WriteRAW(reinterpret_cast<const uint8_t *>(&value), sizeof(value));
|
|
||||||
}
|
|
||||||
|
|
||||||
void WriteNull() { WriteRAW(underlying_cast(Marker::Null)); }
|
|
||||||
|
|
||||||
void WriteBool(const bool &value) {
|
|
||||||
if (value)
|
|
||||||
WriteRAW(underlying_cast(Marker::True));
|
|
||||||
else
|
|
||||||
WriteRAW(underlying_cast(Marker::False));
|
|
||||||
}
|
|
||||||
|
|
||||||
void WriteInt(const int64_t &value) {
|
|
||||||
if (value >= -16L && value < 128L) {
|
|
||||||
WriteRAW(static_cast<uint8_t>(value));
|
|
||||||
} else if (value >= -128L && value < -16L) {
|
|
||||||
WriteRAW(underlying_cast(Marker::Int8));
|
|
||||||
WriteRAW(static_cast<uint8_t>(value));
|
|
||||||
} else if (value >= -32768L && value < 32768L) {
|
|
||||||
WriteRAW(underlying_cast(Marker::Int16));
|
|
||||||
WriteValue(static_cast<int16_t>(value));
|
|
||||||
} else if (value >= -2147483648L && value < 2147483648L) {
|
|
||||||
WriteRAW(underlying_cast(Marker::Int32));
|
|
||||||
WriteValue(static_cast<int32_t>(value));
|
|
||||||
} else {
|
|
||||||
WriteRAW(underlying_cast(Marker::Int64));
|
|
||||||
WriteValue(value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void WriteDouble(const double &value) {
|
|
||||||
WriteRAW(underlying_cast(Marker::Float64));
|
|
||||||
WriteValue(*reinterpret_cast<const int64_t *>(&value));
|
|
||||||
}
|
|
||||||
|
|
||||||
void WriteTypeSize(const size_t size, const uint8_t typ) {
|
|
||||||
if (size <= 15) {
|
|
||||||
uint8_t len = size;
|
|
||||||
len &= 0x0F;
|
|
||||||
WriteRAW(underlying_cast(MarkerTiny[typ]) + len);
|
|
||||||
} else if (size <= 255) {
|
|
||||||
uint8_t len = size;
|
|
||||||
WriteRAW(underlying_cast(Marker8[typ]));
|
|
||||||
WriteRAW(len);
|
|
||||||
} else if (size <= 65535) {
|
|
||||||
uint16_t len = size;
|
|
||||||
WriteRAW(underlying_cast(Marker16[typ]));
|
|
||||||
WriteValue(len);
|
|
||||||
} else {
|
|
||||||
uint32_t len = size;
|
|
||||||
WriteRAW(underlying_cast(Marker32[typ]));
|
|
||||||
WriteValue(len);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void WriteString(const std::string &value) {
|
|
||||||
WriteTypeSize(value.size(), MarkerString);
|
|
||||||
WriteRAW(value.c_str(), value.size());
|
|
||||||
}
|
|
||||||
|
|
||||||
void WriteList(const std::vector<query::TypedValue> &value) {
|
void WriteList(const std::vector<query::TypedValue> &value) {
|
||||||
WriteTypeSize(value.size(), MarkerList);
|
this->WriteTypeSize(value.size(), MarkerList);
|
||||||
for (auto &x : value) WriteTypedValue(x);
|
for (auto &x : value) WriteTypedValue(x);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -106,36 +30,36 @@ class BaseEncoder {
|
|||||||
*/
|
*/
|
||||||
template <typename TMap>
|
template <typename TMap>
|
||||||
void WriteMap(const TMap &value) {
|
void WriteMap(const TMap &value) {
|
||||||
WriteTypeSize(value.size(), MarkerMap);
|
this->WriteTypeSize(value.size(), MarkerMap);
|
||||||
for (auto &x : value) {
|
for (auto &x : value) {
|
||||||
WriteString(x.first);
|
this->WriteString(x.first);
|
||||||
WriteTypedValue(x.second);
|
WriteTypedValue(x.second);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void WriteVertex(const VertexAccessor &vertex) {
|
void WriteVertex(const VertexAccessor &vertex) {
|
||||||
WriteRAW(underlying_cast(Marker::TinyStruct) + 3);
|
this->WriteRAW(underlying_cast(Marker::TinyStruct) + 3);
|
||||||
WriteRAW(underlying_cast(Signature::Node));
|
this->WriteRAW(underlying_cast(Signature::Node));
|
||||||
WriteUInt(vertex.temporary_id());
|
WriteUInt(vertex.temporary_id());
|
||||||
|
|
||||||
// write labels
|
// write labels
|
||||||
const auto &labels = vertex.labels();
|
const auto &labels = vertex.labels();
|
||||||
WriteTypeSize(labels.size(), MarkerList);
|
this->WriteTypeSize(labels.size(), MarkerList);
|
||||||
for (const auto &label : labels)
|
for (const auto &label : labels)
|
||||||
WriteString(vertex.db_accessor().LabelName(label));
|
this->WriteString(vertex.db_accessor().LabelName(label));
|
||||||
|
|
||||||
// write properties
|
// write properties
|
||||||
const auto &props = vertex.Properties();
|
const auto &props = vertex.Properties();
|
||||||
WriteTypeSize(props.size(), MarkerMap);
|
this->WriteTypeSize(props.size(), MarkerMap);
|
||||||
for (const auto &prop : props) {
|
for (const auto &prop : props) {
|
||||||
WriteString(vertex.db_accessor().PropertyName(prop.first));
|
this->WriteString(vertex.db_accessor().PropertyName(prop.first));
|
||||||
WriteTypedValue(prop.second);
|
WriteTypedValue(prop.second);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void WriteEdge(const EdgeAccessor &edge, bool unbound = false) {
|
void WriteEdge(const EdgeAccessor &edge, bool unbound = false) {
|
||||||
WriteRAW(underlying_cast(Marker::TinyStruct) + (unbound ? 3 : 5));
|
this->WriteRAW(underlying_cast(Marker::TinyStruct) + (unbound ? 3 : 5));
|
||||||
WriteRAW(underlying_cast(unbound ? Signature::UnboundRelationship
|
this->WriteRAW(underlying_cast(unbound ? Signature::UnboundRelationship
|
||||||
: Signature::Relationship));
|
: Signature::Relationship));
|
||||||
|
|
||||||
WriteUInt(edge.temporary_id());
|
WriteUInt(edge.temporary_id());
|
||||||
@ -145,13 +69,13 @@ class BaseEncoder {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// write type
|
// write type
|
||||||
WriteString(edge.db_accessor().EdgeTypeName(edge.EdgeType()));
|
this->WriteString(edge.db_accessor().EdgeTypeName(edge.EdgeType()));
|
||||||
|
|
||||||
// write properties
|
// write properties
|
||||||
const auto &props = edge.Properties();
|
const auto &props = edge.Properties();
|
||||||
WriteTypeSize(props.size(), MarkerMap);
|
this->WriteTypeSize(props.size(), MarkerMap);
|
||||||
for (const auto &prop : props) {
|
for (const auto &prop : props) {
|
||||||
WriteString(edge.db_accessor().PropertyName(prop.first));
|
this->WriteString(edge.db_accessor().PropertyName(prop.first));
|
||||||
WriteTypedValue(prop.second);
|
WriteTypedValue(prop.second);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -189,32 +113,32 @@ class BaseEncoder {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Write data.
|
// Write data.
|
||||||
WriteRAW(underlying_cast(Marker::TinyStruct) + 3);
|
this->WriteRAW(underlying_cast(Marker::TinyStruct) + 3);
|
||||||
WriteRAW(underlying_cast(Signature::Path));
|
this->WriteRAW(underlying_cast(Signature::Path));
|
||||||
WriteTypeSize(vertices.size(), MarkerList);
|
this->WriteTypeSize(vertices.size(), MarkerList);
|
||||||
for (auto &v : vertices) WriteVertex(v);
|
for (auto &v : vertices) WriteVertex(v);
|
||||||
WriteTypeSize(edges.size(), MarkerList);
|
this->WriteTypeSize(edges.size(), MarkerList);
|
||||||
for (auto &e : edges) WriteEdge(e, true);
|
for (auto &e : edges) WriteEdge(e, true);
|
||||||
WriteTypeSize(indices.size(), MarkerList);
|
this->WriteTypeSize(indices.size(), MarkerList);
|
||||||
for (auto &i : indices) WriteInt(i);
|
for (auto &i : indices) this->WriteInt(i);
|
||||||
}
|
}
|
||||||
|
|
||||||
void WriteTypedValue(const query::TypedValue &value) {
|
void WriteTypedValue(const query::TypedValue &value) {
|
||||||
switch (value.type()) {
|
switch (value.type()) {
|
||||||
case query::TypedValue::Type::Null:
|
case query::TypedValue::Type::Null:
|
||||||
WriteNull();
|
this->WriteNull();
|
||||||
break;
|
break;
|
||||||
case query::TypedValue::Type::Bool:
|
case query::TypedValue::Type::Bool:
|
||||||
WriteBool(value.Value<bool>());
|
this->WriteBool(value.Value<bool>());
|
||||||
break;
|
break;
|
||||||
case query::TypedValue::Type::Int:
|
case query::TypedValue::Type::Int:
|
||||||
WriteInt(value.Value<int64_t>());
|
this->WriteInt(value.Value<int64_t>());
|
||||||
break;
|
break;
|
||||||
case query::TypedValue::Type::Double:
|
case query::TypedValue::Type::Double:
|
||||||
WriteDouble(value.Value<double>());
|
this->WriteDouble(value.Value<double>());
|
||||||
break;
|
break;
|
||||||
case query::TypedValue::Type::String:
|
case query::TypedValue::Type::String:
|
||||||
WriteString(value.Value<std::string>());
|
this->WriteString(value.Value<std::string>());
|
||||||
break;
|
break;
|
||||||
case query::TypedValue::Type::List:
|
case query::TypedValue::Type::List:
|
||||||
WriteList(value.Value<std::vector<query::TypedValue>>());
|
WriteList(value.Value<std::vector<query::TypedValue>>());
|
||||||
@ -234,12 +158,9 @@ class BaseEncoder {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected:
|
|
||||||
Buffer &buffer_;
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void WriteUInt(const uint64_t &value) {
|
void WriteUInt(const uint64_t &value) {
|
||||||
WriteInt(*reinterpret_cast<const int64_t *>(&value));
|
this->WriteInt(*reinterpret_cast<const int64_t *>(&value));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
137
src/communication/bolt/v1/encoder/primitive_encoder.hpp
Normal file
137
src/communication/bolt/v1/encoder/primitive_encoder.hpp
Normal file
@ -0,0 +1,137 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
|
||||||
|
#include "communication/bolt/v1/codes.hpp"
|
||||||
|
#include "storage/property_value.hpp"
|
||||||
|
#include "utils/bswap.hpp"
|
||||||
|
|
||||||
|
namespace communication::bolt {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Bolt PrimitiveEncoder. Has public interfaces for writing Bolt encoded data.
|
||||||
|
* Supported types are: Null, Bool, Int, Double, String and PropertyValue.
|
||||||
|
*
|
||||||
|
* Bolt encoding is used both for streaming data to network clients and for
|
||||||
|
* database durability.
|
||||||
|
*
|
||||||
|
* @tparam Buffer the output buffer that should be used
|
||||||
|
*/
|
||||||
|
template <typename Buffer>
|
||||||
|
class PrimitiveEncoder {
|
||||||
|
public:
|
||||||
|
PrimitiveEncoder(Buffer &buffer) : buffer_(buffer) {}
|
||||||
|
|
||||||
|
void WriteRAW(const uint8_t *data, uint64_t len) { buffer_.Write(data, len); }
|
||||||
|
|
||||||
|
void WriteRAW(const char *data, uint64_t len) {
|
||||||
|
WriteRAW((const uint8_t *)data, len);
|
||||||
|
}
|
||||||
|
|
||||||
|
void WriteRAW(const uint8_t data) { WriteRAW(&data, 1); }
|
||||||
|
|
||||||
|
template <class T>
|
||||||
|
void WriteValue(T value) {
|
||||||
|
value = bswap(value);
|
||||||
|
WriteRAW(reinterpret_cast<const uint8_t *>(&value), sizeof(value));
|
||||||
|
}
|
||||||
|
|
||||||
|
void WriteNull() { WriteRAW(underlying_cast(Marker::Null)); }
|
||||||
|
|
||||||
|
void WriteBool(const bool &value) {
|
||||||
|
if (value)
|
||||||
|
WriteRAW(underlying_cast(Marker::True));
|
||||||
|
else
|
||||||
|
WriteRAW(underlying_cast(Marker::False));
|
||||||
|
}
|
||||||
|
|
||||||
|
void WriteInt(const int64_t &value) {
|
||||||
|
if (value >= -16L && value < 128L) {
|
||||||
|
WriteRAW(static_cast<uint8_t>(value));
|
||||||
|
} else if (value >= -128L && value < -16L) {
|
||||||
|
WriteRAW(underlying_cast(Marker::Int8));
|
||||||
|
WriteRAW(static_cast<uint8_t>(value));
|
||||||
|
} else if (value >= -32768L && value < 32768L) {
|
||||||
|
WriteRAW(underlying_cast(Marker::Int16));
|
||||||
|
WriteValue(static_cast<int16_t>(value));
|
||||||
|
} else if (value >= -2147483648L && value < 2147483648L) {
|
||||||
|
WriteRAW(underlying_cast(Marker::Int32));
|
||||||
|
WriteValue(static_cast<int32_t>(value));
|
||||||
|
} else {
|
||||||
|
WriteRAW(underlying_cast(Marker::Int64));
|
||||||
|
WriteValue(value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void WriteDouble(const double &value) {
|
||||||
|
WriteRAW(underlying_cast(Marker::Float64));
|
||||||
|
WriteValue(*reinterpret_cast<const int64_t *>(&value));
|
||||||
|
}
|
||||||
|
|
||||||
|
void WriteTypeSize(const size_t size, const uint8_t typ) {
|
||||||
|
if (size <= 15) {
|
||||||
|
uint8_t len = size;
|
||||||
|
len &= 0x0F;
|
||||||
|
WriteRAW(underlying_cast(MarkerTiny[typ]) + len);
|
||||||
|
} else if (size <= 255) {
|
||||||
|
uint8_t len = size;
|
||||||
|
WriteRAW(underlying_cast(Marker8[typ]));
|
||||||
|
WriteRAW(len);
|
||||||
|
} else if (size <= 65535) {
|
||||||
|
uint16_t len = size;
|
||||||
|
WriteRAW(underlying_cast(Marker16[typ]));
|
||||||
|
WriteValue(len);
|
||||||
|
} else {
|
||||||
|
uint32_t len = size;
|
||||||
|
WriteRAW(underlying_cast(Marker32[typ]));
|
||||||
|
WriteValue(len);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void WriteString(const std::string &value) {
|
||||||
|
WriteTypeSize(value.size(), MarkerString);
|
||||||
|
WriteRAW(value.c_str(), value.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
void WritePropertyValue(const PropertyValue &value) {
|
||||||
|
auto write_list = [this](const std::vector<PropertyValue> &value) {
|
||||||
|
WriteTypeSize(value.size(), MarkerList);
|
||||||
|
for (auto &x : value) WritePropertyValue(x);
|
||||||
|
};
|
||||||
|
|
||||||
|
auto write_map = [this](const std::map<std::string, PropertyValue> &value) {
|
||||||
|
WriteTypeSize(value.size(), MarkerMap);
|
||||||
|
for (auto &x : value) {
|
||||||
|
WriteString(x.first);
|
||||||
|
WritePropertyValue(x.second);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
switch (value.type()) {
|
||||||
|
case PropertyValue::Type::Null:
|
||||||
|
WriteNull();
|
||||||
|
break;
|
||||||
|
case PropertyValue::Type::Bool:
|
||||||
|
WriteBool(value.Value<bool>());
|
||||||
|
break;
|
||||||
|
case PropertyValue::Type::Int:
|
||||||
|
WriteInt(value.Value<int64_t>());
|
||||||
|
break;
|
||||||
|
case PropertyValue::Type::Double:
|
||||||
|
WriteDouble(value.Value<double>());
|
||||||
|
break;
|
||||||
|
case PropertyValue::Type::String:
|
||||||
|
WriteString(value.Value<std::string>());
|
||||||
|
break;
|
||||||
|
case PropertyValue::Type::List:
|
||||||
|
write_list(value.Value<std::vector<PropertyValue>>());
|
||||||
|
break;
|
||||||
|
case PropertyValue::Type::Map:
|
||||||
|
write_map(value.Value<std::map<std::string, PropertyValue>>());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected:
|
||||||
|
Buffer &buffer_;
|
||||||
|
};
|
||||||
|
} // namespace communication::bolt
|
Loading…
Reference in New Issue
Block a user