From 3bf0bd40a7c9063ad4fbaa1a7e8acaac63e2464c Mon Sep 17 00:00:00 2001 From: Matej Ferencevic <matej.ferencevic@memgraph.io> Date: Wed, 22 Mar 2017 15:53:30 +0100 Subject: [PATCH] Initial version of new bolt encoder. Reviewers: dgleich, buda Reviewed By: buda Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D104 --- CMakeLists.txt | 34 +- coverage | 2 +- .../bolt/v1/encoder/chunked_buffer.hpp | 80 +++++ src/communication/bolt/v1/encoder/encoder.hpp | 312 ++++++++++++++++++ .../bolt/v1/encoder/result_stream.hpp | 82 +++++ .../bolt/v1/serialization/result_stream.hpp | 137 -------- tests/unit/bolt_chunked_buffer.cpp | 46 +++ tests/unit/bolt_common.hpp | 72 ++++ tests/unit/bolt_encoder.cpp | 249 ++++++++++++++ tests/unit/bolt_result_stream.cpp | 50 +++ tests/unit/bolt_session.cpp | 58 +--- 11 files changed, 912 insertions(+), 210 deletions(-) create mode 100644 src/communication/bolt/v1/encoder/chunked_buffer.hpp create mode 100644 src/communication/bolt/v1/encoder/encoder.hpp create mode 100644 src/communication/bolt/v1/encoder/result_stream.hpp delete mode 100644 src/communication/bolt/v1/serialization/result_stream.hpp create mode 100644 tests/unit/bolt_chunked_buffer.cpp create mode 100644 tests/unit/bolt_common.hpp create mode 100644 tests/unit/bolt_encoder.cpp create mode 100644 tests/unit/bolt_result_stream.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index da8a8bb4b..938bc0160 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -255,6 +255,8 @@ option(MANUAL_TESTS "Add manual test binaries" OFF) message(STATUS "Add manual test binaries: ${MANUAL_TESTS}") option(UNIT_TESTS "Add unit test binaries" OFF) message(STATUS "Add unit test binaries: ${UNIT_TESTS}") +option(HARDCODED_TARGETS "Make hardcoded query targets" ON) +message(STATUS "Make hardcoded query targets: ${HARDCODED_TARGETS}") option(TEST_COVERAGE "Generate coverage reports from unit tests" OFF) message(STATUS "Generate coverage from unit tests: ${TEST_COVERAGE}") # ----------------------------------------------------------------------------- @@ -451,21 +453,23 @@ set_target_properties(__refactor_target PROPERTIES EXCLUDE_FROM_ALL 1) # 2. query plan execution agains empty database and injected OutputStream # 3. integration tests for all pilot/clients written in cucumber # the following targets address only the first phase -file(GLOB __HARDCODED_SOURCES - ${CMAKE_SOURCE_DIR}/tests/integration/hardcoded_query/*.cpp) -foreach(file_path ${__HARDCODED_SOURCES}) - get_filename_component(file_name ${file_path} NAME_WE) - set(target_name __${file_name}_hardcoded_target) - add_executable(${target_name} ${CMAKE_SOURCE_DIR}/libs/__main.cpp - ${file_path}) - target_link_libraries(${target_name} memgraph_lib) - target_link_libraries(${target_name} fmt) - target_link_libraries(${target_name} Threads::Threads) - set_property(TARGET ${target_name} PROPERTY CXX_STANDARD ${cxx_standard}) - set_target_properties(${target_name} - PROPERTIES RUNTIME_OUTPUT_DIRECTORY - "${CMAKE_BINARY_DIR}/__hardcoded_targets") -endforeach() +if(HARDCODED_TARGETS) + file(GLOB __HARDCODED_SOURCES + ${CMAKE_SOURCE_DIR}/tests/integration/hardcoded_query/*.cpp) + foreach(file_path ${__HARDCODED_SOURCES}) + get_filename_component(file_name ${file_path} NAME_WE) + set(target_name __${file_name}_hardcoded_target) + add_executable(${target_name} ${CMAKE_SOURCE_DIR}/libs/__main.cpp + ${file_path}) + target_link_libraries(${target_name} memgraph_lib) + target_link_libraries(${target_name} fmt) + target_link_libraries(${target_name} Threads::Threads) + set_property(TARGET ${target_name} PROPERTY CXX_STANDARD ${cxx_standard}) + set_target_properties(${target_name} + PROPERTIES RUNTIME_OUTPUT_DIRECTORY + "${CMAKE_BINARY_DIR}/__hardcoded_targets") + endforeach() +endif() get_target_cxx_flags(memgraph_lib compile_flags) set(plan_compiler_flags_file ${build_include_dir}/query/plan_compiler_flags.hpp) diff --git a/coverage b/coverage index fbd38174a..8f9a9bbe1 100755 --- a/coverage +++ b/coverage @@ -19,7 +19,7 @@ all_coverage_info="" for dir in $binary_path/*.dir; do pushd ${dir} lcov --gcov-tool ${working_dir}/llvm-gcov -c -d . -o ${coverage_file} - lcov -r ${coverage_file} '/usr/*' '*/libs/*' -o ${coverage_file} + lcov -r ${coverage_file} '/usr/*' '*/libs/*' '*/tests/*' -o ${coverage_file} all_coverage_info+=" -a ${working_dir}/${dir}/${coverage_file}" popd done diff --git a/src/communication/bolt/v1/encoder/chunked_buffer.hpp b/src/communication/bolt/v1/encoder/chunked_buffer.hpp new file mode 100644 index 000000000..0e07f319d --- /dev/null +++ b/src/communication/bolt/v1/encoder/chunked_buffer.hpp @@ -0,0 +1,80 @@ +#pragma once + +#include <cstring> +#include <memory> +#include <vector> +#include <algorithm> + +#include "communication/bolt/v1/config.hpp" +#include "logging/default.hpp" +#include "utils/types/byte.hpp" +#include "utils/bswap.hpp" + +namespace communication::bolt { + +// maximum chunk size = 65536 bytes data +static constexpr size_t CHUNK_SIZE = 65536; + +/** + * Bolt chunked buffer. + * Has methods for writing and flushing data. + * Writing data stores data in the internal buffer and flushing data sends + * the currently stored data to the Socket with prepended data length and + * appended chunk tail (0x00 0x00). + * + * @tparam Socket the output socket that should be used + */ +template <class Socket> +class ChunkedBuffer { + public: + ChunkedBuffer(Socket &socket) : socket_(socket), logger_(logging::log->logger("Chunked Buffer")) {} + + void Write(const uint8_t* values, size_t n) { + logger_.trace("Write {} bytes", n); + + // total size of the buffer is now bigger for n + size_ += n; + + // reserve enough space for the new data + buffer_.reserve(size_); + + // copy new data + std::copy(values, values + n, std::back_inserter(buffer_)); + } + + void Flush() { + size_t size = buffer_.size(), n = 0, pos = 0; + uint16_t head; + + while (size > 0) { + head = n = std::min(CHUNK_SIZE, size); + head = bswap(head); + + logger_.trace("Flushing chunk of {} bytes", n); + + // TODO: implement better flushing strategy! + socket_.Write(reinterpret_cast<const uint8_t *>(&head), sizeof(head)); + socket_.Write(buffer_.data() + pos, n); + + head = 0; + socket_.Write(reinterpret_cast<const uint8_t *>(&head), sizeof(head)); + + size -= n; + pos += n; + } + + // GC + // TODO: impelement a better strategy + buffer_.clear(); + + // clear size + size_ = 0; + } + + private: + Socket& socket_; + Logger logger_; + std::vector<uint8_t> buffer_; + size_t size_{0}; +}; +} diff --git a/src/communication/bolt/v1/encoder/encoder.hpp b/src/communication/bolt/v1/encoder/encoder.hpp new file mode 100644 index 000000000..f49f78de4 --- /dev/null +++ b/src/communication/bolt/v1/encoder/encoder.hpp @@ -0,0 +1,312 @@ +#pragma once + +#include "database/graph_db_accessor.hpp" +#include "logging/default.hpp" +#include "query/backend/cpp/typed_value.hpp" +#include "utils/bswap.hpp" + +#include <string> + +namespace communication::bolt { + +static constexpr uint8_t TSTRING = 0, TLIST = 1, TMAP = 2; +static constexpr uint8_t type_tiny_marker[3] = { 0x80, 0x90, 0xA0 }; +static constexpr uint8_t type_8_marker[3] = { 0xD0, 0xD4, 0xD8 }; +static constexpr uint8_t type_16_marker[3] = { 0xD1, 0xD5, 0xD9 }; +static constexpr uint8_t type_32_marker[3] = { 0xD2, 0xD6, 0xDA }; + + +/** + * Bolt Encoder. + * Has public interfaces for writing Bolt specific response messages. + * Supported messages are: Record, Success, Failure and Ignored. + * + * @tparam Buffer the output buffer that should be used + * @tparam Socket the output socket that should be used + */ +template <typename Buffer, typename Socket> +class Encoder { + + public: + Encoder(Socket& socket) : socket_(socket), buffer_(socket), logger_(logging::log->logger("communication::bolt::Encoder")) {} + + /** + * Sends a Record message. + * + * From the Bolt v1 documentation: + * RecordMessage (signature=0x71) { + * List<Value> fields + * } + * + * @param values the fields list object that should be sent + */ + void MessageRecord(const std::vector<TypedValue>& values) { + // 0xB1 = struct 1; 0x71 = record signature + WriteRAW("\xB1\x71", 2); + WriteList(values); + buffer_.Flush(); + } + + /** + * Sends a Success message. + * + * From the Bolt v1 documentation: + * SuccessMessage (signature=0x70) { + * Map<String,Value> metadata + * } + * + * @param metadata the metadata map object that should be sent + */ + void MessageSuccess(const std::map<std::string, TypedValue>& metadata) { + // 0xB1 = struct 1; 0x70 = success signature + WriteRAW("\xB1\x70", 2); + WriteMap(metadata); + buffer_.Flush(); + } + + /** + * Sends a Failure message. + * + * From the Bolt v1 documentation: + * FailureMessage (signature=0x7F) { + * Map<String,Value> metadata + * } + * + * @param metadata the metadata map object that should be sent + */ + void MessageFailure(const std::map<std::string, TypedValue>& metadata) { + // 0xB1 = struct 1; 0x7F = failure signature + WriteRAW("\xB1\x7F", 2); + WriteMap(metadata); + buffer_.Flush(); + } + + /** + * Sends an Ignored message. + * + * From the bolt v1 documentation: + * IgnoredMessage (signature=0x7E) { + * Map<String,Value> metadata + * } + * + * @param metadata the metadata map object that should be sent + */ + void MessageIgnored(const std::map<std::string, TypedValue>& metadata) { + // 0xB1 = struct 1; 0x7E = ignored signature + WriteRAW("\xB1\x7E", 2); + WriteMap(metadata); + buffer_.Flush(); + } + + /** + * Sends an Ignored message. + * + * This function sends an ignored message without additional metadata. + */ + void MessageIgnored() { + // 0xB0 = struct 0; 0x7E = ignored signature + WriteRAW("\xB0\x7E", 2); + buffer_.Flush(); + } + + + private: + Socket& socket_; + Buffer buffer_; + Logger logger_; + + + void WriteRAW(const uint8_t* data, uint64_t len) { + buffer_.Write(data, len); + } + + void WriteRAW(const char* data, uint64_t len) { + WriteRAW((const uint8_t*) data, len); + } + + void WriteRAW(const uint8_t data) { + WriteRAW(&data, 1); + } + + template <class T> + void WriteValue(T value) { + value = bswap(value); + WriteRAW(reinterpret_cast<const uint8_t *>(&value), sizeof(value)); + } + + void WriteNull() { + // 0xC0 = null marker + WriteRAW(0xC0); + } + + void WriteBool(const bool& value) { + if (value) { + // 0xC3 = true marker + WriteRAW(0xC3); + } else { + // 0xC2 = false marker + WriteRAW(0xC2); + } + } + + void WriteInt(const int64_t& value) { + if (value >= -16L && value < 128L) { + WriteRAW(static_cast<uint8_t>(value)); + } else if (value >= -128L && value < -16L) { + // 0xC8 = int8 marker + WriteRAW(0xC8); + WriteRAW(static_cast<uint8_t>(value)); + } else if (value >= -32768L && value < 32768L) { + // 0xC9 = int16 marker + WriteRAW(0xC9); + WriteValue(static_cast<int16_t>(value)); + } else if (value >= -2147483648L && value < 2147483648L) { + // 0xCA = int32 marker + WriteRAW(0xCA); + WriteValue(static_cast<int32_t>(value)); + } else { + // 0xCB = int64 marker + WriteRAW(0xCB); + WriteValue(value); + } + } + + void WriteDouble(const double& value) { + // 0xC1 = float64 marker + WriteRAW(0xC1); + WriteValue(*reinterpret_cast<const int64_t *>(&value)); + } + + void WriteTypeSize(const size_t size, const uint8_t typ) { + if (size <= 15) { + uint8_t len = size; + len &= 0x0F; + // tiny marker (+len) + WriteRAW(type_tiny_marker[typ] + len); + } else if (size <= 255) { + uint8_t len = size; + // 8 marker + WriteRAW(type_8_marker[typ]); + WriteRAW(len); + } else if (size <= 65536) { + uint16_t len = size; + // 16 marker + WriteRAW(type_16_marker[typ]); + WriteValue(len); + } else { + uint32_t len = size; + // 32 marker + WriteRAW(type_32_marker[typ]); + WriteValue(len); + } + } + + void WriteString(const std::string& value) { + WriteTypeSize(value.size(), TSTRING); + WriteRAW(value.c_str(), value.size()); + } + + void WriteList(const std::vector<TypedValue>& value) { + WriteTypeSize(value.size(), TLIST); + for (auto& x: value) WriteTypedValue(x); + } + + void WriteMap(const std::map<std::string, TypedValue>& value) { + WriteTypeSize(value.size(), TMAP); + for (auto& x: value) { + WriteString(x.first); + WriteTypedValue(x.second); + } + } + + void WriteVertex(const VertexAccessor& vertex) { + // 0xB3 = struct 3; 0x4E = vertex signature + WriteRAW("\xB3\x4E", 2); + + // IMPORTANT: here we write a hardcoded 0 because we don't + // use internal IDs, but need to give something to Bolt + // note that OpenCypher has no id(x) function, so the client + // should not be able to do anything with this value anyway + WriteInt(0); + + // write labels + const auto& labels = vertex.labels(); + WriteTypeSize(labels.size(), TLIST); + for (const auto& label : labels) + WriteString(vertex.db_accessor().label_name(label)); + + // write properties + const auto& props = vertex.Properties(); + WriteTypeSize(props.size(), TMAP); + for (const auto& prop : props) { + WriteString(vertex.db_accessor().property_name(prop.first)); + WriteTypedValue(prop.second); + } + } + + + void WriteEdge(const EdgeAccessor& edge) { + // 0xB5 = struct 5; 0x52 = edge signature + WriteRAW("\xB5\x52", 2); + + // IMPORTANT: here we write a hardcoded 0 because we don't + // use internal IDs, but need to give something to Bolt + // note that OpenCypher has no id(x) function, so the client + // should not be able to do anything with this value anyway + WriteInt(0); + WriteInt(0); + WriteInt(0); + + // write type + WriteString(edge.db_accessor().edge_type_name(edge.edge_type())); + + // write properties + const auto& props = edge.Properties(); + WriteTypeSize(props.size(), TMAP); + for (const auto& prop : props) { + WriteString(edge.db_accessor().property_name(prop.first)); + WriteTypedValue(prop.second); + } + } + + void WritePath() { + // TODO: this isn't implemented in the backend! + } + + void WriteTypedValue(const TypedValue& value) { + switch(value.type()) { + case TypedValue::Type::Null: + WriteNull(); + break; + case TypedValue::Type::Bool: + WriteBool(value.Value<bool>()); + break; + case TypedValue::Type::Int: + WriteInt(value.Value<int64_t>()); + break; + case TypedValue::Type::Double: + WriteDouble(value.Value<double>()); + break; + case TypedValue::Type::String: + WriteString(value.Value<std::string>()); + break; + case TypedValue::Type::List: + WriteList(value.Value<std::vector<TypedValue>>()); + break; + case TypedValue::Type::Map: + WriteMap(value.Value<std::map<std::string, TypedValue>>()); + break; + case TypedValue::Type::Vertex: + WriteVertex(value.Value<VertexAccessor>()); + break; + case TypedValue::Type::Edge: + WriteEdge(value.Value<EdgeAccessor>()); + break; + case TypedValue::Type::Path: + // TODO: this is not implemeted yet! + WritePath(); + break; + } + } +}; +} diff --git a/src/communication/bolt/v1/encoder/result_stream.hpp b/src/communication/bolt/v1/encoder/result_stream.hpp new file mode 100644 index 000000000..6a0ca4377 --- /dev/null +++ b/src/communication/bolt/v1/encoder/result_stream.hpp @@ -0,0 +1,82 @@ +#pragma once + +#include "communication/bolt/v1/encoder/encoder.hpp" +#include "communication/bolt/v1/encoder/chunked_buffer.hpp" +#include "query/backend/cpp/typed_value.hpp" + +#include "logging/default.hpp" + +namespace communication::bolt { + +/** + * A high level API for streaming a Bolt response. Exposes + * functionalities used by the compiler and query plans (which + * should not use any lower level API). + * + * @tparam Socket Socket used. + */ +template <typename Socket> +class ResultStream { + private: + using encoder_t = Encoder<ChunkedBuffer<Socket>, Socket>; + public: + + // TODO add logging to this class + ResultStream(encoder_t &encoder) : + encoder_(encoder) {} + + /** + * Writes a header. Typically a header is something like: + * [ + * "Header1", + * "Header2", + * "Header3" + * ] + * + * @param fields the header fields that should be sent. + */ + void Header(const std::vector<std::string> &fields) { + std::vector<TypedValue> vec; + std::map<std::string, TypedValue> data; + for (auto& i : fields) + vec.push_back(TypedValue(i)); + data.insert(std::make_pair(std::string("fields"), TypedValue(vec))); + encoder_.MessageSuccess(data); + } + + /** + * Writes a result. Typically a result is something like: + * [ + * Value1, + * Value2, + * Value3 + * ] + * NOTE: The result fields should be in the same ordering that the header + * fields were sent in. + * + * @param values the values that should be sent + */ + void Result(std::vector<TypedValue> &values) { + encoder_.MessageRecord(values); + } + + /** + * Writes a summary. Typically a summary is something like: + * { + * "type" : "r" | "rw" | ..., + * "stats": { + * "nodes_created": 12, + * "nodes_deleted": 0 + * } + * } + * + * @param summary the summary map object that should be sent + */ + void Summary(const std::map<std::string, TypedValue> &summary) { + encoder_.MessageSuccess(summary); + } + +private: + encoder_t& encoder_; +}; +} diff --git a/src/communication/bolt/v1/serialization/result_stream.hpp b/src/communication/bolt/v1/serialization/result_stream.hpp deleted file mode 100644 index 6f98a6892..000000000 --- a/src/communication/bolt/v1/serialization/result_stream.hpp +++ /dev/null @@ -1,137 +0,0 @@ -#pragma once - -#include "communication/bolt/v1/serialization/bolt_serializer.hpp" -#include "query/backend/cpp/typed_value.hpp" - -#include "logging/default.hpp" - -namespace bolt { - -/** - * A high level API for streaming a Bolt response. Exposes - * functionalities used by the compiler and query plans (which - * should not use any lower level API). - * - * @tparam TChunkedEncoder Type of chunked encoder used. - */ -// TODO templatisation on TChunkedEncoder might not be desired -// but makes the code a bit easer to understand because we know -// that this class uses a BoltEncoder (and not some arbitrary template) -// it helps the programmer, the compiler and the IDE -template <typename TChunkedEncoder> -class RecordStream { - public: - // TODO add logging to this class - - RecordStream(BoltEncoder<TChunkedEncoder> &bolt_encoder) - : bolt_encoder_(bolt_encoder), serializer_(bolt_encoder) {} - - void Header(const std::vector<std::string> &fields) { - bolt_encoder_.message_success(); - bolt_encoder_.write_map_header(1); - bolt_encoder_.write_string("fields"); - - bolt_encoder_.write_list_header(fields.size()); - for (auto &name : fields) { - bolt_encoder_.write_string(name); - } - - Chunk(); - Send(); - } - - void Result(std::vector<TypedValue> &values) { - bolt_encoder_.message_record(); - Write(values); - Chunk(); - Send(); - } - - /** - * Writes a summary. Typically a summary is something like: - * { - * "type" : "r" | "rw" | ..., - * "stats": { - * "nodes_created": 12, - * "nodes_deleted": 0 - * } - * } - * - * @param value - */ - void Summary(const std::map<std::string, TypedValue> &summary) { - bolt_encoder_.message_success(); - Write(summary); - Chunk(); - } - - private: - BoltEncoder<TChunkedEncoder> bolt_encoder_; - BoltSerializer<BoltEncoder<TChunkedEncoder>> serializer_; - - /** - * Writes a TypedValue. Resolves it's type and uses - * encoder primitives to write exactly typed values. - */ - void Write(const TypedValue &value) { - switch (value.type()) { - case TypedValue::Type::Null: - bolt_encoder_.write_null(); - break; - case TypedValue::Type::Bool: - bolt_encoder_.write(value.Value<bool>()); - break; - case TypedValue::Type::Int: - bolt_encoder_.write(value.Value<int64_t>()); - break; - case TypedValue::Type::Double: - bolt_encoder_.write(value.Value<double>()); - break; - case TypedValue::Type::String: - bolt_encoder_.write(value.Value<std::string>()); - break; - case TypedValue::Type::List: - Write(value.Value<std::vector<TypedValue>>()); - break; - case TypedValue::Type::Map: - Write(value.Value<std::map<std::string, TypedValue>>()); - break; - case TypedValue::Type::Vertex: - serializer_.write(value.Value<VertexAccessor>()); - break; - case TypedValue::Type::Edge: - serializer_.write(value.Value<EdgeAccessor>()); - break; - default: - throw std::runtime_error( - "Serialization not implemented for given type"); - } - } - - void Write(const std::vector<TypedValue> &values) { - bolt_encoder_.write_list_header(values.size()); - for (const auto &value : values) Write(value); - } - - void Write(const std::map<std::string, TypedValue> &values) { - bolt_encoder_.write_map_header(values.size()); - for (const auto &kv : values) { - bolt_encoder_.write(kv.first); - Write(kv.second); - } - } - - void Send() { - // TODO expose these low level functions in the encoder - // be careful! ChunkedEncoder seems to have a 'flush()' function - // but that is different from it's underlying ChunkedBuffer's - // 'flush()' method - bolt_encoder_.flush(); - } - - void Chunk() { - // TODO expose these low level functions in the encoder - bolt_encoder_.write_chunk(); - } -}; -} diff --git a/tests/unit/bolt_chunked_buffer.cpp b/tests/unit/bolt_chunked_buffer.cpp new file mode 100644 index 000000000..eeb0c3da1 --- /dev/null +++ b/tests/unit/bolt_chunked_buffer.cpp @@ -0,0 +1,46 @@ +#define NDEBUG +#include "bolt_common.hpp" + +#include "communication/bolt/v1/encoder/chunked_buffer.hpp" + + +constexpr const int SIZE = 131072; +uint8_t data[SIZE]; + + +void verify_output(std::vector<uint8_t>& output, const uint8_t* data, uint64_t size) { + uint64_t len = 0, pos = 0; + uint8_t tail[2] = { 0, 0 }; + uint16_t head; + while (size > 0) { + head = len = std::min(size, communication::bolt::CHUNK_SIZE); + head = bswap(head); + check_output(output, reinterpret_cast<uint8_t *>(&head), sizeof(head), false); + check_output(output, data + pos, len, false); + check_output(output, tail, 2, false); + size -= len; + pos += len; + } + check_output(output, nullptr, 0, true); +} + +TEST(Bolt, ChunkedBuffer) { + TestSocket socket(10); + communication::bolt::ChunkedBuffer<TestSocket> chunked_buffer(socket); + std::vector<uint8_t>& output = socket.output; + + for (int i = 0; i <= SIZE; i += 16) { + chunked_buffer.Write(data, i); + chunked_buffer.Flush(); + verify_output(output, data, i); + } +} + + +int main(int argc, char** argv) { + initialize_data(data, SIZE); + logging::init_sync(); + logging::log->pipe(std::make_unique<Stdout>()); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/tests/unit/bolt_common.hpp b/tests/unit/bolt_common.hpp new file mode 100644 index 000000000..9bbce0c38 --- /dev/null +++ b/tests/unit/bolt_common.hpp @@ -0,0 +1,72 @@ +#include <array> +#include <cassert> +#include <cstring> +#include <iostream> +#include <vector> + +#include "gtest/gtest.h" + +#include "logging/default.hpp" +#include "logging/streams/stdout.hpp" + +#include "dbms/dbms.hpp" + + +class TestSocket { + public: + TestSocket(int socket) : socket(socket) {} + TestSocket(const TestSocket& s) : socket(s.id()){}; + TestSocket(TestSocket&& other) { *this = std::forward<TestSocket>(other); } + + TestSocket& operator=(TestSocket&& other) { + this->socket = other.socket; + other.socket = -1; + return *this; + } + + void Close() { socket = -1; } + bool IsOpen() { return socket != -1; } + + int id() const { return socket; } + + int Write(const std::string& str) { return Write(str.c_str(), str.size()); } + int Write(const char* data, size_t len) { + return Write(reinterpret_cast<const uint8_t*>(data), len); + } + int Write(const uint8_t* data, size_t len) { + for (int i = 0; i < len; ++i) output.push_back(data[i]); + return len; + } + + std::vector<uint8_t> output; + + protected: + int socket; +}; + +void print_output(std::vector<uint8_t>& output) { + fprintf(stderr, "output: "); + for (int i = 0; i < output.size(); ++i) { + fprintf(stderr, "%02X ", output[i]); + } + fprintf(stderr, "\n"); +} + +void check_output(std::vector<uint8_t>& output, const uint8_t* data, + uint64_t len, bool clear = true) { + if (clear) ASSERT_EQ(len, output.size()); + else ASSERT_LE(len, output.size()); + for (int i = 0; i < len; ++i) + EXPECT_EQ(output[i], data[i]); + if (clear) output.clear(); + else output.erase(output.begin(), output.begin() + len); +} + +void initialize_data(uint8_t* data, size_t size) { + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> dis(0, 255); + for (int i = 0; i < size; ++i) { + data[i] = dis(gen); + } +} diff --git a/tests/unit/bolt_encoder.cpp b/tests/unit/bolt_encoder.cpp new file mode 100644 index 000000000..65601bda6 --- /dev/null +++ b/tests/unit/bolt_encoder.cpp @@ -0,0 +1,249 @@ +#include "bolt_common.hpp" + +#include "communication/bolt/v1/encoder/encoder.hpp" +#include "database/graph_db.hpp" +#include "database/graph_db_accessor.hpp" +#include "query/backend/cpp/typed_value.hpp" + + +class TestBuffer { + public: + TestBuffer(TestSocket& socket) : socket_(socket) {} + + void Write(const uint8_t* data, size_t n) { + socket_.Write(data, n); + } + + void Flush() {} + + private: + TestSocket& socket_; +}; + + +const int64_t int_input[] = { 0, -1, -8, -16, 1, 63, 127, -128, -20, -17, -32768, -12345, -129, 128, 12345, 32767, -2147483648L, -12345678L, -32769L, 32768L, 12345678L, 2147483647L, -9223372036854775807L, -12345678912345L, -2147483649L, 2147483648L, 12345678912345L, 9223372036854775807 }; +const uint8_t int_output[][10] = { "\x00", "\xFF", "\xF8", "\xF0", "\x01", "\x3F", "\x7F", "\xC8\x80", "\xC8\xEC", "\xC8\xEF", "\xC9\x80\x00", "\xC9\xCF\xC7", "\xC9\xFF\x7F", "\xC9\x00\x80", "\xC9\x30\x39", "\xC9\x7F\xFF", "\xCA\x80\x00\x00\x00", "\xCA\xFF\x43\x9E\xB2", "\xCA\xFF\xFF\x7F\xFF", "\xCA\x00\x00\x80\x00", "\xCA\x00\xBC\x61\x4E", "\xCA\x7F\xFF\xFF\xFF", "\xCB\x80\x00\x00\x00\x00\x00\x00\x01", "\xCB\xFF\xFF\xF4\xC5\x8C\x31\xA4\xA7", "\xCB\xFF\xFF\xFF\xFF\x7F\xFF\xFF\xFF", "\xCB\x00\x00\x00\x00\x80\x00\x00\x00", "\xCB\x00\x00\x0B\x3A\x73\xCE\x5B\x59", "\xCB\x7F\xFF\xFF\xFF\xFF\xFF\xFF\xFF" }; +const uint32_t int_output_len[] = { 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 3, 3, 3, 3, 3, 3, 5, 5, 5, 5, 5, 5, 9, 9, 9, 9, 9, 9 }; + + +const double double_input[] = { 5.834, 108.199, 43677.9882, 254524.5851 }; +const uint8_t double_output[][10] = { "\xC1\x40\x17\x56\x04\x18\x93\x74\xBC", "\xC1\x40\x5B\x0C\xBC\x6A\x7E\xF9\xDB", "\xC1\x40\xE5\x53\xBF\x9F\x55\x9B\x3D", "\xC1\x41\x0F\x11\xE4\xAE\x48\xE8\xA7"}; + + +const uint8_t vertexedge_output[] = "\xB1\x71\x93\xB3\x4E\x00\x92\x86\x6C\x61\x62\x65\x6C\x31\x86\x6C\x61\x62\x65\x6C\x32\xA2\x85\x70\x72\x6F\x70\x31\x0C\x85\x70\x72\x6F\x70\x32\xC9\x00\xC8\xB3\x4E\x00\x90\xA0\xB5\x52\x00\x00\x00\x88\x65\x64\x67\x65\x74\x79\x70\x65\xA2\x85\x70\x72\x6F\x70\x33\x2A\x85\x70\x72\x6F\x70\x34\xC9\x04\xD2"; + + +constexpr const int SIZE = 131072; +uint8_t data[SIZE]; +const uint64_t sizes[] = { 0, 1, 5, 15, 16, 120, 255, 256, 12345, 65535, 65536, 100000 }; +const uint64_t sizes_num = 12; + + +constexpr const int STRING = 0, LIST = 1, MAP = 2; +const uint8_t type_tiny_magic[] = { 0x80, 0x90, 0xA0 }; +const uint8_t type_8_magic[] = { 0xD0, 0xD4, 0xD8 }; +const uint8_t type_16_magic[] = { 0xD1, 0xD5, 0xD9 }; +const uint8_t type_32_magic[] = { 0xD2, 0xD6, 0xDA }; + +void check_type_size(std::vector<uint8_t>& v, int typ, uint64_t size) { + if (size <= 15) { + uint8_t len = size; + len &= 0x0F; + len += type_tiny_magic[typ]; + check_output(v, &len, 1, false); + } else if (size <= 255) { + uint8_t len = size; + check_output(v, &type_8_magic[typ], 1, false); + check_output(v, &len, 1, false); + } else if (size <= 65536) { + uint16_t len = size; + len = bswap(len); + check_output(v, &type_16_magic[typ], 1, false); + check_output(v, reinterpret_cast<const uint8_t*> (&len), 2, false); + } else { + uint32_t len = size; + len = bswap(len); + check_output(v, &type_32_magic[typ], 1, false); + check_output(v, reinterpret_cast<const uint8_t*> (&len), 4, false); + } +} + +void check_record_header(std::vector<uint8_t>& v, uint64_t size) { + check_output(v, (const uint8_t*) "\xB1\x71", 2, false); + check_type_size(v, LIST, size); +} + + +TestSocket socket(10); +communication::bolt::Encoder<TestBuffer, TestSocket> bolt_encoder(socket); +std::vector<uint8_t>& output = socket.output; + + +TEST(BoltEncoder, NullAndBool) { + std::vector<TypedValue> vals; + vals.push_back(TypedValue::Null); + vals.push_back(TypedValue(true)); + vals.push_back(TypedValue(false)); + bolt_encoder.MessageRecord(vals); + check_record_header(output, 3); + check_output(output, (const uint8_t*) "\xC0\xC3\xC2", 3); +} + +TEST(BoltEncoder, Int) { + int N = 28; + std::vector<TypedValue> vals; + for (int i = 0; i < N; ++i) + vals.push_back(TypedValue(int_input[i])); + bolt_encoder.MessageRecord(vals); + check_record_header(output, N); + for (int i = 0; i < N; ++i) + check_output(output, int_output[i], int_output_len[i], false); + check_output(output, nullptr, 0); +} + +TEST(BoltEncoder, Double) { + int N = 4; + std::vector<TypedValue> vals; + for (int i = 0; i < N; ++i) + vals.push_back(TypedValue(double_input[i])); + bolt_encoder.MessageRecord(vals); + check_record_header(output, N); + for (int i = 0; i < N; ++i) + check_output(output, double_output[i], 9, false); + check_output(output, nullptr, 0); +} + +TEST(BoltEncoder, String) { + std::vector<TypedValue> vals; + for (int i = 0; i < sizes_num; ++i) + vals.push_back(TypedValue(std::string((const char*) data, sizes[i]))); + bolt_encoder.MessageRecord(vals); + check_record_header(output, vals.size()); + for (int i = 0; i < sizes_num; ++i) { + check_type_size(output, STRING, sizes[i]); + check_output(output, data, sizes[i], false); + } + check_output(output, nullptr, 0); +} + +TEST(BoltEncoder, List) { + std::vector<TypedValue> vals; + for (int i = 0; i < sizes_num; ++i) { + std::vector<TypedValue> val; + for (int j = 0; j < sizes[i]; ++j) + val.push_back(TypedValue(std::string((const char*) &data[j], 1))); + vals.push_back(TypedValue(val)); + } + bolt_encoder.MessageRecord(vals); + check_record_header(output, vals.size()); + for (int i = 0; i < sizes_num; ++i) { + check_type_size(output, LIST, sizes[i]); + for (int j = 0; j < sizes[i]; ++j) { + check_type_size(output, STRING, 1); + check_output(output, &data[j], 1, false); + } + } + check_output(output, nullptr, 0); +} + +TEST(BoltEncoder, Map) { + std::vector<TypedValue> vals; + uint8_t buff[10]; + for (int i = 0; i < sizes_num; ++i) { + std::map<std::string, TypedValue> val; + for (int j = 0; j < sizes[i]; ++j) { + sprintf((char*) buff, "%05X", j); + std::string tmp((char*) buff, 5); + val.insert(std::make_pair(tmp, TypedValue(tmp))); + } + vals.push_back(TypedValue(val)); + } + bolt_encoder.MessageRecord(vals); + check_record_header(output, vals.size()); + for (int i = 0; i < sizes_num; ++i) { + check_type_size(output, MAP, sizes[i]); + for (int j = 0; j < sizes[i]; ++j) { + sprintf((char*) buff, "%05X", j); + check_type_size(output, STRING, 5); + check_output(output, buff, 5, false); + check_type_size(output, STRING, 5); + check_output(output, buff, 5, false); + } + } + check_output(output, nullptr, 0); +} + +TEST(BoltEncoder, VertexAndEdge) { + // create vertex + Dbms dbms; + auto db_accessor = dbms.active(); + auto va1 = db_accessor.insert_vertex(); + auto va2 = db_accessor.insert_vertex(); + std::string l1("label1"), l2("label2"); + va1.add_label(&l1); + va1.add_label(&l2); + std::string p1("prop1"), p2("prop2"); + PropertyValue pv1(12), pv2(200); + va1.PropsSet(&p1, pv1); + va1.PropsSet(&p2, pv2); + + // create edge + std::string et("edgetype"); + auto ea = db_accessor.insert_edge(va1, va2, &et); + std::string p3("prop3"), p4("prop4"); + PropertyValue pv3(42), pv4(1234); + ea.PropsSet(&p3, pv3); + ea.PropsSet(&p4, pv4); + + // check everything + std::vector<TypedValue> vals; + vals.push_back(TypedValue(va1)); + vals.push_back(TypedValue(va2)); + vals.push_back(TypedValue(ea)); + bolt_encoder.MessageRecord(vals); + check_output(output, vertexedge_output, 74); +} + +TEST(BoltEncoder, BoltV1ExampleMessages) { + // this test checks example messages from: http://boltprotocol.org/v1/ + + // record message + std::vector<TypedValue> rvals; + for (int i = 1; i < 4; ++i) rvals.push_back(TypedValue(i)); + bolt_encoder.MessageRecord(rvals); + check_output(output, (const uint8_t*) "\xB1\x71\x93\x01\x02\x03", 6); + + // success message + std::string sv1("name"), sv2("age"), sk("fields"); + std::vector<TypedValue> svec; + svec.push_back(TypedValue(sv1)); + svec.push_back(TypedValue(sv2)); + TypedValue slist(svec); + std::map<std::string, TypedValue> svals; + svals.insert(std::make_pair(sk, slist)); + bolt_encoder.MessageSuccess(svals); + check_output(output, (const uint8_t*) "\xB1\x70\xA1\x86\x66\x69\x65\x6C\x64\x73\x92\x84\x6E\x61\x6D\x65\x83\x61\x67\x65", 20); + + // failure message + std::string fv1("Neo.ClientError.Statement.SyntaxError"), fv2("Invalid syntax."); + std::string fk1("code"), fk2("message"); + TypedValue ftv1(fv1), ftv2(fv2); + std::map<std::string, TypedValue> fvals; + fvals.insert(std::make_pair(fk1, ftv1)); + fvals.insert(std::make_pair(fk2, ftv2)); + bolt_encoder.MessageFailure(fvals); + check_output(output, (const uint8_t*) "\xB1\x7F\xA2\x84\x63\x6F\x64\x65\xD0\x25\x4E\x65\x6F\x2E\x43\x6C\x69\x65\x6E\x74\x45\x72\x72\x6F\x72\x2E\x53\x74\x61\x74\x65\x6D\x65\x6E\x74\x2E\x53\x79\x6E\x74\x61\x78\x45\x72\x72\x6F\x72\x87\x6D\x65\x73\x73\x61\x67\x65\x8F\x49\x6E\x76\x61\x6C\x69\x64\x20\x73\x79\x6E\x74\x61\x78\x2E", 71); + + // ignored message + bolt_encoder.MessageIgnored(); + check_output(output, (const uint8_t*) "\xB0\x7E", 2); +} + + +int main(int argc, char** argv) { + initialize_data(data, SIZE); + logging::init_sync(); + logging::log->pipe(std::make_unique<Stdout>()); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/tests/unit/bolt_result_stream.cpp b/tests/unit/bolt_result_stream.cpp new file mode 100644 index 000000000..ed18c274c --- /dev/null +++ b/tests/unit/bolt_result_stream.cpp @@ -0,0 +1,50 @@ +#include "bolt_common.hpp" + +#include "communication/bolt/v1/encoder/chunked_buffer.hpp" +#include "communication/bolt/v1/encoder/encoder.hpp" +#include "communication/bolt/v1/encoder/result_stream.hpp" +#include "query/backend/cpp/typed_value.hpp" + + +using buffer_t = communication::bolt::ChunkedBuffer<TestSocket>; +using encoder_t = communication::bolt::Encoder<buffer_t, TestSocket>; +using result_stream_t = communication::bolt::ResultStream<TestSocket>; + + +const uint8_t header_output[] = "\x00\x29\xB1\x70\xA1\x86\x66\x69\x65\x6C\x64\x73\x9A\x82\x61\x61\x82\x62\x62\x82\x63\x63\x82\x64\x64\x82\x65\x65\x82\x66\x66\x82\x67\x67\x82\x68\x68\x82\x69\x69\x82\x6A\x6A\x00\x00"; +const uint8_t result_output[] = "\x00\x0A\xB1\x71\x92\x05\x85\x68\x65\x6C\x6C\x6F\x00\x00"; +const uint8_t summary_output[] = "\x00\x0C\xB1\x70\xA1\x87\x63\x68\x61\x6E\x67\x65\x64\x0A\x00\x00"; + + +TEST(Bolt, ResultStream) { + TestSocket socket(10); + encoder_t encoder(socket); + result_stream_t result_stream(encoder); + std::vector<uint8_t>& output = socket.output; + + std::vector<std::string> headers; + for (int i = 0; i < 10; ++i) headers.push_back(std::string(2, (char)('a' + i))); + + result_stream.Header(headers); + print_output(output); + check_output(output, header_output, 45); + + std::vector<TypedValue> result{TypedValue(5), TypedValue(std::string("hello"))}; + result_stream.Result(result); + print_output(output); + check_output(output, result_output, 14); + + std::map<std::string, TypedValue> summary; + summary.insert(std::make_pair(std::string("changed"), TypedValue(10))); + result_stream.Summary(summary); + print_output(output); + check_output(output, summary_output, 16); +} + + +int main(int argc, char** argv) { + logging::init_sync(); + logging::log->pipe(std::make_unique<Stdout>()); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/tests/unit/bolt_session.cpp b/tests/unit/bolt_session.cpp index 177ed7a47..e07e138cd 100644 --- a/tests/unit/bolt_session.cpp +++ b/tests/unit/bolt_session.cpp @@ -1,49 +1,9 @@ -#include <array> -#include <cassert> -#include <cstring> -#include <iostream> -#include <vector> - -#include "gtest/gtest.h" - -#include "logging/streams/stdout.hpp" +#include "bolt_common.hpp" #include "communication/bolt/v1/serialization/record_stream.hpp" #include "communication/bolt/v1/session.hpp" -#include "dbms/dbms.hpp" #include "query/engine.hpp" -class TestSocket { - public: - TestSocket(int socket) : socket(socket) {} - TestSocket(const TestSocket& s) : socket(s.id()){}; - TestSocket(TestSocket&& other) { *this = std::forward<TestSocket>(other); } - - TestSocket& operator=(TestSocket&& other) { - this->socket = other.socket; - other.socket = -1; - return *this; - } - - void Close() { socket = -1; } - bool IsOpen() { return socket != -1; } - - int id() const { return socket; } - - int Write(const std::string& str) { return Write(str.c_str(), str.size()); } - int Write(const char* data, size_t len) { - return Write(reinterpret_cast<const uint8_t*>(data), len); - } - int Write(const uint8_t* data, size_t len) { - for (int i = 0; i < len; ++i) output.push_back(data[i]); - return len; - } - - std::vector<uint8_t> output; - - protected: - int socket; -}; const uint8_t handshake_req[] = "\x60\x60\xb0\x17\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" @@ -60,22 +20,6 @@ const uint8_t run_req[] = "\x61\x6d\x65\x3a\x20\x32\x39\x33\x38\x33\x7d\x29\x20\x52\x45\x54\x55\x52" "\x4e\x20\x6e\xa0\x00\x00"; -void print_output(std::vector<uint8_t>& output) { - fprintf(stderr, "output: "); - for (int i = 0; i < output.size(); ++i) { - fprintf(stderr, "%02X ", output[i]); - } - fprintf(stderr, "\n"); -} - -void check_output(std::vector<uint8_t>& output, const uint8_t* data, - uint64_t len) { - EXPECT_EQ(len, output.size()); - for (int i = 0; i < len; ++i) { - EXPECT_EQ(output[i], data[i]); - } - output.clear(); -} TEST(Bolt, Session) { Dbms dbms;