First version of new bolt decoder and tests.

Summary:
Extracted constants to codes.hpp.

Extracted bolt constants.

Extracted StreamBuffer and fixed data type.

Extracted bolt testdata.

Added bolt buffer and tests.

Added bolt decoder buffer and tests.

Renamed bolt testdata.

Reviewers: dgleich, buda, matej.gradicek

Reviewed By: buda

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D220
This commit is contained in:
Matej Ferencevic 2017-04-06 14:30:04 +02:00
parent aa6cae0b16
commit e5c814e022
22 changed files with 1483 additions and 133 deletions

View File

@ -0,0 +1,67 @@
#pragma once
#include <cstdint>
#include "utils/underlying_cast.hpp"
namespace communication::bolt {
enum class Signature : uint8_t {
Init = 0x01,
AckFailure = 0x0E,
Reset = 0x0F,
Run = 0x10,
DiscardAll = 0x2F,
PullAll = 0x3F,
Record = 0x71,
Success = 0x70,
Ignored = 0x7E,
Failure = 0x7F,
Node = 0x4E,
Relationship = 0x52,
Path = 0x50,
UnboundRelationship = 0x72,
};
enum class Marker : uint8_t {
TinyString = 0x80,
TinyList = 0x90,
TinyMap = 0xA0,
TinyStruct = 0xB0,
Null = 0xC0,
Float64 = 0xC1,
False = 0xC2,
True = 0xC3,
Int8 = 0xC8,
Int16 = 0xC9,
Int32 = 0xCA,
Int64 = 0xCB,
String8 = 0xD0,
String16 = 0xD1,
String32 = 0xD2,
List8 = 0xD4,
List16 = 0xD5,
List32 = 0xD6,
Map8 = 0xD8,
Map16 = 0xD9,
Map32 = 0xDA,
Struct8 = 0xDC,
Struct16 = 0xDD,
};
static constexpr uint8_t MarkerString = 0, MarkerList = 1, MarkerMap = 2;
static constexpr Marker MarkerTiny[3] = {Marker::TinyString, Marker::TinyList, Marker::TinyMap};
static constexpr Marker Marker8[3] = {Marker::String8, Marker::List8, Marker::Map8};
static constexpr Marker Marker16[3] = {Marker::String16, Marker::List16, Marker::Map16};
static constexpr Marker Marker32[3] = {Marker::String32, Marker::List32, Marker::Map32};
}

View File

@ -0,0 +1,13 @@
#pragma once
namespace communication::bolt {
/**
* Sizes related to the chunk defined in Bolt protocol.
*/
static constexpr size_t CHUNK_HEADER_SIZE = 2;
static constexpr size_t MAX_CHUNK_SIZE = 65535;
static constexpr size_t CHUNK_END_MARKER_SIZE = 2;
static constexpr size_t WHOLE_CHUNK_SIZE =
CHUNK_HEADER_SIZE + MAX_CHUNK_SIZE + CHUNK_END_MARKER_SIZE;
}

View File

@ -0,0 +1,92 @@
#pragma once
#include <algorithm>
#include <cstring>
#include <memory>
#include <vector>
#include "communication/bolt/v1/constants.hpp"
#include "io/network/stream_buffer.hpp"
#include "logging/loggable.hpp"
#include "utils/assert.hpp"
#include "utils/bswap.hpp"
namespace communication::bolt {
/**
* @brief Buffer
*
* Has methods for writing and reading raw data.
*
* Allocating, writing and written stores data in the buffer. The stored
* data can then be read using the pointer returned with the data function.
* The current implementation stores data in a single fixed length buffer.
*/
class Buffer : public Loggable {
private:
using StreamBufferT = io::network::StreamBuffer;
public:
Buffer() : Loggable("Buffer") {}
/**
* Allocates a new StreamBuffer from the internal buffer.
* This function returns a pointer to the first currently free memory
* location in the internal buffer. Also, it returns the size of the
* available memory.
*/
StreamBufferT Allocate() {
return StreamBufferT{&data_[size_], WHOLE_CHUNK_SIZE - size_};
}
/**
* This method is used to notify the buffer that the data has been written.
* To write data to this buffer you should do this:
* Call Allocate(), then write to the returned data pointer.
* IMPORTANT: Don't write more data then the returned size, you will cause
* a memory overflow. Then call Written(size) with the length of data that
* you have written into the buffer.
*
* @param len the size of data that has been written into the buffer
*/
void Written(size_t len) {
size_ += len;
debug_assert(size_ <= WHOLE_CHUNK_SIZE, "Written more than storage has space!");
}
/**
* This method shifts the available data for len. It is used when you read
* some data from the buffer and you want to remove it from the buffer.
*
* @param len the length of data that has to be removed from the start of
* the buffer
*/
void Shift(size_t len) {
debug_assert(len <= size_, "Tried to shift more data than the buffer has!");
memmove(data_, data_ + len, size_ - len);
size_ -= len;
}
/**
* This method clears the buffer.
*/
void Clear() {
size_ = 0;
}
/**
* This function returns a pointer to the internal buffer. It is used for
* reading data from the buffer.
*/
uint8_t *data() { return data_; }
/**
* This function returns the size of available data for reading.
*/
size_t size() { return size_; }
private:
uint8_t data_[WHOLE_CHUNK_SIZE];
size_t size_{0};
};
}

View File

@ -0,0 +1,95 @@
#pragma once
#include <algorithm>
#include <cstring>
#include <memory>
#include <vector>
#include "communication/bolt/v1/constants.hpp"
#include "communication/bolt/v1/decoder/buffer.hpp"
#include "logging/loggable.hpp"
#include "utils/assert.hpp"
namespace communication::bolt {
/**
* @brief ChunkedDecoderBuffer
*
* Has methods for getting chunks and reading their data.
*
* Getting a chunk copies the chunk into the internal buffer from which
* the data can then be read. While getting a chunk the buffer checks the
* chunk for validity and then copies only data from the chunk. The headers
* aren't copied so that the decoder can read only the raw encoded data.
*/
class ChunkedDecoderBuffer : public Loggable {
private:
using StreamBufferT = io::network::StreamBuffer;
public:
ChunkedDecoderBuffer(Buffer &buffer) : Loggable("ChunkedDecoderBuffer"), buffer_(buffer) {}
/**
* Reads data from the internal buffer.
*
* @param data a pointer to where the data should be read
* @param len the length of data that should be read
* @returns true if exactly len of data was copied into data,
* false otherwise
*/
bool Read(uint8_t *data, size_t len) {
if (len > size_ - pos_) return false;
memcpy(data, &data_[pos_], len);
pos_ += len;
return true;
}
/**
* Gets a chunk from the underlying raw data buffer.
* When getting a chunk this function validates the chunk.
* If the chunk isn't yet finished the function just returns false.
* If the chunk is finished (all data has been read) and the chunk isn't
* valid, then the function automatically deletes the invalid chunk
* from the underlying buffer and returns false.
*
* @returns true if a chunk was successfully copied into the internal
* buffer, false otherwise
*/
bool GetChunk() {
uint8_t *data = buffer_.data();
size_t size = buffer_.size();
if (size < 2) {
logger.trace("Size < 2");
return false;
}
size_t chunk_size = data[0];
chunk_size <<= 8;
chunk_size += data[1];
if (size < chunk_size + 4) {
logger.trace("Chunk size is {} but only have {} data bytes.", chunk_size, size);
return false;
}
if (data[chunk_size + 2] != 0 || data[chunk_size + 3] != 0) {
logger.trace("Invalid chunk!");
buffer_.Shift(chunk_size + 4);
// TODO: raise an exception!
return false;
}
pos_ = 0;
size_ = chunk_size;
memcpy(data_, data + 2, size - 4);
buffer_.Shift(chunk_size + 4);
return true;
}
private:
Buffer &buffer_;
uint8_t data_[MAX_CHUNK_SIZE];
size_t size_{0};
size_t pos_{0};
};
}

View File

