From 5a5ffface3dd5d95234f7893f375c8ee6a94d3ec Mon Sep 17 00:00:00 2001 From: Matej Ferencevic <matej.ferencevic@memgraph.io> Date: Wed, 22 Mar 2017 16:36:48 +0100 Subject: [PATCH] Modified hardcoded queries to use new encoder. Summary: Removed old encoder. Changed namespace from bolt to communication::bolt. Removed old include from new encoder. Added an empty message success to encoder. Changed order in communication::Server. Changed bolt session to use new encoder. Merge remote-tracking branch 'origin/dev' into mg_hardcoded_queries Fixed PrintRecordStream. Reviewers: buda, dgleich Reviewed By: buda Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D158 --- src/communication/bolt/v1/config.hpp | 14 -- .../bolt/v1/encoder/chunked_buffer.hpp | 1 - src/communication/bolt/v1/encoder/encoder.hpp | 10 + src/communication/bolt/v1/messaging/codes.hpp | 2 +- src/communication/bolt/v1/packing/codes.hpp | 5 +- src/communication/bolt/v1/packing/types.hpp | 2 +- .../bolt/v1/serialization/bolt_serializer.hpp | 136 ------------ .../bolt/v1/serialization/record_stream.hpp | 158 -------------- src/communication/bolt/v1/session.hpp | 23 +- src/communication/bolt/v1/state.hpp | 2 +- src/communication/bolt/v1/states/error.hpp | 27 +-- src/communication/bolt/v1/states/executor.hpp | 37 ++-- .../bolt/v1/states/handshake.hpp | 2 +- src/communication/bolt/v1/states/init.hpp | 12 +- .../bolt/v1/transport/bolt_decoder.cpp | 2 +- .../bolt/v1/transport/bolt_decoder.hpp | 2 +- .../bolt/v1/transport/bolt_encoder.hpp | 200 ------------------ .../bolt/v1/transport/buffer.cpp | 2 +- .../bolt/v1/transport/buffer.hpp | 2 +- .../bolt/v1/transport/chunked_buffer.hpp | 58 ----- .../bolt/v1/transport/chunked_decoder.hpp | 2 +- .../bolt/v1/transport/chunked_encoder.hpp | 83 -------- .../bolt/v1/transport/stream_error.hpp | 2 +- .../v1/transport/streamed_bolt_decoder.hpp | 2 +- src/communication/server.hpp | 2 +- src/memgraph_bolt.cpp | 8 +- src/query/plan_template_cpp | 2 +- tests/integration/hardcoded_query/clique.hpp | 22 +- .../create_full_profile_conceals_return.cpp | 9 +- .../create_full_profile_return.cpp | 9 +- .../create_full_profile_reveals_return.cpp | 9 +- .../hardcoded_query/create_garment.cpp | 9 +- .../create_garment_conceals.cpp | 9 +- .../create_garment_reveals.cpp | 9 +- .../hardcoded_query/delete_all.cpp | 6 +- .../hardcoded_query/match_garment.cpp | 9 +- .../match_garment_default_outfit.cpp | 9 +- ...match_garment_set_label_general_return.hpp | 9 +- .../hardcoded_query/match_profile.cpp | 9 +- .../match_profile_garment_score.cpp | 9 +- .../match_profile_garment_set_score.cpp | 9 +- .../match_profile_garment_update_score.cpp | 9 +- tests/integration/hardcoded_query/using.hpp | 4 +- .../stream/print_record_stream.hpp | 38 +--- tests/unit/bolt_session.cpp | 13 +- tests/unit/chunked_decoder.cpp | 2 +- tests/unit/chunked_encoder.cpp | 111 ---------- 47 files changed, 188 insertions(+), 923 deletions(-) delete mode 100644 src/communication/bolt/v1/config.hpp delete mode 100644 src/communication/bolt/v1/serialization/bolt_serializer.hpp delete mode 100644 src/communication/bolt/v1/serialization/record_stream.hpp delete mode 100644 src/communication/bolt/v1/transport/bolt_encoder.hpp delete mode 100644 src/communication/bolt/v1/transport/chunked_buffer.hpp delete mode 100644 src/communication/bolt/v1/transport/chunked_encoder.hpp delete mode 100644 tests/unit/chunked_encoder.cpp diff --git a/src/communication/bolt/v1/config.hpp b/src/communication/bolt/v1/config.hpp deleted file mode 100644 index c05680b7a..000000000 --- a/src/communication/bolt/v1/config.hpp +++ /dev/null @@ -1,14 +0,0 @@ -#pragma once - -#include <cstddef> - -namespace bolt { - -namespace config { -/** chunk size */ -static constexpr size_t N = 65535; - -/** end mark */ -static constexpr size_t C = N + 2; -} -} diff --git a/src/communication/bolt/v1/encoder/chunked_buffer.hpp b/src/communication/bolt/v1/encoder/chunked_buffer.hpp index 0e07f319d..85f663fe6 100644 --- a/src/communication/bolt/v1/encoder/chunked_buffer.hpp +++ b/src/communication/bolt/v1/encoder/chunked_buffer.hpp @@ -5,7 +5,6 @@ #include <vector> #include <algorithm> -#include "communication/bolt/v1/config.hpp" #include "logging/default.hpp" #include "utils/types/byte.hpp" #include "utils/bswap.hpp" diff --git a/src/communication/bolt/v1/encoder/encoder.hpp b/src/communication/bolt/v1/encoder/encoder.hpp index f49f78de4..cc4620386 100644 --- a/src/communication/bolt/v1/encoder/encoder.hpp +++ b/src/communication/bolt/v1/encoder/encoder.hpp @@ -64,6 +64,16 @@ class Encoder { buffer_.Flush(); } + /** + * Sends a Success message. + * + * This function sends a success message without additional metadata. + */ + void MessageSuccess() { + std::map<std::string, TypedValue> metadata; + MessageSuccess(metadata); + } + /** * Sends a Failure message. * diff --git a/src/communication/bolt/v1/messaging/codes.hpp b/src/communication/bolt/v1/messaging/codes.hpp index 09a8f9cfe..04bc0bff4 100644 --- a/src/communication/bolt/v1/messaging/codes.hpp +++ b/src/communication/bolt/v1/messaging/codes.hpp @@ -3,7 +3,7 @@ #include "utils/types/byte.hpp" #include "utils/underlying_cast.hpp" -namespace bolt { +namespace communication::bolt { enum class MessageCode : byte { Init = 0x01, diff --git a/src/communication/bolt/v1/packing/codes.hpp b/src/communication/bolt/v1/packing/codes.hpp index 607ddd6f8..b21aa3b98 100644 --- a/src/communication/bolt/v1/packing/codes.hpp +++ b/src/communication/bolt/v1/packing/codes.hpp @@ -2,9 +2,7 @@ #include <cstdint> -namespace bolt { - -namespace pack { +namespace communication::bolt::pack { enum Code : uint8_t { TinyString = 0x80, @@ -55,4 +53,3 @@ enum Code : uint8_t { enum Rule : uint8_t { MaxInitStructSize = 0x02 }; } -} diff --git a/src/communication/bolt/v1/packing/types.hpp b/src/communication/bolt/v1/packing/types.hpp index ede64bba3..9838af1c0 100644 --- a/src/communication/bolt/v1/packing/types.hpp +++ b/src/communication/bolt/v1/packing/types.hpp @@ -1,6 +1,6 @@ #pragma once -namespace bolt { +namespace communication::bolt { enum class PackType { /** denotes absence of a value */ diff --git a/src/communication/bolt/v1/serialization/bolt_serializer.hpp b/src/communication/bolt/v1/serialization/bolt_serializer.hpp deleted file mode 100644 index cd7dfd88b..000000000 --- a/src/communication/bolt/v1/serialization/bolt_serializer.hpp +++ /dev/null @@ -1,136 +0,0 @@ -#pragma once - -#include "communication/bolt/v1/packing/codes.hpp" -#include "communication/bolt/v1/transport/bolt_encoder.hpp" -#include "communication/bolt/v1/transport/chunked_buffer.hpp" -#include "communication/bolt/v1/transport/chunked_encoder.hpp" - -#include "database/graph_db.hpp" -#include "database/graph_db_accessor.hpp" -#include "storage/property_value_store.hpp" - -namespace bolt { - -template <class Stream> -class BoltSerializer { - public: - BoltSerializer(Stream &stream) : encoder(stream) {} - - /** Serializes the vertex accessor into the packstream format - * - * struct[size = 3] Vertex [signature = 0x4E] { - * Integer node_id; - * List<String> labels; - * Map<String, Value> properties; - * } - * - */ - void write(const VertexAccessor &vertex) { - // write signatures for the node struct and node data type - encoder.write_struct_header(3); - encoder.write(underlying_cast(pack::Code::Node)); - - // IMPORTANT: here we write a hardcoded 0 because we don't - // use internal IDs, but need to give something to Bolt - // note that OpenCypher has no id(x) function, so the client - // should not be able to do anything with this value anyway - encoder.write_integer(0); // uID - - // write the list of labels - auto labels = vertex.labels(); - encoder.write_list_header(labels.size()); - for (auto label : labels) - encoder.write_string(vertex.db_accessor().label_name(label)); - - // write the properties - const PropertyValueStore<GraphDb::Property> &props = vertex.Properties(); - encoder.write_map_header(props.size()); - props.Accept([this, &vertex](const GraphDb::Property prop, - const PropertyValue &value) { - this->encoder.write(vertex.db_accessor().property_name(prop)); - this->write(value); - }); - } - - /** Serializes the edge accessor into the packstream format - * - * struct[size = 5] Edge [signature = 0x52] { - * Integer edge_id; // IMPORTANT: always 0 since we - * don't do IDs - * Integer start_node_id; // IMPORTANT: always 0 since we - * don't do IDs - * Integer end_node_id; // IMPORTANT: always 0 since we - * don't do IDs - * String type; - * Map<String, Value> properties; - * } - * - */ - void write(const EdgeAccessor &edge) { - // write signatures for the edge struct and edge data type - encoder.write_struct_header(5); - encoder.write(underlying_cast(pack::Code::Relationship)); - - // IMPORTANT: here we write a hardcoded 0 because we don't - // use internal IDs, but need to give something to Bolt - // note that OpenCypher has no id(x) function, so the client - // should not be able to do anything with this value anyway - encoder.write_integer(0); - encoder.write_integer(0); - encoder.write_integer(0); - - // write the type of the edge - encoder.write(edge.db_accessor().edge_type_name(edge.edge_type())); - - // write the property map - const PropertyValueStore<GraphDb::Property> &props = edge.Properties(); - encoder.write_map_header(props.size()); - props.Accept( - [this, &edge](GraphDb::Property prop, const PropertyValue &value) { - this->encoder.write(edge.db_accessor().property_name(prop)); - this->write(value); - }); - } - - // TODO document - void write_failure(const std::map<std::string, std::string> &data) { - encoder.message_failure(); - encoder.write_map_header(data.size()); - for (auto const &kv : data) { - write(kv.first); - write(kv.second); - } - } - - /** - * Writes a PropertyValue (typically a property value in the edge or vertex). - * - * @param value The value to write. - */ - void write(const PropertyValue &value) { - switch (value.type()) { - case PropertyValue::Type::Null: - encoder.write_null(); - return; - case PropertyValue::Type::Bool: - encoder.write_bool(value.Value<bool>()); - return; - case PropertyValue::Type::String: - encoder.write_string(value.Value<std::string>()); - return; - case PropertyValue::Type::Int: - encoder.write_integer(value.Value<int64_t>()); - return; - case PropertyValue::Type::Double: - encoder.write_double(value.Value<double>()); - return; - case PropertyValue::Type::List: - // Not implemented - assert(false); - } - } - - protected: - Stream &encoder; -}; -} diff --git a/src/communication/bolt/v1/serialization/record_stream.hpp b/src/communication/bolt/v1/serialization/record_stream.hpp deleted file mode 100644 index 0a730c5ce..000000000 --- a/src/communication/bolt/v1/serialization/record_stream.hpp +++ /dev/null @@ -1,158 +0,0 @@ -#pragma once - -#include "communication/bolt/v1/serialization/bolt_serializer.hpp" -#include "communication/bolt/v1/transport/chunked_buffer.hpp" -#include "communication/bolt/v1/transport/chunked_encoder.hpp" - -#include "logging/default.hpp" - -namespace bolt { - -/** - * compiled queries have to use this class in order to return results - * query code should not know about bolt protocol - */ -template <class Socket> -class RecordStream { - public: - RecordStream(Socket &socket) : socket(socket) { - logger = logging::log->logger("Record Stream"); - } - - ~RecordStream() = default; - - // TODO: create apstract methods that are not bolt specific --------------- - void write_success() { - logger.trace("write_success"); - bolt_encoder.message_success(); - } - - void write_success_empty() { - logger.trace("write_success_empty"); - bolt_encoder.message_success_empty(); - } - - void write_ignored() { - logger.trace("write_ignored"); - bolt_encoder.message_ignored(); - } - - void write_empty_fields() { - bolt_encoder.message_success(); - bolt_encoder.write_map_header(1); - bolt_encoder.write_string("fields"); - write_list_header(0); - chunk(); - } - - void write_fields(const std::vector<std::string> &fields) { - // TODO: that should be one level below? - bolt_encoder.message_success(); - - bolt_encoder.write_map_header(1); - bolt_encoder.write_string("fields"); - write_list_header(fields.size()); - - for (auto &name : fields) { - bolt_encoder.write_string(name); - } - - chunk(); - send(); - } - - void write_field(const std::string &field) { - bolt_encoder.message_success(); - bolt_encoder.write_map_header(1); - bolt_encoder.write_string("fields"); - write_list_header(1); - bolt_encoder.write_string(field); - chunk(); - send(); - } - - void write_list_header(size_t size) { bolt_encoder.write_list_header(size); } - - void write_record() { bolt_encoder.message_record(); } - - // writes metadata at the end of the message - // TODO: write whole implementation (currently, only type is supported) - // { "stats": { "nodes created": 1, "properties set": 1}, - // "type": "r" | "rw" | ... - void write_meta(const std::string &type) { - bolt_encoder.message_success(); - bolt_encoder.write_map_header(1); - bolt_encoder.write_string("type"); - bolt_encoder.write_string(type); - chunk(); - } - - void write_failure(const std::map<std::string, std::string> &data) { - serializer.write_failure(data); - chunk(); - } - - void write_count(const size_t count) { - write_record(); - write_list_header(1); - write(count); - chunk(); - } - - void write(const VertexAccessor &vertex) { serializer.write(vertex); } - - void write_vertex_record(const VertexAccessor &va) { - write_record(); - write_list_header(1); - write(va); - chunk(); - } - - void write(const EdgeAccessor &edge) { serializer.write(edge); } - - void write_edge_record(const EdgeAccessor &ea) { - write_record(); - write_list_header(1); - write(ea); - chunk(); - } - - void write(const PropertyValue &value) { serializer.write(value); } - - void send() { chunked_buffer.flush(); } - - void chunk() { chunked_encoder.write_chunk(); } - - // TODO WTF is this test doing here? - void _write_test() { - logger.trace("write_test"); - - write_fields({{"name"}}); - - write_record(); - write_list_header(1); - bolt_encoder.write("max"); - - write_record(); - write_list_header(1); - bolt_encoder.write("paul"); - - write_success_empty(); - } - - protected: - Logger logger; - - private: - using buffer_t = ChunkedBuffer<Socket>; - using chunked_encoder_t = ChunkedEncoder<buffer_t>; - using bolt_encoder_t = BoltEncoder<chunked_encoder_t>; - using bolt_serializer_t = BoltSerializer<bolt_encoder_t>; - - Socket &socket; - buffer_t chunked_buffer{socket}; - chunked_encoder_t chunked_encoder{chunked_buffer}; - bolt_encoder_t bolt_encoder{chunked_encoder}; - bolt_serializer_t serializer{bolt_encoder}; -}; -} diff --git a/src/communication/bolt/v1/session.hpp b/src/communication/bolt/v1/session.hpp index 00a1c53b8..a2fffc015 100644 --- a/src/communication/bolt/v1/session.hpp +++ b/src/communication/bolt/v1/session.hpp @@ -12,23 +12,24 @@ #include "communication/bolt/v1/states/executor.hpp" #include "communication/bolt/v1/states/error.hpp" -#include "communication/bolt/v1/serialization/record_stream.hpp" +#include "communication/bolt/v1/encoder/encoder.hpp" +#include "communication/bolt/v1/encoder/result_stream.hpp" #include "communication/bolt/v1/transport/bolt_decoder.hpp" -#include "communication/bolt/v1/transport/bolt_encoder.hpp" #include "logging/default.hpp" -namespace bolt { +namespace communication::bolt { template<typename Socket> class Session { public: using Decoder = BoltDecoder; - using OutputStream = RecordStream<Socket>; + using OutputStream = ResultStream<Socket>; Session(Socket &&socket, Dbms &dbms, QueryEngine<OutputStream> &query_engine) : socket(std::move(socket)), dbms(dbms), query_engine(query_engine), + encoder(this->socket), output_stream(encoder), logger(logging::log->logger("Session")) { event.data.ptr = this; // start with a handshake state @@ -63,15 +64,18 @@ class Session { break; case INIT: logger.debug("Current state: INIT"); - state = state_init_run<Socket>(output_stream, decoder); + // TODO: swap around parameters so that inputs are first and outputs are last! + state = state_init_run<Socket>(encoder, decoder); break; case EXECUTOR: logger.debug("Current state: EXECUTOR"); - state = state_executor_run<Socket>(output_stream, decoder, dbms, query_engine); + // TODO: swap around parameters so that inputs are first and outputs are last! + state = state_executor_run<Socket>(output_stream, encoder, decoder, dbms, query_engine); break; case ERROR: logger.debug("Current state: ERROR"); - state = state_error_run<Socket>(output_stream, decoder); + // TODO: swap around parameters so that inputs are first and outputs are last! + state = state_error_run<Socket>(output_stream, encoder, decoder); break; case NULLSTATE: break; @@ -86,6 +90,8 @@ class Session { this->socket.Close(); } + // TODO: these members should be private + Socket socket; io::network::Epoll::Event event; @@ -95,7 +101,8 @@ class Session { GraphDbAccessor active_db() { return dbms.active(); } Decoder decoder; - OutputStream output_stream{socket}; + Encoder<ChunkedBuffer<Socket>, Socket> encoder; + OutputStream output_stream; bool connected{false}; State state; diff --git a/src/communication/bolt/v1/state.hpp b/src/communication/bolt/v1/state.hpp index e4415d2ae..51a78d827 100644 --- a/src/communication/bolt/v1/state.hpp +++ b/src/communication/bolt/v1/state.hpp @@ -1,6 +1,6 @@ #pragma once -namespace bolt { +namespace communication::bolt { enum State { HANDSHAKE, diff --git a/src/communication/bolt/v1/states/error.hpp b/src/communication/bolt/v1/states/error.hpp index e4e6baba3..34b52ded2 100644 --- a/src/communication/bolt/v1/states/error.hpp +++ b/src/communication/bolt/v1/states/error.hpp @@ -2,14 +2,14 @@ #include "communication/bolt/v1/state.hpp" #include "communication/bolt/v1/transport/bolt_decoder.hpp" -#include "communication/bolt/v1/serialization/record_stream.hpp" +#include "communication/bolt/v1/encoder/result_stream.hpp" #include "logging/default.hpp" -namespace bolt { +namespace communication::bolt { template<typename Socket> -State state_error_run(RecordStream<Socket> &output_stream, BoltDecoder &decoder) { +State state_error_run(ResultStream<Socket> &output_stream, Encoder<ChunkedBuffer<Socket>, Socket>& encoder, BoltDecoder &decoder) { Logger logger = logging::log->logger("State ERROR"); logger.trace("Run"); @@ -19,34 +19,29 @@ State state_error_run(RecordStream<Socket> &output_stream, BoltDecoder &decoder) logger.trace("Message type byte is: {:02X}", message_type); if (message_type == MessageCode::PullAll) { - output_stream.write_ignored(); - output_stream.chunk(); - output_stream.send(); + // TODO: write_ignored, chunk, send + encoder.MessageIgnored(); return ERROR; } else if (message_type == MessageCode::AckFailure) { // TODO reset current statement? is it even necessary? logger.trace("AckFailure received"); - output_stream.write_success_empty(); - output_stream.chunk(); - output_stream.send(); + // TODO: write_success, chunk, send + encoder.MessageSuccess(); return EXECUTOR; } else if (message_type == MessageCode::Reset) { // TODO rollback current transaction // discard all records waiting to be sent - output_stream.write_success_empty(); - output_stream.chunk(); - output_stream.send(); + // TODO: write_success, chunk, send + encoder.MessageSuccess(); return EXECUTOR; } - // TODO: write this as single call - output_stream.write_ignored(); - output_stream.chunk(); - output_stream.send(); + // TODO: write_ignored, chunk, send + encoder.MessageIgnored(); return ERROR; } diff --git a/src/communication/bolt/v1/states/executor.hpp b/src/communication/bolt/v1/states/executor.hpp index 07058fc37..c8c516991 100644 --- a/src/communication/bolt/v1/states/executor.hpp +++ b/src/communication/bolt/v1/states/executor.hpp @@ -10,14 +10,14 @@ #include "logging/default.hpp" -namespace bolt { +namespace communication::bolt { struct Query { std::string statement; }; template<typename Socket> -State state_executor_run(RecordStream<Socket> &output_stream, BoltDecoder &decoder, Dbms &dmbs, QueryEngine<RecordStream<Socket>> &query_engine){ +State state_executor_run(ResultStream<Socket> &output_stream, Encoder<ChunkedBuffer<Socket>, Socket>& encoder, BoltDecoder &decoder, Dbms &dmbs, QueryEngine<ResultStream<Socket>> &query_engine){ Logger logger = logging::log->logger("State EXECUTOR"); // just read one byte that represents the struct type, we can skip the // information contained in this byte @@ -43,23 +43,23 @@ State state_executor_run(RecordStream<Socket> &output_stream, BoltDecoder &decod query_engine.Run(query.statement, db_accessor, output_stream); if (!is_successfully_executed) { - output_stream.write_failure( + // TODO: write_failure, send + encoder.MessageFailure( {{"code", "Memgraph.QueryExecutionFail"}, {"message", "Query execution has failed (probably there is no " "element or there are some problems with concurrent " "access -> client has to resolve problems with " "concurrent access)"}}); - output_stream.send(); return ERROR; } return EXECUTOR; // TODO: RETURN success MAYBE } catch (const frontend::opencypher::SyntaxException &e) { - output_stream.write_failure( + // TODO: write_failure, send + encoder.MessageFailure( {{"code", "Memgraph.SyntaxException"}, {"message", "Syntax error"}}); - output_stream.send(); return ERROR; // } catch (const backend::cpp::GeneratorException &e) { // output_stream.write_failure( @@ -68,34 +68,33 @@ State state_executor_run(RecordStream<Socket> &output_stream, BoltDecoder &decod // output_stream.send(); // return ERROR; } catch (const QueryEngineException &e) { - output_stream.write_failure( + // TODO: write_failure, send + encoder.MessageFailure( {{"code", "Memgraph.QueryEngineException"}, {"message", "Query engine was unable to execute the query"}}); - output_stream.send(); return ERROR; } catch (const StacktraceException &e) { - output_stream.write_failure( + // TODO: write_failure, send + encoder.MessageFailure( {{"code", "Memgraph.StacktraceException"}, - {"message", "Unknow exception"}}); - output_stream.send(); + {"message", "Unknown exception"}}); return ERROR; } catch (std::exception &e) { - output_stream.write_failure( - {{"code", "Memgraph.Exception"}, {"message", "unknow exception"}}); - output_stream.send(); + // TODO: write_failure, send + encoder.MessageFailure( + {{"code", "Memgraph.Exception"}, {"message", "Unknown exception"}}); return ERROR; } } else if (message_type == MessageCode::PullAll) { logger.trace("[PullAll]"); - output_stream.send(); + // TODO: all query output should not be immediately flushed from the buffer, it should wait the PullAll command to start flushing!! + //output_stream.send(); } else if (message_type == MessageCode::DiscardAll) { logger.trace("[DiscardAll]"); // TODO: discard state - - output_stream.write_success(); - output_stream.chunk(); - output_stream.send(); + // TODO: write_success, send + encoder.MessageSuccess(); } else if (message_type == MessageCode::Reset) { // TODO: rollback current transaction // discard all records waiting to be sent diff --git a/src/communication/bolt/v1/states/handshake.hpp b/src/communication/bolt/v1/states/handshake.hpp index a6261b1c5..0007ea127 100644 --- a/src/communication/bolt/v1/states/handshake.hpp +++ b/src/communication/bolt/v1/states/handshake.hpp @@ -6,7 +6,7 @@ #include "logging/default.hpp" -namespace bolt { +namespace communication::bolt { static constexpr uint32_t preamble = 0x6060B017; diff --git a/src/communication/bolt/v1/states/init.hpp b/src/communication/bolt/v1/states/init.hpp index 59d05c796..68fa50a78 100644 --- a/src/communication/bolt/v1/states/init.hpp +++ b/src/communication/bolt/v1/states/init.hpp @@ -1,17 +1,18 @@ #pragma once +#include "communication/bolt/v1/packing/codes.hpp" #include "communication/bolt/v1/state.hpp" #include "communication/bolt/v1/transport/bolt_decoder.hpp" -#include "communication/bolt/v1/serialization/record_stream.hpp" +#include "communication/bolt/v1/encoder/result_stream.hpp" #include "communication/bolt/v1/messaging/codes.hpp" #include "logging/default.hpp" #include "utils/likely.hpp" -namespace bolt { +namespace communication::bolt { template<typename Socket> -State state_init_run(RecordStream<Socket> &output_stream, BoltDecoder &decoder) { +State state_init_run(Encoder<ChunkedBuffer<Socket>, Socket> &encoder, BoltDecoder &decoder) { Logger logger = logging::log->logger("State INIT"); logger.debug("Parsing message"); @@ -45,9 +46,8 @@ State state_init_run(RecordStream<Socket> &output_stream, BoltDecoder &decoder) logger.debug("Executing state"); logger.debug("Client connected '{}'", client_name); - output_stream.write_success_empty(); - output_stream.chunk(); - output_stream.send(); + // TODO: write_success, chunk, send + encoder.MessageSuccess(); return EXECUTOR; } diff --git a/src/communication/bolt/v1/transport/bolt_decoder.cpp b/src/communication/bolt/v1/transport/bolt_decoder.cpp index da36a3f9f..a94892096 100644 --- a/src/communication/bolt/v1/transport/bolt_decoder.cpp +++ b/src/communication/bolt/v1/transport/bolt_decoder.cpp @@ -4,7 +4,7 @@ #include "logging/default.hpp" #include "utils/bswap.hpp" -namespace bolt { +namespace communication::bolt { void BoltDecoder::handshake(const byte *&data, size_t len) { buffer.write(data, len); diff --git a/src/communication/bolt/v1/transport/bolt_decoder.hpp b/src/communication/bolt/v1/transport/bolt_decoder.hpp index d752671bc..d8401bc96 100644 --- a/src/communication/bolt/v1/transport/bolt_decoder.hpp +++ b/src/communication/bolt/v1/transport/bolt_decoder.hpp @@ -4,7 +4,7 @@ #include "communication/bolt/v1/transport/chunked_decoder.hpp" #include "utils/types/byte.hpp" -namespace bolt { +namespace communication::bolt { class BoltDecoder { public: diff --git a/src/communication/bolt/v1/transport/bolt_encoder.hpp b/src/communication/bolt/v1/transport/bolt_encoder.hpp deleted file mode 100644 index 6364618b9..000000000 --- a/src/communication/bolt/v1/transport/bolt_encoder.hpp +++ /dev/null @@ -1,200 +0,0 @@ -#pragma once - -#include <string> - -#include "communication/bolt/v1/messaging/codes.hpp" -#include "communication/bolt/v1/packing/codes.hpp" -#include "logging/default.hpp" -#include "utils/bswap.hpp" -#include "utils/types/byte.hpp" - -namespace bolt { - -template <class Stream> -class BoltEncoder { - static constexpr int64_t plus_2_to_the_31 = 2147483648L; - static constexpr int64_t plus_2_to_the_15 = 32768L; - static constexpr int64_t plus_2_to_the_7 = 128L; - static constexpr int64_t minus_2_to_the_4 = -16L; - static constexpr int64_t minus_2_to_the_7 = -128L; - static constexpr int64_t minus_2_to_the_15 = -32768L; - static constexpr int64_t minus_2_to_the_31 = -2147483648L; - - public: - BoltEncoder(Stream &stream) : stream(stream) { - logger = logging::log->logger("Bolt Encoder"); - } - - void write(byte value) { write_byte(value); } - void write(pack::Code value) { write_byte(static_cast<byte>(value)); } - - void write_byte(byte value) { - logger.trace("write byte: {}", value); - stream.write(value); - } - - void write(const byte *values, size_t n) { stream.write(values, n); } - - void write_null() { stream.write(static_cast<byte>(pack::Code::Null)); } - - void write(bool value) { write_bool(value); } - - void write_bool(bool value) { - if (value) - write_true(); - else - write_false(); - } - - void write_true() { stream.write(pack::Code::True); } - - void write_false() { stream.write(pack::Code::False); } - - template <class T> - void write_value(T value) { - value = bswap(value); - stream.write(reinterpret_cast<const byte *>(&value), sizeof(value)); - } - - void write_integer(int64_t value) { - if (value >= minus_2_to_the_4 && value < plus_2_to_the_7) { - write(static_cast<byte>(value)); - } else if (value >= minus_2_to_the_7 && value < minus_2_to_the_4) { - write(pack::Code::Int8); - write(static_cast<byte>(value)); - } else if (value >= minus_2_to_the_15 && value < plus_2_to_the_15) { - write(pack::Code::Int16); - write_value(static_cast<int16_t>(value)); - } else if (value >= minus_2_to_the_31 && value < plus_2_to_the_31) { - write(pack::Code::Int32); - write_value(static_cast<int32_t>(value)); - } else { - write(pack::Code::Int64); - write_value(value); - } - } - - void write(double value) { write_double(value); } - - void write_double(double value) { - write(pack::Code::Float64); - write_value(*reinterpret_cast<const int64_t *>(&value)); - } - - void write_map_header(size_t size) { - if (size < 0x10) { - write(static_cast<byte>(static_cast<size_t>(pack::Code::TinyMap) | size)); - } else if (size <= 0xFF) { - write(pack::Code::Map8); - write(static_cast<byte>(size)); - } else if (size <= 0xFFFF) { - write(pack::Code::Map16); - write_value<uint16_t>(size); - } else { - write(pack::Code::Map32); - write_value<uint32_t>(size); - } - } - - void write_empty_map() { write(pack::Code::TinyMap); } - - void write_list_header(size_t size) { - if (size < 0x10) { - write( - static_cast<byte>(static_cast<size_t>(pack::Code::TinyList) | size)); - } else if (size <= 0xFF) { - write(pack::Code::List8); - write(static_cast<byte>(size)); - } else if (size <= 0xFFFF) { - write(pack::Code::List16); - write_value<uint16_t>(size); - } else { - write(pack::Code::List32); - write_value<uint32_t>(size); - } - } - - void write_empty_list() { write(pack::Code::TinyList); } - - void write_string_header(size_t size) { - if (size < 0x10) { - write( - static_cast<byte>(static_cast<byte>(pack::Code::TinyString) | size)); - } else if (size <= 0xFF) { - write(pack::Code::String8); - write(static_cast<byte>(size)); - } else if (size <= 0xFFFF) { - write(pack::Code::String16); - write_value<uint16_t>(size); - } else { - write(pack::Code::String32); - write_value<uint32_t>(size); - } - } - - void write(const std::string &str) { write_string(str); } - - void write_string(const std::string &str) { - write_string(str.c_str(), str.size()); - } - - void write_string(const char *str, size_t len) { - write_string_header(len); - write(reinterpret_cast<const byte *>(str), len); - } - - void write_struct_header(size_t size) { - if (size < 0x10) { - write(static_cast<byte>(static_cast<size_t>(pack::Code::TinyStruct) | - size)); - } else if (size <= 0xFF) { - write(pack::Code::Struct8); - write(static_cast<byte>(size)); - } else { - write(pack::Code::Struct16); - write_value<uint16_t>(size); - } - } - - void message_success() { - write_struct_header(1); - write(underlying_cast(MessageCode::Success)); - } - - void message_success_empty() { - message_success(); - write_empty_map(); - } - - void message_record() { - write_struct_header(1); - write(underlying_cast(MessageCode::Record)); - } - - void message_record_empty() { - message_record(); - write_empty_list(); - } - - void message_ignored() { - write_struct_header(0); - write(underlying_cast(MessageCode::Ignored)); - } - - void message_failure() { - write_struct_header(1); - write(underlying_cast(MessageCode::Failure)); - } - - void message_ignored_empty() { - message_ignored(); - write_empty_map(); - } - - protected: - Logger logger; - - private: - Stream &stream; -}; -} diff --git a/src/communication/bolt/v1/transport/buffer.cpp b/src/communication/bolt/v1/transport/buffer.cpp index e5bf838f8..b47bec8eb 100644 --- a/src/communication/bolt/v1/transport/buffer.cpp +++ b/src/communication/bolt/v1/transport/buffer.cpp @@ -1,6 +1,6 @@ #include "communication/bolt/v1/transport/buffer.hpp" -namespace bolt { +namespace communication::bolt { void Buffer::write(const byte* data, size_t len) { buffer.insert(buffer.end(), data, data + len); diff --git a/src/communication/bolt/v1/transport/buffer.hpp b/src/communication/bolt/v1/transport/buffer.hpp index 4de68a722..62d3828c4 100644 --- a/src/communication/bolt/v1/transport/buffer.hpp +++ b/src/communication/bolt/v1/transport/buffer.hpp @@ -6,7 +6,7 @@ #include "utils/types/byte.hpp" -namespace bolt { +namespace communication::bolt { class Buffer { public: diff --git a/src/communication/bolt/v1/transport/chunked_buffer.hpp b/src/communication/bolt/v1/transport/chunked_buffer.hpp deleted file mode 100644 index 208d0ce2f..000000000 --- a/src/communication/bolt/v1/transport/chunked_buffer.hpp +++ /dev/null @@ -1,58 +0,0 @@ -#pragma once - -#include <cstring> -#include <memory> -#include <vector> - -#include "communication/bolt/v1/config.hpp" -#include "logging/default.hpp" -#include "utils/types/byte.hpp" - -namespace bolt { - -template <class Stream> -class ChunkedBuffer { - static constexpr size_t C = bolt::config::C; /* chunk size */ - - public: - ChunkedBuffer(Stream &stream) : stream(stream) { - logger = logging::log->logger("Chunked Buffer"); - } - - void write(const byte *values, size_t n) { - logger.trace("Write {} bytes", n); - - // total size of the buffer is now bigger for n - size += n; - - // reserve enough spece for the new data - buffer.reserve(size); - - // copy new data - std::copy(values, values + n, std::back_inserter(buffer)); - } - - void flush() { - // TODO: check for success - stream.get().Write(&buffer.front(), size); - - logger.trace("Flushed {} bytes", size); - - // GC - // TODO: impelement a better strategy - buffer.clear(); - - // reset size - size = 0; - } - - ~ChunkedBuffer() {} - - private: - Logger logger; - // every new stream.write creates new TCP package - std::reference_wrapper<Stream> stream; - std::vector<byte> buffer; - size_t size{0}; -}; -} diff --git a/src/communication/bolt/v1/transport/chunked_decoder.hpp b/src/communication/bolt/v1/transport/chunked_decoder.hpp index 4f498bc92..2d02f16f2 100644 --- a/src/communication/bolt/v1/transport/chunked_decoder.hpp +++ b/src/communication/bolt/v1/transport/chunked_decoder.hpp @@ -8,7 +8,7 @@ #include "utils/likely.hpp" #include "utils/types/byte.hpp" -namespace bolt { +namespace communication::bolt { template <class Stream> class ChunkedDecoder { diff --git a/src/communication/bolt/v1/transport/chunked_encoder.hpp b/src/communication/bolt/v1/transport/chunked_encoder.hpp deleted file mode 100644 index 72d6b8c9c..000000000 --- a/src/communication/bolt/v1/transport/chunked_encoder.hpp +++ /dev/null @@ -1,83 +0,0 @@ -#pragma once - -#include <array> -#include <cstring> -#include <functional> - -#include "communication/bolt/v1/config.hpp" -#include "logging/default.hpp" -#include "utils/likely.hpp" - -namespace bolt { - -template <class Stream> -class ChunkedEncoder { - static constexpr size_t N = bolt::config::N; - static constexpr size_t C = bolt::config::C; - - public: - using byte = unsigned char; - - ChunkedEncoder(Stream &stream) - : logger(logging::log->logger("Chunked Encoder")), stream(stream) {} - - static constexpr size_t chunk_size = N - 2; - - void write(byte value) { - if (UNLIKELY(pos == N)) write_chunk(); - - chunk[pos++] = value; - } - - void write(const byte *values, size_t n) { - logger.trace("write {} bytes", n); - - while (n > 0) { - auto size = n < N - pos ? n : N - pos; - - std::memcpy(chunk.data() + pos, values, size); - - pos += size; - n -= size; - - // TODO: see how bolt splits message over more TCP packets, - // test for more TCP packets - if (pos == N) write_chunk(); - } - } - - void write_chunk() { - write_chunk_header(); - - // write two zeros to signal message end - chunk[pos++] = 0x00; - chunk[pos++] = 0x00; - - flush(); - } - - private: - Logger logger; - std::reference_wrapper<Stream> stream; - - std::array<byte, C> chunk; - size_t pos{2}; - - void write_chunk_header() { - // write the size of the chunk - uint16_t size = pos - 2; - - // write the higher byte - chunk[0] = size >> 8; - - // write the lower byte - chunk[1] = size & 0xFF; - } - - void flush() { - // write chunk to the stream - stream.get().write(chunk.data(), pos); - pos = 2; - } -}; -} diff --git a/src/communication/bolt/v1/transport/stream_error.hpp b/src/communication/bolt/v1/transport/stream_error.hpp index ac088661d..2f5e987e8 100644 --- a/src/communication/bolt/v1/transport/stream_error.hpp +++ b/src/communication/bolt/v1/transport/stream_error.hpp @@ -2,7 +2,7 @@ #include "utils/exceptions/stacktrace_exception.hpp" -namespace bolt { +namespace communication::bolt { class StreamError : StacktraceException { public: diff --git a/src/communication/bolt/v1/transport/streamed_bolt_decoder.hpp b/src/communication/bolt/v1/transport/streamed_bolt_decoder.hpp index 8211fa962..d39928be8 100644 --- a/src/communication/bolt/v1/transport/streamed_bolt_decoder.hpp +++ b/src/communication/bolt/v1/transport/streamed_bolt_decoder.hpp @@ -7,7 +7,7 @@ #include "utils/bswap.hpp" #include "utils/types/byte.hpp" -namespace bolt { +namespace communication::bolt { // BoltDecoder for streams. Meant for use in SnapshotDecoder. // This should be recoded to recieve the current caller so that decoder can diff --git a/src/communication/server.hpp b/src/communication/server.hpp index 72d8a92a8..3ce87b0b8 100644 --- a/src/communication/server.hpp +++ b/src/communication/server.hpp @@ -104,10 +104,10 @@ class Server std::atomic<bool> alive_{true}; int idx_{0}; + Socket socket_; Dbms &dbms_; QueryEngine<OutputStream> &query_engine_; Event event_; - Socket socket_; Logger logger_; }; } diff --git a/src/memgraph_bolt.cpp b/src/memgraph_bolt.cpp index f7d39e4d3..fa0cbdbcf 100644 --- a/src/memgraph_bolt.cpp +++ b/src/memgraph_bolt.cpp @@ -21,9 +21,9 @@ using endpoint_t = io::network::NetworkEndpoint; using socket_t = io::network::Socket; -using bolt_server_t = - communication::Server<bolt::Session<socket_t>, bolt::RecordStream<socket_t>, - socket_t>; +using session_t = communication::bolt::Session<socket_t>; +using result_stream_t = communication::bolt::ResultStream<socket_t>; +using bolt_server_t = communication::Server<session_t, result_stream_t, socket_t>; static bolt_server_t *serverptr; @@ -98,7 +98,7 @@ int main(int argc, char **argv) { logger.info("Listening on {} at {}", interface, port); Dbms dbms; - QueryEngine<bolt::RecordStream<socket_t>> query_engine; + QueryEngine<result_stream_t> query_engine; // initialize server bolt_server_t server(std::move(socket), dbms, query_engine); diff --git a/src/query/plan_template_cpp b/src/query/plan_template_cpp index 79b1e6e59..a9229c995 100644 --- a/src/query/plan_template_cpp +++ b/src/query/plan_template_cpp @@ -2,7 +2,7 @@ #include <string> #include "data_structures/bitset/static_bitset.hpp" -#include "communication/bolt/v1/serialization/record_stream.hpp" +#include "communication/bolt/v1/encoder/result_stream.hpp" #include "io/network/socket.hpp" #include "query/backend/cpp/typed_value.hpp" #include "query/plan_interface.hpp" diff --git a/tests/integration/hardcoded_query/clique.hpp b/tests/integration/hardcoded_query/clique.hpp index 49864c98f..03fcd64e8 100644 --- a/tests/integration/hardcoded_query/clique.hpp +++ b/tests/integration/hardcoded_query/clique.hpp @@ -30,12 +30,10 @@ enum CliqueQuery { SCORE_AND_LIMIT, FIND_ALL }; bool run_general_query(GraphDbAccessor &db_accessor, const Parameters &args, Stream &stream, enum CliqueQuery query_type) { - if (query_type == CliqueQuery::FIND_ALL) - stream.write_fields( - {"a.garment_id", "b.garment_id", "c.garment_id", "d.garment_id"}); - else - stream.write_fields({"a.garment_id", "b.garment_id", "c.garment_id", - "d.garment_id", "score"}); + std::vector<std::string> headers{std::string("a.garment_id"), std::string("b.garment_id"), std::string("c.garment_id"), std::string("d.garment_id")}; + if (query_type != CliqueQuery::FIND_ALL) + headers.push_back(std::string("score")); + stream.Header(headers); // TODO dgleich: this code is very inefficient as it first makes a copy // of all the vertices/edges, and filters aftwarwards. I warned about this // happening in code review!!! @@ -206,18 +204,18 @@ bool run_general_query(GraphDbAccessor &db_accessor, const Parameters &args, ? args.At((int)args.Size() - 1).Value<int64_t>() : (int)results.size(); for (int i = 0; i < std::min(limit, (int)results.size()); ++i) { - stream.write_record(); - stream.write_list_header(query_type == CliqueQuery::SCORE_AND_LIMIT ? 5 - : 4); + std::vector<TypedValue> result; for (auto x : results[i]) { - stream.write(vertices_indexed[x] + result.push_back(vertices_indexed[x] ->PropsAt(db_accessor.property("garment_id")) .Value<int64_t>()); } if (query_type == CliqueQuery::SCORE_AND_LIMIT) - stream.write(calc_score(results[i])); + result.push_back(calc_score(results[i])); + stream.Result(result); } - stream.write_meta("r"); + std::map<std::string, TypedValue> meta{std::make_pair(std::string("type"), TypedValue(std::string("r")))}; + stream.Summary(meta); db_accessor.commit(); return true; } diff --git a/tests/integration/hardcoded_query/create_full_profile_conceals_return.cpp b/tests/integration/hardcoded_query/create_full_profile_conceals_return.cpp index 989a22a3a..9ed793add 100644 --- a/tests/integration/hardcoded_query/create_full_profile_conceals_return.cpp +++ b/tests/integration/hardcoded_query/create_full_profile_conceals_return.cpp @@ -22,9 +22,12 @@ class CPUPlan : public PlanInterface<Stream> { v.PropsSet(db_accessor.property("partner_id"), args.At(1)); v.PropsSet(db_accessor.property("conceals"), args.At(2)); v.add_label(db_accessor.label("profile")); - stream.write_field("p"); - stream.write_vertex_record(v); - stream.write_meta("rw"); + std::vector<std::string> headers{std::string("p")}; + stream.Header(headers); + std::vector<TypedValue> result{TypedValue(v)}; + stream.Result(result); + std::map<std::string, TypedValue> meta{std::make_pair(std::string("type"), TypedValue(std::string("rw")))}; + stream.Summary(meta); db_accessor.commit(); return true; } diff --git a/tests/integration/hardcoded_query/create_full_profile_return.cpp b/tests/integration/hardcoded_query/create_full_profile_return.cpp index 6413079b5..51fddc4cd 100644 --- a/tests/integration/hardcoded_query/create_full_profile_return.cpp +++ b/tests/integration/hardcoded_query/create_full_profile_return.cpp @@ -20,9 +20,12 @@ class CPUPlan : public PlanInterface<Stream> { v.PropsSet(db_accessor.property("profile_id"), args.At(0)); v.PropsSet(db_accessor.property("partner_id"), args.At(1)); v.add_label(db_accessor.label("profile")); - stream.write_field("p"); - stream.write_vertex_record(v); - stream.write_meta("rw"); + std::vector<std::string> headers{std::string("p")}; + stream.Header(headers); + std::vector<TypedValue> result{TypedValue(v)}; + stream.Result(result); + std::map<std::string, TypedValue> meta{std::make_pair(std::string("type"), TypedValue(std::string("rw")))}; + stream.Summary(meta); db_accessor.commit(); return true; } diff --git a/tests/integration/hardcoded_query/create_full_profile_reveals_return.cpp b/tests/integration/hardcoded_query/create_full_profile_reveals_return.cpp index 7262ac20a..b35ae6f69 100644 --- a/tests/integration/hardcoded_query/create_full_profile_reveals_return.cpp +++ b/tests/integration/hardcoded_query/create_full_profile_reveals_return.cpp @@ -22,9 +22,12 @@ class CPUPlan : public PlanInterface<Stream> { v.PropsSet(db_accessor.property("partner_id"), args.At(1)); v.PropsSet(db_accessor.property("reveals"), args.At(2)); v.add_label(db_accessor.label("profile")); - stream.write_field("p"); - stream.write_vertex_record(v); - stream.write_meta("rw"); + std::vector<std::string> headers{std::string("p")}; + stream.Header(headers); + std::vector<TypedValue> result{TypedValue(v)}; + stream.Result(result); + std::map<std::string, TypedValue> meta{std::make_pair(std::string("type"), TypedValue(std::string("rw")))}; + stream.Summary(meta); db_accessor.commit(); return true; } diff --git a/tests/integration/hardcoded_query/create_garment.cpp b/tests/integration/hardcoded_query/create_garment.cpp index 56cc7f9fb..ba5f16fe0 100644 --- a/tests/integration/hardcoded_query/create_garment.cpp +++ b/tests/integration/hardcoded_query/create_garment.cpp @@ -20,9 +20,12 @@ class CPUPlan : public PlanInterface<Stream> { v.add_label(db_accessor.label("garment")); v.PropsSet(db_accessor.property("garment_id"), args.At(0)); v.PropsSet(db_accessor.property("garment_category_id"), args.At(1)); - stream.write_field("g"); - stream.write_vertex_record(v); - stream.write_meta("rw"); + std::vector<std::string> headers{std::string("g")}; + stream.Header(headers); + std::vector<TypedValue> result{TypedValue(v)}; + stream.Result(result); + std::map<std::string, TypedValue> meta{std::make_pair(std::string("type"), TypedValue(std::string("rw")))}; + stream.Summary(meta); db_accessor.commit(); return true; } diff --git a/tests/integration/hardcoded_query/create_garment_conceals.cpp b/tests/integration/hardcoded_query/create_garment_conceals.cpp index 08808efff..0139c6fd9 100644 --- a/tests/integration/hardcoded_query/create_garment_conceals.cpp +++ b/tests/integration/hardcoded_query/create_garment_conceals.cpp @@ -22,9 +22,12 @@ class CPUPlan : public PlanInterface<Stream> { v.PropsSet(db_accessor.property("garment_id"), args.At(0)); v.PropsSet(db_accessor.property("garment_category_id"), args.At(1)); v.PropsSet(db_accessor.property("conceals"), args.At(2)); - stream.write_field("g"); - stream.write_vertex_record(v); - stream.write_meta("rw"); + std::vector<std::string> headers{std::string("g")}; + stream.Header(headers); + std::vector<TypedValue> result{TypedValue(v)}; + stream.Result(result); + std::map<std::string, TypedValue> meta{std::make_pair(std::string("type"), TypedValue(std::string("rw")))}; + stream.Summary(meta); db_accessor.commit(); return true; } diff --git a/tests/integration/hardcoded_query/create_garment_reveals.cpp b/tests/integration/hardcoded_query/create_garment_reveals.cpp index 73ea1d205..ad2b388dd 100644 --- a/tests/integration/hardcoded_query/create_garment_reveals.cpp +++ b/tests/integration/hardcoded_query/create_garment_reveals.cpp @@ -22,9 +22,12 @@ class CPUPlan : public PlanInterface<Stream> { v.PropsSet(db_accessor.property("garment_id"), args.At(0)); v.PropsSet(db_accessor.property("garment_category_id"), args.At(1)); v.PropsSet(db_accessor.property("reveals"), args.At(2)); - stream.write_field("g"); - stream.write_vertex_record(v); - stream.write_meta("rw"); + std::vector<std::string> headers{std::string("g")}; + stream.Header(headers); + std::vector<TypedValue> result{TypedValue(v)}; + stream.Result(result); + std::map<std::string, TypedValue> meta{std::make_pair(std::string("type"), TypedValue(std::string("rw")))}; + stream.Summary(meta); db_accessor.commit(); return true; } diff --git a/tests/integration/hardcoded_query/delete_all.cpp b/tests/integration/hardcoded_query/delete_all.cpp index 80c23f74c..971d3f6d5 100644 --- a/tests/integration/hardcoded_query/delete_all.cpp +++ b/tests/integration/hardcoded_query/delete_all.cpp @@ -17,8 +17,10 @@ class CPUPlan : public PlanInterface<Stream> { bool run(GraphDbAccessor &db_accessor, const Parameters &args, Stream &stream) { for (auto v : db_accessor.vertices()) db_accessor.detach_remove_vertex(v); - stream.write_empty_fields(); - stream.write_meta("rw"); + std::vector<std::string> headers; + stream.Header(headers); + std::map<std::string, TypedValue> meta{std::make_pair(std::string("type"), TypedValue(std::string("rw")))}; + stream.Summary(meta); db_accessor.commit(); return true; } diff --git a/tests/integration/hardcoded_query/match_garment.cpp b/tests/integration/hardcoded_query/match_garment.cpp index a044713b0..0a41578a2 100644 --- a/tests/integration/hardcoded_query/match_garment.cpp +++ b/tests/integration/hardcoded_query/match_garment.cpp @@ -17,7 +17,8 @@ class CPUPlan : public PlanInterface<Stream> { public: bool run(GraphDbAccessor &db_accessor, const Parameters &args, Stream &stream) { - stream.write_field("g"); + std::vector<std::string> headers{std::string("g")}; + stream.Header(headers); for (auto vertex : db_accessor.vertices()) { if (vertex.has_label(db_accessor.label("garment"))) { TypedValue prop = vertex.PropsAt(db_accessor.property("garment_id")); @@ -25,10 +26,12 @@ class CPUPlan : public PlanInterface<Stream> { auto cmp = prop == args.At(0); if (cmp.type() != TypedValue::Type::Bool) continue; if (cmp.Value<bool>() != true) continue; - stream.write_vertex_record(vertex); + std::vector<TypedValue> result{TypedValue(vertex)}; + stream.Result(result); } } - stream.write_meta("r"); + std::map<std::string, TypedValue> meta{std::make_pair(std::string("type"), TypedValue(std::string("r")))}; + stream.Summary(meta); db_accessor.commit(); return true; } diff --git a/tests/integration/hardcoded_query/match_garment_default_outfit.cpp b/tests/integration/hardcoded_query/match_garment_default_outfit.cpp index 5fa8d96c7..94cb854e9 100644 --- a/tests/integration/hardcoded_query/match_garment_default_outfit.cpp +++ b/tests/integration/hardcoded_query/match_garment_default_outfit.cpp @@ -18,7 +18,8 @@ class CPUPlan : public PlanInterface<Stream> { public: bool run(GraphDbAccessor &db_accessor, const Parameters &args, Stream &stream) { - stream.write_field("r"); + std::vector<std::string> headers{std::string("r")}; + stream.Header(headers); std::vector<VertexAccessor> g1_set, g2_set; for (auto g1 : db_accessor.vertices()) { if (g1.has_label(db_accessor.label("garment"))) { @@ -44,9 +45,11 @@ class CPUPlan : public PlanInterface<Stream> { for (auto g2 : g2_set) { EdgeAccessor e = db_accessor.insert_edge( g1, g2, db_accessor.edge_type("default_outfit")); - stream.write_edge_record(e); + std::vector<TypedValue> result{TypedValue(e)}; + stream.Result(result); } - stream.write_meta("rw"); + std::map<std::string, TypedValue> meta{std::make_pair(std::string("type"), TypedValue(std::string("rw")))}; + stream.Summary(meta); db_accessor.commit(); return true; } diff --git a/tests/integration/hardcoded_query/match_garment_set_label_general_return.hpp b/tests/integration/hardcoded_query/match_garment_set_label_general_return.hpp index f74c609ce..e6283c41f 100644 --- a/tests/integration/hardcoded_query/match_garment_set_label_general_return.hpp +++ b/tests/integration/hardcoded_query/match_garment_set_label_general_return.hpp @@ -17,7 +17,8 @@ using std::endl; bool run_general_query(GraphDbAccessor &db_accessor, const Parameters &args, Stream &stream, const std::string &general_label) { - stream.write_field("g"); + std::vector<std::string> headers{std::string("g")}; + stream.Header(headers); for (auto vertex : db_accessor.vertices()) { if (vertex.has_label(db_accessor.label("garment"))) { TypedValue prop = vertex.PropsAt(db_accessor.property("garment_id")); @@ -26,10 +27,12 @@ bool run_general_query(GraphDbAccessor &db_accessor, if (cmp.type() != TypedValue::Type::Bool) continue; if (cmp.Value<bool>() != true) continue; vertex.add_label(db_accessor.label(general_label)); - stream.write_vertex_record(vertex); + std::vector<TypedValue> result{TypedValue(vertex)}; + stream.Result(result); } } - stream.write_meta("rw"); + std::map<std::string, TypedValue> meta{std::make_pair(std::string("type"), TypedValue(std::string("rw")))}; + stream.Summary(meta); db_accessor.commit(); return true; } diff --git a/tests/integration/hardcoded_query/match_profile.cpp b/tests/integration/hardcoded_query/match_profile.cpp index 092386d59..c3b5e069f 100644 --- a/tests/integration/hardcoded_query/match_profile.cpp +++ b/tests/integration/hardcoded_query/match_profile.cpp @@ -17,7 +17,8 @@ class CPUPlan : public PlanInterface<Stream> { public: bool run(GraphDbAccessor &db_accessor, const Parameters &args, Stream &stream) { - stream.write_field("p"); + std::vector<std::string> headers{std::string("p")}; + stream.Header(headers); for (auto vertex : db_accessor.vertices()) { if (vertex.has_label(db_accessor.label("profile"))) { TypedValue prop = vertex.PropsAt(db_accessor.property("profile_id")); @@ -31,10 +32,12 @@ class CPUPlan : public PlanInterface<Stream> { auto cmp2 = prop2 == args.At(1); if (cmp2.type() != TypedValue::Type::Bool) continue; if (cmp2.Value<bool>() != true) continue; - stream.write_vertex_record(vertex); + std::vector<TypedValue> result{TypedValue(vertex)}; + stream.Result(result); } } - stream.write_meta("r"); + std::map<std::string, TypedValue> meta{std::make_pair(std::string("type"), TypedValue(std::string("r")))}; + stream.Summary(meta); db_accessor.commit(); return true; } diff --git a/tests/integration/hardcoded_query/match_profile_garment_score.cpp b/tests/integration/hardcoded_query/match_profile_garment_score.cpp index 3fb69f97e..2ccc0cc45 100644 --- a/tests/integration/hardcoded_query/match_profile_garment_score.cpp +++ b/tests/integration/hardcoded_query/match_profile_garment_score.cpp @@ -20,7 +20,8 @@ class CPUPlan : public PlanInterface<Stream> { public: bool run(GraphDbAccessor &db_accessor, const Parameters &args, Stream &stream) { - stream.write_field("s"); + std::vector<std::string> headers{std::string("s")}; + stream.Header(headers); auto profile = [&db_accessor, &args](const VertexAccessor &v) -> bool { TypedValue prop = v.PropsAt(db_accessor.property("profile_id")); if (prop.type() == TypedValue::Type::Null) return false; @@ -46,10 +47,12 @@ class CPUPlan : public PlanInterface<Stream> { auto to = edge.to(); if (edge.edge_type() != db_accessor.edge_type("score")) continue; if ((profile(from) && garment(to)) || (profile(to) && garment(from))) { - stream.write_edge_record(edge); + std::vector<TypedValue> result{TypedValue(edge)}; + stream.Result(result); } } - stream.write_meta("r"); + std::map<std::string, TypedValue> meta{std::make_pair(std::string("type"), TypedValue(std::string("r")))}; + stream.Summary(meta); db_accessor.commit(); return true; } diff --git a/tests/integration/hardcoded_query/match_profile_garment_set_score.cpp b/tests/integration/hardcoded_query/match_profile_garment_set_score.cpp index c267ccb4f..fdc15ade4 100644 --- a/tests/integration/hardcoded_query/match_profile_garment_set_score.cpp +++ b/tests/integration/hardcoded_query/match_profile_garment_set_score.cpp @@ -18,7 +18,8 @@ class CPUPlan : public PlanInterface<Stream> { public: bool run(GraphDbAccessor &db_accessor, const Parameters &args, Stream &stream) { - stream.write_field("r"); + std::vector<std::string> headers{std::string("r")}; + stream.Header(headers); std::vector<VertexAccessor> g1_set, g2_set; for (auto g1 : db_accessor.vertices()) { if (g1.has_label(db_accessor.label("profile"))) { @@ -51,9 +52,11 @@ class CPUPlan : public PlanInterface<Stream> { EdgeAccessor e = db_accessor.insert_edge(g1, g2, db_accessor.edge_type("score")); e.PropsSet(db_accessor.property("score"), args.At(3)); - stream.write_edge_record(e); + std::vector<TypedValue> result{TypedValue(e)}; + stream.Result(result); } - stream.write_meta("rw"); + std::map<std::string, TypedValue> meta{std::make_pair(std::string("type"), TypedValue(std::string("rw")))}; + stream.Summary(meta); db_accessor.commit(); return true; } diff --git a/tests/integration/hardcoded_query/match_profile_garment_update_score.cpp b/tests/integration/hardcoded_query/match_profile_garment_update_score.cpp index 7f5a64fd2..7d75973bb 100644 --- a/tests/integration/hardcoded_query/match_profile_garment_update_score.cpp +++ b/tests/integration/hardcoded_query/match_profile_garment_update_score.cpp @@ -19,7 +19,8 @@ class CPUPlan : public PlanInterface<Stream> { public: bool run(GraphDbAccessor &db_accessor, const Parameters &args, Stream &stream) { - stream.write_field("s"); + std::vector<std::string> headers{std::string("s")}; + stream.Header(headers); auto profile = [&db_accessor, &args](const VertexAccessor &v) -> bool { TypedValue prop = v.PropsAt(db_accessor.property("profile_id")); if (prop.type() == TypedValue::Type::Null) return false; @@ -46,10 +47,12 @@ class CPUPlan : public PlanInterface<Stream> { if (edge.edge_type() != db_accessor.edge_type("score")) continue; if ((profile(from) && garment(to)) || (profile(to) && garment(from))) { edge.PropsSet(db_accessor.property("score"), args.At(3)); - stream.write_edge_record(edge); + std::vector<TypedValue> result{TypedValue(edge)}; + stream.Result(result); } } - stream.write_meta("rw"); + std::map<std::string, TypedValue> meta{std::make_pair(std::string("type"), TypedValue(std::string("rw")))}; + stream.Summary(meta); db_accessor.commit(); return true; } diff --git a/tests/integration/hardcoded_query/using.hpp b/tests/integration/hardcoded_query/using.hpp index a7335f4c4..f42ed18c9 100644 --- a/tests/integration/hardcoded_query/using.hpp +++ b/tests/integration/hardcoded_query/using.hpp @@ -3,9 +3,9 @@ // the flag is only used in hardcoded queries compilation // see usage in plan_compiler.hpp #ifndef HARDCODED_OUTPUT_STREAM -#include "communication/bolt/v1/serialization/record_stream.hpp" +#include "communication/bolt/v1/encoder/result_stream.hpp" #include "io/network/socket.hpp" -using Stream = bolt::RecordStream<io::network::Socket>; +using Stream = communication::bolt::ResultStream<io::network::Socket>; #else #include "../stream/print_record_stream.hpp" using Stream = PrintRecordStream; diff --git a/tests/integration/stream/print_record_stream.hpp b/tests/integration/stream/print_record_stream.hpp index 42d57e91e..878732eee 100644 --- a/tests/integration/stream/print_record_stream.hpp +++ b/tests/integration/stream/print_record_stream.hpp @@ -44,40 +44,16 @@ class PrintRecordStream { public: PrintRecordStream(std::ostream &stream) : stream(stream) {} - void write_success() { stream << "SUCCESS\n"; } - - void write_success_empty() { stream << "SUCCESS EMPTY\n"; } - - void write_ignored() { stream << "IGNORED\n"; } - - void write_empty_fields() { stream << "EMPTY FIELDS\n"; } - - void write_fields(const std::vector<std::string> &fields) { - stream << "FIELDS:"; - for (auto &field : fields) { - stream << " " << field; - } - stream << '\n'; + // TODO: all these functions should pretty print their data + void Header(const std::vector<std::string> &fields) { + stream << "Header\n"; } - void write_field(const std::string &field) { - stream << "Field: " << field << '\n'; + void Result(std::vector<TypedValue> &values) { + stream << "Result\n"; } - void write(const TypedValue &value) { stream << value << " "; } - void write_list_header(size_t size) { stream << "List: " << size << '\n'; } - - void write_record() { stream << "Record\n"; } - - void write_vertex_record(const VertexAccessor &vertex) { stream << vertex; } - void write_edge_record(const EdgeAccessor &edge) { stream << edge; } - - void write_meta(const std::string &type) { - stream << "Meta: " << type << std::endl; + void Summary(const std::map<std::string, TypedValue> &summary) { + stream << "Summary\n"; } - - void write_failure(const std::map<std::string, std::string> &data) {} - - void write_count(const size_t count) {} - void chunk() { stream << "CHUNK\n"; } }; diff --git a/tests/unit/bolt_session.cpp b/tests/unit/bolt_session.cpp index e07e138cd..772b5fa5f 100644 --- a/tests/unit/bolt_session.cpp +++ b/tests/unit/bolt_session.cpp @@ -1,9 +1,12 @@ #include "bolt_common.hpp" -#include "communication/bolt/v1/serialization/record_stream.hpp" +#include "communication/bolt/v1/encoder/result_stream.hpp" #include "communication/bolt/v1/session.hpp" #include "query/engine.hpp" +using result_stream_t = communication::bolt::ResultStream<TestSocket>; +using session_t = communication::bolt::Session<TestSocket>; + const uint8_t handshake_req[] = "\x60\x60\xb0\x17\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" @@ -24,19 +27,19 @@ const uint8_t run_req[] = TEST(Bolt, Session) { Dbms dbms; TestSocket socket(10); - QueryEngine<bolt::RecordStream<TestSocket>> query_engine; - bolt::Session<TestSocket> session(std::move(socket), dbms, query_engine); + QueryEngine<result_stream_t> query_engine; + session_t session(std::move(socket), dbms, query_engine); std::vector<uint8_t>& output = session.socket.output; // execute handshake session.execute(handshake_req, 20); - ASSERT_EQ(session.state, bolt::INIT); + ASSERT_EQ(session.state, communication::bolt::INIT); print_output(output); check_output(output, handshake_resp, 4); // execute init session.execute(init_req, 67); - ASSERT_EQ(session.state, bolt::EXECUTOR); + ASSERT_EQ(session.state, communication::bolt::EXECUTOR); print_output(output); check_output(output, init_resp, 7); diff --git a/tests/unit/chunked_decoder.cpp b/tests/unit/chunked_decoder.cpp index 314645864..3bb66a666 100644 --- a/tests/unit/chunked_decoder.cpp +++ b/tests/unit/chunked_decoder.cpp @@ -21,7 +21,7 @@ struct DummyStream { std::vector<byte> data; }; -using Decoder = bolt::ChunkedDecoder<DummyStream>; +using Decoder = communication::bolt::ChunkedDecoder<DummyStream>; std::vector<byte> chunks[] = { {0x00, 0x08, 'A', ' ', 'q', 'u', 'i', 'c', 'k', ' ', 0x00, 0x06, 'b', 'r', diff --git a/tests/unit/chunked_encoder.cpp b/tests/unit/chunked_encoder.cpp deleted file mode 100644 index b7ed389bb..000000000 --- a/tests/unit/chunked_encoder.cpp +++ /dev/null @@ -1,111 +0,0 @@ -#include <cassert> -#include <deque> -#include <iostream> -#include <vector> - -#include "gtest/gtest.h" - -#include "communication/bolt/v1/transport/chunked_encoder.hpp" -#include "logging/default.hpp" -#include "logging/streams/stdout.hpp" - -using byte = unsigned char; - -void print_hex(byte x) { printf("%02X ", static_cast<byte>(x)); } - -class DummyStream { - public: - void write(const byte *values, size_t n) { - num_calls++; - data.insert(data.end(), values, values + n); - } - - byte pop() { - auto c = data.front(); - data.pop_front(); - return c; - } - - size_t pop_size() { return ((size_t)pop() << 8) | pop(); } - - void print() { - for (size_t i = 0; i < data.size(); ++i) print_hex(data[i]); - } - - std::deque<byte> data; - size_t num_calls{0}; -}; - -using Encoder = bolt::ChunkedEncoder<DummyStream>; - -void write_ff(Encoder &encoder, size_t n) { - std::vector<byte> v; - - for (size_t i = 0; i < n; ++i) v.push_back('\xFF'); - - encoder.write(v.data(), v.size()); -} - -void check_ff(DummyStream &stream, size_t n) { - for (size_t i = 0; i < n; ++i) ASSERT_EQ(stream.pop(), byte('\xFF')); - - (void)stream; -} - -using encoder_t = bolt::ChunkedEncoder<DummyStream>; - -TEST(ChunkedEncoderTest, Encode) { - DummyStream stream; - encoder_t encoder(stream); - size_t chunk_size = encoder_t::chunk_size; - - write_ff(encoder, 10); - write_ff(encoder, 10); - encoder.write_chunk(); - - write_ff(encoder, 10); - write_ff(encoder, 10); - encoder.write_chunk(); - - // this should be two chunks, one of size 65533 and the other of size 1467 - write_ff(encoder, 67000); - encoder.write_chunk(); - - for (int i = 0; i < 10000; ++i) write_ff(encoder, 1500); - encoder.write_chunk(); - - ASSERT_EQ(stream.pop_size(), 20); - check_ff(stream, 20); - ASSERT_EQ(stream.pop_size(), 0); - - ASSERT_EQ(stream.pop_size(), 20); - check_ff(stream, 20); - ASSERT_EQ(stream.pop_size(), 0); - - ASSERT_EQ(stream.pop_size(), chunk_size); - check_ff(stream, chunk_size); - ASSERT_EQ(stream.pop_size(), 0); - - ASSERT_EQ(stream.pop_size(), 1467); - check_ff(stream, 1467); - ASSERT_EQ(stream.pop_size(), 0); - - size_t k = 10000 * 1500; - - while (k > 0) { - auto size = k > chunk_size ? chunk_size : k; - ASSERT_EQ(stream.pop_size(), size); - check_ff(stream, size); - ASSERT_EQ(stream.pop_size(), 0); - k -= size; - } - ASSERT_EQ(stream.pop_size(), 0); -} - -int main(int argc, char **argv) { - logging::init_sync(); - logging::log->pipe(std::make_unique<Stdout>()); - - ::testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -}