diff --git a/src/communication/bolt/v1/codes.hpp b/src/communication/bolt/v1/codes.hpp new file mode 100644 index 000000000..bd1fa88d3 --- /dev/null +++ b/src/communication/bolt/v1/codes.hpp @@ -0,0 +1,67 @@ +#pragma once + +#include +#include "utils/underlying_cast.hpp" + +namespace communication::bolt { + +enum class Signature : uint8_t { + Init = 0x01, + AckFailure = 0x0E, + Reset = 0x0F, + + Run = 0x10, + DiscardAll = 0x2F, + PullAll = 0x3F, + + Record = 0x71, + Success = 0x70, + Ignored = 0x7E, + Failure = 0x7F, + + Node = 0x4E, + Relationship = 0x52, + Path = 0x50, + UnboundRelationship = 0x72, +}; + +enum class Marker : uint8_t { + TinyString = 0x80, + TinyList = 0x90, + TinyMap = 0xA0, + TinyStruct = 0xB0, + + Null = 0xC0, + Float64 = 0xC1, + + False = 0xC2, + True = 0xC3, + + Int8 = 0xC8, + Int16 = 0xC9, + Int32 = 0xCA, + Int64 = 0xCB, + + String8 = 0xD0, + String16 = 0xD1, + String32 = 0xD2, + + List8 = 0xD4, + List16 = 0xD5, + List32 = 0xD6, + + Map8 = 0xD8, + Map16 = 0xD9, + Map32 = 0xDA, + + Struct8 = 0xDC, + Struct16 = 0xDD, +}; + +static constexpr uint8_t MarkerString = 0, MarkerList = 1, MarkerMap = 2; +static constexpr Marker MarkerTiny[3] = {Marker::TinyString, Marker::TinyList, Marker::TinyMap}; +static constexpr Marker Marker8[3] = {Marker::String8, Marker::List8, Marker::Map8}; +static constexpr Marker Marker16[3] = {Marker::String16, Marker::List16, Marker::Map16}; +static constexpr Marker Marker32[3] = {Marker::String32, Marker::List32, Marker::Map32}; + +} diff --git a/src/communication/bolt/v1/constants.hpp b/src/communication/bolt/v1/constants.hpp new file mode 100644 index 000000000..866de247a --- /dev/null +++ b/src/communication/bolt/v1/constants.hpp @@ -0,0 +1,13 @@ +#pragma once + +namespace communication::bolt { + +/** + * Sizes related to the chunk defined in Bolt protocol. + */ +static constexpr size_t CHUNK_HEADER_SIZE = 2; +static constexpr size_t MAX_CHUNK_SIZE = 65535; +static constexpr size_t CHUNK_END_MARKER_SIZE = 2; +static constexpr size_t WHOLE_CHUNK_SIZE = + CHUNK_HEADER_SIZE + MAX_CHUNK_SIZE + CHUNK_END_MARKER_SIZE; +} diff --git a/src/communication/bolt/v1/decoder/buffer.hpp b/src/communication/bolt/v1/decoder/buffer.hpp new file mode 100644 index 000000000..90b70c4f4 --- /dev/null +++ b/src/communication/bolt/v1/decoder/buffer.hpp @@ -0,0 +1,92 @@ +#pragma once + +#include +#include +#include +#include + +#include "communication/bolt/v1/constants.hpp" +#include "io/network/stream_buffer.hpp" +#include "logging/loggable.hpp" +#include "utils/assert.hpp" +#include "utils/bswap.hpp" + +namespace communication::bolt { + +/** + * @brief Buffer + * + * Has methods for writing and reading raw data. + * + * Allocating, writing and written stores data in the buffer. The stored + * data can then be read using the pointer returned with the data function. + * The current implementation stores data in a single fixed length buffer. + */ +class Buffer : public Loggable { + private: + using StreamBufferT = io::network::StreamBuffer; + + public: + Buffer() : Loggable("Buffer") {} + + /** + * Allocates a new StreamBuffer from the internal buffer. + * This function returns a pointer to the first currently free memory + * location in the internal buffer. Also, it returns the size of the + * available memory. + */ + StreamBufferT Allocate() { + return StreamBufferT{&data_[size_], WHOLE_CHUNK_SIZE - size_}; + } + + /** + * This method is used to notify the buffer that the data has been written. + * To write data to this buffer you should do this: + * Call Allocate(), then write to the returned data pointer. + * IMPORTANT: Don't write more data then the returned size, you will cause + * a memory overflow. Then call Written(size) with the length of data that + * you have written into the buffer. + * + * @param len the size of data that has been written into the buffer + */ + void Written(size_t len) { + size_ += len; + debug_assert(size_ <= WHOLE_CHUNK_SIZE, "Written more than storage has space!"); + } + + /** + * This method shifts the available data for len. It is used when you read + * some data from the buffer and you want to remove it from the buffer. + * + * @param len the length of data that has to be removed from the start of + * the buffer + */ + void Shift(size_t len) { + debug_assert(len <= size_, "Tried to shift more data than the buffer has!"); + memmove(data_, data_ + len, size_ - len); + size_ -= len; + } + + /** + * This method clears the buffer. + */ + void Clear() { + size_ = 0; + } + + /** + * This function returns a pointer to the internal buffer. It is used for + * reading data from the buffer. + */ + uint8_t *data() { return data_; } + + /** + * This function returns the size of available data for reading. + */ + size_t size() { return size_; } + + private: + uint8_t data_[WHOLE_CHUNK_SIZE]; + size_t size_{0}; +}; +} diff --git a/src/communication/bolt/v1/decoder/chunked_decoder_buffer.hpp b/src/communication/bolt/v1/decoder/chunked_decoder_buffer.hpp new file mode 100644 index 000000000..460e08291 --- /dev/null +++ b/src/communication/bolt/v1/decoder/chunked_decoder_buffer.hpp @@ -0,0 +1,95 @@ +#pragma once + +#include +#include +#include +#include + +#include "communication/bolt/v1/constants.hpp" +#include "communication/bolt/v1/decoder/buffer.hpp" +#include "logging/loggable.hpp" +#include "utils/assert.hpp" + +namespace communication::bolt { + +/** + * @brief ChunkedDecoderBuffer + * + * Has methods for getting chunks and reading their data. + * + * Getting a chunk copies the chunk into the internal buffer from which + * the data can then be read. While getting a chunk the buffer checks the + * chunk for validity and then copies only data from the chunk. The headers + * aren't copied so that the decoder can read only the raw encoded data. + */ +class ChunkedDecoderBuffer : public Loggable { + private: + using StreamBufferT = io::network::StreamBuffer; + + public: + ChunkedDecoderBuffer(Buffer &buffer) : Loggable("ChunkedDecoderBuffer"), buffer_(buffer) {} + + /** + * Reads data from the internal buffer. + * + * @param data a pointer to where the data should be read + * @param len the length of data that should be read + * @returns true if exactly len of data was copied into data, + * false otherwise + */ + bool Read(uint8_t *data, size_t len) { + if (len > size_ - pos_) return false; + memcpy(data, &data_[pos_], len); + pos_ += len; + return true; + } + + /** + * Gets a chunk from the underlying raw data buffer. + * When getting a chunk this function validates the chunk. + * If the chunk isn't yet finished the function just returns false. + * If the chunk is finished (all data has been read) and the chunk isn't + * valid, then the function automatically deletes the invalid chunk + * from the underlying buffer and returns false. + * + * @returns true if a chunk was successfully copied into the internal + * buffer, false otherwise + */ + bool GetChunk() { + uint8_t *data = buffer_.data(); + size_t size = buffer_.size(); + if (size < 2) { + logger.trace("Size < 2"); + return false; + } + + size_t chunk_size = data[0]; + chunk_size <<= 8; + chunk_size += data[1]; + if (size < chunk_size + 4) { + logger.trace("Chunk size is {} but only have {} data bytes.", chunk_size, size); + return false; + } + + if (data[chunk_size + 2] != 0 || data[chunk_size + 3] != 0) { + logger.trace("Invalid chunk!"); + buffer_.Shift(chunk_size + 4); + // TODO: raise an exception! + return false; + } + + pos_ = 0; + size_ = chunk_size; + memcpy(data_, data + 2, size - 4); + buffer_.Shift(chunk_size + 4); + + return true; + } + + private: + Buffer &buffer_; + uint8_t data_[MAX_CHUNK_SIZE]; + size_t size_{0}; + size_t pos_{0}; +}; +} diff --git a/src/communication/bolt/v1/decoder/decoder.hpp b/src/communication/bolt/v1/decoder/decoder.hpp new file mode 100644 index 000000000..62ff42555 --- /dev/null +++ b/src/communication/bolt/v1/decoder/decoder.hpp @@ -0,0 +1,474 @@ +#pragma once + +#include "communication/bolt/v1/codes.hpp" +#include "database/graph_db_accessor.hpp" +#include "logging/default.hpp" +#include "logging/logger.hpp" +#include "query/backend/cpp/typed_value.hpp" +#include "utils/bswap.hpp" +#include "utils/underlying_cast.hpp" + +#include + +namespace communication::bolt { + +/** + * Structure used when reading a Vertex with the decoder. + * The decoder writes data into this structure. + */ +struct DecodedVertex { + int64_t id; + std::vector labels; + std::map properties; +}; + +/** + * Structure used when reading an Edge with the decoder. + * The decoder writes data into this structure. + */ +struct DecodedEdge { + int64_t id; + int64_t from; + int64_t to; + std::string type; + std::map properties; +}; + +/** + * Bolt Decoder. + * Has public interfaces for reading Bolt encoded data. + * Supports reading: TypedValue (without Vertex, Edge and Path), + * Vertex, Edge + * + * @tparam Buffer the input buffer that should be used + */ +template +class Decoder : public Loggable { + public: + Decoder(Buffer &buffer) + : Loggable("communication::bolt::Decoder"), + buffer_(buffer) {} + + /** + * Reads a TypedValue from the available data in the buffer. + * This function tries to read a TypedValue from the available data. + * + * @param data pointer to a TypedValue where the read data should be stored + * @returns true if data has been written to the data pointer, + * false otherwise + */ + bool ReadTypedValue(TypedValue *data) { + uint8_t value; + + logger.trace("[ReadTypedValue] Start"); + + if (!buffer_.Read(&value, 1)) { + logger.debug("[ReadTypedValue] Marker data missing!"); + return false; + } + + Marker marker = (Marker)value; + + switch (marker) { + case Marker::Null: + return ReadNull(marker, data); + + case Marker::True: + case Marker::False: + return ReadBool(marker, data); + + case Marker::Int8: + case Marker::Int16: + case Marker::Int32: + case Marker::Int64: + return ReadInt(marker, data); + + case Marker::Float64: + return ReadDouble(marker, data); + + case Marker::String8: + case Marker::String16: + case Marker::String32: + return ReadString(marker, data); + + case Marker::List8: + case Marker::List16: + case Marker::List32: + return ReadList(marker, data); + + case Marker::Map8: + case Marker::Map16: + case Marker::Map32: + return ReadMap(marker, data); + + default: + if ((value & 0xF0) == underlying_cast(Marker::TinyString)) { + return ReadString(marker, data); + } else if ((value & 0xF0) == underlying_cast(Marker::TinyList)) { + return ReadList(marker, data); + } else if ((value & 0xF0) == underlying_cast(Marker::TinyMap)) { + return ReadMap(marker, data); + } else { + return ReadInt(marker, data); + } + break; + } + } + + /** + * Reads a TypedValue from the available data in the buffer and checks + * whether the read data type matches the supplied data type. + * + * @param data pointer to a TypedValue where the read data should be stored + * @param type the expected type that should be read + * @returns true if data has been written to the data pointer and the type + * matches the expected type, false otherwise + */ + bool ReadTypedValue(TypedValue *data, TypedValue::Type type) { + if (!ReadTypedValue(data)) { + logger.debug("[ReadTypedValue] ReadTypedValue call failed!"); + return false; + } + if (data->type() != type) { + logger.debug("[ReadTypedValue] Typed value has wrong type!"); + return false; + } + return true; + } + + /** + * Reads a Vertex from the available data in the buffer. + * This function tries to read a Vertex from the available data. + * + * @param data pointer to a DecodedVertex where the data should be stored + * @returns true if data has been written into the data pointer, + * false otherwise + */ + bool ReadVertex(DecodedVertex *data) { + uint8_t value[2]; + TypedValue tv; + + logger.trace("[ReadVertex] Start"); + + if (!buffer_.Read(value, 2)) { + logger.debug("[ReadVertex] Missing marker and/or signature data!"); + return false; + } + + // check header + if (value[0] != underlying_cast(Marker::TinyStruct) + 3) { + logger.debug("[ReadVertex] Received invalid marker ({})!", value[0]); + return false; + } + if (value[1] != underlying_cast(Signature::Node)) { + logger.debug("[ReadVertex] Received invalid signature ({})!", value[1]); + return false; + } + + // read ID + if (!ReadTypedValue(&tv, TypedValue::Type::Int)) { + logger.debug("[ReadVertex] Couldn't read ID!"); + return false; + } + data->id = tv.Value(); + + // read labels + if (!ReadTypedValue(&tv, TypedValue::Type::List)) { + logger.debug("[ReadVertex] Couldn't read labels!"); + return false; + } + std::vector &labels = tv.Value>(); + data->labels.resize(labels.size()); + for (size_t i = 0; i < labels.size(); ++i) { + if (labels[i].type() != TypedValue::Type::String) { + logger.debug("[ReadVertex] Label has wrong type!"); + return false; + } + data->labels[i] = labels[i].Value(); + } + + // read properties + if (!ReadTypedValue(&tv, TypedValue::Type::Map)) { + logger.debug("[ReadVertex] Couldn't read properties!"); + return false; + } + data->properties = tv.Value>(); + + logger.trace("[ReadVertex] Success"); + + return true; + } + + /** + * Reads an Edge from the available data in the buffer. + * This function tries to read an Edge from the available data. + * + * @param data pointer to a DecodedEdge where the data should be stored + * @returns true if data has been written into the data pointer, + * false otherwise + */ + bool ReadEdge(DecodedEdge *data) { + uint8_t value[2]; + TypedValue tv; + + logger.trace("[ReadEdge] Start"); + + if (!buffer_.Read(value, 2)) { + logger.debug("[ReadEdge] Missing marker and/or signature data!"); + return false; + } + + // check header + if (value[0] != underlying_cast(Marker::TinyStruct) + 5) { + logger.debug("[ReadEdge] Received invalid marker ({})!", value[0]); + return false; + } + if (value[1] != underlying_cast(Signature::Relationship)) { + logger.debug("[ReadEdge] Received invalid signature ({})!", value[1]); + return false; + } + + // read ID + if (!ReadTypedValue(&tv, TypedValue::Type::Int)) { + logger.debug("[ReadEdge] couldn't read ID!"); + return false; + } + data->id = tv.Value(); + + // read from + if (!ReadTypedValue(&tv, TypedValue::Type::Int)) { + logger.debug("[ReadEdge] Couldn't read from_id!"); + return false; + } + data->from = tv.Value(); + + // read to + if (!ReadTypedValue(&tv, TypedValue::Type::Int)) { + logger.debug("[ReadEdge] Couldn't read to_id!"); + return false; + } + data->to = tv.Value(); + + // read type + if (!ReadTypedValue(&tv, TypedValue::Type::String)) { + logger.debug("[ReadEdge] Couldn't read type!"); + return false; + } + data->type = tv.Value(); + + // read properties + if (!ReadTypedValue(&tv, TypedValue::Type::Map)) { + logger.debug("[ReadEdge] Couldn't read properties!"); + return false; + } + data->properties = tv.Value>(); + + logger.trace("[ReadEdge] Success"); + + return true; + } + + protected: + Buffer &buffer_; + + private: + bool ReadNull(const Marker &marker, TypedValue *data) { + logger.trace("[ReadNull] Start"); + debug_assert(marker == Marker::Null, "Received invalid marker!"); + *data = TypedValue::Null; + logger.trace("[ReadNull] Success"); + return true; + } + + bool ReadBool(const Marker &marker, TypedValue *data) { + logger.trace("[ReadBool] Start"); + debug_assert(marker == Marker::False || marker == Marker::True, + "Received invalid marker!"); + if (marker == Marker::False) { + *data = TypedValue(false); + } else { + *data = TypedValue(true); + } + logger.trace("[ReadBool] Success"); + return true; + } + + bool ReadInt(const Marker &marker, TypedValue *data) { + uint8_t value = underlying_cast(marker); + bool success = true; + int64_t ret; + logger.trace("[ReadInt] Start"); + if (value >= 240 || value <= 127) { + logger.trace("[ReadInt] Found a TinyInt"); + ret = value; + if (value >= 240) ret -= 256; + } else if (marker == Marker::Int8) { + logger.trace("[ReadInt] Found an Int8"); + int8_t tmp; + if (!buffer_.Read(reinterpret_cast(&tmp), sizeof(tmp))) { + logger.debug( "[ReadInt] Int8 missing data!"); + return false; + } + ret = tmp; + } else if (marker == Marker::Int16) { + logger.trace("[ReadInt] Found an Int16"); + int16_t tmp; + if (!buffer_.Read(reinterpret_cast(&tmp), sizeof(tmp))) { + logger.debug( "[ReadInt] Int16 missing data!"); + return false; + } + ret = bswap(tmp); + } else if (marker == Marker::Int32) { + logger.trace("[ReadInt] Found an Int32"); + int32_t tmp; + if (!buffer_.Read(reinterpret_cast(&tmp), sizeof(tmp))) { + logger.debug( "[ReadInt] Int32 missing data!"); + return false; + } + ret = bswap(tmp); + } else if (marker == Marker::Int64) { + logger.trace("[ReadInt] Found an Int64"); + if (!buffer_.Read(reinterpret_cast(&ret), sizeof(ret))) { + logger.debug( "[ReadInt] Int64 missing data!"); + return false; + } + ret = bswap(ret); + } else { + logger.debug("[ReadInt] Received invalid marker ({})!", underlying_cast(marker)); + return false; + } + if (success) { + *data = TypedValue(ret); + logger.trace("[ReadInt] Success"); + } + return success; + } + + bool ReadDouble(const Marker marker, TypedValue *data) { + uint64_t value; + double ret; + logger.trace("[ReadDouble] Start"); + debug_assert(marker == Marker::Float64, "Received invalid marker!"); + if (!buffer_.Read(reinterpret_cast(&value), sizeof(value))) { + logger.debug( "[ReadDouble] Missing data!"); + return false; + } + value = bswap(value); + ret = *reinterpret_cast(&value); + *data = TypedValue(ret); + logger.trace("[ReadDouble] Success"); + return true; + } + + int64_t ReadTypeSize(const Marker &marker, const uint8_t type) { + uint8_t value = underlying_cast(marker); + if ((value & 0xF0) == underlying_cast(MarkerTiny[type])) { + logger.trace("[ReadTypeSize] Found a TinyType"); + return value & 0x0F; + } else if (marker == Marker8[type]) { + logger.trace("[ReadTypeSize] Found a Type8"); + uint8_t tmp; + if (!buffer_.Read(reinterpret_cast(&tmp), sizeof(tmp))) { + logger.debug( "[ReadTypeSize] Type8 missing data!"); + return -1; + } + return tmp; + } else if (marker == Marker16[type]) { + logger.trace("[ReadTypeSize] Found a Type16"); + uint16_t tmp; + if (!buffer_.Read(reinterpret_cast(&tmp), sizeof(tmp))) { + logger.debug( "[ReadTypeSize] Type16 missing data!"); + return -1; + } + tmp = bswap(tmp); + return tmp; + } else if (marker == Marker32[type]) { + logger.trace("[ReadTypeSize] Found a Type32"); + uint32_t tmp; + if (!buffer_.Read(reinterpret_cast(&tmp), sizeof(tmp))) { + logger.debug( "[ReadTypeSize] Type32 missing data!"); + return -1; + } + tmp = bswap(tmp); + return tmp; + } else { + logger.debug("[ReadTypeSize] Received invalid marker ({})!", underlying_cast(marker)); + return -1; + } + } + + bool ReadString(const Marker &marker, TypedValue *data) { + logger.trace("[ReadString] Start"); + auto size = ReadTypeSize(marker, MarkerString); + if (size == -1) { + logger.debug("[ReadString] Couldn't get size!"); + return false; + } + std::unique_ptr ret(new uint8_t[size]); + if (!buffer_.Read(ret.get(), size)) { + logger.debug("[ReadString] Missing data!"); + return false; + } + *data = TypedValue(std::string(reinterpret_cast(ret.get()), size)); + logger.trace("[ReadString] Success"); + return true; + } + + bool ReadList(const Marker &marker, TypedValue *data) { + logger.trace("[ReadList] Start"); + auto size = ReadTypeSize(marker, MarkerList); + if (size == -1) { + logger.debug("[ReadList] Couldn't get size!"); + return false; + } + std::vector ret(size); + for (int64_t i = 0; i < size; ++i) { + if (!ReadTypedValue(&ret[i])) { + logger.debug("[ReadList] Couldn't read element {}", i); + return false; + } + } + *data = TypedValue(ret); + logger.trace("[ReadList] Success"); + return true; + } + + bool ReadMap(const Marker &marker, TypedValue *data) { + logger.trace("[ReadMap] Start"); + auto size = ReadTypeSize(marker, MarkerMap); + if (size == -1) { + logger.debug("[ReadMap] Couldn't get size!"); + return false; + } + + TypedValue tv; + std::string str; + std::map ret; + for (int64_t i = 0; i < size; ++i) { + if (!ReadTypedValue(&tv)) { + logger.debug("[ReadMap] Couldn't read index {}", i); + return false; + } + if (tv.type() != TypedValue::Type::String) { + logger.debug("[ReadMap] Index {} isn't a string!", i); + return false; + } + str = tv.Value(); + + if (!ReadTypedValue(&tv)) { + logger.debug("[ReadMap] Couldn't read element {}", i); + return false; + } + ret.insert(std::make_pair(str, tv)); + } + if (ret.size() != size) { + logger.debug("[ReadMap] The client sent multiple objects with same indexes!"); + return false; + } + + *data = TypedValue(ret); + logger.trace("[ReadMap] Success"); + return true; + } +}; +} diff --git a/src/communication/bolt/v1/encoder/base_encoder.hpp b/src/communication/bolt/v1/encoder/base_encoder.hpp index eefe45962..831916835 100644 --- a/src/communication/bolt/v1/encoder/base_encoder.hpp +++ b/src/communication/bolt/v1/encoder/base_encoder.hpp @@ -1,5 +1,6 @@ #pragma once +#include "communication/bolt/v1/codes.hpp" #include "database/graph_db_accessor.hpp" #include "logging/default.hpp" #include "logging/logger.hpp" @@ -10,12 +11,6 @@ namespace communication::bolt { -static constexpr uint8_t TSTRING = 0, TLIST = 1, TMAP = 2; -static constexpr uint8_t type_tiny_marker[3] = {0x80, 0x90, 0xA0}; -static constexpr uint8_t type_8_marker[3] = {0xD0, 0xD4, 0xD8}; -static constexpr uint8_t type_16_marker[3] = {0xD1, 0xD5, 0xD9}; -static constexpr uint8_t type_32_marker[3] = {0xD2, 0xD6, 0xDA}; - /** * Bolt BaseEncoder. * Has public interfaces for writing Bolt encoded data. @@ -62,45 +57,36 @@ class BaseEncoder : public Loggable { } void WriteNull() { - // 0xC0 = null marker - WriteRAW(0xC0); + WriteRAW(underlying_cast(Marker::Null)); } void WriteBool(const bool &value) { - if (value) { - // 0xC3 = true marker - WriteRAW(0xC3); - } else { - // 0xC2 = false marker - WriteRAW(0xC2); - } + if (value) + WriteRAW(underlying_cast(Marker::True)); + else + WriteRAW(underlying_cast(Marker::False)); } void WriteInt(const int64_t &value) { if (value >= -16L && value < 128L) { WriteRAW(static_cast(value)); } else if (value >= -128L && value < -16L) { - // 0xC8 = int8 marker - WriteRAW(0xC8); + WriteRAW(underlying_cast(Marker::Int8)); WriteRAW(static_cast(value)); } else if (value >= -32768L && value < 32768L) { - // 0xC9 = int16 marker - WriteRAW(0xC9); + WriteRAW(underlying_cast(Marker::Int16)); WriteValue(static_cast(value)); } else if (value >= -2147483648L && value < 2147483648L) { - // 0xCA = int32 marker - WriteRAW(0xCA); + WriteRAW(underlying_cast(Marker::Int32)); WriteValue(static_cast(value)); } else { - // 0xCB = int64 marker - WriteRAW(0xCB); + WriteRAW(underlying_cast(Marker::Int64)); WriteValue(value); } } void WriteDouble(const double &value) { - // 0xC1 = float64 marker - WriteRAW(0xC1); + WriteRAW(underlying_cast(Marker::Float64)); WriteValue(*reinterpret_cast(&value)); } @@ -108,38 +94,34 @@ class BaseEncoder : public Loggable { if (size <= 15) { uint8_t len = size; len &= 0x0F; - // tiny marker (+len) - WriteRAW(type_tiny_marker[typ] + len); + WriteRAW(underlying_cast(MarkerTiny[typ]) + len); } else if (size <= 255) { uint8_t len = size; - // 8 marker - WriteRAW(type_8_marker[typ]); + WriteRAW(underlying_cast(Marker8[typ])); WriteRAW(len); } else if (size <= 65536) { uint16_t len = size; - // 16 marker - WriteRAW(type_16_marker[typ]); + WriteRAW(underlying_cast(Marker16[typ])); WriteValue(len); } else { uint32_t len = size; - // 32 marker - WriteRAW(type_32_marker[typ]); + WriteRAW(underlying_cast(Marker32[typ])); WriteValue(len); } } void WriteString(const std::string &value) { - WriteTypeSize(value.size(), TSTRING); + WriteTypeSize(value.size(), MarkerString); WriteRAW(value.c_str(), value.size()); } void WriteList(const std::vector &value) { - WriteTypeSize(value.size(), TLIST); + WriteTypeSize(value.size(), MarkerList); for (auto &x : value) WriteTypedValue(x); } void WriteMap(const std::map &value) { - WriteTypeSize(value.size(), TMAP); + WriteTypeSize(value.size(), MarkerMap); for (auto &x : value) { WriteString(x.first); WriteTypedValue(x.second); @@ -147,8 +129,8 @@ class BaseEncoder : public Loggable { } void WriteVertex(const VertexAccessor &vertex) { - // 0xB3 = struct 3; 0x4E = vertex signature - WriteRAW("\xB3\x4E", 2); + WriteRAW(underlying_cast(Marker::TinyStruct) + 3); + WriteRAW(underlying_cast(Signature::Node)); if (encode_ids_) { // IMPORTANT: this is used only in the database snapshotter! @@ -163,13 +145,13 @@ class BaseEncoder : public Loggable { // write labels const auto &labels = vertex.labels(); - WriteTypeSize(labels.size(), TLIST); + WriteTypeSize(labels.size(), MarkerList); for (const auto &label : labels) WriteString(vertex.db_accessor().label_name(label)); // write properties const auto &props = vertex.Properties(); - WriteTypeSize(props.size(), TMAP); + WriteTypeSize(props.size(), MarkerMap); for (const auto &prop : props) { WriteString(vertex.db_accessor().property_name(prop.first)); WriteTypedValue(prop.second); @@ -177,8 +159,8 @@ class BaseEncoder : public Loggable { } void WriteEdge(const EdgeAccessor &edge) { - // 0xB5 = struct 5; 0x52 = edge signature - WriteRAW("\xB5\x52", 2); + WriteRAW(underlying_cast(Marker::TinyStruct) + 5); + WriteRAW(underlying_cast(Signature::Relationship)); if (encode_ids_) { // IMPORTANT: this is used only in the database snapshotter! @@ -200,7 +182,7 @@ class BaseEncoder : public Loggable { // write properties const auto &props = edge.Properties(); - WriteTypeSize(props.size(), TMAP); + WriteTypeSize(props.size(), MarkerMap); for (const auto &prop : props) { WriteString(edge.db_accessor().property_name(prop.first)); WriteTypedValue(prop.second); diff --git a/src/communication/bolt/v1/encoder/chunked_buffer.hpp b/src/communication/bolt/v1/encoder/chunked_encoder_buffer.hpp similarity index 89% rename from src/communication/bolt/v1/encoder/chunked_buffer.hpp rename to src/communication/bolt/v1/encoder/chunked_encoder_buffer.hpp index 0f24a329e..a411da2af 100644 --- a/src/communication/bolt/v1/encoder/chunked_buffer.hpp +++ b/src/communication/bolt/v1/encoder/chunked_encoder_buffer.hpp @@ -5,6 +5,7 @@ #include #include +#include "communication/bolt/v1/constants.hpp" #include "logging/loggable.hpp" #include "utils/bswap.hpp" @@ -15,16 +16,7 @@ namespace communication::bolt { // -> test for more TCP packets! /** - * Sizes related to the chunk defined in Bolt protocol. - */ -static constexpr size_t CHUNK_HEADER_SIZE = 2; -static constexpr size_t MAX_CHUNK_SIZE = 65535; -static constexpr size_t CHUNK_END_MARKER_SIZE = 2; -static constexpr size_t WHOLE_CHUNK_SIZE = - CHUNK_HEADER_SIZE + MAX_CHUNK_SIZE + CHUNK_END_MARKER_SIZE; - -/** - * @brief ChunkedBuffer + * @brief ChunkedEncoderBuffer * * Has methods for writing and flushing data into the message buffer. * @@ -44,9 +36,9 @@ static constexpr size_t WHOLE_CHUNK_SIZE = * @tparam Socket the output socket that should be used */ template -class ChunkedBuffer : public Loggable { +class ChunkedEncoderBuffer : public Loggable { public: - ChunkedBuffer(Socket &socket) : Loggable("Chunked Buffer"), socket_(socket) {} + ChunkedEncoderBuffer(Socket &socket) : Loggable("Chunked Encoder Buffer"), socket_(socket) {} /** * Writes n values into the buffer. If n is bigger than whole chunk size diff --git a/src/communication/bolt/v1/encoder/result_stream.hpp b/src/communication/bolt/v1/encoder/result_stream.hpp index f48c0faaa..32b4d411d 100644 --- a/src/communication/bolt/v1/encoder/result_stream.hpp +++ b/src/communication/bolt/v1/encoder/result_stream.hpp @@ -1,6 +1,6 @@ #pragma once -#include "communication/bolt/v1/encoder/chunked_buffer.hpp" +#include "communication/bolt/v1/encoder/chunked_encoder_buffer.hpp" #include "communication/bolt/v1/encoder/encoder.hpp" #include "query/backend/cpp/typed_value.hpp" diff --git a/src/communication/bolt/v1/session.hpp b/src/communication/bolt/v1/session.hpp index 903598efe..d8461ee34 100644 --- a/src/communication/bolt/v1/session.hpp +++ b/src/communication/bolt/v1/session.hpp @@ -31,7 +31,7 @@ template class Session : public Loggable { public: using Decoder = BoltDecoder; - using OutputStream = ResultStream>>; + using OutputStream = ResultStream>>; Session(Socket &&socket, Dbms &dbms, QueryEngine &query_engine) : Loggable("communication::bolt::Session"), @@ -63,7 +63,7 @@ class Session : public Loggable { * @param data pointer on bytes received from a client * @param len length of data received from a client */ - void Execute(const byte *data, size_t len) { + void Execute(const uint8_t *data, size_t len) { // mark the end of the message auto end = data + len; @@ -112,8 +112,8 @@ class Session : public Loggable { Socket socket_; Dbms &dbms_; QueryEngine &query_engine_; - ChunkedBuffer encoder_buffer_; - Encoder> encoder_; + ChunkedEncoderBuffer encoder_buffer_; + Encoder> encoder_; OutputStream output_stream_; Decoder decoder_; io::network::Epoll::Event event_; diff --git a/src/communication/worker.hpp b/src/communication/worker.hpp index 585a8342b..2998f174f 100644 --- a/src/communication/worker.hpp +++ b/src/communication/worker.hpp @@ -73,7 +73,7 @@ class Worker logger_.trace("[on_read] Received {}B", buf.len); try { - session.Execute(reinterpret_cast(buf.ptr), buf.len); + session.Execute(buf.data, buf.len); } catch (const std::exception &e) { logger_.error("Error occured while executing statement."); logger_.error("{}", e.what()); @@ -96,7 +96,7 @@ class Worker // TODO: Do something about it } - char buf_[65536]; + uint8_t buf_[65536]; std::thread thread_; void Start(std::atomic &alive) { diff --git a/src/io/network/stream_buffer.hpp b/src/io/network/stream_buffer.hpp new file mode 100644 index 000000000..95bdd64fa --- /dev/null +++ b/src/io/network/stream_buffer.hpp @@ -0,0 +1,17 @@ +#pragma once + +#include + +namespace io::network { + +/** + * StreamBuffer + * Used for getting a pointer and size of a preallocated block of memory. + * The network stack than uses this block of memory to read data from a + * socket. + */ +struct StreamBuffer { + uint8_t *data; + size_t len; +}; +} diff --git a/src/io/network/stream_reader.hpp b/src/io/network/stream_reader.hpp index 375cef70a..96d5c4c06 100644 --- a/src/io/network/stream_reader.hpp +++ b/src/io/network/stream_reader.hpp @@ -1,15 +1,10 @@ #pragma once +#include "io/network/stream_buffer.hpp" #include "io/network/stream_listener.hpp" #include "memory/literals.hpp" namespace io::network { -using namespace memory::literals; - -struct StreamBuffer { - char* ptr; - size_t len; -}; /** * This class is used to get data from a socket that has been notified @@ -62,7 +57,7 @@ class StreamReader : public StreamListener { auto buf = this->derived().OnAlloc(stream); // read from the buffer at most buf.len bytes - buf.len = stream.socket_.Read(buf.ptr, buf.len); + buf.len = stream.socket_.Read(buf.data, buf.len); // check for read errors if (buf.len == -1) { diff --git a/src/memgraph_bolt.cpp b/src/memgraph_bolt.cpp index 63b6a9d43..792cbcffc 100644 --- a/src/memgraph_bolt.cpp +++ b/src/memgraph_bolt.cpp @@ -23,7 +23,7 @@ using endpoint_t = io::network::NetworkEndpoint; using socket_t = io::network::Socket; using session_t = communication::bolt::Session; using result_stream_t = communication::bolt::ResultStream< - communication::bolt::Encoder>>; + communication::bolt::Encoder>>; using bolt_server_t = communication::Server; diff --git a/tests/integration/hardcoded_query/using.hpp b/tests/integration/hardcoded_query/using.hpp index 610377f11..67dc43488 100644 --- a/tests/integration/hardcoded_query/using.hpp +++ b/tests/integration/hardcoded_query/using.hpp @@ -6,7 +6,7 @@ #include "communication/bolt/v1/encoder/result_stream.hpp" #include "io/network/socket.hpp" using Stream = communication::bolt::ResultStream>>; + communication::bolt::ChunkedEncoderBuffer>>; #else #include "../stream/print_record_stream.hpp" using Stream = PrintRecordStream; diff --git a/tests/unit/bolt_buffer.cpp b/tests/unit/bolt_buffer.cpp new file mode 100644 index 000000000..c303996b1 --- /dev/null +++ b/tests/unit/bolt_buffer.cpp @@ -0,0 +1,55 @@ +#include "bolt_common.hpp" +#include "communication/bolt/v1/decoder/buffer.hpp" + +constexpr const int SIZE = 4096; +uint8_t data[SIZE]; + +using BufferT = communication::bolt::Buffer; +using StreamBufferT = io::network::StreamBuffer; + +TEST(BoltBuffer, AllocateAndWritten) { + BufferT buffer; + StreamBufferT sb = buffer.Allocate(); + + memcpy(sb.data, data, 1000); + buffer.Written(1000); + + ASSERT_EQ(buffer.size(), 1000); + + uint8_t *tmp = buffer.data(); + for (int i = 0; i < 1000; ++i) + EXPECT_EQ(data[i], tmp[i]); +} + +TEST(BoltBuffer, Shift) { + BufferT buffer; + StreamBufferT sb = buffer.Allocate(); + + memcpy(sb.data, data, 1000); + buffer.Written(1000); + + sb = buffer.Allocate(); + memcpy(sb.data, data + 1000, 1000); + buffer.Written(1000); + + ASSERT_EQ(buffer.size(), 2000); + + uint8_t *tmp = buffer.data(); + for (int i = 0; i < 1000; ++i) + EXPECT_EQ(data[i], tmp[i]); + + buffer.Shift(1000); + ASSERT_EQ(buffer.size(), 1000); + tmp = buffer.data(); + + for (int i = 0; i < 1000; ++i) + EXPECT_EQ(data[i + 1000], tmp[i]); +} + +int main(int argc, char **argv) { + InitializeData(data, SIZE); + logging::init_sync(); + logging::log->pipe(std::make_unique()); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/tests/unit/bolt_chunked_decoder_buffer.cpp b/tests/unit/bolt_chunked_decoder_buffer.cpp new file mode 100644 index 000000000..8e827b6f5 --- /dev/null +++ b/tests/unit/bolt_chunked_decoder_buffer.cpp @@ -0,0 +1,146 @@ +#include "bolt_common.hpp" +#include "communication/bolt/v1/decoder/buffer.hpp" +#include "communication/bolt/v1/decoder/chunked_decoder_buffer.hpp" + +constexpr const int SIZE = 131072; +uint8_t data[SIZE]; + +using BufferT = communication::bolt::Buffer; +using StreamBufferT = io::network::StreamBuffer; +using DecoderBufferT = communication::bolt::ChunkedDecoderBuffer; + +TEST(BoltBuffer, CorrectChunk) { + uint8_t tmp[2000]; + BufferT buffer; + DecoderBufferT decoder_buffer(buffer); + StreamBufferT sb = buffer.Allocate(); + + sb.data[0] = 0x03; sb.data[1] = 0xe8; + memcpy(sb.data + 2, data, 1000); + sb.data[1002] = 0; sb.data[1003] = 0; + buffer.Written(1004); + + ASSERT_EQ(decoder_buffer.GetChunk(), true); + + ASSERT_EQ(decoder_buffer.Read(tmp, 1000), true); + for (int i = 0; i < 1000; ++i) + EXPECT_EQ(data[i], tmp[i]); + + ASSERT_EQ(buffer.size(), 0); +} + +TEST(BoltBuffer, CorrectChunkTrailingData) { + uint8_t tmp[2000]; + BufferT buffer; + DecoderBufferT decoder_buffer(buffer); + StreamBufferT sb = buffer.Allocate(); + + sb.data[0] = 0x03; sb.data[1] = 0xe8; + memcpy(sb.data + 2, data, 2002); + sb.data[1002] = 0; sb.data[1003] = 0; + buffer.Written(2004); + + ASSERT_EQ(decoder_buffer.GetChunk(), true); + + ASSERT_EQ(decoder_buffer.Read(tmp, 1000), true); + for (int i = 0; i < 1000; ++i) + EXPECT_EQ(data[i], tmp[i]); + + uint8_t *leftover = buffer.data(); + ASSERT_EQ(buffer.size(), 1000); + for (int i = 0; i < 1000; ++i) + EXPECT_EQ(data[i + 1002], leftover[i]); +} + +TEST(BoltBuffer, InvalidChunk) { + BufferT buffer; + DecoderBufferT decoder_buffer(buffer); + StreamBufferT sb = buffer.Allocate(); + + sb.data[0] = 0x03; sb.data[1] = 0xe8; + memcpy(sb.data + 2, data, 2002); + sb.data[1002] = 1; sb.data[1003] = 1; + buffer.Written(2004); + + ASSERT_EQ(decoder_buffer.GetChunk(), false); + + ASSERT_EQ(buffer.size(), 1000); + + uint8_t *tmp = buffer.data(); + for (int i = 0; i < 1000; ++i) + EXPECT_EQ(data[i + 1002], tmp[i]); +} + +TEST(BoltBuffer, GraduallyPopulatedChunk) { + uint8_t tmp[2000]; + BufferT buffer; + DecoderBufferT decoder_buffer(buffer); + StreamBufferT sb = buffer.Allocate(); + + sb.data[0] = 0x03; sb.data[1] = 0xe8; + buffer.Written(2); + ASSERT_EQ(decoder_buffer.GetChunk(), false); + + for (int i = 0; i < 5; ++i) { + sb = buffer.Allocate(); + memcpy(sb.data, data + 200 * i, 200); + buffer.Written(200); + ASSERT_EQ(decoder_buffer.GetChunk(), false); + } + + sb = buffer.Allocate(); + sb.data[0] = 0; sb.data[1] = 0; + buffer.Written(2); + ASSERT_EQ(decoder_buffer.GetChunk(), true); + + ASSERT_EQ(decoder_buffer.Read(tmp, 1000), true); + for (int i = 0; i < 1000; ++i) + EXPECT_EQ(data[i], tmp[i]); + + ASSERT_EQ(buffer.size(), 0); +} + +TEST(BoltBuffer, GraduallyPopulatedChunkTrailingData) { + uint8_t tmp[2000]; + BufferT buffer; + DecoderBufferT decoder_buffer(buffer); + StreamBufferT sb = buffer.Allocate(); + + sb.data[0] = 0x03; sb.data[1] = 0xe8; + buffer.Written(2); + ASSERT_EQ(decoder_buffer.GetChunk(), false); + + for (int i = 0; i < 5; ++i) { + sb = buffer.Allocate(); + memcpy(sb.data, data + 200 * i, 200); + buffer.Written(200); + ASSERT_EQ(decoder_buffer.GetChunk(), false); + } + + sb = buffer.Allocate(); + sb.data[0] = 0; sb.data[1] = 0; + buffer.Written(2); + + sb = buffer.Allocate(); + memcpy(sb.data, data, 1000); + buffer.Written(1000); + + ASSERT_EQ(decoder_buffer.GetChunk(), true); + + ASSERT_EQ(decoder_buffer.Read(tmp, 1000), true); + for (int i = 0; i < 1000; ++i) + EXPECT_EQ(data[i], tmp[i]); + + uint8_t *leftover = buffer.data(); + ASSERT_EQ(buffer.size(), 1000); + for (int i = 0; i < 1000; ++i) + EXPECT_EQ(data[i], leftover[i]); +} + +int main(int argc, char **argv) { + InitializeData(data, SIZE); + logging::init_sync(); + logging::log->pipe(std::make_unique()); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/tests/unit/bolt_chunked_buffer.cpp b/tests/unit/bolt_chunked_encoder_buffer.cpp similarity index 92% rename from tests/unit/bolt_chunked_buffer.cpp rename to tests/unit/bolt_chunked_encoder_buffer.cpp index 73ea11dcb..2837196d8 100644 --- a/tests/unit/bolt_chunked_buffer.cpp +++ b/tests/unit/bolt_chunked_encoder_buffer.cpp @@ -1,9 +1,9 @@ #include "bolt_common.hpp" -#include "communication/bolt/v1/encoder/chunked_buffer.hpp" +#include "communication/bolt/v1/encoder/chunked_encoder_buffer.hpp" // aliases using SocketT = TestSocket; -using BufferT = communication::bolt::ChunkedBuffer; +using BufferT = communication::bolt::ChunkedEncoderBuffer; // "alias" constants static constexpr auto CHS = communication::bolt::CHUNK_HEADER_SIZE; @@ -41,7 +41,7 @@ void VerifyChunkOfOnes(uint8_t *data, int size, uint8_t element) { ASSERT_EQ(*(data + CHS + size + 1), 0x00); } -TEST(BoltChunkedBuffer, OneSmallChunk) { +TEST(BoltChunkedEncoderBuffer, OneSmallChunk) { // initialize array of 100 ones (small chunk) int size = 100; uint8_t element = '1'; @@ -60,7 +60,7 @@ TEST(BoltChunkedBuffer, OneSmallChunk) { VerifyChunkOfOnes(socket.output.data(), size, element); } -TEST(BoltChunkedBuffer, TwoSmallChunks) { +TEST(BoltChunkedEncoderBuffer, TwoSmallChunks) { // initialize the small arrays int size1 = 100; uint8_t element1 = '1'; @@ -87,7 +87,7 @@ TEST(BoltChunkedBuffer, TwoSmallChunks) { VerifyChunkOfOnes(data + CHS + size1 + CEMS, size2, element2); } -TEST(BoltChunkedBuffer, OneAndAHalfOfMaxChunk) { +TEST(BoltChunkedEncoderBuffer, OneAndAHalfOfMaxChunk) { // initialize a big chunk int size = 100000; uint8_t element = '1'; diff --git a/tests/unit/bolt_decoder.cpp b/tests/unit/bolt_decoder.cpp new file mode 100644 index 000000000..4f7c5b89e --- /dev/null +++ b/tests/unit/bolt_decoder.cpp @@ -0,0 +1,422 @@ +#include "bolt_common.hpp" +#include "bolt_testdata.hpp" + +#include "communication/bolt/v1/decoder/decoder.hpp" +#include "query/backend/cpp/typed_value.hpp" + +constexpr const int SIZE = 131072; +uint8_t data[SIZE]; + +/** + * TestDecoderBuffer + * This class provides a dummy Buffer used for testing the Decoder. + * It's Read function is the necessary public interface for the Decoder. + * It's Write and Clear methods are used for testing. Through the Write + * method you can store data in the buffer, and throgh the Clear method + * you can clear the buffer. The decoder uses the Read function to get + * data from the buffer. + */ +class TestDecoderBuffer { + public: + bool Read(uint8_t *data, size_t len) { + if (len > buffer_.size()) return false; + memcpy(data, buffer_.data(), len); + buffer_.erase(buffer_.begin(), buffer_.begin() + len); + return true; + } + + void Write(const uint8_t *data, size_t len) { + for (size_t i = 0; i < len; ++i) + buffer_.push_back(data[i]); + } + + void Clear() { + buffer_.clear(); + } + + private: + std::vector buffer_; +}; + +using DecoderT = communication::bolt::Decoder; + +TEST(BoltDecoder, NullAndBool) { + TestDecoderBuffer buffer; + DecoderT decoder(buffer); + TypedValue tv; + + // test null + buffer.Write((const uint8_t *)"\xC0", 1); + ASSERT_EQ(decoder.ReadTypedValue(&tv), true); + ASSERT_EQ(tv.type(), TypedValue::Type::Null); + + // test true + buffer.Write((const uint8_t *)"\xC3", 1); + ASSERT_EQ(decoder.ReadTypedValue(&tv), true); + ASSERT_EQ(tv.type(), TypedValue::Type::Bool); + ASSERT_EQ(tv.Value(), true); + + // test false + buffer.Write((const uint8_t *)"\xC2", 1); + ASSERT_EQ(decoder.ReadTypedValue(&tv), true); + ASSERT_EQ(tv.type(), TypedValue::Type::Bool); + ASSERT_EQ(tv.Value(), false); +} + +TEST(BoltDecoder, Int) { + TestDecoderBuffer buffer; + DecoderT decoder(buffer); + + TypedValue tv; + + // test invalid marker + buffer.Clear(); + buffer.Write((uint8_t *)"\xCD", 1); // 0xCD is reserved in the protocol + ASSERT_EQ(decoder.ReadTypedValue(&tv), false); + + for (int i = 0; i < 28; ++i) { + // test missing data + buffer.Clear(); + buffer.Write(int_encoded[i], int_encoded_len[i] - 1); + ASSERT_EQ(decoder.ReadTypedValue(&tv), false); + + // test all ok + buffer.Clear(); + buffer.Write(int_encoded[i], int_encoded_len[i]); + ASSERT_EQ(decoder.ReadTypedValue(&tv), true); + ASSERT_EQ(tv.type(), TypedValue::Type::Int); + ASSERT_EQ(tv.Value(), int_decoded[i]); + } +} + +TEST(BoltDecoder, Double) { + TestDecoderBuffer buffer; + DecoderT decoder(buffer); + + TypedValue tv; + + for (int i = 0; i < 4; ++i) { + // test missing data + buffer.Clear(); + buffer.Write(double_encoded[i], 8); + ASSERT_EQ(decoder.ReadTypedValue(&tv), false); + + // test all ok + buffer.Clear(); + buffer.Write(double_encoded[i], 9); + ASSERT_EQ(decoder.ReadTypedValue(&tv), true); + ASSERT_EQ(tv.type(), TypedValue::Type::Double); + ASSERT_EQ(tv.Value(), double_decoded[i]); + } +} + +TEST(BoltDecoder, String) { + TestDecoderBuffer buffer; + DecoderT decoder(buffer); + + TypedValue tv; + + uint8_t headers[][6] = {"\x8F", "\xD0\x0F", "\xD1\x00\x0F", "\xD2\x00\x00\x00\x0F"}; + int headers_len[] = {1, 2, 3, 5}; + + for (int i = 0; i < 4; ++i) { + // test missing data in header + buffer.Clear(); + buffer.Write(headers[i], headers_len[i] - 1); + ASSERT_EQ(decoder.ReadTypedValue(&tv), false); + + // test missing elements + buffer.Clear(); + buffer.Write(headers[i], headers_len[i]); + buffer.Write(data, 14); + ASSERT_EQ(decoder.ReadTypedValue(&tv), false); + + // test all ok + buffer.Clear(); + buffer.Write(headers[i], headers_len[i]); + buffer.Write(data, 15); + ASSERT_EQ(decoder.ReadTypedValue(&tv), true); + ASSERT_EQ(tv.type(), TypedValue::Type::String); + std::string &str = tv.Value(); + for (int j = 0; j < 15; ++j) + EXPECT_EQ((uint8_t)str[j], data[j]); + } +} + +TEST(BoltDecoder, List) { + TestDecoderBuffer buffer; + DecoderT decoder(buffer); + + TypedValue tv; + + uint8_t headers[][6] = {"\x9F", "\xD4\x0F", "\xD5\x00\x0F", "\xD6\x00\x00\x00\x0F"}; + int headers_len[] = {1, 2, 3, 5}; + + for (int i = 0; i < 4; ++i) { + // test missing data in header + buffer.Clear(); + buffer.Write(headers[i], headers_len[i] - 1); + ASSERT_EQ(decoder.ReadTypedValue(&tv), false); + + // test missing elements + buffer.Clear(); + buffer.Write(headers[i], headers_len[i]); + for (uint8_t j = 0; j < 14; ++j) + buffer.Write(&j, 1); + ASSERT_EQ(decoder.ReadTypedValue(&tv), false); + + // test all ok + buffer.Clear(); + buffer.Write(headers[i], headers_len[i]); + for (uint8_t j = 0; j < 15; ++j) + buffer.Write(&j, 1); + ASSERT_EQ(decoder.ReadTypedValue(&tv), true); + ASSERT_EQ(tv.type(), TypedValue::Type::List); + std::vector &val = tv.Value>(); + ASSERT_EQ(val.size(), 15); + for (int j = 0; j < 15; ++j) + EXPECT_EQ(val[j].Value(), j); + } +} + +TEST(BoltDecoder, Map) { + TestDecoderBuffer buffer; + DecoderT decoder(buffer); + + TypedValue tv; + + uint8_t headers[][6] = {"\xAF", "\xD8\x0F", "\xD9\x00\x0F", "\xDA\x00\x00\x00\x0F"}; + int headers_len[] = {1, 2, 3, 5}; + + uint8_t index[] = "\x81\x61"; + uint8_t wrong_index = 1; + + for (int i = 0; i < 4; ++i) { + // test missing data in header + buffer.Clear(); + buffer.Write(headers[i], headers_len[i] - 1); + ASSERT_EQ(decoder.ReadTypedValue(&tv), false); + + // test wrong index type + buffer.Clear(); + buffer.Write(headers[i], headers_len[i]); + buffer.Write(&wrong_index, 1); + buffer.Write(&wrong_index, 1); + ASSERT_EQ(decoder.ReadTypedValue(&tv), false); + + // test missing element data + buffer.Clear(); + buffer.Write(headers[i], headers_len[i]); + buffer.Write(index, 2); + ASSERT_EQ(decoder.ReadTypedValue(&tv), false); + + // test missing elements + buffer.Clear(); + buffer.Write(headers[i], headers_len[i]); + for (uint8_t j = 0; j < 14; ++j) { + buffer.Write(index, 2); + buffer.Write(&j, 1); + } + ASSERT_EQ(decoder.ReadTypedValue(&tv), false); + + // test elements with same index + buffer.Clear(); + buffer.Write(headers[i], headers_len[i]); + for (uint8_t j = 0; j < 15; ++j) { + uint8_t tmp = 'a' + j; + buffer.Write(index, 2); + buffer.Write(&j, 1); + } + ASSERT_EQ(decoder.ReadTypedValue(&tv), false); + + // test all ok + buffer.Clear(); + buffer.Write(headers[i], headers_len[i]); + for (uint8_t j = 0; j < 15; ++j) { + uint8_t tmp = 'a' + j; + buffer.Write(index, 1); + buffer.Write(&tmp, 1); + buffer.Write(&j, 1); + } + ASSERT_EQ(decoder.ReadTypedValue(&tv), true); + ASSERT_EQ(tv.type(), TypedValue::Type::Map); + std::map &val = tv.Value>(); + ASSERT_EQ(val.size(), 15); + for (int j = 0; j < 15; ++j) { + char tmp_chr = 'a' + j; + TypedValue tmp_tv = val[std::string(1, tmp_chr)]; + EXPECT_EQ(tmp_tv.type(), TypedValue::Type::Int); + EXPECT_EQ(tmp_tv.Value(), j); + } + } +} + +TEST(BoltDecoder, Vertex) { + TestDecoderBuffer buffer; + DecoderT decoder(buffer); + + communication::bolt::DecodedVertex dv; + + uint8_t header[] = "\xB3\x4E"; + uint8_t wrong_header[] = "\x00\x00"; + uint8_t test_int[] = "\x01"; + uint8_t test_str[] = "\x81\x61"; + uint8_t test_list[] = "\x91"; + uint8_t test_map[] = "\xA1"; + + // test missing signature + buffer.Clear(); + buffer.Write(wrong_header, 1); + ASSERT_EQ(decoder.ReadVertex(&dv), false); + + // test wrong marker + buffer.Clear(); + buffer.Write(wrong_header, 2); + ASSERT_EQ(decoder.ReadVertex(&dv), false); + + // test wrong signature + buffer.Clear(); + buffer.Write(header, 1); + buffer.Write(wrong_header, 1); + ASSERT_EQ(decoder.ReadVertex(&dv), false); + + // test ID wrong type + buffer.Clear(); + buffer.Write(header, 2); + buffer.Write(test_str, 2); + ASSERT_EQ(decoder.ReadVertex(&dv), false); + + // test labels wrong outer type + buffer.Clear(); + buffer.Write(header, 2); + buffer.Write(test_int, 1); + buffer.Write(test_int, 1); + ASSERT_EQ(decoder.ReadVertex(&dv), false); + + // test labels wrong inner type + buffer.Clear(); + buffer.Write(header, 2); + buffer.Write(test_int, 1); + buffer.Write(test_list, 1); + buffer.Write(test_int, 1); + ASSERT_EQ(decoder.ReadVertex(&dv), false); + + // test properties wrong outer type + buffer.Clear(); + buffer.Write(header, 2); + buffer.Write(test_int, 1); + buffer.Write(test_list, 1); + buffer.Write(test_str, 2); + ASSERT_EQ(decoder.ReadVertex(&dv), false); + + // test all ok + buffer.Clear(); + buffer.Write(header, 2); + buffer.Write(test_int, 1); + buffer.Write(test_list, 1); + buffer.Write(test_str, 2); + buffer.Write(test_map, 1); + buffer.Write(test_str, 2); + buffer.Write(test_int, 1); + ASSERT_EQ(decoder.ReadVertex(&dv), true); + ASSERT_EQ(dv.id, 1); + ASSERT_EQ(dv.labels[0], std::string("a")); + ASSERT_EQ(dv.properties[std::string("a")].Value(), 1); +} + +TEST(BoltDecoder, Edge) { + TestDecoderBuffer buffer; + DecoderT decoder(buffer); + + communication::bolt::DecodedEdge de; + + uint8_t header[] = "\xB5\x52"; + uint8_t wrong_header[] = "\x00\x00"; + uint8_t test_int1[] = "\x01"; + uint8_t test_int2[] = "\x02"; + uint8_t test_int3[] = "\x03"; + uint8_t test_str[] = "\x81\x61"; + uint8_t test_list[] = "\x91"; + uint8_t test_map[] = "\xA1"; + + // test missing signature + buffer.Clear(); + buffer.Write(wrong_header, 1); + ASSERT_EQ(decoder.ReadEdge(&de), false); + + // test wrong marker + buffer.Clear(); + buffer.Write(wrong_header, 2); + ASSERT_EQ(decoder.ReadEdge(&de), false); + + // test wrong signature + buffer.Clear(); + buffer.Write(header, 1); + buffer.Write(wrong_header, 1); + ASSERT_EQ(decoder.ReadEdge(&de), false); + + // test ID wrong type + buffer.Clear(); + buffer.Write(header, 2); + buffer.Write(test_str, 2); + ASSERT_EQ(decoder.ReadEdge(&de), false); + + // test from_id wrong type + buffer.Clear(); + buffer.Write(header, 2); + buffer.Write(test_int1, 1); + buffer.Write(test_str, 2); + ASSERT_EQ(decoder.ReadEdge(&de), false); + + // test to_id wrong type + buffer.Clear(); + buffer.Write(header, 2); + buffer.Write(test_int1, 1); + buffer.Write(test_int2, 1); + buffer.Write(test_str, 2); + ASSERT_EQ(decoder.ReadEdge(&de), false); + + // test type wrong type + buffer.Clear(); + buffer.Write(header, 2); + buffer.Write(test_int1, 1); + buffer.Write(test_int2, 1); + buffer.Write(test_int3, 1); + buffer.Write(test_int1, 1); + ASSERT_EQ(decoder.ReadEdge(&de), false); + + // test properties wrong outer type + buffer.Clear(); + buffer.Write(header, 2); + buffer.Write(test_int1, 1); + buffer.Write(test_int2, 1); + buffer.Write(test_int3, 1); + buffer.Write(test_str, 2); + buffer.Write(test_int1, 1); + ASSERT_EQ(decoder.ReadEdge(&de), false); + + // test all ok + buffer.Clear(); + buffer.Write(header, 2); + buffer.Write(test_int1, 1); + buffer.Write(test_int2, 1); + buffer.Write(test_int3, 1); + buffer.Write(test_str, 2); + buffer.Write(test_map, 1); + buffer.Write(test_str, 2); + buffer.Write(test_int1, 1); + ASSERT_EQ(decoder.ReadEdge(&de), true); + ASSERT_EQ(de.id, 1); + ASSERT_EQ(de.from, 2); + ASSERT_EQ(de.to, 3); + ASSERT_EQ(de.type, std::string("a")); + ASSERT_EQ(de.properties[std::string("a")].Value(), 1); +} + +int main(int argc, char **argv) { + InitializeData(data, SIZE); + logging::init_sync(); + logging::log->pipe(std::make_unique()); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/tests/unit/bolt_encoder.cpp b/tests/unit/bolt_encoder.cpp index c1669fccb..f53b9fca2 100644 --- a/tests/unit/bolt_encoder.cpp +++ b/tests/unit/bolt_encoder.cpp @@ -1,4 +1,5 @@ #include "bolt_common.hpp" +#include "bolt_testdata.hpp" #include "communication/bolt/v1/encoder/encoder.hpp" #include "database/graph_db.hpp" @@ -9,53 +10,8 @@ * TODO (mferencevic): document */ -// clang-format off -const int64_t int_input[] = { - 0, -1, -8, -16, 1, 63, 127, -128, -20, -17, -32768, -12345, -129, 128, - 12345, 32767, -2147483648L, -12345678L, -32769L, 32768L, 12345678L, - 2147483647L, -9223372036854775807L, -12345678912345L, -2147483649L, - 2147483648L, 12345678912345L, 9223372036854775807}; - -const uint8_t int_output[][10] = { - "\x00", "\xFF", "\xF8", "\xF0", "\x01", "\x3F", "\x7F", "\xC8\x80", - "\xC8\xEC", "\xC8\xEF", "\xC9\x80\x00", "\xC9\xCF\xC7", "\xC9\xFF\x7F", - "\xC9\x00\x80", "\xC9\x30\x39", "\xC9\x7F\xFF", "\xCA\x80\x00\x00\x00", - "\xCA\xFF\x43\x9E\xB2", "\xCA\xFF\xFF\x7F\xFF", "\xCA\x00\x00\x80\x00", - "\xCA\x00\xBC\x61\x4E", "\xCA\x7F\xFF\xFF\xFF", - "\xCB\x80\x00\x00\x00\x00\x00\x00\x01", - "\xCB\xFF\xFF\xF4\xC5\x8C\x31\xA4\xA7", - "\xCB\xFF\xFF\xFF\xFF\x7F\xFF\xFF\xFF", - "\xCB\x00\x00\x00\x00\x80\x00\x00\x00", - "\xCB\x00\x00\x0B\x3A\x73\xCE\x5B\x59", - "\xCB\x7F\xFF\xFF\xFF\xFF\xFF\xFF\xFF"}; -// clang-format on -const uint32_t int_output_len[] = {1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 3, 3, 3, 3, - 3, 3, 5, 5, 5, 5, 5, 5, 9, 9, 9, 9, 9, 9}; - -const double double_input[] = {5.834, 108.199, 43677.9882, 254524.5851}; -const uint8_t double_output[][10] = {"\xC1\x40\x17\x56\x04\x18\x93\x74\xBC", - "\xC1\x40\x5B\x0C\xBC\x6A\x7E\xF9\xDB", - "\xC1\x40\xE5\x53\xBF\x9F\x55\x9B\x3D", - "\xC1\x41\x0F\x11\xE4\xAE\x48\xE8\xA7"}; - -const uint8_t vertexedge_output[] = - "\xB1\x71\x93\xB3\x4E\x00\x92\x86\x6C\x61\x62\x65\x6C\x31\x86\x6C\x61\x62" - "\x65\x6C\x32\xA2\x85\x70\x72\x6F\x70\x31\x0C\x85\x70\x72\x6F\x70\x32\xC9" - "\x00\xC8\xB3\x4E\x00\x90\xA0\xB5\x52\x00\x00\x00\x88\x65\x64\x67\x65\x74" - "\x79\x70\x65\xA2\x85\x70\x72\x6F\x70\x33\x2A\x85\x70\x72\x6F\x70\x34\xC9" - "\x04\xD2"; - constexpr const int SIZE = 131072; uint8_t data[SIZE]; -const uint64_t sizes[] = {0, 1, 5, 15, 16, 120, - 255, 256, 12345, 65535, 65536, 100000}; -const uint64_t sizes_num = 12; - -constexpr const int STRING = 0, LIST = 1, MAP = 2; -const uint8_t type_tiny_magic[] = {0x80, 0x90, 0xA0}; -const uint8_t type_8_magic[] = {0xD0, 0xD4, 0xD8}; -const uint8_t type_16_magic[] = {0xD1, 0xD5, 0xD9}; -const uint8_t type_32_magic[] = {0xD2, 0xD6, 0xDA}; void CheckTypeSize(std::vector &v, int typ, uint64_t size) { if (size <= 15) { @@ -103,21 +59,21 @@ TEST(BoltEncoder, NullAndBool) { TEST(BoltEncoder, Int) { int N = 28; std::vector vals; - for (int i = 0; i < N; ++i) vals.push_back(TypedValue(int_input[i])); + for (int i = 0; i < N; ++i) vals.push_back(TypedValue(int_decoded[i])); bolt_encoder.MessageRecord(vals); CheckRecordHeader(output, N); for (int i = 0; i < N; ++i) - CheckOutput(output, int_output[i], int_output_len[i], false); + CheckOutput(output, int_encoded[i], int_encoded_len[i], false); CheckOutput(output, nullptr, 0); } TEST(BoltEncoder, Double) { int N = 4; std::vector vals; - for (int i = 0; i < N; ++i) vals.push_back(TypedValue(double_input[i])); + for (int i = 0; i < N; ++i) vals.push_back(TypedValue(double_decoded[i])); bolt_encoder.MessageRecord(vals); CheckRecordHeader(output, N); - for (int i = 0; i < N; ++i) CheckOutput(output, double_output[i], 9, false); + for (int i = 0; i < N; ++i) CheckOutput(output, double_encoded[i], 9, false); CheckOutput(output, nullptr, 0); } @@ -209,7 +165,7 @@ TEST(BoltEncoder, VertexAndEdge) { vals.push_back(TypedValue(va2)); vals.push_back(TypedValue(ea)); bolt_encoder.MessageRecord(vals); - CheckOutput(output, vertexedge_output, 74); + CheckOutput(output, vertexedge_encoded, 74); } TEST(BoltEncoder, BoltV1ExampleMessages) { diff --git a/tests/unit/bolt_result_stream.cpp b/tests/unit/bolt_result_stream.cpp index 002fa3c32..4947cc8b6 100644 --- a/tests/unit/bolt_result_stream.cpp +++ b/tests/unit/bolt_result_stream.cpp @@ -1,18 +1,14 @@ #include "bolt_common.hpp" -#include "communication/bolt/v1/encoder/chunked_buffer.hpp" +#include "communication/bolt/v1/encoder/chunked_encoder_buffer.hpp" #include "communication/bolt/v1/encoder/encoder.hpp" #include "communication/bolt/v1/encoder/result_stream.hpp" #include "query/backend/cpp/typed_value.hpp" -using BufferT = communication::bolt::ChunkedBuffer; +using BufferT = communication::bolt::ChunkedEncoderBuffer; using EncoderT = communication::bolt::Encoder; using ResultStreamT = communication::bolt::ResultStream; -/** - * TODO (mferencevic): document - */ - const uint8_t header_output[] = "\x00\x29\xB1\x70\xA1\x86\x66\x69\x65\x6C\x64\x73\x9A\x82\x61\x61\x82\x62" "\x62\x82\x63\x63\x82\x64\x64\x82\x65\x65\x82\x66\x66\x82\x67\x67\x82\x68" diff --git a/tests/unit/bolt_session.cpp b/tests/unit/bolt_session.cpp index b98c3e635..1dfea8f76 100644 --- a/tests/unit/bolt_session.cpp +++ b/tests/unit/bolt_session.cpp @@ -6,7 +6,7 @@ using ResultStreamT = communication::bolt::ResultStream>>; + communication::bolt::ChunkedEncoderBuffer>>; using SessionT = communication::bolt::Session; /** diff --git a/tests/unit/bolt_testdata.hpp b/tests/unit/bolt_testdata.hpp new file mode 100644 index 000000000..baf514b82 --- /dev/null +++ b/tests/unit/bolt_testdata.hpp @@ -0,0 +1,48 @@ +#pragma once + +// clang-format off +const int64_t int_decoded[] = { + 0, -1, -8, -16, 1, 63, 127, -128, -20, -17, -32768, -12345, -129, 128, + 12345, 32767, -2147483648L, -12345678L, -32769L, 32768L, 12345678L, + 2147483647L, -9223372036854775807L, -12345678912345L, -2147483649L, + 2147483648L, 12345678912345L, 9223372036854775807}; + +const uint8_t int_encoded[][10] = { + "\x00", "\xFF", "\xF8", "\xF0", "\x01", "\x3F", "\x7F", "\xC8\x80", + "\xC8\xEC", "\xC8\xEF", "\xC9\x80\x00", "\xC9\xCF\xC7", "\xC9\xFF\x7F", + "\xC9\x00\x80", "\xC9\x30\x39", "\xC9\x7F\xFF", "\xCA\x80\x00\x00\x00", + "\xCA\xFF\x43\x9E\xB2", "\xCA\xFF\xFF\x7F\xFF", "\xCA\x00\x00\x80\x00", + "\xCA\x00\xBC\x61\x4E", "\xCA\x7F\xFF\xFF\xFF", + "\xCB\x80\x00\x00\x00\x00\x00\x00\x01", + "\xCB\xFF\xFF\xF4\xC5\x8C\x31\xA4\xA7", + "\xCB\xFF\xFF\xFF\xFF\x7F\xFF\xFF\xFF", + "\xCB\x00\x00\x00\x00\x80\x00\x00\x00", + "\xCB\x00\x00\x0B\x3A\x73\xCE\x5B\x59", + "\xCB\x7F\xFF\xFF\xFF\xFF\xFF\xFF\xFF"}; +// clang-format on + +const uint32_t int_encoded_len[] = {1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 3, 3, 3, 3, + 3, 3, 5, 5, 5, 5, 5, 5, 9, 9, 9, 9, 9, 9}; + +const double double_decoded[] = {5.834, 108.199, 43677.9882, 254524.5851}; +const uint8_t double_encoded[][10] = {"\xC1\x40\x17\x56\x04\x18\x93\x74\xBC", + "\xC1\x40\x5B\x0C\xBC\x6A\x7E\xF9\xDB", + "\xC1\x40\xE5\x53\xBF\x9F\x55\x9B\x3D", + "\xC1\x41\x0F\x11\xE4\xAE\x48\xE8\xA7"}; + +const uint8_t vertexedge_encoded[] = + "\xB1\x71\x93\xB3\x4E\x00\x92\x86\x6C\x61\x62\x65\x6C\x31\x86\x6C\x61\x62" + "\x65\x6C\x32\xA2\x85\x70\x72\x6F\x70\x31\x0C\x85\x70\x72\x6F\x70\x32\xC9" + "\x00\xC8\xB3\x4E\x00\x90\xA0\xB5\x52\x00\x00\x00\x88\x65\x64\x67\x65\x74" + "\x79\x70\x65\xA2\x85\x70\x72\x6F\x70\x33\x2A\x85\x70\x72\x6F\x70\x34\xC9" + "\x04\xD2"; + +const uint64_t sizes[] = {0, 1, 5, 15, 16, 120, + 255, 256, 12345, 65535, 65536, 100000}; +const uint64_t sizes_num = 12; + +constexpr const int STRING = 0, LIST = 1, MAP = 2; +const uint8_t type_tiny_magic[] = {0x80, 0x90, 0xA0}; +const uint8_t type_8_magic[] = {0xD0, 0xD4, 0xD8}; +const uint8_t type_16_magic[] = {0xD1, 0xD5, 0xD9}; +const uint8_t type_32_magic[] = {0xD2, 0xD6, 0xDA};