From c507e74384e9850b1bbfd5213a55c227a55f10a1 Mon Sep 17 00:00:00 2001 From: Matej Ferencevic Date: Thu, 24 Aug 2017 15:23:33 +0200 Subject: [PATCH] First version of bolt cpp client. Reviewers: buda, mislav.bradac Reviewed By: mislav.bradac Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D689 --- CMakeLists.txt | 1 + src/communication/bolt/client.hpp | 279 ++++++++++++++ src/communication/bolt/v1/codes.hpp | 6 + .../v1/decoder/chunked_decoder_buffer.hpp | 16 + .../bolt/v1/decoder/decoded_value.cpp | 298 ++++++++++++++ .../bolt/v1/decoder/decoded_value.hpp | 153 ++++++++ src/communication/bolt/v1/decoder/decoder.hpp | 362 ++++++++---------- .../bolt/v1/encoder/client_encoder.hpp | 144 +++++++ src/communication/bolt/v1/states/error.hpp | 5 +- .../bolt/v1/states/handshake.hpp | 7 +- .../bolt/v1/states/idle_result.hpp | 28 +- src/communication/bolt/v1/states/init.hpp | 14 +- src/durability/recovery.cpp | 27 +- tests/manual/bolt_client.cpp | 66 ++++ tests/manual/harness_client.cpp | 166 ++++++++ tests/unit/bolt_decoder.cpp | 149 +++---- tests/unit/recovery.cpp | 14 +- 17 files changed, 1422 insertions(+), 313 deletions(-) create mode 100644 src/communication/bolt/client.hpp create mode 100644 src/communication/bolt/v1/decoder/decoded_value.cpp create mode 100644 src/communication/bolt/v1/decoder/decoded_value.hpp create mode 100644 src/communication/bolt/v1/encoder/client_encoder.hpp create mode 100644 tests/manual/bolt_client.cpp create mode 100644 tests/manual/harness_client.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 9a3969c40..93024fdae 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -266,6 +266,7 @@ target_link_libraries(antlr_opencypher_parser_lib antlr4_static) # all memgraph src files set(memgraph_src_files + ${src_dir}/communication/bolt/v1/decoder/decoded_value.cpp ${src_dir}/data_structures/concurrent/skiplist_gc.cpp ${src_dir}/database/graph_db.cpp ${src_dir}/database/graph_db_accessor.cpp diff --git a/src/communication/bolt/client.hpp b/src/communication/bolt/client.hpp new file mode 100644 index 000000000..3883b8001 --- /dev/null +++ b/src/communication/bolt/client.hpp @@ -0,0 +1,279 @@ +#pragma once + +#include + +#include "communication/bolt/v1/decoder/buffer.hpp" +#include "communication/bolt/v1/decoder/chunked_decoder_buffer.hpp" +#include "communication/bolt/v1/decoder/decoder.hpp" +#include "communication/bolt/v1/encoder/chunked_encoder_buffer.hpp" +#include "communication/bolt/v1/encoder/client_encoder.hpp" + +#include "query/typed_value.hpp" +#include "utils/exceptions.hpp" + +namespace communication::bolt { + +class ClientException : public utils::BasicException { + using utils::BasicException::BasicException; +}; + +class ClientSocketException : public ClientException { + public: + using ClientException::ClientException; + ClientSocketException() + : ClientException("Couldn't write/read data to/from the socket!") {} +}; + +class ClientInvalidDataException : public ClientException { + public: + using ClientException::ClientException; + ClientInvalidDataException() + : ClientException("The server sent invalid data!") {} +}; + +class ClientQueryException : public ClientException { + public: + using ClientException::ClientException; + ClientQueryException() : ClientException("Couldn't execute query!") {} +}; + +struct QueryData { + std::vector fields; + std::vector> records; + std::map metadata; +}; + +template +class Client { + public: + Client(Socket &&socket, std::string &username, std::string &password, + std::string client_name = "") + : socket_(std::move(socket)) { + DLOG(INFO) << "Sending handshake"; + if (!socket_.Write(kPreamble, sizeof(kPreamble))) { + throw ClientSocketException(); + } + for (int i = 0; i < 4; ++i) { + if (!socket_.Write(kProtocol, sizeof(kProtocol))) { + throw ClientSocketException(); + } + } + + DLOG(INFO) << "Reading handshake response"; + if (!GetDataByLen(4)) { + throw ClientSocketException(); + } + if (memcmp(kProtocol, buffer_.data(), sizeof(kProtocol)) != 0) { + throw ClientException("Server negotiated unsupported protocol version!"); + } + buffer_.Shift(sizeof(kProtocol)); + + if (client_name == "") { + client_name = "memgraph-bolt/0.0.1"; + } + + DLOG(INFO) << "Sending init message"; + if (!encoder_.MessageInit(client_name, {{"scheme", "basic"}, + {"principal", username}, + {"secret", password}})) { + throw ClientSocketException(); + } + + DLOG(INFO) << "Reading init message response"; + if (!GetDataByChunk()) { + throw ClientSocketException(); + } + Signature signature; + DecodedValue metadata; + if (!ReadMessage(&signature, &metadata)) { + throw ClientException("Couldn't read init message response!"); + } + if (signature != Signature::Success) { + throw ClientInvalidDataException(); + } + } + + QueryData Execute(const std::string &query, + const std::map ¶meters) { + DLOG(INFO) << "Sending run message with statement: '" << query + << "'; parameters: " << parameters; + + std::map params_tv(parameters.begin(), + parameters.end()); + encoder_.MessageRun(query, params_tv, false); + encoder_.MessagePullAll(); + + DLOG(INFO) << "Reading run message response"; + if (!GetDataByChunk()) { + throw ClientSocketException(); + } + Signature signature; + DecodedValue fields; + if (!ReadMessage(&signature, &fields)) { + throw ClientInvalidDataException(); + } + if (fields.type() != DecodedValue::Type::Map) { + throw ClientInvalidDataException(); + } + + if (signature == Signature::Failure) { + HandleFailure(); + auto &tmp = fields.ValueMap(); + auto it = tmp.find("message"); + if (it != tmp.end()) { + throw ClientQueryException(it->second.ValueString()); + } + throw ClientQueryException(); + } else if (signature != Signature::Success) { + throw ClientInvalidDataException(); + } + + DLOG(INFO) << "Reading pull_all message response"; + Marker marker; + DecodedValue metadata; + std::vector> records; + while (true) { + if (!GetDataByChunk()) { + throw ClientInvalidDataException(); + } + if (!decoder_.ReadMessageHeader(&signature, &marker)) { + throw ClientInvalidDataException(); + } + if (signature == Signature::Record) { + DecodedValue record; + if (!decoder_.ReadValue(&record, DecodedValue::Type::List)) { + throw ClientInvalidDataException(); + } + records.push_back(record.ValueList()); + } else if (signature == Signature::Success) { + if (!decoder_.ReadValue(&metadata)) { + throw ClientInvalidDataException(); + } + break; + } else if (signature == Signature::Failure) { + DecodedValue data; + if (!decoder_.ReadValue(&data)) { + throw ClientInvalidDataException(); + } + HandleFailure(); + auto &tmp = data.ValueMap(); + auto it = tmp.find("message"); + if (it != tmp.end()) { + throw ClientQueryException(it->second.ValueString()); + } + throw ClientQueryException(); + } else { + throw ClientInvalidDataException(); + } + } + + if (metadata.type() != DecodedValue::Type::Map) { + throw ClientInvalidDataException(); + } + + QueryData ret{{}, records, metadata.ValueMap()}; + + auto &header = fields.ValueMap(); + if (header.find("fields") == header.end()) { + throw ClientInvalidDataException(); + } + if (header["fields"].type() != DecodedValue::Type::List) { + throw ClientInvalidDataException(); + } + auto &field_vector = header["fields"].ValueList(); + + for (auto &field_item : field_vector) { + if (field_item.type() != DecodedValue::Type::String) { + throw ClientInvalidDataException(); + } + ret.fields.push_back(field_item.ValueString()); + } + + return ret; + } + + void Close() { socket_.Close(); }; + + ~Client() { Close(); } + + private: + bool GetDataByLen(uint64_t len) { + while (buffer_.size() < len) { + auto buff = buffer_.Allocate(); + int ret = socket_.Read(buff.data, buff.len); + if (ret == -1) return false; + buffer_.Written(ret); + } + return true; + } + + bool GetDataByChunk() { + // If there is more data in the buffer then don't read data. + if (decoder_buffer_.Size() > 0) return true; + + ChunkState state; + while ((state = decoder_buffer_.GetChunk()) == ChunkState::Partial) { + auto buff = buffer_.Allocate(); + int ret = socket_.Read(buff.data, buff.len); + if (ret == -1) return false; + buffer_.Written(ret); + } + + if (state == ChunkState::Whole) { + return true; + } + return false; + } + + bool ReadMessage(Signature *signature, DecodedValue *ret) { + Marker marker; + if (!decoder_.ReadMessageHeader(signature, &marker)) { + return false; + } + return ReadMessageData(marker, ret); + } + + bool ReadMessageData(Marker marker, DecodedValue *ret) { + if (marker == Marker::TinyStruct) { + *ret = DecodedValue(); + return true; + } else if (marker == Marker::TinyStruct1) { + return decoder_.ReadValue(ret); + } + return false; + } + + void HandleFailure() { + if (!encoder_.MessageAckFailure()) { + throw ClientSocketException(); + } + while (true) { + Signature signature; + DecodedValue data; + if (!GetDataByChunk()) { + throw ClientInvalidDataException(); + } + if (!ReadMessage(&signature, &data)) { + throw ClientInvalidDataException(); + } + if (signature == Signature::Success) { + break; + } else if (signature != Signature::Ignored) { + throw ClientInvalidDataException(); + } + } + } + + // socket + Socket socket_; + + // decoder objects + Buffer<> buffer_; + ChunkedDecoderBuffer decoder_buffer_{buffer_}; + Decoder decoder_{decoder_buffer_}; + + // encoder objects + ChunkedEncoderBuffer encoder_buffer_{socket_}; + ClientEncoder> encoder_{encoder_buffer_}; +}; +} diff --git a/src/communication/bolt/v1/codes.hpp b/src/communication/bolt/v1/codes.hpp index e36117954..b47cd87c7 100644 --- a/src/communication/bolt/v1/codes.hpp +++ b/src/communication/bolt/v1/codes.hpp @@ -5,6 +5,9 @@ namespace communication::bolt { +static constexpr uint8_t kPreamble[4] = {0x60, 0x60, 0xB0, 0x17}; +static constexpr uint8_t kProtocol[4] = {0x00, 0x00, 0x00, 0x01}; + enum class Signature : uint8_t { Init = 0x01, AckFailure = 0x0E, @@ -39,6 +42,9 @@ enum class Marker : uint8_t { // marker == Marker::TinyStruct1 TinyStruct1 = 0xB1, TinyStruct2 = 0xB2, + TinyStruct3 = 0xB3, + TinyStruct4 = 0xB4, + TinyStruct5 = 0xB5, Null = 0xC0, Float64 = 0xC1, diff --git a/src/communication/bolt/v1/decoder/chunked_decoder_buffer.hpp b/src/communication/bolt/v1/decoder/chunked_decoder_buffer.hpp index b56df0b31..971087d69 100644 --- a/src/communication/bolt/v1/decoder/chunked_decoder_buffer.hpp +++ b/src/communication/bolt/v1/decoder/chunked_decoder_buffer.hpp @@ -61,6 +61,22 @@ class ChunkedDecoderBuffer { return true; } + /** + * Peeks data from the internal buffer. + * Reads data, but doesn't remove it from the buffer. + * + * @param data a pointer to where the data should be read + * @param len the length of data that should be read + * @param offset offset from the beginning of the data + * @returns true if exactly len of data was copied into data, + * false otherwise + */ + bool Peek(uint8_t *data, size_t len, size_t offset = 0) { + if (len + offset > size_ - pos_) return false; + memcpy(data, &data_[pos_ + offset], len); + return true; + } + /** * Gets a chunk from the underlying raw data buffer. * When getting a chunk this function validates the chunk. diff --git a/src/communication/bolt/v1/decoder/decoded_value.cpp b/src/communication/bolt/v1/decoder/decoded_value.cpp new file mode 100644 index 000000000..170e14e4d --- /dev/null +++ b/src/communication/bolt/v1/decoder/decoded_value.cpp @@ -0,0 +1,298 @@ +#include "communication/bolt/v1/decoder/decoded_value.hpp" + +namespace communication::bolt { + +bool DecodedValue::ValueBool() const { + if (type_ != Type::Bool) { + throw DecodedValueException(); + } + return bool_v; +} + +int64_t DecodedValue::ValueInt() const { + if (type_ != Type::Int) { + throw DecodedValueException(); + } + return int_v; +} + +double DecodedValue::ValueDouble() const { + if (type_ != Type::Double) { + throw DecodedValueException(); + } + return double_v; +} + +const std::string &DecodedValue::ValueString() const { + if (type_ != Type::String) { + throw DecodedValueException(); + } + return string_v; +} + +const std::vector &DecodedValue::ValueList() const { + if (type_ != Type::List) { + throw DecodedValueException(); + } + return list_v; +} + +const std::map &DecodedValue::ValueMap() const { + if (type_ != Type::Map) { + throw DecodedValueException(); + } + return map_v; +} + +const DecodedVertex &DecodedValue::ValueVertex() const { + if (type_ != Type::Vertex) { + throw DecodedValueException(); + } + return vertex_v; +} + +const DecodedEdge &DecodedValue::ValueEdge() const { + if (type_ != Type::Edge) { + throw DecodedValueException(); + } + return edge_v; +} + +bool &DecodedValue::ValueBool() { + if (type_ != Type::Bool) { + throw DecodedValueException(); + } + return bool_v; +} + +int64_t &DecodedValue::ValueInt() { + if (type_ != Type::Int) { + throw DecodedValueException(); + } + return int_v; +} + +double &DecodedValue::ValueDouble() { + if (type_ != Type::Double) { + throw DecodedValueException(); + } + return double_v; +} + +std::string &DecodedValue::ValueString() { + if (type_ != Type::String) { + throw DecodedValueException(); + } + return string_v; +} + +std::vector &DecodedValue::ValueList() { + if (type_ != Type::List) { + throw DecodedValueException(); + } + return list_v; +} + +std::map &DecodedValue::ValueMap() { + if (type_ != Type::Map) { + throw DecodedValueException(); + } + return map_v; +} + +DecodedVertex &DecodedValue::ValueVertex() { + if (type_ != Type::Vertex) { + throw DecodedValueException(); + } + return vertex_v; +} + +DecodedEdge &DecodedValue::ValueEdge() { + if (type_ != Type::Edge) { + throw DecodedValueException(); + } + return edge_v; +} + +DecodedValue::DecodedValue(const DecodedValue &other) : type_(other.type_) { + switch (other.type_) { + case Type::Null: + return; + case Type::Bool: + this->bool_v = other.bool_v; + return; + case Type::Int: + this->int_v = other.int_v; + return; + case Type::Double: + this->double_v = other.double_v; + return; + case Type::String: + new (&string_v) std::string(other.string_v); + return; + case Type::List: + new (&list_v) std::vector(other.list_v); + return; + case Type::Map: + new (&map_v) std::map(other.map_v); + return; + case Type::Vertex: + new (&vertex_v) DecodedVertex(other.vertex_v); + return; + case Type::Edge: + new (&edge_v) DecodedEdge(other.edge_v); + return; + } + permanent_fail("Unsupported DecodedValue::Type"); +} + +DecodedValue &DecodedValue::operator=(const DecodedValue &other) { + if (this != &other) { + this->~DecodedValue(); + // set the type of this + type_ = other.type_; + + switch (other.type_) { + case Type::Null: + return *this; + case Type::Bool: + this->bool_v = other.bool_v; + return *this; + case Type::Int: + this->int_v = other.int_v; + return *this; + case Type::Double: + this->double_v = other.double_v; + return *this; + case Type::String: + new (&string_v) std::string(other.string_v); + return *this; + case Type::List: + new (&list_v) std::vector(other.list_v); + return *this; + case Type::Map: + new (&map_v) std::map(other.map_v); + return *this; + case Type::Vertex: + new (&vertex_v) DecodedVertex(other.vertex_v); + return *this; + case Type::Edge: + new (&edge_v) DecodedEdge(other.edge_v); + return *this; + } + permanent_fail("Unsupported DecodedValue::Type"); + } + return *this; +} + +DecodedValue::~DecodedValue() { + switch (type_) { + // destructor for primitive types does nothing + case Type::Null: + case Type::Bool: + case Type::Int: + case Type::Double: + return; + + // we need to call destructors for non primitive types since we used + // placement new + case Type::String: + // Clang fails to compile ~std::string. It seems it is a bug in some + // versions of clang. using namespace std statement solves the issue. + using namespace std; + string_v.~string(); + return; + case Type::List: + using namespace std; + list_v.~vector(); + return; + case Type::Map: + using namespace std; + map_v.~map(); + return; + case Type::Vertex: + vertex_v.~DecodedVertex(); + return; + case Type::Edge: + edge_v.~DecodedEdge(); + return; + } + permanent_fail("Unsupported DecodedValue::Type"); +} + +DecodedValue::operator query::TypedValue() const { + switch (type_) { + case Type::Null: + return query::TypedValue::Null; + case Type::Bool: + return query::TypedValue(bool_v); + case Type::Int: + return query::TypedValue(int_v); + case Type::Double: + return query::TypedValue(double_v); + case Type::String: + return query::TypedValue(string_v); + case Type::List: + return query::TypedValue( + std::vector(list_v.begin(), list_v.end())); + case Type::Map: + return query::TypedValue( + std::map(map_v.begin(), map_v.end())); + default: + throw DecodedValueException( + "Unsupported conversion from DecodedValue to TypedValue"); + } +} + +std::ostream &operator<<(std::ostream &os, const DecodedVertex &vertex) { + os << "V("; + PrintIterable(os, vertex.labels, ":", + [&](auto &stream, auto label) { stream << label; }); + os << " {"; + PrintIterable(os, vertex.properties, ", ", + [&](auto &stream, const auto &pair) { + stream << pair.first << ": " << pair.second; + }); + return os << "})"; +} + +std::ostream &operator<<(std::ostream &os, const DecodedEdge &edge) { + os << "E[" << edge.type; + os << " {"; + PrintIterable(os, edge.properties, ", ", [&](auto &stream, const auto &pair) { + stream << pair.first << ": " << pair.second; + }); + return os << "}]"; +} + +std::ostream &operator<<(std::ostream &os, const DecodedValue &value) { + switch (value.type_) { + case DecodedValue::Type::Null: + return os << "Null"; + case DecodedValue::Type::Bool: + return os << (value.ValueBool() ? "true" : "false"); + case DecodedValue::Type::Int: + return os << value.ValueInt(); + case DecodedValue::Type::Double: + return os << value.ValueDouble(); + case DecodedValue::Type::String: + return os << value.ValueString(); + case DecodedValue::Type::List: + os << "["; + PrintIterable(os, value.ValueList()); + return os << "]"; + case DecodedValue::Type::Map: + os << "{"; + PrintIterable(os, value.ValueMap(), ", ", + [](auto &stream, const auto &pair) { + stream << pair.first << ": " << pair.second; + }); + return os << "}"; + case DecodedValue::Type::Vertex: + return os << value.ValueVertex(); + case DecodedValue::Type::Edge: + return os << value.ValueEdge(); + } + permanent_fail("Unsupported DecodedValue::Type"); +} +} diff --git a/src/communication/bolt/v1/decoder/decoded_value.hpp b/src/communication/bolt/v1/decoder/decoded_value.hpp new file mode 100644 index 000000000..b456286ba --- /dev/null +++ b/src/communication/bolt/v1/decoder/decoded_value.hpp @@ -0,0 +1,153 @@ +#pragma once + +#include +#include +#include + +#include "query/typed_value.hpp" +#include "storage/property_value.hpp" +#include "utils/algorithm.hpp" +#include "utils/assert.hpp" +#include "utils/exceptions.hpp" + +namespace communication::bolt { + +/** Forward declaration of DecodedValue class. */ +class DecodedValue; + +/** + * Structure used when reading a Vertex with the decoder. + * The decoder writes data into this structure. + */ +struct DecodedVertex { + int64_t id; + std::vector labels; + std::map properties; +}; + +/** + * Structure used when reading an Edge with the decoder. + * The decoder writes data into this structure. + */ +struct DecodedEdge { + int64_t id; + int64_t from; + int64_t to; + std::string type; + std::map properties; +}; + +/** + * DecodedValue provides an encapsulation arround TypedValue, DecodedVertex + * and DecodedEdge. This is necessary because TypedValue stores vertices and + * edges as our internal accessors. Because of that the Bolt decoder can't + * decode vertices and edges directly into a TypedValue so a DecodedValue is + * used instead. + */ +class DecodedValue { + public: + /** Default constructor, makes Null */ + DecodedValue() : type_(Type::Null) {} + + /** Types that can be stored in a DecodedValue. */ + // TODO: Path isn't supported yet! + enum class Type : unsigned { + Null, + Bool, + Int, + Double, + String, + List, + Map, + Vertex, + Edge + }; + + // constructors for primitive types + DecodedValue(bool value) : type_(Type::Bool) { bool_v = value; } + DecodedValue(int value) : type_(Type::Int) { int_v = value; } + DecodedValue(int64_t value) : type_(Type::Int) { int_v = value; } + DecodedValue(double value) : type_(Type::Double) { double_v = value; } + + // constructors for non-primitive types + DecodedValue(const std::string &value) : type_(Type::String) { + new (&string_v) std::string(value); + } + DecodedValue(const std::vector &value) : type_(Type::List) { + new (&list_v) std::vector(value); + } + DecodedValue(const std::map &value) + : type_(Type::Map) { + new (&map_v) std::map(value); + } + DecodedValue(const DecodedVertex &value) : type_(Type::Vertex) { + new (&vertex_v) DecodedVertex(value); + } + DecodedValue(const DecodedEdge &value) : type_(Type::Edge) { + new (&edge_v) DecodedEdge(value); + } + + DecodedValue &operator=(const DecodedValue &other); + DecodedValue(const DecodedValue &other); + ~DecodedValue(); + + Type type() const { return type_; } + + // constant getters + bool ValueBool() const; + int64_t ValueInt() const; + double ValueDouble() const; + const std::string &ValueString() const; + const std::vector &ValueList() const; + const std::map &ValueMap() const; + const DecodedVertex &ValueVertex() const; + const DecodedEdge &ValueEdge() const; + + // getters + bool &ValueBool(); + int64_t &ValueInt(); + double &ValueDouble(); + std::string &ValueString(); + std::vector &ValueList(); + std::map &ValueMap(); + DecodedVertex &ValueVertex(); + DecodedEdge &ValueEdge(); + + // conversion function to TypedValue + operator query::TypedValue() const; + + friend std::ostream &operator<<(std::ostream &os, const DecodedValue &value); + + private: + Type type_; + + // storage for the value of the property + union { + bool bool_v; + int64_t int_v; + double double_v; + std::string string_v; + std::vector list_v; + std::map map_v; + DecodedVertex vertex_v; + DecodedEdge edge_v; + }; +}; + +/** + * An exception raised by the DecodedValue system. + */ +class DecodedValueException : public utils::BasicException { + public: + using utils::BasicException::BasicException; + DecodedValueException() + : BasicException("Incompatible template param and type!") {} +}; + +/** + * Output operators. + */ +std::ostream &operator<<(std::ostream &os, const DecodedVertex &vertex); +std::ostream &operator<<(std::ostream &os, const DecodedEdge &edge); +std::ostream &operator<<(std::ostream &os, const DecodedValue &value); +} diff --git a/src/communication/bolt/v1/decoder/decoder.hpp b/src/communication/bolt/v1/decoder/decoder.hpp index 708b9e7e6..db4dca3e9 100644 --- a/src/communication/bolt/v1/decoder/decoder.hpp +++ b/src/communication/bolt/v1/decoder/decoder.hpp @@ -5,40 +5,16 @@ #include #include "communication/bolt/v1/codes.hpp" +#include "communication/bolt/v1/decoder/decoded_value.hpp" #include "database/graph_db_accessor.hpp" -#include "query/typed_value.hpp" #include "utils/bswap.hpp" #include "utils/underlying_cast.hpp" namespace communication::bolt { -/** - * Structure used when reading a Vertex with the decoder. - * The decoder writes data into this structure. - */ -struct DecodedVertex { - int64_t id; - std::vector labels; - std::map properties; -}; - -/** - * Structure used when reading an Edge with the decoder. - * The decoder writes data into this structure. - */ -struct DecodedEdge { - int64_t id; - int64_t from; - int64_t to; - std::string type; - std::map properties; -}; - /** * Bolt Decoder. * Has public interfaces for reading Bolt encoded data. - * Supports reading: TypedValue (without Vertex, Edge and Path), - * Vertex, Edge * * @tparam Buffer the input buffer that should be used */ @@ -48,20 +24,20 @@ class Decoder { Decoder(Buffer &buffer) : buffer_(buffer) {} /** - * Reads a TypedValue from the available data in the buffer. - * This function tries to read a TypedValue from the available data. + * Reads a DecodedValue from the available data in the buffer. + * This function tries to read a DecodedValue from the available data. * - * @param data pointer to a TypedValue where the read data should be stored + * @param data pointer to a DecodedValue where the read data should be stored * @returns true if data has been written to the data pointer, * false otherwise */ - bool ReadTypedValue(query::TypedValue *data) { + bool ReadValue(DecodedValue *data) { uint8_t value; - DLOG(INFO) << "[ReadTypedValue] Start"; + DLOG(INFO) << "[ReadValue] Start"; if (!buffer_.Read(&value, 1)) { - DLOG(WARNING) << "[ReadTypedValue] Marker data missing!"; + DLOG(WARNING) << "[ReadValue] Marker data missing!"; return false; } @@ -99,6 +75,12 @@ class Decoder { case Marker::Map32: return ReadMap(marker, data); + case Marker::TinyStruct3: + return ReadVertex(marker, data); + + case Marker::TinyStruct5: + return ReadEdge(marker, data); + default: if ((value & 0xF0) == underlying_cast(Marker::TinyString)) { return ReadString(marker, data); @@ -114,21 +96,21 @@ class Decoder { } /** - * Reads a TypedValue from the available data in the buffer and checks + * Reads a DecodedValue from the available data in the buffer and checks * whether the read data type matches the supplied data type. * - * @param data pointer to a TypedValue where the read data should be stored + * @param data pointer to a DecodedValue where the read data should be stored * @param type the expected type that should be read * @returns true if data has been written to the data pointer and the type * matches the expected type, false otherwise */ - bool ReadTypedValue(query::TypedValue *data, query::TypedValue::Type type) { - if (!ReadTypedValue(data)) { - DLOG(WARNING) << "[ReadTypedValue] ReadTypedValue call failed!"; + bool ReadValue(DecodedValue *data, DecodedValue::Type type) { + if (!ReadValue(data)) { + DLOG(WARNING) << "[ReadValue] ReadValue call failed!"; return false; } if (data->type() != type) { - DLOG(WARNING) << "[ReadTypedValue] Typed value has wrong type!"; + DLOG(WARNING) << "[ReadValue] Decoded value has wrong type!"; return false; } return true; @@ -159,164 +141,32 @@ class Decoder { return true; } - /** - * Reads a Vertex from the available data in the buffer. - * This function tries to read a Vertex from the available data. - * - * @param data pointer to a DecodedVertex where the data should be stored - * @returns true if data has been written into the data pointer, - * false otherwise - */ - bool ReadVertex(DecodedVertex *data) { - uint8_t value[2]; - query::TypedValue tv; - - DLOG(INFO) << "[ReadVertex] Start"; - - if (!buffer_.Read(value, 2)) { - DLOG(WARNING) << "[ReadVertex] Missing marker and/or signature data!"; - return false; - } - - // check header - if (value[0] != underlying_cast(Marker::TinyStruct) + 3) { - DLOG(WARNING) << "[ReadVertex] Received invalid marker " << value[0]; - return false; - } - if (value[1] != underlying_cast(Signature::Node)) { - DLOG(WARNING) << "[ReadVertex] Received invalid signature " << value[1]; - return false; - } - - // read ID - if (!ReadTypedValue(&tv, query::TypedValue::Type::Int)) { - DLOG(WARNING) << "[ReadVertex] Couldn't read ID!"; - return false; - } - data->id = tv.Value(); - - // read labels - if (!ReadTypedValue(&tv, query::TypedValue::Type::List)) { - DLOG(WARNING) << "[ReadVertex] Couldn't read labels!"; - return false; - } - auto &labels = tv.Value>(); - data->labels.resize(labels.size()); - for (size_t i = 0; i < labels.size(); ++i) { - if (labels[i].type() != query::TypedValue::Type::String) { - DLOG(WARNING) << "[ReadVertex] Label has wrong type!"; - return false; - } - data->labels[i] = labels[i].Value(); - } - - // read properties - if (!ReadTypedValue(&tv, query::TypedValue::Type::Map)) { - DLOG(WARNING) << "[ReadVertex] Couldn't read properties!"; - return false; - } - data->properties = tv.Value>(); - - DLOG(INFO) << "[ReadVertex] Success"; - - return true; - } - - /** - * Reads an Edge from the available data in the buffer. - * This function tries to read an Edge from the available data. - * - * @param data pointer to a DecodedEdge where the data should be stored - * @returns true if data has been written into the data pointer, - * false otherwise - */ - bool ReadEdge(DecodedEdge *data) { - uint8_t value[2]; - query::TypedValue tv; - - DLOG(INFO) << "[ReadEdge] Start"; - - if (!buffer_.Read(value, 2)) { - DLOG(WARNING) << "[ReadEdge] Missing marker and/or signature data!"; - return false; - } - - // check header - if (value[0] != underlying_cast(Marker::TinyStruct) + 5) { - DLOG(WARNING) << "[ReadEdge] Received invalid marker " << value[0]; - return false; - } - if (value[1] != underlying_cast(Signature::Relationship)) { - DLOG(WARNING) << "[ReadEdge] Received invalid signature " << value[1]; - return false; - } - - // read ID - if (!ReadTypedValue(&tv, query::TypedValue::Type::Int)) { - DLOG(WARNING) << "[ReadEdge] couldn't read ID!"; - return false; - } - data->id = tv.Value(); - - // read from - if (!ReadTypedValue(&tv, query::TypedValue::Type::Int)) { - DLOG(WARNING) << "[ReadEdge] Couldn't read from_id!"; - return false; - } - data->from = tv.Value(); - - // read to - if (!ReadTypedValue(&tv, query::TypedValue::Type::Int)) { - DLOG(WARNING) << "[ReadEdge] Couldn't read to_id!"; - return false; - } - data->to = tv.Value(); - - // read type - if (!ReadTypedValue(&tv, query::TypedValue::Type::String)) { - DLOG(WARNING) << "[ReadEdge] Couldn't read type!"; - return false; - } - data->type = tv.Value(); - - // read properties - if (!ReadTypedValue(&tv, query::TypedValue::Type::Map)) { - DLOG(WARNING) << "[ReadEdge] Couldn't read properties!"; - return false; - } - data->properties = tv.Value>(); - - DLOG(INFO) << "ReadEdge] Success"; - - return true; - } - protected: Buffer &buffer_; private: - bool ReadNull(const Marker &marker, query::TypedValue *data) { + bool ReadNull(const Marker &marker, DecodedValue *data) { DLOG(INFO) << "[ReadNull] Start"; debug_assert(marker == Marker::Null, "Received invalid marker!"); - *data = query::TypedValue::Null; + *data = DecodedValue(); DLOG(INFO) << "[ReadNull] Success"; return true; } - bool ReadBool(const Marker &marker, query::TypedValue *data) { + bool ReadBool(const Marker &marker, DecodedValue *data) { DLOG(INFO) << "[ReadBool] Start"; debug_assert(marker == Marker::False || marker == Marker::True, "Received invalid marker!"); if (marker == Marker::False) { - *data = query::TypedValue(false); + *data = DecodedValue(false); } else { - *data = query::TypedValue(true); + *data = DecodedValue(true); } DLOG(INFO) << "[ReadBool] Success"; return true; } - bool ReadInt(const Marker &marker, query::TypedValue *data) { + bool ReadInt(const Marker &marker, DecodedValue *data) { uint8_t value = underlying_cast(marker); bool success = true; int64_t ret; @@ -362,13 +212,13 @@ class Decoder { return false; } if (success) { - *data = query::TypedValue(ret); + *data = DecodedValue(ret); DLOG(INFO) << "[ReadInt] Success"; } return success; } - bool ReadDouble(const Marker marker, query::TypedValue *data) { + bool ReadDouble(const Marker marker, DecodedValue *data) { uint64_t value; double ret; DLOG(INFO) << "[ReadDouble] Start"; @@ -379,7 +229,7 @@ class Decoder { } value = bswap(value); ret = *reinterpret_cast(&value); - *data = query::TypedValue(ret); + *data = DecodedValue(ret); DLOG(INFO) << "[ReadDouble] Success"; return true; } @@ -422,7 +272,7 @@ class Decoder { } } - bool ReadString(const Marker &marker, query::TypedValue *data) { + bool ReadString(const Marker &marker, DecodedValue *data) { DLOG(INFO) << "[ReadString] Start"; auto size = ReadTypeSize(marker, MarkerString); if (size == -1) { @@ -434,32 +284,32 @@ class Decoder { DLOG(WARNING) << "[ReadString] Missing data!"; return false; } - *data = query::TypedValue( - std::string(reinterpret_cast(ret.get()), size)); + *data = + DecodedValue(std::string(reinterpret_cast(ret.get()), size)); DLOG(INFO) << "[ReadString] Success"; return true; } - bool ReadList(const Marker &marker, query::TypedValue *data) { + bool ReadList(const Marker &marker, DecodedValue *data) { DLOG(INFO) << "[ReadList] Start"; auto size = ReadTypeSize(marker, MarkerList); if (size == -1) { DLOG(WARNING) << "[ReadList] Couldn't get size!"; return false; } - std::vector ret(size); + std::vector ret(size); for (int64_t i = 0; i < size; ++i) { - if (!ReadTypedValue(&ret[i])) { + if (!ReadValue(&ret[i])) { DLOG(WARNING) << "[ReadList] Couldn't read element " << i; return false; } } - *data = query::TypedValue(ret); + *data = DecodedValue(ret); DLOG(INFO) << "[ReadList] Success"; return true; } - bool ReadMap(const Marker &marker, query::TypedValue *data) { + bool ReadMap(const Marker &marker, DecodedValue *data) { DLOG(INFO) << "[ReadMap] Start"; auto size = ReadTypeSize(marker, MarkerMap); if (size == -1) { @@ -467,25 +317,25 @@ class Decoder { return false; } - query::TypedValue tv; + DecodedValue dv; std::string str; - std::map ret; + std::map ret; for (int64_t i = 0; i < size; ++i) { - if (!ReadTypedValue(&tv)) { + if (!ReadValue(&dv)) { DLOG(WARNING) << "[ReadMap] Couldn't read index " << i; return false; } - if (tv.type() != query::TypedValue::Type::String) { + if (dv.type() != DecodedValue::Type::String) { DLOG(WARNING) << "[ReadMap] Index " << i << " isn't a string!"; return false; } - str = tv.Value(); + str = dv.ValueString(); - if (!ReadTypedValue(&tv)) { + if (!ReadValue(&dv)) { DLOG(WARNING) << "[ReadMap] Couldn't read element " << i; return false; } - ret.insert(std::make_pair(str, tv)); + ret.insert(std::make_pair(str, dv)); } if (ret.size() != size) { DLOG(WARNING) @@ -493,9 +343,133 @@ class Decoder { return false; } - *data = query::TypedValue(ret); + *data = DecodedValue(ret); DLOG(INFO) << "[ReadMap] Success"; return true; } + + bool ReadVertex(const Marker &marker, DecodedValue *data) { + uint8_t value; + DecodedValue dv; + DecodedVertex vertex; + + DLOG(INFO) << "[ReadVertex] Start"; + + if (!buffer_.Read(&value, 1)) { + DLOG(WARNING) << "[ReadVertex] Missing marker and/or signature data!"; + return false; + } + + // check header + if (marker != Marker::TinyStruct3) { + DLOG(WARNING) << "[ReadVertex] Received invalid marker " + << (uint64_t)underlying_cast(marker); + return false; + } + if (value != underlying_cast(Signature::Node)) { + DLOG(WARNING) << "[ReadVertex] Received invalid signature " << value; + return false; + } + + // read ID + if (!ReadValue(&dv, DecodedValue::Type::Int)) { + DLOG(WARNING) << "[ReadVertex] Couldn't read ID!"; + return false; + } + vertex.id = dv.ValueInt(); + + // read labels + if (!ReadValue(&dv, DecodedValue::Type::List)) { + DLOG(WARNING) << "[ReadVertex] Couldn't read labels!"; + return false; + } + auto &labels = dv.ValueList(); + vertex.labels.resize(labels.size()); + for (size_t i = 0; i < labels.size(); ++i) { + if (labels[i].type() != DecodedValue::Type::String) { + DLOG(WARNING) << "[ReadVertex] Label has wrong type!"; + return false; + } + vertex.labels[i] = labels[i].ValueString(); + } + + // read properties + if (!ReadValue(&dv, DecodedValue::Type::Map)) { + DLOG(WARNING) << "[ReadVertex] Couldn't read properties!"; + return false; + } + vertex.properties = dv.ValueMap(); + + *data = DecodedValue(vertex); + + DLOG(INFO) << "[ReadVertex] Success"; + + return true; + } + + bool ReadEdge(const Marker &marker, DecodedValue *data) { + uint8_t value; + DecodedValue dv; + DecodedEdge edge; + + DLOG(INFO) << "[ReadEdge] Start"; + + if (!buffer_.Read(&value, 1)) { + DLOG(WARNING) << "[ReadEdge] Missing marker and/or signature data!"; + return false; + } + + // check header + if (marker != Marker::TinyStruct5) { + DLOG(WARNING) << "[ReadEdge] Received invalid marker " + << (uint64_t)underlying_cast(marker); + return false; + } + if (value != underlying_cast(Signature::Relationship)) { + DLOG(WARNING) << "[ReadEdge] Received invalid signature " << value; + return false; + } + + // read ID + if (!ReadValue(&dv, DecodedValue::Type::Int)) { + DLOG(WARNING) << "[ReadEdge] couldn't read ID!"; + return false; + } + edge.id = dv.ValueInt(); + + // read from + if (!ReadValue(&dv, DecodedValue::Type::Int)) { + DLOG(WARNING) << "[ReadEdge] Couldn't read from_id!"; + return false; + } + edge.from = dv.ValueInt(); + + // read to + if (!ReadValue(&dv, DecodedValue::Type::Int)) { + DLOG(WARNING) << "[ReadEdge] Couldn't read to_id!"; + return false; + } + edge.to = dv.ValueInt(); + + // read type + if (!ReadValue(&dv, DecodedValue::Type::String)) { + DLOG(WARNING) << "[ReadEdge] Couldn't read type!"; + return false; + } + edge.type = dv.ValueString(); + + // read properties + if (!ReadValue(&dv, DecodedValue::Type::Map)) { + DLOG(WARNING) << "[ReadEdge] Couldn't read properties!"; + return false; + } + edge.properties = dv.ValueMap(); + + *data = DecodedValue(edge); + + DLOG(INFO) << "[ReadEdge] Success"; + + return true; + } }; } diff --git a/src/communication/bolt/v1/encoder/client_encoder.hpp b/src/communication/bolt/v1/encoder/client_encoder.hpp new file mode 100644 index 000000000..d00a00948 --- /dev/null +++ b/src/communication/bolt/v1/encoder/client_encoder.hpp @@ -0,0 +1,144 @@ +#pragma once + +#include "communication/bolt/v1/codes.hpp" +#include "communication/bolt/v1/encoder/base_encoder.hpp" + +namespace communication::bolt { + +/** + * Bolt Client Encoder. + * Has public interfaces for writing Bolt specific request messages. + * Supported messages are: Init, Run, DiscardAll, PullAll, AckFailure, Reset + * + * @tparam Buffer the output buffer that should be used + */ +template +class ClientEncoder : private BaseEncoder { + private: + using BaseEncoder::WriteRAW; + using BaseEncoder::WriteList; + using BaseEncoder::WriteMap; + using BaseEncoder::WriteString; + using BaseEncoder::buffer_; + + public: + ClientEncoder(Buffer &buffer) : BaseEncoder(buffer) {} + + /** + * Writes a Init message. + * + * From the Bolt v1 documentation: + * InitMessage (signature=0x01) { + * String clientName + * Map authToken + * } + * + * @param client_name the name of the connected client + * @param auth_token a map with authentication data + * @returns true if the data was successfully sent to the client + * when flushing, false otherwise + */ + bool MessageInit(const std::string client_name, + const std::map &auth_token) { + WriteRAW(underlying_cast(Marker::TinyStruct2)); + WriteRAW(underlying_cast(Signature::Init)); + WriteString(client_name); + WriteMap(auth_token); + return buffer_.Flush(); + } + + /** + * Writes a Run message. + * + * From the Bolt v1 documentation: + * RunMessage (signature=0x10) { + * String statement + * Map parameters + * } + * + * @param statement the statement that should be executed + * @param parameters parameters that should be used to execute the statement + * @returns true if the data was successfully sent to the client + * when flushing, false otherwise + */ + bool MessageRun(const std::string statement, + const std::map ¶meters, + bool flush = true) { + WriteRAW(underlying_cast(Marker::TinyStruct2)); + WriteRAW(underlying_cast(Signature::Run)); + WriteString(statement); + WriteMap(parameters); + if (flush) { + return buffer_.Flush(); + } else { + buffer_.Chunk(); + // Chunk always succeeds, so return true + return true; + } + } + + /** + * Writes a DiscardAll message. + * + * From the Bolt v1 documentation: + * DiscardAllMessage (signature=0x2F) { + * } + * + * @returns true if the data was successfully sent to the client + * when flushing, false otherwise + */ + bool MessageDiscardAll() { + WriteRAW(underlying_cast(Marker::TinyStruct)); + WriteRAW(underlying_cast(Signature::DiscardAll)); + return buffer_.Flush(); + } + + /** + * Writes a PullAll message. + * + * From the Bolt v1 documentation: + * PullAllMessage (signature=0x3F) { + * } + * + * @returns true if the data was successfully sent to the client + * when flushing, false otherwise + */ + bool MessagePullAll() { + WriteRAW(underlying_cast(Marker::TinyStruct)); + WriteRAW(underlying_cast(Signature::PullAll)); + return buffer_.Flush(); + } + + /** + * Writes a AckFailure message. + * + * From the Bolt v1 documentation: + * AckFailureMessage (signature=0x0E) { + * } + * + * @returns true if the data was successfully sent to the client + * when flushing, false otherwise + */ + bool MessageAckFailure() { + WriteRAW(underlying_cast(Marker::TinyStruct)); + WriteRAW(underlying_cast(Signature::AckFailure)); + return buffer_.Flush(); + } + + /** + * Writes a Reset message. + * + * From the Bolt v1 documentation: + * ResetMessage (signature=0x0F) { + * } + * + * @returns true if the data was successfully sent to the client + * when flushing, false otherwise + */ + bool MessageReset() { + WriteRAW(underlying_cast(Marker::TinyStruct)); + WriteRAW(underlying_cast(Signature::Reset)); + return buffer_.Flush(); + } +}; +} diff --git a/src/communication/bolt/v1/states/error.hpp b/src/communication/bolt/v1/states/error.hpp index dfbf35366..1d80f4891 100644 --- a/src/communication/bolt/v1/states/error.hpp +++ b/src/communication/bolt/v1/states/error.hpp @@ -4,6 +4,7 @@ #include #include "communication/bolt/v1/codes.hpp" +#include "communication/bolt/v1/decoder/decoded_value.hpp" #include "communication/bolt/v1/state.hpp" #include "query/typed_value.hpp" @@ -69,9 +70,9 @@ State StateErrorRun(Session &session, State state) { // we need to clean up all parameters from this command value &= 0x0F; // the length is stored in the lower nibble - query::TypedValue tv; + DecodedValue dv; for (int i = 0; i < value; ++i) { - if (!session.decoder_.ReadTypedValue(&tv)) { + if (!session.decoder_.ReadValue(&dv)) { DLOG(WARNING) << fmt::format("Couldn't clean up parameter {} / {}!", i, value); return State::Close; diff --git a/src/communication/bolt/v1/states/handshake.hpp b/src/communication/bolt/v1/states/handshake.hpp index 0d4e45ef7..4f307da93 100644 --- a/src/communication/bolt/v1/states/handshake.hpp +++ b/src/communication/bolt/v1/states/handshake.hpp @@ -7,9 +7,6 @@ namespace communication::bolt { -static constexpr uint8_t preamble[4] = {0x60, 0x60, 0xB0, 0x17}; -static constexpr uint8_t protocol[4] = {0x00, 0x00, 0x00, 0x01}; - /** * Handshake state run function * This function runs everything to make a Bolt handshake with the client. @@ -17,7 +14,7 @@ static constexpr uint8_t protocol[4] = {0x00, 0x00, 0x00, 0x01}; */ template State StateHandshakeRun(Session &session) { - auto precmp = memcmp(session.buffer_.data(), preamble, sizeof(preamble)); + auto precmp = memcmp(session.buffer_.data(), kPreamble, sizeof(kPreamble)); if (UNLIKELY(precmp != 0)) { DLOG(WARNING) << "Received a wrong preamble!"; return State::Close; @@ -27,7 +24,7 @@ State StateHandshakeRun(Session &session) { // make sense to check which version the client prefers // this will change in the future - if (!session.socket_.Write(protocol, sizeof(protocol))) { + if (!session.socket_.Write(kProtocol, sizeof(kProtocol))) { DLOG(WARNING) << "Couldn't write handshake response!"; return State::Close; } diff --git a/src/communication/bolt/v1/states/idle_result.hpp b/src/communication/bolt/v1/states/idle_result.hpp index bb501f66b..ad403442c 100644 --- a/src/communication/bolt/v1/states/idle_result.hpp +++ b/src/communication/bolt/v1/states/idle_result.hpp @@ -6,6 +6,7 @@ #include #include "communication/bolt/v1/codes.hpp" +#include "communication/bolt/v1/decoder/decoded_value.hpp" #include "communication/bolt/v1/state.hpp" #include "query/exceptions.hpp" #include "query/typed_value.hpp" @@ -25,20 +26,19 @@ State HandleRun(Session &session, State state, Marker marker) { return State::Close; } - query::TypedValue query, params; - if (!session.decoder_.ReadTypedValue(&query, - query::TypedValue::Type::String)) { + DecodedValue query, params; + if (!session.decoder_.ReadValue(&query, DecodedValue::Type::String)) { DLOG(WARNING) << "Couldn't read query string!"; return State::Close; } - if (!session.decoder_.ReadTypedValue(¶ms, query::TypedValue::Type::Map)) { + if (!session.decoder_.ReadValue(¶ms, DecodedValue::Type::Map)) { DLOG(WARNING) << "Couldn't read parameters!"; return State::Close; } if (state == State::WaitForRollback) { - if (query.Value() == "ROLLBACK") { + if (query.ValueString() == "ROLLBACK") { session.Abort(); // One MessageSuccess for RUN command should be flushed. session.encoder_.MessageSuccess(kEmptyFields); @@ -65,7 +65,7 @@ State HandleRun(Session &session, State state, Marker marker) { debug_assert(!session.encoder_buffer_.HasData(), "There should be no data to write in this state"); - DLOG(INFO) << fmt::format("[Run] '{}'", query.Value()); + DLOG(INFO) << fmt::format("[Run] '{}'", query.ValueString()); bool in_explicit_transaction = false; if (session.db_accessor_) { // Transaction already exists. @@ -79,7 +79,7 @@ State HandleRun(Session &session, State state, Marker marker) { // If there was not explicitly started transaction before maybe we are // starting one now. - if (!in_explicit_transaction && query.Value() == "BEGIN") { + if (!in_explicit_transaction && query.ValueString() == "BEGIN") { // Check if query string is "BEGIN". If it is then we should start // transaction and wait for in-transaction queries. // TODO: "BEGIN" is not defined by bolt protocol or opencypher so we should @@ -94,14 +94,14 @@ State HandleRun(Session &session, State state, Marker marker) { } if (in_explicit_transaction) { - if (query.Value() == "COMMIT") { + if (query.ValueString() == "COMMIT") { session.Commit(); // One MessageSuccess for RUN command should be flushed. session.encoder_.MessageSuccess(kEmptyFields); // One for PULL_ALL should be chunked. session.encoder_.MessageSuccess({}, false); return State::Result; - } else if (query.Value() == "ROLLBACK") { + } else if (query.ValueString() == "ROLLBACK") { session.Abort(); // One MessageSuccess for RUN command should be flushed. session.encoder_.MessageSuccess(kEmptyFields); @@ -113,10 +113,12 @@ State HandleRun(Session &session, State state, Marker marker) { } try { - auto is_successfully_executed = session.query_engine_.Run( - query.Value(), *session.db_accessor_, - session.output_stream_, - params.Value>()); + auto ¶ms_map = params.ValueMap(); + std::map params_tv(params_map.begin(), + params_map.end()); + auto is_successfully_executed = + session.query_engine_.Run(query.ValueString(), *session.db_accessor_, + session.output_stream_, params_tv); // TODO: once we remove compiler from query_engine we can change return type // to void and not do this checks here. diff --git a/src/communication/bolt/v1/states/init.hpp b/src/communication/bolt/v1/states/init.hpp index 119d95161..449b2eb65 100644 --- a/src/communication/bolt/v1/states/init.hpp +++ b/src/communication/bolt/v1/states/init.hpp @@ -4,6 +4,7 @@ #include #include "communication/bolt/v1/codes.hpp" +#include "communication/bolt/v1/decoder/decoded_value.hpp" #include "communication/bolt/v1/encoder/result_stream.hpp" #include "communication/bolt/v1/state.hpp" #include "utils/likely.hpp" @@ -45,22 +46,19 @@ State StateInitRun(Session &session) { // return State::Close; } - query::TypedValue client_name; - if (!session.decoder_.ReadTypedValue(&client_name, - query::TypedValue::Type::String)) { + DecodedValue client_name; + if (!session.decoder_.ReadValue(&client_name, DecodedValue::Type::String)) { DLOG(WARNING) << "Couldn't read client name!"; return State::Close; } - query::TypedValue metadata; - if (!session.decoder_.ReadTypedValue(&metadata, - query::TypedValue::Type::Map)) { + DecodedValue metadata; + if (!session.decoder_.ReadValue(&metadata, DecodedValue::Type::Map)) { DLOG(WARNING) << "Couldn't read metadata!"; return State::Close; } - LOG(INFO) << fmt::format("Client connected '{}'", - client_name.Value()) + LOG(INFO) << fmt::format("Client connected '{}'", client_name.ValueString()) << std::endl; if (!session.encoder_.MessageSuccess()) { diff --git a/src/durability/recovery.cpp b/src/durability/recovery.cpp index b32ad15fa..13e9365e1 100644 --- a/src/durability/recovery.cpp +++ b/src/durability/recovery.cpp @@ -1,6 +1,9 @@ #include "durability/recovery.hpp" #include "communication/bolt/v1/decoder/decoder.hpp" #include "durability/file_reader_buffer.hpp" +#include "query/typed_value.hpp" + +using communication::bolt::DecodedValue; bool Recovery::Recover(const fs::path &snapshot_file, GraphDbAccessor &db_accessor) { @@ -25,41 +28,43 @@ bool Recovery::Decode(const fs::path &snapshot_file, } std::unordered_map vertices; - query::TypedValue tv; - if (!decoder.ReadTypedValue(&tv, query::TypedValue::Type::List)) { + DecodedValue dv; + if (!decoder.ReadValue(&dv, DecodedValue::Type::List)) { buffer.Close(); return false; } - auto &label_property_vector = tv.Value>(); + auto &label_property_vector = dv.ValueList(); for (int i = 0; i < label_property_vector.size(); i += 2) { - auto label = label_property_vector[i].Value(); - auto property = label_property_vector[i + 1].Value(); + auto label = label_property_vector[i].ValueString(); + auto property = label_property_vector[i + 1].ValueString(); db_accessor.BuildIndex(db_accessor.Label(label), db_accessor.Property(property)); } for (int64_t i = 0; i < summary.vertex_num_; ++i) { - communication::bolt::DecodedVertex vertex; - if (!decoder.ReadVertex(&vertex)) { + DecodedValue vertex_dv; + if (!decoder.ReadValue(&vertex_dv, DecodedValue::Type::Vertex)) { buffer.Close(); return false; } + auto &vertex = vertex_dv.ValueVertex(); auto vertex_accessor = db_accessor.InsertVertex(); for (const auto &label : vertex.labels) { vertex_accessor.add_label(db_accessor.Label(label)); } for (const auto &property_pair : vertex.properties) { vertex_accessor.PropsSet(db_accessor.Property(property_pair.first), - property_pair.second); + query::TypedValue(property_pair.second)); } vertices.insert({vertex.id, vertex_accessor}); } for (int64_t i = 0; i < summary.edge_num_; ++i) { - communication::bolt::DecodedEdge edge; - if (!decoder.ReadEdge(&edge)) { + DecodedValue edge_dv; + if (!decoder.ReadValue(&edge_dv, DecodedValue::Type::Edge)) { buffer.Close(); return false; } + auto &edge = edge_dv.ValueEdge(); auto it_from = vertices.find(edge.from); auto it_to = vertices.find(edge.to); if (it_from == vertices.end() || it_to == vertices.end()) { @@ -71,7 +76,7 @@ bool Recovery::Decode(const fs::path &snapshot_file, for (const auto &property_pair : edge.properties) edge_accessor.PropsSet(db_accessor.Property(property_pair.first), - property_pair.second); + query::TypedValue(property_pair.second)); } uint64_t hash = buffer.hash(); diff --git a/tests/manual/bolt_client.cpp b/tests/manual/bolt_client.cpp new file mode 100644 index 000000000..4242893fd --- /dev/null +++ b/tests/manual/bolt_client.cpp @@ -0,0 +1,66 @@ +#include +#include + +#include "communication/bolt/client.hpp" +#include "io/network/network_endpoint.hpp" +#include "io/network/socket.hpp" + +using SocketT = io::network::Socket; +using EndpointT = io::network::NetworkEndpoint; +using ClientT = communication::bolt::Client; + +DEFINE_string(address, "127.0.0.1", "Server address"); +DEFINE_string(port, "7687", "Server port"); +DEFINE_string(username, "", "Username for the database"); +DEFINE_string(password, "", "Password for the database"); + +int main(int argc, char **argv) { + gflags::ParseCommandLineFlags(&argc, &argv, true); + google::InitGoogleLogging(argv[0]); + + // TODO: handle endpoint exception + EndpointT endpoint(FLAGS_address, FLAGS_port); + SocketT socket; + + if (!socket.Connect(endpoint)) return 1; + + ClientT client(std::move(socket), FLAGS_username, FLAGS_password); + + std::cout << "Memgraph bolt client is connected and running." << std::endl; + + while (true) { + std::string s; + std::getline(std::cin, s); + if (s == "") { + break; + } + try { + auto ret = client.Execute(s, {}); + + std::cout << "Fields:" << std::endl; + for (auto &field : ret.fields) { + std::cout << " " << field << std::endl; + } + + std::cout << "Records:" << std::endl; + for (int i = 0; i < ret.records.size(); ++i) { + std::cout << " " << i << std::endl; + for (auto &value : ret.records[i]) { + std::cout << " " << value << std::endl; + } + } + + std::cout << "Metadata:" << std::endl; + for (auto &data : ret.metadata) { + std::cout << " " << data.first << " : " << data.second << std::endl; + } + } + catch (const communication::bolt::ClientQueryException &e) { + std::cout << "Client received exception: " << e.what() << std::endl; + } + } + + client.Close(); + + return 0; +} diff --git a/tests/manual/harness_client.cpp b/tests/manual/harness_client.cpp new file mode 100644 index 000000000..1a704ba78 --- /dev/null +++ b/tests/manual/harness_client.cpp @@ -0,0 +1,166 @@ +#include +#include + +#include +#include + +#include "communication/bolt/client.hpp" +#include "io/network/network_endpoint.hpp" +#include "io/network/socket.hpp" +#include "threading/sync/spinlock.hpp" +#include "utils/algorithm.hpp" +#include "utils/timer.hpp" + +using SocketT = io::network::Socket; +using EndpointT = io::network::NetworkEndpoint; +using ClientT = communication::bolt::Client; +using DecodedValueT = communication::bolt::DecodedValue; + +DEFINE_string(address, "127.0.0.1", "Server address"); +DEFINE_string(port, "7687", "Server port"); +DEFINE_uint64(num_workers, 1, "Number of workers"); +DEFINE_string(output, "", "Output file"); +DEFINE_string(username, "", "Username for the database"); +DEFINE_string(password, "", "Password for the database"); + +const uint64_t MAX_RETRIES = 1000; + +void PrintJsonDecodedValue(std::ostream &os, const DecodedValueT &value) { + switch (value.type()) { + case DecodedValueT::Type::Null: + os << "null"; + break; + case DecodedValueT::Type::Bool: + os << (value.ValueBool() ? "true" : "false"); + break; + case DecodedValueT::Type::Int: + os << value.ValueInt(); + break; + case DecodedValueT::Type::Double: + os << value.ValueDouble(); + break; + case DecodedValueT::Type::String: + os << "\"" << value.ValueString() << "\""; + break; + case DecodedValueT::Type::List: + os << "["; + PrintIterable(os, value.ValueList(), ", ", + [](auto &stream, const auto &item) { + PrintJsonDecodedValue(stream, item); + }); + os << "]"; + break; + case DecodedValueT::Type::Map: + os << "{"; + PrintIterable(os, value.ValueMap(), ", ", + [](auto &stream, const auto &pair) { + PrintJsonDecodedValue(stream, {pair.first}); + stream << ": "; + PrintJsonDecodedValue(stream, pair.second); + }); + os << "}"; + break; + default: + std::terminate(); + } +} + +void PrintJsonMetadata( + std::ostream &os, + const std::vector> &metadata) { + os << "["; + PrintIterable(os, metadata, ", ", [](auto &stream, const auto &item) { + PrintJsonDecodedValue(stream, item); + }); + os << "]"; +} + +void PrintSummary( + std::ostream &os, double duration, + const std::vector> &metadata) { + os << "{\"wall_time\": " << duration << ", " + << "\"metadatas\": "; + PrintJsonMetadata(os, metadata); + os << "}"; +} + +int main(int argc, char **argv) { + gflags::ParseCommandLineFlags(&argc, &argv, true); + google::InitGoogleLogging(argv[0]); + + std::string query; + std::vector threads; + + SpinLock mutex; + uint64_t last = 0; + std::vector queries; + std::vector> metadata; + + while (std::getline(std::cin, query)) { + queries.push_back(query); + } + metadata.resize(queries.size()); + + utils::Timer timer; + + for (int i = 0; i < FLAGS_num_workers; ++i) { + threads.push_back(std::thread([&]() { + SocketT socket; + EndpointT endpoint; + + try { + endpoint = EndpointT(FLAGS_address, FLAGS_port); + } catch (const io::network::NetworkEndpointException &e) { + std::terminate(); + } + if (!socket.Connect(endpoint)) { + std::terminate(); + } + + ClientT client(std::move(socket), FLAGS_username, FLAGS_password); + + uint64_t pos, i; + std::string str; + while (true) { + { + std::lock_guard lock(mutex); + if (last == queries.size()) { + break; + } + pos = last++; + str = queries[pos]; + } + for (i = 0; i < MAX_RETRIES; ++i) { + try { + auto ret = client.Execute(str, {}); + std::lock_guard lock(mutex); + metadata[pos] = ret.metadata; + break; + } catch (const communication::bolt::ClientQueryException &e) { + } + } + if (i == MAX_RETRIES) { + std::terminate(); + } + } + client.Close(); + })); + } + + for (int i = 0; i < FLAGS_num_workers; ++i) { + threads[i].join(); + } + + auto elapsed = timer.Elapsed(); + double duration = elapsed.count(); + + if (FLAGS_output != "") { + std::ofstream ofile; + ofile.open(FLAGS_output); + PrintSummary(ofile, duration, metadata); + } else { + PrintSummary(std::cout, duration, metadata); + } + + return 0; +} diff --git a/tests/unit/bolt_decoder.cpp b/tests/unit/bolt_decoder.cpp index dba26fb5e..434588632 100644 --- a/tests/unit/bolt_decoder.cpp +++ b/tests/unit/bolt_decoder.cpp @@ -6,7 +6,7 @@ #include "communication/bolt/v1/decoder/decoder.hpp" #include "query/typed_value.hpp" -using query::TypedValue; +using communication::bolt::DecodedValue; constexpr const int SIZE = 131072; uint8_t data[SIZE]; @@ -44,49 +44,49 @@ using DecoderT = communication::bolt::Decoder; TEST(BoltDecoder, NullAndBool) { TestDecoderBuffer buffer; DecoderT decoder(buffer); - TypedValue tv; + DecodedValue dv; // test null buffer.Write((const uint8_t *)"\xC0", 1); - ASSERT_EQ(decoder.ReadTypedValue(&tv), true); - ASSERT_EQ(tv.type(), TypedValue::Type::Null); + ASSERT_EQ(decoder.ReadValue(&dv), true); + ASSERT_EQ(dv.type(), DecodedValue::Type::Null); // test true buffer.Write((const uint8_t *)"\xC3", 1); - ASSERT_EQ(decoder.ReadTypedValue(&tv), true); - ASSERT_EQ(tv.type(), TypedValue::Type::Bool); - ASSERT_EQ(tv.Value(), true); + ASSERT_EQ(decoder.ReadValue(&dv), true); + ASSERT_EQ(dv.type(), DecodedValue::Type::Bool); + ASSERT_EQ(dv.ValueBool(), true); // test false buffer.Write((const uint8_t *)"\xC2", 1); - ASSERT_EQ(decoder.ReadTypedValue(&tv), true); - ASSERT_EQ(tv.type(), TypedValue::Type::Bool); - ASSERT_EQ(tv.Value(), false); + ASSERT_EQ(decoder.ReadValue(&dv), true); + ASSERT_EQ(dv.type(), DecodedValue::Type::Bool); + ASSERT_EQ(dv.ValueBool(), false); } TEST(BoltDecoder, Int) { TestDecoderBuffer buffer; DecoderT decoder(buffer); - TypedValue tv; + DecodedValue dv; // test invalid marker buffer.Clear(); buffer.Write((uint8_t *)"\xCD", 1); // 0xCD is reserved in the protocol - ASSERT_EQ(decoder.ReadTypedValue(&tv), false); + ASSERT_EQ(decoder.ReadValue(&dv), false); for (int i = 0; i < 28; ++i) { // test missing data buffer.Clear(); buffer.Write(int_encoded[i], int_encoded_len[i] - 1); - ASSERT_EQ(decoder.ReadTypedValue(&tv), false); + ASSERT_EQ(decoder.ReadValue(&dv), false); // test all ok buffer.Clear(); buffer.Write(int_encoded[i], int_encoded_len[i]); - ASSERT_EQ(decoder.ReadTypedValue(&tv), true); - ASSERT_EQ(tv.type(), TypedValue::Type::Int); - ASSERT_EQ(tv.Value(), int_decoded[i]); + ASSERT_EQ(decoder.ReadValue(&dv), true); + ASSERT_EQ(dv.type(), DecodedValue::Type::Int); + ASSERT_EQ(dv.ValueInt(), int_decoded[i]); } } @@ -94,20 +94,20 @@ TEST(BoltDecoder, Double) { TestDecoderBuffer buffer; DecoderT decoder(buffer); - TypedValue tv; + DecodedValue dv; for (int i = 0; i < 4; ++i) { // test missing data buffer.Clear(); buffer.Write(double_encoded[i], 8); - ASSERT_EQ(decoder.ReadTypedValue(&tv), false); + ASSERT_EQ(decoder.ReadValue(&dv), false); // test all ok buffer.Clear(); buffer.Write(double_encoded[i], 9); - ASSERT_EQ(decoder.ReadTypedValue(&tv), true); - ASSERT_EQ(tv.type(), TypedValue::Type::Double); - ASSERT_EQ(tv.Value(), double_decoded[i]); + ASSERT_EQ(decoder.ReadValue(&dv), true); + ASSERT_EQ(dv.type(), DecodedValue::Type::Double); + ASSERT_EQ(dv.ValueDouble(), double_decoded[i]); } } @@ -115,7 +115,7 @@ TEST(BoltDecoder, String) { TestDecoderBuffer buffer; DecoderT decoder(buffer); - TypedValue tv; + DecodedValue dv; uint8_t headers[][6] = {"\x8F", "\xD0\x0F", "\xD1\x00\x0F", "\xD2\x00\x00\x00\x0F"}; @@ -125,21 +125,21 @@ TEST(BoltDecoder, String) { // test missing data in header buffer.Clear(); buffer.Write(headers[i], headers_len[i] - 1); - ASSERT_EQ(decoder.ReadTypedValue(&tv), false); + ASSERT_EQ(decoder.ReadValue(&dv), false); // test missing elements buffer.Clear(); buffer.Write(headers[i], headers_len[i]); buffer.Write(data, 14); - ASSERT_EQ(decoder.ReadTypedValue(&tv), false); + ASSERT_EQ(decoder.ReadValue(&dv), false); // test all ok buffer.Clear(); buffer.Write(headers[i], headers_len[i]); buffer.Write(data, 15); - ASSERT_EQ(decoder.ReadTypedValue(&tv), true); - ASSERT_EQ(tv.type(), TypedValue::Type::String); - std::string &str = tv.Value(); + ASSERT_EQ(decoder.ReadValue(&dv), true); + ASSERT_EQ(dv.type(), DecodedValue::Type::String); + std::string &str = dv.ValueString(); for (int j = 0; j < 15; ++j) EXPECT_EQ((uint8_t)str[j], data[j]); } } @@ -148,7 +148,7 @@ TEST(BoltDecoder, List) { TestDecoderBuffer buffer; DecoderT decoder(buffer); - TypedValue tv; + DecodedValue dv; uint8_t headers[][6] = {"\x9F", "\xD4\x0F", "\xD5\x00\x0F", "\xD6\x00\x00\x00\x0F"}; @@ -158,23 +158,23 @@ TEST(BoltDecoder, List) { // test missing data in header buffer.Clear(); buffer.Write(headers[i], headers_len[i] - 1); - ASSERT_EQ(decoder.ReadTypedValue(&tv), false); + ASSERT_EQ(decoder.ReadValue(&dv), false); // test missing elements buffer.Clear(); buffer.Write(headers[i], headers_len[i]); for (uint8_t j = 0; j < 14; ++j) buffer.Write(&j, 1); - ASSERT_EQ(decoder.ReadTypedValue(&tv), false); + ASSERT_EQ(decoder.ReadValue(&dv), false); // test all ok buffer.Clear(); buffer.Write(headers[i], headers_len[i]); for (uint8_t j = 0; j < 15; ++j) buffer.Write(&j, 1); - ASSERT_EQ(decoder.ReadTypedValue(&tv), true); - ASSERT_EQ(tv.type(), TypedValue::Type::List); - std::vector &val = tv.Value>(); + ASSERT_EQ(decoder.ReadValue(&dv), true); + ASSERT_EQ(dv.type(), DecodedValue::Type::List); + std::vector &val = dv.ValueList(); ASSERT_EQ(val.size(), 15); - for (int j = 0; j < 15; ++j) EXPECT_EQ(val[j].Value(), j); + for (int j = 0; j < 15; ++j) EXPECT_EQ(val[j].ValueInt(), j); } } @@ -182,7 +182,7 @@ TEST(BoltDecoder, Map) { TestDecoderBuffer buffer; DecoderT decoder(buffer); - TypedValue tv; + DecodedValue dv; uint8_t headers[][6] = {"\xAF", "\xD8\x0F", "\xD9\x00\x0F", "\xDA\x00\x00\x00\x0F"}; @@ -195,20 +195,20 @@ TEST(BoltDecoder, Map) { // test missing data in header buffer.Clear(); buffer.Write(headers[i], headers_len[i] - 1); - ASSERT_EQ(decoder.ReadTypedValue(&tv), false); + ASSERT_EQ(decoder.ReadValue(&dv), false); // test wrong index type buffer.Clear(); buffer.Write(headers[i], headers_len[i]); buffer.Write(&wrong_index, 1); buffer.Write(&wrong_index, 1); - ASSERT_EQ(decoder.ReadTypedValue(&tv), false); + ASSERT_EQ(decoder.ReadValue(&dv), false); // test missing element data buffer.Clear(); buffer.Write(headers[i], headers_len[i]); buffer.Write(index, 2); - ASSERT_EQ(decoder.ReadTypedValue(&tv), false); + ASSERT_EQ(decoder.ReadValue(&dv), false); // test missing elements buffer.Clear(); @@ -217,7 +217,7 @@ TEST(BoltDecoder, Map) { buffer.Write(index, 2); buffer.Write(&j, 1); } - ASSERT_EQ(decoder.ReadTypedValue(&tv), false); + ASSERT_EQ(decoder.ReadValue(&dv), false); // test elements with same index buffer.Clear(); @@ -226,7 +226,7 @@ TEST(BoltDecoder, Map) { buffer.Write(index, 2); buffer.Write(&j, 1); } - ASSERT_EQ(decoder.ReadTypedValue(&tv), false); + ASSERT_EQ(decoder.ReadValue(&dv), false); // test all ok buffer.Clear(); @@ -237,16 +237,15 @@ TEST(BoltDecoder, Map) { buffer.Write(&tmp, 1); buffer.Write(&j, 1); } - ASSERT_EQ(decoder.ReadTypedValue(&tv), true); - ASSERT_EQ(tv.type(), TypedValue::Type::Map); - std::map &val = - tv.Value>(); + ASSERT_EQ(decoder.ReadValue(&dv), true); + ASSERT_EQ(dv.type(), DecodedValue::Type::Map); + std::map &val = dv.ValueMap(); ASSERT_EQ(val.size(), 15); for (int j = 0; j < 15; ++j) { char tmp_chr = 'a' + j; - TypedValue tmp_tv = val[std::string(1, tmp_chr)]; - EXPECT_EQ(tmp_tv.type(), TypedValue::Type::Int); - EXPECT_EQ(tmp_tv.Value(), j); + DecodedValue tmp_dv = val[std::string(1, tmp_chr)]; + EXPECT_EQ(tmp_dv.type(), DecodedValue::Type::Int); + EXPECT_EQ(tmp_dv.ValueInt(), j); } } } @@ -255,7 +254,7 @@ TEST(BoltDecoder, Vertex) { TestDecoderBuffer buffer; DecoderT decoder(buffer); - communication::bolt::DecodedVertex dv; + DecodedValue dv; uint8_t header[] = "\xB3\x4E"; uint8_t wrong_header[] = "\x00\x00"; @@ -267,31 +266,31 @@ TEST(BoltDecoder, Vertex) { // test missing signature buffer.Clear(); buffer.Write(wrong_header, 1); - ASSERT_EQ(decoder.ReadVertex(&dv), false); + ASSERT_EQ(decoder.ReadValue(&dv, DecodedValue::Type::Vertex), false); // test wrong marker buffer.Clear(); buffer.Write(wrong_header, 2); - ASSERT_EQ(decoder.ReadVertex(&dv), false); + ASSERT_EQ(decoder.ReadValue(&dv, DecodedValue::Type::Vertex), false); // test wrong signature buffer.Clear(); buffer.Write(header, 1); buffer.Write(wrong_header, 1); - ASSERT_EQ(decoder.ReadVertex(&dv), false); + ASSERT_EQ(decoder.ReadValue(&dv, DecodedValue::Type::Vertex), false); // test ID wrong type buffer.Clear(); buffer.Write(header, 2); buffer.Write(test_str, 2); - ASSERT_EQ(decoder.ReadVertex(&dv), false); + ASSERT_EQ(decoder.ReadValue(&dv, DecodedValue::Type::Vertex), false); // test labels wrong outer type buffer.Clear(); buffer.Write(header, 2); buffer.Write(test_int, 1); buffer.Write(test_int, 1); - ASSERT_EQ(decoder.ReadVertex(&dv), false); + ASSERT_EQ(decoder.ReadValue(&dv, DecodedValue::Type::Vertex), false); // test labels wrong inner type buffer.Clear(); @@ -299,7 +298,7 @@ TEST(BoltDecoder, Vertex) { buffer.Write(test_int, 1); buffer.Write(test_list, 1); buffer.Write(test_int, 1); - ASSERT_EQ(decoder.ReadVertex(&dv), false); + ASSERT_EQ(decoder.ReadValue(&dv, DecodedValue::Type::Vertex), false); // test properties wrong outer type buffer.Clear(); @@ -307,7 +306,7 @@ TEST(BoltDecoder, Vertex) { buffer.Write(test_int, 1); buffer.Write(test_list, 1); buffer.Write(test_str, 2); - ASSERT_EQ(decoder.ReadVertex(&dv), false); + ASSERT_EQ(decoder.ReadValue(&dv, DecodedValue::Type::Vertex), false); // test all ok buffer.Clear(); @@ -318,17 +317,18 @@ TEST(BoltDecoder, Vertex) { buffer.Write(test_map, 1); buffer.Write(test_str, 2); buffer.Write(test_int, 1); - ASSERT_EQ(decoder.ReadVertex(&dv), true); - ASSERT_EQ(dv.id, 1); - ASSERT_EQ(dv.labels[0], std::string("a")); - ASSERT_EQ(dv.properties[std::string("a")].Value(), 1); + ASSERT_EQ(decoder.ReadValue(&dv, DecodedValue::Type::Vertex), true); + auto &vertex = dv.ValueVertex(); + ASSERT_EQ(vertex.id, 1); + ASSERT_EQ(vertex.labels[0], std::string("a")); + ASSERT_EQ(vertex.properties[std::string("a")].ValueInt(), 1); } TEST(BoltDecoder, Edge) { TestDecoderBuffer buffer; DecoderT decoder(buffer); - communication::bolt::DecodedEdge de; + DecodedValue de; uint8_t header[] = "\xB5\x52"; uint8_t wrong_header[] = "\x00\x00"; @@ -341,31 +341,31 @@ TEST(BoltDecoder, Edge) { // test missing signature buffer.Clear(); buffer.Write(wrong_header, 1); - ASSERT_EQ(decoder.ReadEdge(&de), false); + ASSERT_EQ(decoder.ReadValue(&de, DecodedValue::Type::Edge), false); // test wrong marker buffer.Clear(); buffer.Write(wrong_header, 2); - ASSERT_EQ(decoder.ReadEdge(&de), false); + ASSERT_EQ(decoder.ReadValue(&de, DecodedValue::Type::Edge), false); // test wrong signature buffer.Clear(); buffer.Write(header, 1); buffer.Write(wrong_header, 1); - ASSERT_EQ(decoder.ReadEdge(&de), false); + ASSERT_EQ(decoder.ReadValue(&de, DecodedValue::Type::Edge), false); // test ID wrong type buffer.Clear(); buffer.Write(header, 2); buffer.Write(test_str, 2); - ASSERT_EQ(decoder.ReadEdge(&de), false); + ASSERT_EQ(decoder.ReadValue(&de, DecodedValue::Type::Edge), false); // test from_id wrong type buffer.Clear(); buffer.Write(header, 2); buffer.Write(test_int1, 1); buffer.Write(test_str, 2); - ASSERT_EQ(decoder.ReadEdge(&de), false); + ASSERT_EQ(decoder.ReadValue(&de, DecodedValue::Type::Edge), false); // test to_id wrong type buffer.Clear(); @@ -373,7 +373,7 @@ TEST(BoltDecoder, Edge) { buffer.Write(test_int1, 1); buffer.Write(test_int2, 1); buffer.Write(test_str, 2); - ASSERT_EQ(decoder.ReadEdge(&de), false); + ASSERT_EQ(decoder.ReadValue(&de, DecodedValue::Type::Edge), false); // test type wrong type buffer.Clear(); @@ -382,7 +382,7 @@ TEST(BoltDecoder, Edge) { buffer.Write(test_int2, 1); buffer.Write(test_int3, 1); buffer.Write(test_int1, 1); - ASSERT_EQ(decoder.ReadEdge(&de), false); + ASSERT_EQ(decoder.ReadValue(&de, DecodedValue::Type::Edge), false); // test properties wrong outer type buffer.Clear(); @@ -392,7 +392,7 @@ TEST(BoltDecoder, Edge) { buffer.Write(test_int3, 1); buffer.Write(test_str, 2); buffer.Write(test_int1, 1); - ASSERT_EQ(decoder.ReadEdge(&de), false); + ASSERT_EQ(decoder.ReadValue(&de, DecodedValue::Type::Edge), false); // test all ok buffer.Clear(); @@ -404,12 +404,13 @@ TEST(BoltDecoder, Edge) { buffer.Write(test_map, 1); buffer.Write(test_str, 2); buffer.Write(test_int1, 1); - ASSERT_EQ(decoder.ReadEdge(&de), true); - ASSERT_EQ(de.id, 1); - ASSERT_EQ(de.from, 2); - ASSERT_EQ(de.to, 3); - ASSERT_EQ(de.type, std::string("a")); - ASSERT_EQ(de.properties[std::string("a")].Value(), 1); + ASSERT_EQ(decoder.ReadValue(&de, DecodedValue::Type::Edge), true); + auto &edge = de.ValueEdge(); + ASSERT_EQ(edge.id, 1); + ASSERT_EQ(edge.from, 2); + ASSERT_EQ(edge.to, 3); + ASSERT_EQ(edge.type, std::string("a")); + ASSERT_EQ(edge.properties[std::string("a")].ValueInt(), 1); } int main(int argc, char **argv) { diff --git a/tests/unit/recovery.cpp b/tests/unit/recovery.cpp index 10ae6dddf..ce5410655 100644 --- a/tests/unit/recovery.cpp +++ b/tests/unit/recovery.cpp @@ -109,20 +109,22 @@ TEST_F(RecoveryTest, TestEncoding) { snapshot::Summary summary; buffer.Open(snapshot, summary); - query::TypedValue tv; - decoder.ReadTypedValue(&tv); + communication::bolt::DecodedValue dv; + decoder.ReadValue(&dv); std::vector ids; std::vector edge_types; for (int i = 0; i < summary.vertex_num_; ++i) { - communication::bolt::DecodedVertex vertex; - decoder.ReadVertex(&vertex); + communication::bolt::DecodedValue vertex_dv; + decoder.ReadValue(&vertex_dv); + auto &vertex = vertex_dv.ValueVertex(); ids.push_back(vertex.id); } std::vector from, to; for (int i = 0; i < summary.edge_num_; ++i) { - communication::bolt::DecodedEdge edge; - decoder.ReadEdge(&edge); + communication::bolt::DecodedValue edge_dv; + decoder.ReadValue(&edge_dv); + auto &edge = edge_dv.ValueEdge(); from.push_back(edge.from); to.push_back(edge.to); edge_types.push_back(edge.type);