@ -0,0 +1,474 @@
#pragma once
#include "communication/bolt/v1/codes.hpp"
#include "database/graph_db_accessor.hpp"
#include "logging/default.hpp"
#include "logging/logger.hpp"
#include "query/backend/cpp/typed_value.hpp"
#include "utils/bswap.hpp"
#include "utils/underlying_cast.hpp"
#include <string>
namespace communication::bolt {
/**
* Structure used when reading a Vertex with the decoder.
* The decoder writes data into this structure.
*/
struct DecodedVertex {
int64_t id;
std::vector<std::string> labels;
std::map<std::string, TypedValue> properties;
};
/**
* Structure used when reading an Edge with the decoder.
* The decoder writes data into this structure.
*/
struct DecodedEdge {
int64_t id;
int64_t from;
int64_t to;
std::string type;
std::map<std::string, TypedValue> properties;
};
/**
* Bolt Decoder.
* Has public interfaces for reading Bolt encoded data.
* Supports reading: TypedValue (without Vertex, Edge and Path),
* Vertex, Edge
*
* @tparam Buffer the input buffer that should be used
*/
template <typename Buffer>
class Decoder : public Loggable {
public:
Decoder(Buffer &buffer)
: Loggable("communication::bolt::Decoder"),
buffer_(buffer) {}
/**
* Reads a TypedValue from the available data in the buffer.
* This function tries to read a TypedValue from the available data.
*
* @param data pointer to a TypedValue where the read data should be stored
* @returns true if data has been written to the data pointer,
* false otherwise
*/
bool ReadTypedValue(TypedValue *data) {
uint8_t value;
logger.trace("[ReadTypedValue] Start");
if (!buffer_.Read(&value, 1)) {
logger.debug("[ReadTypedValue] Marker data missing!");
return false;
}
Marker marker = (Marker)value;
switch (marker) {
case Marker::Null:
return ReadNull(marker, data);
case Marker::True:
case Marker::False:
return ReadBool(marker, data);
case Marker::Int8:
case Marker::Int16:
case Marker::Int32:
case Marker::Int64:
return ReadInt(marker, data);
case Marker::Float64:
return ReadDouble(marker, data);
case Marker::String8:
case Marker::String16:
case Marker::String32:
return ReadString(marker, data);
case Marker::List8:
case Marker::List16:
case Marker::List32:
return ReadList(marker, data);
case Marker::Map8:
case Marker::Map16:
case Marker::Map32:
return ReadMap(marker, data);
default:
if ((value & 0xF0) == underlying_cast(Marker::TinyString)) {
return ReadString(marker, data);
} else if ((value & 0xF0) == underlying_cast(Marker::TinyList)) {
return ReadList(marker, data);
} else if ((value & 0xF0) == underlying_cast(Marker::TinyMap)) {
return ReadMap(marker, data);
} else {
return ReadInt(marker, data);
}
break;
}
}
/**
* Reads a TypedValue from the available data in the buffer and checks
* whether the read data type matches the supplied data type.
*
* @param data pointer to a TypedValue where the read data should be stored
* @param type the expected type that should be read
* @returns true if data has been written to the data pointer and the type
* matches the expected type, false otherwise
*/
bool ReadTypedValue(TypedValue *data, TypedValue::Type type) {
if (!ReadTypedValue(data)) {
logger.debug("[ReadTypedValue] ReadTypedValue call failed!");
return false;
}
if (data->type() != type) {
logger.debug("[ReadTypedValue] Typed value has wrong type!");
return false;
}
return true;
}
/**
* Reads a Vertex from the available data in the buffer.
* This function tries to read a Vertex from the available data.
*
* @param data pointer to a DecodedVertex where the data should be stored
* @returns true if data has been written into the data pointer,
* false otherwise
*/
bool ReadVertex(DecodedVertex *data) {
uint8_t value[2];
TypedValue tv;
logger.trace("[ReadVertex] Start");
if (!buffer_.Read(value, 2)) {
logger.debug("[ReadVertex] Missing marker and/or signature data!");
return false;
}
// check header
if (value[0] != underlying_cast(Marker::TinyStruct) + 3) {
logger.debug("[ReadVertex] Received invalid marker ({})!", value[0]);
return false;
}
if (value[1] != underlying_cast(Signature::Node)) {
logger.debug("[ReadVertex] Received invalid signature ({})!", value[1]);
return false;
}
// read ID
if (!ReadTypedValue(&tv, TypedValue::Type::Int)) {
logger.debug("[ReadVertex] Couldn't read ID!");
return false;
}
data->id = tv.Value<int64_t>();
// read labels
if (!ReadTypedValue(&tv, TypedValue::Type::List)) {
logger.debug("[ReadVertex] Couldn't read labels!");
return false;
}
std::vector<TypedValue> &labels = tv.Value<std::vector<TypedValue>>();
data->labels.resize(labels.size());
for (size_t i = 0; i < labels.size(); ++i) {
if (labels[i].type() != TypedValue::Type::String) {
logger.debug("[ReadVertex] Label has wrong type!");
return false;
}
data->labels[i] = labels[i].Value<std::string>();
}
// read properties
if (!ReadTypedValue(&tv, TypedValue::Type::Map)) {
logger.debug("[ReadVertex] Couldn't read properties!");
return false;
}
data->properties = tv.Value<std::map<std::string, TypedValue>>();
logger.trace("[ReadVertex] Success");
return true;
}
/**
* Reads an Edge from the available data in the buffer.
* This function tries to read an Edge from the available data.
*
* @param data pointer to a DecodedEdge where the data should be stored
* @returns true if data has been written into the data pointer,
* false otherwise
*/
bool ReadEdge(DecodedEdge *data) {
uint8_t value[2];
TypedValue tv;
logger.trace("[ReadEdge] Start");
if (!buffer_.Read(value, 2)) {
logger.debug("[ReadEdge] Missing marker and/or signature data!");
return false;
}
// check header
if (value[0] != underlying_cast(Marker::TinyStruct) + 5) {
logger.debug("[ReadEdge] Received invalid marker ({})!", value[0]);
return false;
}
if (value[1] != underlying_cast(Signature::Relationship)) {
logger.debug("[ReadEdge] Received invalid signature ({})!", value[1]);
return false;
}
// read ID
if (!ReadTypedValue(&tv, TypedValue::Type::Int)) {
logger.debug("[ReadEdge] couldn't read ID!");
return false;
}
data->id = tv.Value<int64_t>();
// read from
if (!ReadTypedValue(&tv, TypedValue::Type::Int)) {
logger.debug("[ReadEdge] Couldn't read from_id!");
return false;
}
data->from = tv.Value<int64_t>();
// read to
if (!ReadTypedValue(&tv, TypedValue::Type::Int)) {
logger.debug("[ReadEdge] Couldn't read to_id!");
return false;
}
data->to = tv.Value<int64_t>();
// read type
if (!ReadTypedValue(&tv, TypedValue::Type::String)) {
logger.debug("[ReadEdge] Couldn't read type!");
return false;
}
data->type = tv.Value<std::string>();
// read properties
if (!ReadTypedValue(&tv, TypedValue::Type::Map)) {
logger.debug("[ReadEdge] Couldn't read properties!");
return false;
}
data->properties = tv.Value<std::map<std::string, TypedValue>>();
logger.trace("[ReadEdge] Success");
return true;
}
protected:
Buffer &buffer_;
private:
bool ReadNull(const Marker &marker, TypedValue *data) {
logger.trace("[ReadNull] Start");
debug_assert(marker == Marker::Null, "Received invalid marker!");
*data = TypedValue::Null;
logger.trace("[ReadNull] Success");
return true;
}
bool ReadBool(const Marker &marker, TypedValue *data) {
logger.trace("[ReadBool] Start");
debug_assert(marker == Marker::False || marker == Marker::True,
"Received invalid marker!");
if (marker == Marker::False) {
*data = TypedValue(false);
} else {
*data = TypedValue(true);
}
logger.trace("[ReadBool] Success");
return true;
}
bool ReadInt(const Marker &marker, TypedValue *data) {
uint8_t value = underlying_cast(marker);
bool success = true;
int64_t ret;
logger.trace("[ReadInt] Start");
if (value >= 240 || value <= 127) {
logger.trace("[ReadInt] Found a TinyInt");
ret = value;
if (value >= 240) ret -= 256;
} else if (marker == Marker::Int8) {
logger.trace("[ReadInt] Found an Int8");
int8_t tmp;
if (!buffer_.Read(reinterpret_cast<uint8_t *>(&tmp), sizeof(tmp))) {
logger.debug( "[ReadInt] Int8 missing data!");
return false;
}
ret = tmp;
} else if (marker == Marker::Int16) {
logger.trace("[ReadInt] Found an Int16");
int16_t tmp;
if (!buffer_.Read(reinterpret_cast<uint8_t *>(&tmp), sizeof(tmp))) {
logger.debug( "[ReadInt] Int16 missing data!");
return false;
}
ret = bswap(tmp);
} else if (marker == Marker::Int32) {
logger.trace("[ReadInt] Found an Int32");
int32_t tmp;
if (!buffer_.Read(reinterpret_cast<uint8_t *>(&tmp), sizeof(tmp))) {
logger.debug( "[ReadInt] Int32 missing data!");
return false;
}
ret = bswap(tmp);
} else if (marker == Marker::Int64) {
logger.trace("[ReadInt] Found an Int64");
if (!buffer_.Read(reinterpret_cast<uint8_t *>(&ret), sizeof(ret))) {
logger.debug( "[ReadInt] Int64 missing data!");
return false;
}
ret = bswap(ret);
} else {
logger.debug("[ReadInt] Received invalid marker ({})!", underlying_cast(marker));
return false;
}
if (success) {
*data = TypedValue(ret);
logger.trace("[ReadInt] Success");
}
return success;
}
bool ReadDouble(const Marker marker, TypedValue *data) {
uint64_t value;
double ret;
logger.trace("[ReadDouble] Start");
debug_assert(marker == Marker::Float64, "Received invalid marker!");
if (!buffer_.Read(reinterpret_cast<uint8_t *>(&value), sizeof(value))) {
logger.debug( "[ReadDouble] Missing data!");
return false;
}
value = bswap(value);
ret = *reinterpret_cast<double *>(&value);
*data = TypedValue(ret);
logger.trace("[ReadDouble] Success");
return true;
}
int64_t ReadTypeSize(const Marker &marker, const uint8_t type) {
uint8_t value = underlying_cast(marker);
if ((value & 0xF0) == underlying_cast(MarkerTiny[type])) {
logger.trace("[ReadTypeSize] Found a TinyType");
return value & 0x0F;
} else if (marker == Marker8[type]) {
logger.trace("[ReadTypeSize] Found a Type8");
uint8_t tmp;
if (!buffer_.Read(reinterpret_cast<uint8_t *>(&tmp), sizeof(tmp))) {
logger.debug( "[ReadTypeSize] Type8 missing data!");
return -1;
}
return tmp;
} else if (marker == Marker16[type]) {
logger.trace("[ReadTypeSize] Found a Type16");
uint16_t tmp;
if (!buffer_.Read(reinterpret_cast<uint8_t *>(&tmp), sizeof(tmp))) {
logger.debug( "[ReadTypeSize] Type16 missing data!");
return -1;
}
tmp = bswap(tmp);
return tmp;
} else if (marker == Marker32[type]) {
logger.trace("[ReadTypeSize] Found a Type32");
uint32_t tmp;
if (!buffer_.Read(reinterpret_cast<uint8_t *>(&tmp), sizeof(tmp))) {
logger.debug( "[ReadTypeSize] Type32 missing data!");
return -1;
}
tmp = bswap(tmp);
return tmp;
} else {
logger.debug("[ReadTypeSize] Received invalid marker ({})!", underlying_cast(marker));
return -1;
}
}
bool ReadString(const Marker &marker, TypedValue *data) {
logger.trace("[ReadString] Start");
auto size = ReadTypeSize(marker, MarkerString);
if (size == -1) {
logger.debug("[ReadString] Couldn't get size!");
return false;
}
std::unique_ptr<uint8_t[]> ret(new uint8_t[size]);
if (!buffer_.Read(ret.get(), size)) {
logger.debug("[ReadString] Missing data!");
return false;
}
*data = TypedValue(std::string(reinterpret_cast<char *>(ret.get()), size));
logger.trace("[ReadString] Success");
return true;
}
bool ReadList(const Marker &marker, TypedValue *data) {
logger.trace("[ReadList] Start");
auto size = ReadTypeSize(marker, MarkerList);
if (size == -1) {
logger.debug("[ReadList] Couldn't get size!");
return false;
}
std::vector<TypedValue> ret(size);
for (int64_t i = 0; i < size; ++i) {
if (!ReadTypedValue(&ret[i])) {
logger.debug("[ReadList] Couldn't read element {}", i);
return false;
}
}
*data = TypedValue(ret);
logger.trace("[ReadList] Success");
return true;
}
bool ReadMap(const Marker &marker, TypedValue *data) {
logger.trace("[ReadMap] Start");
auto size = ReadTypeSize(marker, MarkerMap);
if (size == -1) {
logger.debug("[ReadMap] Couldn't get size!");
return false;
}
TypedValue tv;
std::string str;
std::map<std::string, TypedValue> ret;
for (int64_t i = 0; i < size; ++i) {
if (!ReadTypedValue(&tv)) {
logger.debug("[ReadMap] Couldn't read index {}", i);
return false;
}
if (tv.type() != TypedValue::Type::String) {
logger.debug("[ReadMap] Index {} isn't a string!", i);
return false;
}
str = tv.Value<std::string>();
if (!ReadTypedValue(&tv)) {
logger.debug("[ReadMap] Couldn't read element {}", i);
return false;
}
ret.insert(std::make_pair(str, tv));
}
if (ret.size() != size) {
logger.debug("[ReadMap] The client sent multiple objects with same indexes!");
return false;
}
*data = TypedValue(ret);
logger.trace("[ReadMap] Success");
return true;
}
};
}

