From 3d2b37ed35c66a0c487c817b865b424cf1edbf1c Mon Sep 17 00:00:00 2001 From: Matej Ferencevic Date: Thu, 30 Mar 2017 09:59:23 +0200 Subject: [PATCH] Split bolt encoder into two. Summary: Added temporary_id to record_accessor. Added ID encoding to encoder. Reviewers: florijan Reviewed By: florijan Subscribers: pullbot, matej.gradicek Differential Revision: https://phabricator.memgraph.io/D197 --- .../bolt/v1/encoder/base_encoder.hpp | 259 ++++++++++++++++++ src/communication/bolt/v1/encoder/encoder.hpp | 218 +-------------- src/storage/record_accessor.cpp | 5 + src/storage/record_accessor.hpp | 16 ++ 4 files changed, 292 insertions(+), 206 deletions(-) create mode 100644 src/communication/bolt/v1/encoder/base_encoder.hpp diff --git a/src/communication/bolt/v1/encoder/base_encoder.hpp b/src/communication/bolt/v1/encoder/base_encoder.hpp new file mode 100644 index 000000000..eefe45962 --- /dev/null +++ b/src/communication/bolt/v1/encoder/base_encoder.hpp @@ -0,0 +1,259 @@ +#pragma once + +#include "database/graph_db_accessor.hpp" +#include "logging/default.hpp" +#include "logging/logger.hpp" +#include "query/backend/cpp/typed_value.hpp" +#include "utils/bswap.hpp" + +#include + +namespace communication::bolt { + +static constexpr uint8_t TSTRING = 0, TLIST = 1, TMAP = 2; +static constexpr uint8_t type_tiny_marker[3] = {0x80, 0x90, 0xA0}; +static constexpr uint8_t type_8_marker[3] = {0xD0, 0xD4, 0xD8}; +static constexpr uint8_t type_16_marker[3] = {0xD1, 0xD5, 0xD9}; +static constexpr uint8_t type_32_marker[3] = {0xD2, 0xD6, 0xDA}; + +/** + * Bolt BaseEncoder. + * Has public interfaces for writing Bolt encoded data. + * Supported types are: Null, Bool, Int, Double, + * String, List, Map, Vertex, Edge + * + * This class has a dual purpose. The first is streaming of bolt data to + * network clients. The second is streaming to disk in the database snapshotter. + * The two usages are changed depending on the encode_ids boolean flag. + * If encode_ids is set to false (it's default value) then the encoder encodes + * normal bolt network client data. If encode_ids is set to true then the + * encoder encodes data for the database snapshotter. + * In the normal mode (encode_ids == false) the encoder doesn't output object + * IDs but instead it outputs fixed zeros. In the snapshotter mode it outputs + * temporary IDs obtained from the objects. + * + * Also note that currently expansion across the graph is not allowed during + * streaming, if an update has happened in the same transaction command. + * Attempting to do so crashes the DB. That's another reason why we encode + * IDs (expansion from edges) only in the snapshotter. + * + * @tparam Buffer the output buffer that should be used + */ +template +class BaseEncoder : public Loggable { + public: + BaseEncoder(Buffer &buffer, bool encode_ids = false) + : Loggable("communication::bolt::BaseEncoder"), + buffer_(buffer), + encode_ids_(encode_ids) {} + + void WriteRAW(const uint8_t *data, uint64_t len) { buffer_.Write(data, len); } + + void WriteRAW(const char *data, uint64_t len) { + WriteRAW((const uint8_t *)data, len); + } + + void WriteRAW(const uint8_t data) { WriteRAW(&data, 1); } + + template + void WriteValue(T value) { + value = bswap(value); + WriteRAW(reinterpret_cast(&value), sizeof(value)); + } + + void WriteNull() { + // 0xC0 = null marker + WriteRAW(0xC0); + } + + void WriteBool(const bool &value) { + if (value) { + // 0xC3 = true marker + WriteRAW(0xC3); + } else { + // 0xC2 = false marker + WriteRAW(0xC2); + } + } + + void WriteInt(const int64_t &value) { + if (value >= -16L && value < 128L) { + WriteRAW(static_cast(value)); + } else if (value >= -128L && value < -16L) { + // 0xC8 = int8 marker + WriteRAW(0xC8); + WriteRAW(static_cast(value)); + } else if (value >= -32768L && value < 32768L) { + // 0xC9 = int16 marker + WriteRAW(0xC9); + WriteValue(static_cast(value)); + } else if (value >= -2147483648L && value < 2147483648L) { + // 0xCA = int32 marker + WriteRAW(0xCA); + WriteValue(static_cast(value)); + } else { + // 0xCB = int64 marker + WriteRAW(0xCB); + WriteValue(value); + } + } + + void WriteDouble(const double &value) { + // 0xC1 = float64 marker + WriteRAW(0xC1); + WriteValue(*reinterpret_cast(&value)); + } + + void WriteTypeSize(const size_t size, const uint8_t typ) { + if (size <= 15) { + uint8_t len = size; + len &= 0x0F; + // tiny marker (+len) + WriteRAW(type_tiny_marker[typ] + len); + } else if (size <= 255) { + uint8_t len = size; + // 8 marker + WriteRAW(type_8_marker[typ]); + WriteRAW(len); + } else if (size <= 65536) { + uint16_t len = size; + // 16 marker + WriteRAW(type_16_marker[typ]); + WriteValue(len); + } else { + uint32_t len = size; + // 32 marker + WriteRAW(type_32_marker[typ]); + WriteValue(len); + } + } + + void WriteString(const std::string &value) { + WriteTypeSize(value.size(), TSTRING); + WriteRAW(value.c_str(), value.size()); + } + + void WriteList(const std::vector &value) { + WriteTypeSize(value.size(), TLIST); + for (auto &x : value) WriteTypedValue(x); + } + + void WriteMap(const std::map &value) { + WriteTypeSize(value.size(), TMAP); + for (auto &x : value) { + WriteString(x.first); + WriteTypedValue(x.second); + } + } + + void WriteVertex(const VertexAccessor &vertex) { + // 0xB3 = struct 3; 0x4E = vertex signature + WriteRAW("\xB3\x4E", 2); + + if (encode_ids_) { + // IMPORTANT: this is used only in the database snapshotter! + WriteUInt(vertex.temporary_id()); + } else { + // IMPORTANT: here we write a hardcoded 0 because we don't + // use internal IDs, but need to give something to Bolt + // note that OpenCypher has no id(x) function, so the client + // should not be able to do anything with this value anyway + WriteInt(0); + } + + // write labels + const auto &labels = vertex.labels(); + WriteTypeSize(labels.size(), TLIST); + for (const auto &label : labels) + WriteString(vertex.db_accessor().label_name(label)); + + // write properties + const auto &props = vertex.Properties(); + WriteTypeSize(props.size(), TMAP); + for (const auto &prop : props) { + WriteString(vertex.db_accessor().property_name(prop.first)); + WriteTypedValue(prop.second); + } + } + + void WriteEdge(const EdgeAccessor &edge) { + // 0xB5 = struct 5; 0x52 = edge signature + WriteRAW("\xB5\x52", 2); + + if (encode_ids_) { + // IMPORTANT: this is used only in the database snapshotter! + WriteUInt(edge.temporary_id()); + WriteUInt(edge.from().temporary_id()); + WriteUInt(edge.to().temporary_id()); + } else { + // IMPORTANT: here we write a hardcoded 0 because we don't + // use internal IDs, but need to give something to Bolt + // note that OpenCypher has no id(x) function, so the client + // should not be able to do anything with this value anyway + WriteInt(0); + WriteInt(0); + WriteInt(0); + } + + // write type + WriteString(edge.db_accessor().edge_type_name(edge.edge_type())); + + // write properties + const auto &props = edge.Properties(); + WriteTypeSize(props.size(), TMAP); + for (const auto &prop : props) { + WriteString(edge.db_accessor().property_name(prop.first)); + WriteTypedValue(prop.second); + } + } + + void WritePath() { + // TODO: this isn't implemented in the backend! + } + + void WriteTypedValue(const TypedValue &value) { + switch (value.type()) { + case TypedValue::Type::Null: + WriteNull(); + break; + case TypedValue::Type::Bool: + WriteBool(value.Value()); + break; + case TypedValue::Type::Int: + WriteInt(value.Value()); + break; + case TypedValue::Type::Double: + WriteDouble(value.Value()); + break; + case TypedValue::Type::String: + WriteString(value.Value()); + break; + case TypedValue::Type::List: + WriteList(value.Value>()); + break; + case TypedValue::Type::Map: + WriteMap(value.Value>()); + break; + case TypedValue::Type::Vertex: + WriteVertex(value.Value()); + break; + case TypedValue::Type::Edge: + WriteEdge(value.Value()); + break; + case TypedValue::Type::Path: + // TODO: this is not implemeted yet! + WritePath(); + break; + } + } + + protected: + Buffer &buffer_; + bool encode_ids_; + + private: + void WriteUInt(const uint64_t &value) { + WriteInt(*reinterpret_cast(&value)); + } +}; +} diff --git a/src/communication/bolt/v1/encoder/encoder.hpp b/src/communication/bolt/v1/encoder/encoder.hpp index 98a7d6fc7..e2ff18623 100644 --- a/src/communication/bolt/v1/encoder/encoder.hpp +++ b/src/communication/bolt/v1/encoder/encoder.hpp @@ -1,21 +1,9 @@ #pragma once -#include "database/graph_db_accessor.hpp" -#include "logging/default.hpp" -#include "logging/logger.hpp" -#include "query/backend/cpp/typed_value.hpp" -#include "utils/bswap.hpp" - -#include +#include "communication/bolt/v1/encoder/base_encoder.hpp" namespace communication::bolt { -static constexpr uint8_t TSTRING = 0, TLIST = 1, TMAP = 2; -static constexpr uint8_t type_tiny_marker[3] = {0x80, 0x90, 0xA0}; -static constexpr uint8_t type_8_marker[3] = {0xD0, 0xD4, 0xD8}; -static constexpr uint8_t type_16_marker[3] = {0xD1, 0xD5, 0xD9}; -static constexpr uint8_t type_32_marker[3] = {0xD2, 0xD6, 0xDA}; - /** * Bolt Encoder. * Has public interfaces for writing Bolt specific response messages. @@ -24,10 +12,19 @@ static constexpr uint8_t type_32_marker[3] = {0xD2, 0xD6, 0xDA}; * @tparam Buffer the output buffer that should be used */ template -class Encoder : public Loggable { +class Encoder : private BaseEncoder { + private: + using Loggable::logger; + using BaseEncoder::WriteRAW; + using BaseEncoder::WriteList; + using BaseEncoder::WriteMap; + using BaseEncoder::buffer_; + public: Encoder(Buffer &buffer) - : Loggable("communication::bolt::Encoder"), buffer_(buffer) {} + : BaseEncoder(buffer) { + logger = logging::log->logger("communication::bolt::Encoder"); + } /** * Writes a Record message. This method only stores data in the Buffer. @@ -125,196 +122,5 @@ class Encoder : public Loggable { WriteRAW("\xB0\x7E", 2); buffer_.Flush(); } - - private: - Buffer &buffer_; - - void WriteRAW(const uint8_t *data, uint64_t len) { buffer_.Write(data, len); } - - void WriteRAW(const char *data, uint64_t len) { - WriteRAW((const uint8_t *)data, len); - } - - void WriteRAW(const uint8_t data) { WriteRAW(&data, 1); } - - template - void WriteValue(T value) { - value = bswap(value); - WriteRAW(reinterpret_cast(&value), sizeof(value)); - } - - void WriteNull() { - // 0xC0 = null marker - WriteRAW(0xC0); - } - - void WriteBool(const bool &value) { - if (value) { - // 0xC3 = true marker - WriteRAW(0xC3); - } else { - // 0xC2 = false marker - WriteRAW(0xC2); - } - } - - void WriteInt(const int64_t &value) { - if (value >= -16L && value < 128L) { - WriteRAW(static_cast(value)); - } else if (value >= -128L && value < -16L) { - // 0xC8 = int8 marker - WriteRAW(0xC8); - WriteRAW(static_cast(value)); - } else if (value >= -32768L && value < 32768L) { - // 0xC9 = int16 marker - WriteRAW(0xC9); - WriteValue(static_cast(value)); - } else if (value >= -2147483648L && value < 2147483648L) { - // 0xCA = int32 marker - WriteRAW(0xCA); - WriteValue(static_cast(value)); - } else { - // 0xCB = int64 marker - WriteRAW(0xCB); - WriteValue(value); - } - } - - void WriteDouble(const double &value) { - // 0xC1 = float64 marker - WriteRAW(0xC1); - WriteValue(*reinterpret_cast(&value)); - } - - void WriteTypeSize(const size_t size, const uint8_t typ) { - if (size <= 15) { - uint8_t len = size; - len &= 0x0F; - // tiny marker (+len) - WriteRAW(type_tiny_marker[typ] + len); - } else if (size <= 255) { - uint8_t len = size; - // 8 marker - WriteRAW(type_8_marker[typ]); - WriteRAW(len); - } else if (size <= 65536) { - uint16_t len = size; - // 16 marker - WriteRAW(type_16_marker[typ]); - WriteValue(len); - } else { - uint32_t len = size; - // 32 marker - WriteRAW(type_32_marker[typ]); - WriteValue(len); - } - } - - void WriteString(const std::string &value) { - WriteTypeSize(value.size(), TSTRING); - WriteRAW(value.c_str(), value.size()); - } - - void WriteList(const std::vector &value) { - WriteTypeSize(value.size(), TLIST); - for (auto &x : value) WriteTypedValue(x); - } - - void WriteMap(const std::map &value) { - WriteTypeSize(value.size(), TMAP); - for (auto &x : value) { - WriteString(x.first); - WriteTypedValue(x.second); - } - } - - void WriteVertex(const VertexAccessor &vertex) { - // 0xB3 = struct 3; 0x4E = vertex signature - WriteRAW("\xB3\x4E", 2); - - // IMPORTANT: here we write a hardcoded 0 because we don't - // use internal IDs, but need to give something to Bolt - // note that OpenCypher has no id(x) function, so the client - // should not be able to do anything with this value anyway - WriteInt(0); - - // write labels - const auto &labels = vertex.labels(); - WriteTypeSize(labels.size(), TLIST); - for (const auto &label : labels) - WriteString(vertex.db_accessor().label_name(label)); - - // write properties - const auto &props = vertex.Properties(); - WriteTypeSize(props.size(), TMAP); - for (const auto &prop : props) { - WriteString(vertex.db_accessor().property_name(prop.first)); - WriteTypedValue(prop.second); - } - } - - void WriteEdge(const EdgeAccessor &edge) { - // 0xB5 = struct 5; 0x52 = edge signature - WriteRAW("\xB5\x52", 2); - - // IMPORTANT: here we write a hardcoded 0 because we don't - // use internal IDs, but need to give something to Bolt - // note that OpenCypher has no id(x) function, so the client - // should not be able to do anything with this value anyway - WriteInt(0); - WriteInt(0); - WriteInt(0); - - // write type - WriteString(edge.db_accessor().edge_type_name(edge.edge_type())); - - // write properties - const auto &props = edge.Properties(); - WriteTypeSize(props.size(), TMAP); - for (const auto &prop : props) { - WriteString(edge.db_accessor().property_name(prop.first)); - WriteTypedValue(prop.second); - } - } - - void WritePath() { - // TODO: this isn't implemented in the backend! - } - - void WriteTypedValue(const TypedValue &value) { - switch (value.type()) { - case TypedValue::Type::Null: - WriteNull(); - break; - case TypedValue::Type::Bool: - WriteBool(value.Value()); - break; - case TypedValue::Type::Int: - WriteInt(value.Value()); - break; - case TypedValue::Type::Double: - WriteDouble(value.Value()); - break; - case TypedValue::Type::String: - WriteString(value.Value()); - break; - case TypedValue::Type::List: - WriteList(value.Value>()); - break; - case TypedValue::Type::Map: - WriteMap(value.Value>()); - break; - case TypedValue::Type::Vertex: - WriteVertex(value.Value()); - break; - case TypedValue::Type::Edge: - WriteEdge(value.Value()); - break; - case TypedValue::Type::Path: - // TODO: this is not implemeted yet! - WritePath(); - break; - } - } }; } diff --git a/src/storage/record_accessor.cpp b/src/storage/record_accessor.cpp index 9c5a7735c..7c9cdcb6b 100644 --- a/src/storage/record_accessor.cpp +++ b/src/storage/record_accessor.cpp @@ -56,6 +56,11 @@ GraphDbAccessor &RecordAccessor::db_accessor() const { return *db_accessor_; } +template +const uint64_t RecordAccessor::temporary_id() const { + return (uint64_t) vlist_; +} + template TRecord &RecordAccessor::update() { db_accessor().update(*this); diff --git a/src/storage/record_accessor.hpp b/src/storage/record_accessor.hpp index 044b8a202..661e7706a 100644 --- a/src/storage/record_accessor.hpp +++ b/src/storage/record_accessor.hpp @@ -132,6 +132,22 @@ class RecordAccessor { */ GraphDbAccessor& db_accessor() const; + /** + * Returns a temporary ID of the record stored in this accessor. + * + * This function returns a number that represents the current memory + * location where the record is stored. That number is used only as an + * identification for the database snapshotter. The snapshotter needs an + * ID so that when the database is saved to disk that it can be successfully + * reconstructed. + * IMPORTANT: The ID is valid for identifying graph elements observed in + * the same transaction. It is not valid for comparing graph elements + * observed in different transactions. + * + * @return See above. + */ + const uint64_t temporary_id() const; + protected: /** * Returns the update-ready version of the record.