View File

@ -1,5 +1,6 @@
#pragma once #pragma once
#include "communication/bolt/v1/codes.hpp"
#include "database/graph_db_accessor.hpp" #include "database/graph_db_accessor.hpp"
#include "logging/default.hpp" #include "logging/default.hpp"
#include "logging/logger.hpp" #include "logging/logger.hpp"
@ -10,12 +11,6 @@
namespace communication::bolt { namespace communication::bolt {
static constexpr uint8_t TSTRING = 0, TLIST = 1, TMAP = 2;
static constexpr uint8_t type_tiny_marker[3] = {0x80, 0x90, 0xA0};
static constexpr uint8_t type_8_marker[3] = {0xD0, 0xD4, 0xD8};
static constexpr uint8_t type_16_marker[3] = {0xD1, 0xD5, 0xD9};
static constexpr uint8_t type_32_marker[3] = {0xD2, 0xD6, 0xDA};
/** /**
* Bolt BaseEncoder. * Bolt BaseEncoder.
* Has public interfaces for writing Bolt encoded data. * Has public interfaces for writing Bolt encoded data.
@ -62,45 +57,36 @@ class BaseEncoder : public Loggable {
} }
void WriteNull() { void WriteNull() {
// 0xC0 = null marker WriteRAW(underlying_cast(Marker::Null));
WriteRAW(0xC0);
} }
void WriteBool(const bool &value) { void WriteBool(const bool &value) {
if (value) { if (value)
// 0xC3 = true marker WriteRAW(underlying_cast(Marker::True));
WriteRAW(0xC3); else
} else { WriteRAW(underlying_cast(Marker::False));
// 0xC2 = false marker
WriteRAW(0xC2);
}
} }
void WriteInt(const int64_t &value) { void WriteInt(const int64_t &value) {
if (value >= -16L && value < 128L) { if (value >= -16L && value < 128L) {
WriteRAW(static_cast<uint8_t>(value)); WriteRAW(static_cast<uint8_t>(value));
} else if (value >= -128L && value < -16L) { } else if (value >= -128L && value < -16L) {
// 0xC8 = int8 marker WriteRAW(underlying_cast(Marker::Int8));
WriteRAW(0xC8);
WriteRAW(static_cast<uint8_t>(value)); WriteRAW(static_cast<uint8_t>(value));
} else if (value >= -32768L && value < 32768L) { } else if (value >= -32768L && value < 32768L) {
// 0xC9 = int16 marker WriteRAW(underlying_cast(Marker::Int16));
WriteRAW(0xC9);
WriteValue(static_cast<int16_t>(value)); WriteValue(static_cast<int16_t>(value));
} else if (value >= -2147483648L && value < 2147483648L) { } else if (value >= -2147483648L && value < 2147483648L) {
// 0xCA = int32 marker WriteRAW(underlying_cast(Marker::Int32));
WriteRAW(0xCA);
WriteValue(static_cast<int32_t>(value)); WriteValue(static_cast<int32_t>(value));
} else { } else {
// 0xCB = int64 marker WriteRAW(underlying_cast(Marker::Int64));
WriteRAW(0xCB);
WriteValue(value); WriteValue(value);
} }
} }
void WriteDouble(const double &value) { void WriteDouble(const double &value) {
// 0xC1 = float64 marker WriteRAW(underlying_cast(Marker::Float64));
WriteRAW(0xC1);
WriteValue(*reinterpret_cast<const int64_t *>(&value)); WriteValue(*reinterpret_cast<const int64_t *>(&value));
} }
@ -108,38 +94,34 @@ class BaseEncoder : public Loggable {
if (size <= 15) { if (size <= 15) {
uint8_t len = size; uint8_t len = size;
len &= 0x0F; len &= 0x0F;
// tiny marker (+len) WriteRAW(underlying_cast(MarkerTiny[typ]) + len);
WriteRAW(type_tiny_marker[typ] + len);
} else if (size <= 255) { } else if (size <= 255) {
uint8_t len = size; uint8_t len = size;
// 8 marker WriteRAW(underlying_cast(Marker8[typ]));
WriteRAW(type_8_marker[typ]);
WriteRAW(len); WriteRAW(len);
} else if (size <= 65536) { } else if (size <= 65536) {
uint16_t len = size; uint16_t len = size;
// 16 marker WriteRAW(underlying_cast(Marker16[typ]));
WriteRAW(type_16_marker[typ]);
WriteValue(len); WriteValue(len);
} else { } else {
uint32_t len = size; uint32_t len = size;
// 32 marker WriteRAW(underlying_cast(Marker32[typ]));
WriteRAW(type_32_marker[typ]);
WriteValue(len); WriteValue(len);
} }
} }
void WriteString(const std::string &value) { void WriteString(const std::string &value) {
WriteTypeSize(value.size(), TSTRING); WriteTypeSize(value.size(), MarkerString);
WriteRAW(value.c_str(), value.size()); WriteRAW(value.c_str(), value.size());
} }
void WriteList(const std::vector<TypedValue> &value) { void WriteList(const std::vector<TypedValue> &value) {
WriteTypeSize(value.size(), TLIST); WriteTypeSize(value.size(), MarkerList);
for (auto &x : value) WriteTypedValue(x); for (auto &x : value) WriteTypedValue(x);
} }
void WriteMap(const std::map<std::string, TypedValue> &value) { void WriteMap(const std::map<std::string, TypedValue> &value) {
WriteTypeSize(value.size(), TMAP); WriteTypeSize(value.size(), MarkerMap);
for (auto &x : value) { for (auto &x : value) {
WriteString(x.first); WriteString(x.first);
WriteTypedValue(x.second); WriteTypedValue(x.second);
@ -147,8 +129,8 @@ class BaseEncoder : public Loggable {
} }
void WriteVertex(const VertexAccessor &vertex) { void WriteVertex(const VertexAccessor &vertex) {
// 0xB3 = struct 3; 0x4E = vertex signature WriteRAW(underlying_cast(Marker::TinyStruct) + 3);
WriteRAW("\xB3\x4E", 2); WriteRAW(underlying_cast(Signature::Node));
if (encode_ids_) { if (encode_ids_) {
// IMPORTANT: this is used only in the database snapshotter! // IMPORTANT: this is used only in the database snapshotter!
@ -163,13 +145,13 @@ class BaseEncoder : public Loggable {
// write labels // write labels
const auto &labels = vertex.labels(); const auto &labels = vertex.labels();
WriteTypeSize(labels.size(), TLIST); WriteTypeSize(labels.size(), MarkerList);
for (const auto &label : labels) for (const auto &label : labels)
WriteString(vertex.db_accessor().label_name(label)); WriteString(vertex.db_accessor().label_name(label));
// write properties // write properties
const auto &props = vertex.Properties(); const auto &props = vertex.Properties();
WriteTypeSize(props.size(), TMAP); WriteTypeSize(props.size(), MarkerMap);
for (const auto &prop : props) { for (const auto &prop : props) {
WriteString(vertex.db_accessor().property_name(prop.first)); WriteString(vertex.db_accessor().property_name(prop.first));
WriteTypedValue(prop.second); WriteTypedValue(prop.second);
@ -177,8 +159,8 @@ class BaseEncoder : public Loggable {
} }
void WriteEdge(const EdgeAccessor &edge) { void WriteEdge(const EdgeAccessor &edge) {
// 0xB5 = struct 5; 0x52 = edge signature WriteRAW(underlying_cast(Marker::TinyStruct) + 5);
WriteRAW("\xB5\x52", 2); WriteRAW(underlying_cast(Signature::Relationship));
if (encode_ids_) { if (encode_ids_) {
// IMPORTANT: this is used only in the database snapshotter! // IMPORTANT: this is used only in the database snapshotter!
@ -200,7 +182,7 @@ class BaseEncoder : public Loggable {
// write properties // write properties
const auto &props = edge.Properties(); const auto &props = edge.Properties();
WriteTypeSize(props.size(), TMAP); WriteTypeSize(props.size(), MarkerMap);
for (const auto &prop : props) { for (const auto &prop : props) {
WriteString(edge.db_accessor().property_name(prop.first)); WriteString(edge.db_accessor().property_name(prop.first));
WriteTypedValue(prop.second); WriteTypedValue(prop.second);

View File

@ -5,6 +5,7 @@
#include <memory> #include <memory>
#include <vector> #include <vector>
#include "communication/bolt/v1/constants.hpp"
#include "logging/loggable.hpp" #include "logging/loggable.hpp"
#include "utils/bswap.hpp" #include "utils/bswap.hpp"
@ -15,16 +16,7 @@ namespace communication::bolt {
// -> test for more TCP packets! // -> test for more TCP packets!
/** /**
* Sizes related to the chunk defined in Bolt protocol. * @brief ChunkedEncoderBuffer
*/
static constexpr size_t CHUNK_HEADER_SIZE = 2;
static constexpr size_t MAX_CHUNK_SIZE = 65535;
static constexpr size_t CHUNK_END_MARKER_SIZE = 2;
static constexpr size_t WHOLE_CHUNK_SIZE =
CHUNK_HEADER_SIZE + MAX_CHUNK_SIZE + CHUNK_END_MARKER_SIZE;
/**
* @brief ChunkedBuffer
* *
* Has methods for writing and flushing data into the message buffer. * Has methods for writing and flushing data into the message buffer.
* *
@ -44,9 +36,9 @@ static constexpr size_t WHOLE_CHUNK_SIZE =
* @tparam Socket the output socket that should be used * @tparam Socket the output socket that should be used
*/ */
template <class Socket> template <class Socket>
class ChunkedBuffer : public Loggable { class ChunkedEncoderBuffer : public Loggable {
public: public:
ChunkedBuffer(Socket &socket) : Loggable("Chunked Buffer"), socket_(socket) {} ChunkedEncoderBuffer(Socket &socket) : Loggable("Chunked Encoder Buffer"), socket_(socket) {}
/** /**
* Writes n values into the buffer. If n is bigger than whole chunk size * Writes n values into the buffer. If n is bigger than whole chunk size

View File

@ -1,6 +1,6 @@
#pragma once #pragma once
#include "communication/bolt/v1/encoder/chunked_buffer.hpp" #include "communication/bolt/v1/encoder/chunked_encoder_buffer.hpp"
#include "communication/bolt/v1/encoder/encoder.hpp" #include "communication/bolt/v1/encoder/encoder.hpp"
#include "query/backend/cpp/typed_value.hpp" #include "query/backend/cpp/typed_value.hpp"

View File

@ -31,7 +31,7 @@ template <typename Socket>
class Session : public Loggable { class Session : public Loggable {
public: public:
using Decoder = BoltDecoder; using Decoder = BoltDecoder;
using OutputStream = ResultStream<Encoder<ChunkedBuffer<Socket>>>; using OutputStream = ResultStream<Encoder<ChunkedEncoderBuffer<Socket>>>;
Session(Socket &&socket, Dbms &dbms, QueryEngine<OutputStream> &query_engine) Session(Socket &&socket, Dbms &dbms, QueryEngine<OutputStream> &query_engine)
: Loggable("communication::bolt::Session"), : Loggable("communication::bolt::Session"),
@ -63,7 +63,7 @@ class Session : public Loggable {
* @param data pointer on bytes received from a client * @param data pointer on bytes received from a client
* @param len length of data received from a client * @param len length of data received from a client
*/ */
void Execute(const byte *data, size_t len) { void Execute(const uint8_t *data, size_t len) {
// mark the end of the message // mark the end of the message
auto end = data + len; auto end = data + len;
@ -112,8 +112,8 @@ class Session : public Loggable {
Socket socket_; Socket socket_;
Dbms &dbms_; Dbms &dbms_;
QueryEngine<OutputStream> &query_engine_; QueryEngine<OutputStream> &query_engine_;
ChunkedBuffer<Socket> encoder_buffer_; ChunkedEncoderBuffer<Socket> encoder_buffer_;
Encoder<ChunkedBuffer<Socket>> encoder_; Encoder<ChunkedEncoderBuffer<Socket>> encoder_;
OutputStream output_stream_; OutputStream output_stream_;
Decoder decoder_; Decoder decoder_;
io::network::Epoll::Event event_; io::network::Epoll::Event event_;

View File

@ -73,7 +73,7 @@ class Worker
logger_.trace("[on_read] Received {}B", buf.len); logger_.trace("[on_read] Received {}B", buf.len);
try { try {
session.Execute(reinterpret_cast<const byte *>(buf.ptr), buf.len); session.Execute(buf.data, buf.len);
} catch (const std::exception &e) { } catch (const std::exception &e) {
logger_.error("Error occured while executing statement."); logger_.error("Error occured while executing statement.");
logger_.error("{}", e.what()); logger_.error("{}", e.what());
@ -96,7 +96,7 @@ class Worker
// TODO: Do something about it // TODO: Do something about it
} }
char buf_[65536]; uint8_t buf_[65536];
std::thread thread_; std::thread thread_;
void Start(std::atomic<bool> &alive) { void Start(std::atomic<bool> &alive) {

View File

@ -0,0 +1,17 @@
#pragma once
#include <cstdint>
namespace io::network {
/**
* StreamBuffer
* Used for getting a pointer and size of a preallocated block of memory.
* The network stack than uses this block of memory to read data from a
* socket.
*/
struct StreamBuffer {
uint8_t *data;
size_t len;
};
}

View File

@ -1,15 +1,10 @@
#pragma once #pragma once
#include "io/network/stream_buffer.hpp"
#include "io/network/stream_listener.hpp" #include "io/network/stream_listener.hpp"
#include "memory/literals.hpp" #include "memory/literals.hpp"
namespace io::network { namespace io::network {
using namespace memory::literals;
struct StreamBuffer {
char* ptr;
size_t len;
};
/** /**
* This class is used to get data from a socket that has been notified * This class is used to get data from a socket that has been notified
@ -62,7 +57,7 @@ class StreamReader : public StreamListener<Derived, Stream> {
auto buf = this->derived().OnAlloc(stream); auto buf = this->derived().OnAlloc(stream);
// read from the buffer at most buf.len bytes // read from the buffer at most buf.len bytes
buf.len = stream.socket_.Read(buf.ptr, buf.len); buf.len = stream.socket_.Read(buf.data, buf.len);
// check for read errors // check for read errors
if (buf.len == -1) { if (buf.len == -1) {

View File

@ -23,7 +23,7 @@ using endpoint_t = io::network::NetworkEndpoint;
using socket_t = io::network::Socket; using socket_t = io::network::Socket;
using session_t = communication::bolt::Session<socket_t>; using session_t = communication::bolt::Session<socket_t>;
using result_stream_t = communication::bolt::ResultStream< using result_stream_t = communication::bolt::ResultStream<
communication::bolt::Encoder<communication::bolt::ChunkedBuffer<socket_t>>>; communication::bolt::Encoder<communication::bolt::ChunkedEncoderBuffer<socket_t>>>;
using bolt_server_t = using bolt_server_t =
communication::Server<session_t, result_stream_t, socket_t>; communication::Server<session_t, result_stream_t, socket_t>;

View File

@ -6,7 +6,7 @@
#include "communication/bolt/v1/encoder/result_stream.hpp" #include "communication/bolt/v1/encoder/result_stream.hpp"
#include "io/network/socket.hpp" #include "io/network/socket.hpp"
using Stream = communication::bolt::ResultStream<communication::bolt::Encoder< using Stream = communication::bolt::ResultStream<communication::bolt::Encoder<
communication::bolt::ChunkedBuffer<io::network::Socket>>>; communication::bolt::ChunkedEncoderBuffer<io::network::Socket>>>;
#else #else
#include "../stream/print_record_stream.hpp" #include "../stream/print_record_stream.hpp"
using Stream = PrintRecordStream; using Stream = PrintRecordStream;

View File

@ -0,0 +1,55 @@
#include "bolt_common.hpp"
#include "communication/bolt/v1/decoder/buffer.hpp"
constexpr const int SIZE = 4096;
uint8_t data[SIZE];
using BufferT = communication::bolt::Buffer;
using StreamBufferT = io::network::StreamBuffer;
TEST(BoltBuffer, AllocateAndWritten) {
BufferT buffer;
StreamBufferT sb = buffer.Allocate();
memcpy(sb.data, data, 1000);
buffer.Written(1000);
ASSERT_EQ(buffer.size(), 1000);
uint8_t *tmp = buffer.data();
for (int i = 0; i < 1000; ++i)
EXPECT_EQ(data[i], tmp[i]);
}
TEST(BoltBuffer, Shift) {
BufferT buffer;
StreamBufferT sb = buffer.Allocate();
memcpy(sb.data, data, 1000);
buffer.Written(1000);
sb = buffer.Allocate();
memcpy(sb.data, data + 1000, 1000);
buffer.Written(1000);
ASSERT_EQ(buffer.size(), 2000);
uint8_t *tmp = buffer.data();
for (int i = 0; i < 1000; ++i)
EXPECT_EQ(data[i], tmp[i]);
buffer.Shift(1000);
ASSERT_EQ(buffer.size(), 1000);
tmp = buffer.data();
for (int i = 0; i < 1000; ++i)
EXPECT_EQ(data[i + 1000], tmp[i]);
}
int main(int argc, char **argv) {
InitializeData(data, SIZE);
logging::init_sync();
logging::log->pipe(std::make_unique<Stdout>());
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -0,0 +1,146 @@
#include "bolt_common.hpp"
#include "communication/bolt/v1/decoder/buffer.hpp"
#include "communication/bolt/v1/decoder/chunked_decoder_buffer.hpp"
constexpr const int SIZE = 131072;
uint8_t data[SIZE];
using BufferT = communication::bolt::Buffer;
using StreamBufferT = io::network::StreamBuffer;
using DecoderBufferT = communication::bolt::ChunkedDecoderBuffer;
TEST(BoltBuffer, CorrectChunk) {
uint8_t tmp[2000];
BufferT buffer;
DecoderBufferT decoder_buffer(buffer);
StreamBufferT sb = buffer.Allocate();
sb.data[0] = 0x03; sb.data[1] = 0xe8;
memcpy(sb.data + 2, data, 1000);
sb.data[1002] = 0; sb.data[1003] = 0;
buffer.Written(1004);
ASSERT_EQ(decoder_buffer.GetChunk(), true);
ASSERT_EQ(decoder_buffer.Read(tmp, 1000), true);
for (int i = 0; i < 1000; ++i)
EXPECT_EQ(data[i], tmp[i]);
ASSERT_EQ(buffer.size(), 0);
}
TEST(BoltBuffer, CorrectChunkTrailingData) {
uint8_t tmp[2000];
BufferT buffer;
DecoderBufferT decoder_buffer(buffer);
StreamBufferT sb = buffer.Allocate();
sb.data[0] = 0x03; sb.data[1] = 0xe8;
memcpy(sb.data + 2, data, 2002);
sb.data[1002] = 0; sb.data[1003] = 0;
buffer.Written(2004);
ASSERT_EQ(decoder_buffer.GetChunk(), true);
ASSERT_EQ(decoder_buffer.Read(tmp, 1000), true);
for (int i = 0; i < 1000; ++i)
EXPECT_EQ(data[i], tmp[i]);
uint8_t *leftover = buffer.data();
ASSERT_EQ(buffer.size(), 1000);
for (int i = 0; i < 1000; ++i)
EXPECT_EQ(data[i + 1002], leftover[i]);
}
TEST(BoltBuffer, InvalidChunk) {
BufferT buffer;
DecoderBufferT decoder_buffer(buffer);
StreamBufferT sb = buffer.Allocate();
sb.data[0] = 0x03; sb.data[1] = 0xe8;
memcpy(sb.data + 2, data, 2002);
sb.data[1002] = 1; sb.data[1003] = 1;
buffer.Written(2004);
ASSERT_EQ(decoder_buffer.GetChunk(), false);
ASSERT_EQ(buffer.size(), 1000);
uint8_t *tmp = buffer.data();
for (int i = 0; i < 1000; ++i)
EXPECT_EQ(data[i + 1002], tmp[i]);
}
TEST(BoltBuffer, GraduallyPopulatedChunk) {
uint8_t tmp[2000];
BufferT buffer;
DecoderBufferT decoder_buffer(buffer);
StreamBufferT sb = buffer.Allocate();
sb.data[0] = 0x03; sb.data[1] = 0xe8;
buffer.Written(2);
ASSERT_EQ(decoder_buffer.GetChunk(), false);
for (int i = 0; i < 5; ++i) {
sb = buffer.Allocate();
memcpy(sb.data, data + 200 * i, 200);
buffer.Written(200);
ASSERT_EQ(decoder_buffer.GetChunk(), false);
}
sb = buffer.Allocate();
sb.data[0] = 0; sb.data[1] = 0;
buffer.Written(2);
ASSERT_EQ(decoder_buffer.GetChunk(), true);
ASSERT_EQ(decoder_buffer.Read(tmp, 1000), true);
for (int i = 0; i < 1000; ++i)
EXPECT_EQ(data[i], tmp[i]);
ASSERT_EQ(buffer.size(), 0);
}
TEST(BoltBuffer, GraduallyPopulatedChunkTrailingData) {
uint8_t tmp[2000];
BufferT buffer;
DecoderBufferT decoder_buffer(buffer);
StreamBufferT sb = buffer.Allocate();
sb.data[0] = 0x03; sb.data[1] = 0xe8;
buffer.Written(2);
ASSERT_EQ(decoder_buffer.GetChunk(), false);
for (int i = 0; i < 5; ++i) {
sb = buffer.Allocate();
memcpy(sb.data, data + 200 * i, 200);
buffer.Written(200);
ASSERT_EQ(decoder_buffer.GetChunk(), false);
}
sb = buffer.Allocate();
sb.data[0] = 0; sb.data[1] = 0;
buffer.Written(2);
sb = buffer.Allocate();
memcpy(sb.data, data, 1000);
buffer.Written(1000);
ASSERT_EQ(decoder_buffer.GetChunk(), true);
ASSERT_EQ(decoder_buffer.Read(tmp, 1000), true);
for (int i = 0; i < 1000; ++i)
EXPECT_EQ(data[i], tmp[i]);
uint8_t *leftover = buffer.data();
ASSERT_EQ(buffer.size(), 1000);
for (int i = 0; i < 1000; ++i)
EXPECT_EQ(data[i], leftover[i]);
}
int main(int argc, char **argv) {
InitializeData(data, SIZE);
logging::init_sync();
logging::log->pipe(std::make_unique<Stdout>());
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -1,9 +1,9 @@
#include "bolt_common.hpp" #include "bolt_common.hpp"
#include "communication/bolt/v1/encoder/chunked_buffer.hpp" #include "communication/bolt/v1/encoder/chunked_encoder_buffer.hpp"
// aliases // aliases
using SocketT = TestSocket; using SocketT = TestSocket;
using BufferT = communication::bolt::ChunkedBuffer<SocketT>; using BufferT = communication::bolt::ChunkedEncoderBuffer<SocketT>;
// "alias" constants // "alias" constants
static constexpr auto CHS = communication::bolt::CHUNK_HEADER_SIZE; static constexpr auto CHS = communication::bolt::CHUNK_HEADER_SIZE;
@ -41,7 +41,7 @@ void VerifyChunkOfOnes(uint8_t *data, int size, uint8_t element) {
ASSERT_EQ(*(data + CHS + size + 1), 0x00); ASSERT_EQ(*(data + CHS + size + 1), 0x00);
} }
TEST(BoltChunkedBuffer, OneSmallChunk) { TEST(BoltChunkedEncoderBuffer, OneSmallChunk) {
// initialize array of 100 ones (small chunk) // initialize array of 100 ones (small chunk)
int size = 100; int size = 100;
uint8_t element = '1'; uint8_t element = '1';
@ -60,7 +60,7 @@ TEST(BoltChunkedBuffer, OneSmallChunk) {
VerifyChunkOfOnes(socket.output.data(), size, element); VerifyChunkOfOnes(socket.output.data(), size, element);
} }
TEST(BoltChunkedBuffer, TwoSmallChunks) { TEST(BoltChunkedEncoderBuffer, TwoSmallChunks) {
// initialize the small arrays // initialize the small arrays
int size1 = 100; int size1 = 100;
uint8_t element1 = '1'; uint8_t element1 = '1';
@ -87,7 +87,7 @@ TEST(BoltChunkedBuffer, TwoSmallChunks) {
VerifyChunkOfOnes(data + CHS + size1 + CEMS, size2, element2); VerifyChunkOfOnes(data + CHS + size1 + CEMS, size2, element2);
} }
TEST(BoltChunkedBuffer, OneAndAHalfOfMaxChunk) { TEST(BoltChunkedEncoderBuffer, OneAndAHalfOfMaxChunk) {
// initialize a big chunk // initialize a big chunk
int size = 100000; int size = 100000;
uint8_t element = '1'; uint8_t element = '1';

422
tests/unit/bolt_decoder.cpp Normal file
View File

@ -0,0 +1,422 @@
#include "bolt_common.hpp"
#include "bolt_testdata.hpp"
#include "communication/bolt/v1/decoder/decoder.hpp"
#include "query/backend/cpp/typed_value.hpp"
constexpr const int SIZE = 131072;
uint8_t data[SIZE];
/**
* TestDecoderBuffer
* This class provides a dummy Buffer used for testing the Decoder.
* It's Read function is the necessary public interface for the Decoder.
* It's Write and Clear methods are used for testing. Through the Write
* method you can store data in the buffer, and throgh the Clear method
* you can clear the buffer. The decoder uses the Read function to get
* data from the buffer.
*/
class TestDecoderBuffer {
public:
bool Read(uint8_t *data, size_t len) {
if (len > buffer_.size()) return false;
memcpy(data, buffer_.data(), len);
buffer_.erase(buffer_.begin(), buffer_.begin() + len);
return true;
}
void Write(const uint8_t *data, size_t len) {
for (size_t i = 0; i < len; ++i)
buffer_.push_back(data[i]);
}
void Clear() {
buffer_.clear();
}
private:
std::vector<uint8_t> buffer_;
};
using DecoderT = communication::bolt::Decoder<TestDecoderBuffer>;
TEST(BoltDecoder, NullAndBool) {
TestDecoderBuffer buffer;
DecoderT decoder(buffer);
TypedValue tv;
// test null
buffer.Write((const uint8_t *)"\xC0", 1);
ASSERT_EQ(decoder.ReadTypedValue(&tv), true);
ASSERT_EQ(tv.type(), TypedValue::Type::Null);
// test true
buffer.Write((const uint8_t *)"\xC3", 1);
ASSERT_EQ(decoder.ReadTypedValue(&tv), true);
ASSERT_EQ(tv.type(), TypedValue::Type::Bool);
ASSERT_EQ(tv.Value<bool>(), true);
// test false
buffer.Write((const uint8_t *)"\xC2", 1);
ASSERT_EQ(decoder.ReadTypedValue(&tv), true);
ASSERT_EQ(tv.type(), TypedValue::Type::Bool);
ASSERT_EQ(tv.Value<bool>(), false);
}
TEST(BoltDecoder, Int) {
TestDecoderBuffer buffer;
DecoderT decoder(buffer);
TypedValue tv;
// test invalid marker
buffer.Clear();
buffer.Write((uint8_t *)"\xCD", 1); // 0xCD is reserved in the protocol
ASSERT_EQ(decoder.ReadTypedValue(&tv), false);
for (int i = 0; i < 28; ++i) {
// test missing data
buffer.Clear();
buffer.Write(int_encoded[i], int_encoded_len[i] - 1);
ASSERT_EQ(decoder.ReadTypedValue(&tv), false);
// test all ok
buffer.Clear();
buffer.Write(int_encoded[i], int_encoded_len[i]);
ASSERT_EQ(decoder.ReadTypedValue(&tv), true);
ASSERT_EQ(tv.type(), TypedValue::Type::Int);
ASSERT_EQ(tv.Value<int64_t>(), int_decoded[i]);
}
}
TEST(BoltDecoder, Double) {
TestDecoderBuffer buffer;
DecoderT decoder(buffer);
TypedValue tv;
for (int i = 0; i < 4; ++i) {
// test missing data
buffer.Clear();
buffer.Write(double_encoded[i], 8);
ASSERT_EQ(decoder.ReadTypedValue(&tv), false);
// test all ok
buffer.Clear();
buffer.Write(double_encoded[i], 9);
ASSERT_EQ(decoder.ReadTypedValue(&tv), true);
ASSERT_EQ(tv.type(), TypedValue::Type::Double);
ASSERT_EQ(tv.Value<double>(), double_decoded[i]);
}
}
TEST(BoltDecoder, String) {
TestDecoderBuffer buffer;
DecoderT decoder(buffer);
TypedValue tv;
uint8_t headers[][6] = {"\x8F", "\xD0\x0F", "\xD1\x00\x0F", "\xD2\x00\x00\x00\x0F"};
int headers_len[] = {1, 2, 3, 5};
for (int i = 0; i < 4; ++i) {
// test missing data in header
buffer.Clear();
buffer.Write(headers[i], headers_len[i] - 1);
ASSERT_EQ(decoder.ReadTypedValue(&tv), false);
// test missing elements
buffer.Clear();
buffer.Write(headers[i], headers_len[i]);
buffer.Write(data, 14);
ASSERT_EQ(decoder.ReadTypedValue(&tv), false);
// test all ok
buffer.Clear();
buffer.Write(headers[i], headers_len[i]);
buffer.Write(data, 15);
ASSERT_EQ(decoder.ReadTypedValue(&tv), true);
ASSERT_EQ(tv.type(), TypedValue::Type::String);
std::string &str = tv.Value<std::string>();
for (int j = 0; j < 15; ++j)
EXPECT_EQ((uint8_t)str[j], data[j]);
}
}
TEST(BoltDecoder, List) {
TestDecoderBuffer buffer;
DecoderT decoder(buffer);
TypedValue tv;
uint8_t headers[][6] = {"\x9F", "\xD4\x0F", "\xD5\x00\x0F", "\xD6\x00\x00\x00\x0F"};
int headers_len[] = {1, 2, 3, 5};
for (int i = 0; i < 4; ++i) {
// test missing data in header
buffer.Clear();
buffer.Write(headers[i], headers_len[i] - 1);
ASSERT_EQ(decoder.ReadTypedValue(&tv), false);
// test missing elements
buffer.Clear();
buffer.Write(headers[i], headers_len[i]);
for (uint8_t j = 0; j < 14; ++j)
buffer.Write(&j, 1);
ASSERT_EQ(decoder.ReadTypedValue(&tv), false);
// test all ok
buffer.Clear();
buffer.Write(headers[i], headers_len[i]);
for (uint8_t j = 0; j < 15; ++j)
buffer.Write(&j, 1);
ASSERT_EQ(decoder.ReadTypedValue(&tv), true);
ASSERT_EQ(tv.type(), TypedValue::Type::List);
std::vector<TypedValue> &val = tv.Value<std::vector<TypedValue>>();
ASSERT_EQ(val.size(), 15);
for (int j = 0; j < 15; ++j)
EXPECT_EQ(val[j].Value<int64_t>(), j);
}
}
TEST(BoltDecoder, Map) {
TestDecoderBuffer buffer;
DecoderT decoder(buffer);
TypedValue tv;
uint8_t headers[][6] = {"\xAF", "\xD8\x0F", "\xD9\x00\x0F", "\xDA\x00\x00\x00\x0F"};
int headers_len[] = {1, 2, 3, 5};
uint8_t index[] = "\x81\x61";
uint8_t wrong_index = 1;
for (int i = 0; i < 4; ++i) {
// test missing data in header
buffer.Clear();
buffer.Write(headers[i], headers_len[i] - 1);
ASSERT_EQ(decoder.ReadTypedValue(&tv), false);
// test wrong index type
buffer.Clear();
buffer.Write(headers[i], headers_len[i]);
buffer.Write(&wrong_index, 1);
buffer.Write(&wrong_index, 1);
ASSERT_EQ(decoder.ReadTypedValue(&tv), false);
// test missing element data
buffer.Clear();
buffer.Write(headers[i], headers_len[i]);
buffer.Write(index, 2);
ASSERT_EQ(decoder.ReadTypedValue(&tv), false);
// test missing elements
buffer.Clear();
buffer.Write(headers[i], headers_len[i]);
for (uint8_t j = 0; j < 14; ++j) {
buffer.Write(index, 2);
buffer.Write(&j, 1);
}
ASSERT_EQ(decoder.ReadTypedValue(&tv), false);
// test elements with same index
buffer.Clear();
buffer.Write(headers[i], headers_len[i]);
for (uint8_t j = 0; j < 15; ++j) {
uint8_t tmp = 'a' + j;
buffer.Write(index, 2);
buffer.Write(&j, 1);
}
ASSERT_EQ(decoder.ReadTypedValue(&tv), false);
// test all ok
buffer.Clear();
buffer.Write(headers[i], headers_len[i]);
for (uint8_t j = 0; j < 15; ++j) {
uint8_t tmp = 'a' + j;
buffer.Write(index, 1);
buffer.Write(&tmp, 1);
buffer.Write(&j, 1);
}
ASSERT_EQ(decoder.ReadTypedValue(&tv), true);
ASSERT_EQ(tv.type(), TypedValue::Type::Map);
std::map<std::string, TypedValue> &val = tv.Value<std::map<std::string, TypedValue>>();
ASSERT_EQ(val.size(), 15);
for (int j = 0; j < 15; ++j) {
char tmp_chr = 'a' + j;
TypedValue tmp_tv = val[std::string(1, tmp_chr)];
EXPECT_EQ(tmp_tv.type(), TypedValue::Type::Int);
EXPECT_EQ(tmp_tv.Value<int64_t>(), j);
}
}
}
TEST(BoltDecoder, Vertex) {
TestDecoderBuffer buffer;
DecoderT decoder(buffer);
communication::bolt::DecodedVertex dv;
uint8_t header[] = "\xB3\x4E";
uint8_t wrong_header[] = "\x00\x00";
uint8_t test_int[] = "\x01";
uint8_t test_str[] = "\x81\x61";
uint8_t test_list[] = "\x91";
uint8_t test_map[] = "\xA1";
// test missing signature
buffer.Clear();
buffer.Write(wrong_header, 1);
ASSERT_EQ(decoder.ReadVertex(&dv), false);
// test wrong marker
buffer.Clear();
buffer.Write(wrong_header, 2);
ASSERT_EQ(decoder.ReadVertex(&dv), false);
// test wrong signature
buffer.Clear();
buffer.Write(header, 1);
buffer.Write(wrong_header, 1);
ASSERT_EQ(decoder.ReadVertex(&dv), false);
// test ID wrong type
buffer.Clear();
buffer.Write(header, 2);
buffer.Write(test_str, 2);
ASSERT_EQ(decoder.ReadVertex(&dv), false);
// test labels wrong outer type
buffer.Clear();
buffer.Write(header, 2);
buffer.Write(test_int, 1);
buffer.Write(test_int, 1);
ASSERT_EQ(decoder.ReadVertex(&dv), false);
// test labels wrong inner type
buffer.Clear();
buffer.Write(header, 2);
buffer.Write(test_int, 1);
buffer.Write(test_list, 1);
buffer.Write(test_int, 1);
ASSERT_EQ(decoder.ReadVertex(&dv), false);
// test properties wrong outer type
buffer.Clear();
buffer.Write(header, 2);
buffer.Write(test_int, 1);
buffer.Write(test_list, 1);
buffer.Write(test_str, 2);
ASSERT_EQ(decoder.ReadVertex(&dv), false);
// test all ok
buffer.Clear();
buffer.Write(header, 2);
buffer.Write(test_int, 1);
buffer.Write(test_list, 1);
buffer.Write(test_str, 2);
buffer.Write(test_map, 1);
buffer.Write(test_str, 2);
buffer.Write(test_int, 1);
ASSERT_EQ(decoder.ReadVertex(&dv), true);
ASSERT_EQ(dv.id, 1);
ASSERT_EQ(dv.labels[0], std::string("a"));
ASSERT_EQ(dv.properties[std::string("a")].Value<int64_t>(), 1);
}
TEST(BoltDecoder, Edge) {
TestDecoderBuffer buffer;
DecoderT decoder(buffer);
communication::bolt::DecodedEdge de;
uint8_t header[] = "\xB5\x52";
uint8_t wrong_header[] = "\x00\x00";
uint8_t test_int1[] = "\x01";
uint8_t test_int2[] = "\x02";
uint8_t test_int3[] = "\x03";
uint8_t test_str[] = "\x81\x61";
uint8_t test_list[] = "\x91";
uint8_t test_map[] = "\xA1";
// test missing signature
buffer.Clear();
buffer.Write(wrong_header, 1);
ASSERT_EQ(decoder.ReadEdge(&de), false);
// test wrong marker
buffer.Clear();
buffer.Write(wrong_header, 2);
ASSERT_EQ(decoder.ReadEdge(&de), false);
// test wrong signature
buffer.Clear();
buffer.Write(header, 1);
buffer.Write(wrong_header, 1);
ASSERT_EQ(decoder.ReadEdge(&de), false);
// test ID wrong type
buffer.Clear();
buffer.Write(header, 2);
buffer.Write(test_str, 2);
ASSERT_EQ(decoder.ReadEdge(&de), false);
// test from_id wrong type
buffer.Clear();
buffer.Write(header, 2);
buffer.Write(test_int1, 1);
buffer.Write(test_str, 2);
ASSERT_EQ(decoder.ReadEdge(&de), false);
// test to_id wrong type
buffer.Clear();
buffer.Write(header, 2);
buffer.Write(test_int1, 1);
buffer.Write(test_int2, 1);
buffer.Write(test_str, 2);
ASSERT_EQ(decoder.ReadEdge(&de), false);
// test type wrong type
buffer.Clear();
buffer.Write(header, 2);
buffer.Write(test_int1, 1);
buffer.Write(test_int2, 1);
buffer.Write(test_int3, 1);
buffer.Write(test_int1, 1);
ASSERT_EQ(decoder.ReadEdge(&de), false);
// test properties wrong outer type
buffer.Clear();
buffer.Write(header, 2);
buffer.Write(test_int1, 1);
buffer.Write(test_int2, 1);
buffer.Write(test_int3, 1);
buffer.Write(test_str, 2);
buffer.Write(test_int1, 1);
ASSERT_EQ(decoder.ReadEdge(&de), false);
// test all ok
buffer.Clear();
buffer.Write(header, 2);
buffer.Write(test_int1, 1);
buffer.Write(test_int2, 1);
buffer.Write(test_int3, 1);
buffer.Write(test_str, 2);
buffer.Write(test_map, 1);
buffer.Write(test_str, 2);
buffer.Write(test_int1, 1);
ASSERT_EQ(decoder.ReadEdge(&de), true);
ASSERT_EQ(de.id, 1);
ASSERT_EQ(de.from, 2);
ASSERT_EQ(de.to, 3);
ASSERT_EQ(de.type, std::string("a"));
ASSERT_EQ(de.properties[std::string("a")].Value<int64_t>(), 1);
}
int main(int argc, char **argv) {
InitializeData(data, SIZE);
logging::init_sync();
logging::log->pipe(std::make_unique<Stdout>());
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -1,4 +1,5 @@
#include "bolt_common.hpp" #include "bolt_common.hpp"
#include "bolt_testdata.hpp"
#include "communication/bolt/v1/encoder/encoder.hpp" #include "communication/bolt/v1/encoder/encoder.hpp"
#include "database/graph_db.hpp" #include "database/graph_db.hpp"
@ -9,53 +10,8 @@
* TODO (mferencevic): document * TODO (mferencevic): document
*/ */
// clang-format off
const int64_t int_input[] = {
0, -1, -8, -16, 1, 63, 127, -128, -20, -17, -32768, -12345, -129, 128,
12345, 32767, -2147483648L, -12345678L, -32769L, 32768L, 12345678L,
2147483647L, -9223372036854775807L, -12345678912345L, -2147483649L,
2147483648L, 12345678912345L, 9223372036854775807};
const uint8_t int_output[][10] = {
"\x00", "\xFF", "\xF8", "\xF0", "\x01", "\x3F", "\x7F", "\xC8\x80",
"\xC8\xEC", "\xC8\xEF", "\xC9\x80\x00", "\xC9\xCF\xC7", "\xC9\xFF\x7F",
"\xC9\x00\x80", "\xC9\x30\x39", "\xC9\x7F\xFF", "\xCA\x80\x00\x00\x00",
"\xCA\xFF\x43\x9E\xB2", "\xCA\xFF\xFF\x7F\xFF", "\xCA\x00\x00\x80\x00",
"\xCA\x00\xBC\x61\x4E", "\xCA\x7F\xFF\xFF\xFF",
"\xCB\x80\x00\x00\x00\x00\x00\x00\x01",
"\xCB\xFF\xFF\xF4\xC5\x8C\x31\xA4\xA7",
"\xCB\xFF\xFF\xFF\xFF\x7F\xFF\xFF\xFF",
"\xCB\x00\x00\x00\x00\x80\x00\x00\x00",
"\xCB\x00\x00\x0B\x3A\x73\xCE\x5B\x59",
"\xCB\x7F\xFF\xFF\xFF\xFF\xFF\xFF\xFF"};
// clang-format on
const uint32_t int_output_len[] = {1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 3, 3, 3, 3,
3, 3, 5, 5, 5, 5, 5, 5, 9, 9, 9, 9, 9, 9};
const double double_input[] = {5.834, 108.199, 43677.9882, 254524.5851};
const uint8_t double_output[][10] = {"\xC1\x40\x17\x56\x04\x18\x93\x74\xBC",
"\xC1\x40\x5B\x0C\xBC\x6A\x7E\xF9\xDB",
"\xC1\x40\xE5\x53\xBF\x9F\x55\x9B\x3D",
"\xC1\x41\x0F\x11\xE4\xAE\x48\xE8\xA7"};
const uint8_t vertexedge_output[] =
"\xB1\x71\x93\xB3\x4E\x00\x92\x86\x6C\x61\x62\x65\x6C\x31\x86\x6C\x61\x62"
"\x65\x6C\x32\xA2\x85\x70\x72\x6F\x70\x31\x0C\x85\x70\x72\x6F\x70\x32\xC9"
"\x00\xC8\xB3\x4E\x00\x90\xA0\xB5\x52\x00\x00\x00\x88\x65\x64\x67\x65\x74"
"\x79\x70\x65\xA2\x85\x70\x72\x6F\x70\x33\x2A\x85\x70\x72\x6F\x70\x34\xC9"
"\x04\xD2";
constexpr const int SIZE = 131072; constexpr const int SIZE = 131072;
uint8_t data[SIZE]; uint8_t data[SIZE];
const uint64_t sizes[] = {0, 1, 5, 15, 16, 120,
255, 256, 12345, 65535, 65536, 100000};
const uint64_t sizes_num = 12;
constexpr const int STRING = 0, LIST = 1, MAP = 2;
const uint8_t type_tiny_magic[] = {0x80, 0x90, 0xA0};
const uint8_t type_8_magic[] = {0xD0, 0xD4, 0xD8};
const uint8_t type_16_magic[] = {0xD1, 0xD5, 0xD9};
const uint8_t type_32_magic[] = {0xD2, 0xD6, 0xDA};
void CheckTypeSize(std::vector<uint8_t> &v, int typ, uint64_t size) { void CheckTypeSize(std::vector<uint8_t> &v, int typ, uint64_t size) {
if (size <= 15) { if (size <= 15) {
@ -103,21 +59,21 @@ TEST(BoltEncoder, NullAndBool) {
TEST(BoltEncoder, Int) { TEST(BoltEncoder, Int) {
int N = 28; int N = 28;
std::vector<TypedValue> vals; std::vector<TypedValue> vals;
for (int i = 0; i < N; ++i) vals.push_back(TypedValue(int_input[i])); for (int i = 0; i < N; ++i) vals.push_back(TypedValue(int_decoded[i]));
bolt_encoder.MessageRecord(vals); bolt_encoder.MessageRecord(vals);
CheckRecordHeader(output, N); CheckRecordHeader(output, N);
for (int i = 0; i < N; ++i) for (int i = 0; i < N; ++i)
CheckOutput(output, int_output[i], int_output_len[i], false); CheckOutput(output, int_encoded[i], int_encoded_len[i], false);
CheckOutput(output, nullptr, 0); CheckOutput(output, nullptr, 0);
} }
TEST(BoltEncoder, Double) { TEST(BoltEncoder, Double) {
int N = 4; int N = 4;
std::vector<TypedValue> vals; std::vector<TypedValue> vals;
for (int i = 0; i < N; ++i) vals.push_back(TypedValue(double_input[i])); for (int i = 0; i < N; ++i) vals.push_back(TypedValue(double_decoded[i]));
bolt_encoder.MessageRecord(vals); bolt_encoder.MessageRecord(vals);
CheckRecordHeader(output, N); CheckRecordHeader(output, N);
for (int i = 0; i < N; ++i) CheckOutput(output, double_output[i], 9, false); for (int i = 0; i < N; ++i) CheckOutput(output, double_encoded[i], 9, false);
CheckOutput(output, nullptr, 0); CheckOutput(output, nullptr, 0);
} }
@ -209,7 +165,7 @@ TEST(BoltEncoder, VertexAndEdge) {
vals.push_back(TypedValue(va2)); vals.push_back(TypedValue(va2));
vals.push_back(TypedValue(ea)); vals.push_back(TypedValue(ea));
bolt_encoder.MessageRecord(vals); bolt_encoder.MessageRecord(vals);
CheckOutput(output, vertexedge_output, 74); CheckOutput(output, vertexedge_encoded, 74);
} }
TEST(BoltEncoder, BoltV1ExampleMessages) { TEST(BoltEncoder, BoltV1ExampleMessages) {

View File

@ -1,18 +1,14 @@
#include "bolt_common.hpp" #include "bolt_common.hpp"
#include "communication/bolt/v1/encoder/chunked_buffer.hpp" #include "communication/bolt/v1/encoder/chunked_encoder_buffer.hpp"
#include "communication/bolt/v1/encoder/encoder.hpp" #include "communication/bolt/v1/encoder/encoder.hpp"
#include "communication/bolt/v1/encoder/result_stream.hpp" #include "communication/bolt/v1/encoder/result_stream.hpp"
#include "query/backend/cpp/typed_value.hpp" #include "query/backend/cpp/typed_value.hpp"
using BufferT = communication::bolt::ChunkedBuffer<TestSocket>; using BufferT = communication::bolt::ChunkedEncoderBuffer<TestSocket>;
using EncoderT = communication::bolt::Encoder<BufferT>; using EncoderT = communication::bolt::Encoder<BufferT>;
using ResultStreamT = communication::bolt::ResultStream<EncoderT>; using ResultStreamT = communication::bolt::ResultStream<EncoderT>;
/**
* TODO (mferencevic): document
*/
const uint8_t header_output[] = const uint8_t header_output[] =
"\x00\x29\xB1\x70\xA1\x86\x66\x69\x65\x6C\x64\x73\x9A\x82\x61\x61\x82\x62" "\x00\x29\xB1\x70\xA1\x86\x66\x69\x65\x6C\x64\x73\x9A\x82\x61\x61\x82\x62"
"\x62\x82\x63\x63\x82\x64\x64\x82\x65\x65\x82\x66\x66\x82\x67\x67\x82\x68" "\x62\x82\x63\x63\x82\x64\x64\x82\x65\x65\x82\x66\x66\x82\x67\x67\x82\x68"

View File

@ -6,7 +6,7 @@
using ResultStreamT = using ResultStreamT =
communication::bolt::ResultStream<communication::bolt::Encoder< communication::bolt::ResultStream<communication::bolt::Encoder<
communication::bolt::ChunkedBuffer<TestSocket>>>; communication::bolt::ChunkedEncoderBuffer<TestSocket>>>;
using SessionT = communication::bolt::Session<TestSocket>; using SessionT = communication::bolt::Session<TestSocket>;
/** /**

View File

@ -0,0 +1,48 @@
#pragma once
// clang-format off
const int64_t int_decoded[] = {
0, -1, -8, -16, 1, 63, 127, -128, -20, -17, -32768, -12345, -129, 128,
12345, 32767, -2147483648L, -12345678L, -32769L, 32768L, 12345678L,
2147483647L, -9223372036854775807L, -12345678912345L, -2147483649L,
2147483648L, 12345678912345L, 9223372036854775807};
const uint8_t int_encoded[][10] = {
"\x00", "\xFF", "\xF8", "\xF0", "\x01", "\x3F", "\x7F", "\xC8\x80",
"\xC8\xEC", "\xC8\xEF", "\xC9\x80\x00", "\xC9\xCF\xC7", "\xC9\xFF\x7F",
"\xC9\x00\x80", "\xC9\x30\x39", "\xC9\x7F\xFF", "\xCA\x80\x00\x00\x00",
"\xCA\xFF\x43\x9E\xB2", "\xCA\xFF\xFF\x7F\xFF", "\xCA\x00\x00\x80\x00",
"\xCA\x00\xBC\x61\x4E", "\xCA\x7F\xFF\xFF\xFF",
"\xCB\x80\x00\x00\x00\x00\x00\x00\x01",
"\xCB\xFF\xFF\xF4\xC5\x8C\x31\xA4\xA7",
"\xCB\xFF\xFF\xFF\xFF\x7F\xFF\xFF\xFF",
"\xCB\x00\x00\x00\x00\x80\x00\x00\x00",
"\xCB\x00\x00\x0B\x3A\x73\xCE\x5B\x59",
"\xCB\x7F\xFF\xFF\xFF\xFF\xFF\xFF\xFF"};
// clang-format on
const uint32_t int_encoded_len[] = {1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 3, 3, 3, 3,
3, 3, 5, 5, 5, 5, 5, 5, 9, 9, 9, 9, 9, 9};
const double double_decoded[] = {5.834, 108.199, 43677.9882, 254524.5851};
const uint8_t double_encoded[][10] = {"\xC1\x40\x17\x56\x04\x18\x93\x74\xBC",
"\xC1\x40\x5B\x0C\xBC\x6A\x7E\xF9\xDB",
"\xC1\x40\xE5\x53\xBF\x9F\x55\x9B\x3D",
"\xC1\x41\x0F\x11\xE4\xAE\x48\xE8\xA7"};
const uint8_t vertexedge_encoded[] =
"\xB1\x71\x93\xB3\x4E\x00\x92\x86\x6C\x61\x62\x65\x6C\x31\x86\x6C\x61\x62"
"\x65\x6C\x32\xA2\x85\x70\x72\x6F\x70\x31\x0C\x85\x70\x72\x6F\x70\x32\xC9"
"\x00\xC8\xB3\x4E\x00\x90\xA0\xB5\x52\x00\x00\x00\x88\x65\x64\x67\x65\x74"
"\x79\x70\x65\xA2\x85\x70\x72\x6F\x70\x33\x2A\x85\x70\x72\x6F\x70\x34\xC9"
"\x04\xD2";
const uint64_t sizes[] = {0, 1, 5, 15, 16, 120,
255, 256, 12345, 65535, 65536, 100000};
const uint64_t sizes_num = 12;
constexpr const int STRING = 0, LIST = 1, MAP = 2;
const uint8_t type_tiny_magic[] = {0x80, 0x90, 0xA0};
const uint8_t type_8_magic[] = {0xD0, 0xD4, 0xD8};
const uint8_t type_16_magic[] = {0xD1, 0xD5, 0xD9};
const uint8_t type_32_magic[] = {0xD2, 0xD6, 0xDA};