diff --git a/src/communication/bolt/v1/serialization/record_stream_mocker.hpp b/src/communication/bolt/v1/serialization/record_stream_mocker.hpp new file mode 100644 index 000000000..4caa10eea --- /dev/null +++ b/src/communication/bolt/v1/serialization/record_stream_mocker.hpp @@ -0,0 +1,154 @@ +#pragma once + +#include <deque> +#include <map> +#include <stack> + +#include "support/Any.h" + +#include "logging/default.hpp" +#include "storage/edge_accessor.hpp" +#include "storage/vertex_accessor.hpp" + +namespace bolt { + +/** + * this class should be used to mock RecordStream. + */ +class RecordStreamMocker { + public: + RecordStreamMocker() { logger_ = logging::log->logger("Record Stream"); } + + ~RecordStreamMocker() = default; + + // TODO: create apstract methods that are not bolt specific --------------- + void write_success() { logger_.trace("write_success"); } + void write_success_empty() { logger_.trace("write_success_empty"); } + void write_ignored() { logger_.trace("write_ignored"); } + + void write_empty_fields() {} + + void write_fields(const std::vector<std::string> &fields) { + messages_.push_back(std::make_pair("fields", Message(fields))); + } + + void write_field(const std::string &field) { + messages_.push_back(std::make_pair("fields", Message({field}))); + } + + void write_list_header(size_t size) { + messages_.back().second.write_list(size); + } + + void write_record() {} + + // writes metadata at the end of the message + // TODO: write whole implementation (currently, only type is supported) + // { "stats": { "nodes created": 1, "properties set": 1}, + // "type": "r" | "rw" | ... + void write_meta(const std::string &type) { + messages_.push_back(std::make_pair("meta", Message({type}))); + } + + void write_failure(const std::map<std::string, std::string> &data) {} + + void write_count(const size_t count) { + messages_.back().second.write(antlrcpp::Any(count)); + } + void write(const VertexAccessor &vertex) { + messages_.back().second.write(antlrcpp::Any(vertex)); + } + void write(const EdgeAccessor &edge) { + messages_.back().second.write(antlrcpp::Any(edge)); + } + void write(const TypedValue &value) { + messages_.back().second.write(antlrcpp::Any(value)); + } + + void write_edge_record(const EdgeAccessor &ea) { write(ea); } + void write_vertex_record(const VertexAccessor &va) { write(va); } + + // Returns number of columns associated with 'message_name', 0 if no such + // message exists. + size_t count_message_columns(const std::string &message_name) { + for (auto x : messages_) + if (x.first == message_name) return x.second.count_columns(); + return 0; + } + + // Returns a vector of items inside column 'index' inside Message + // 'message_name'. Empty vector is returned if no such Message exists. + std::vector<antlrcpp::Any> get_message_column(const std::string &message_name, + int index) { + for (auto x : messages_) + if (x.first == message_name) return x.second.get_column(index); + return {}; + } + + // Returns a vector of column names (headers) for message 'message_name'. + // Empty vector is returned if no such message exists. + std::vector<std::string> get_message_headers( + const std::string &message_name) { + for (auto x : messages_) + if (x.first == message_name) return x.second.get_headers(); + return {}; + } + + protected: + Logger logger_; + + private: + class Message { + public: + Message(const std::vector<std::string> &header_labels) + : header_labels_(header_labels) {} + std::vector<std::string> get_headers() const { return header_labels_; } + + // Returns vector of items belonging to column 'column'. + std::vector<antlrcpp::Any> get_column(int column) { + size_t index = 0; + std::vector<antlrcpp::Any> ret; + for (int i = 0;; ++i) { + if (index == values_.size()) break; + auto item = read(index); + if (i % header_labels_.size() == column) ret.push_back(item); + } + return ret; + } + + size_t count_columns() const { return header_labels_.size(); } + + enum TYPE { LIST, MAP, SINGLETON }; + antlrcpp::Any read(size_t &index) { + if (index == values_.size()) return false; + std::tuple<enum TYPE, size_t, antlrcpp::Any> curr = values_[index++]; + if (std::get<0>(curr) == MAP) { + throw "Not implemented."; + } + if (std::get<0>(curr) == LIST) { + std::vector<antlrcpp::Any> ret; + for (int i = 0; i < std::get<1>(curr); ++i) ret.push_back(read(index)); + return ret; + } + if (std::get<0>(curr) == SINGLETON) { + return std::get<2>(curr); + } + return false; + } + + void write_list(size_t size) { + values_.push_back(std::make_tuple( + TYPE::LIST, size, antlrcpp::Any(std::vector<antlrcpp::Any>()))); + } + + void write(const antlrcpp::Any &value) { + values_.push_back(std::make_tuple(TYPE::SINGLETON, (size_t)1, value)); + } + + std::vector<std::string> header_labels_; + std::vector<antlrcpp::Any> values_; + }; + + std::vector<std::pair<std::string, Message>> messages_; +}; +} diff --git a/tests/unit/record_stream_mocker.cpp b/tests/unit/record_stream_mocker.cpp new file mode 100644 index 000000000..9e8b9b855 --- /dev/null +++ b/tests/unit/record_stream_mocker.cpp @@ -0,0 +1,77 @@ +#include "gtest/gtest.h" + +#include "support/Any.h" + +#include "communication/bolt/v1/serialization/record_stream_mocker.hpp" +#include "logging/default.hpp" +#include "logging/streams/stdout.hpp" +#include "utils/command_line/arguments.hpp" + +TEST(RecordStreamMocker, Headers) { + bolt::RecordStreamMocker rs; + std::vector<std::string> expected_output = {"a", "b", "c", "0"}; + rs.write_fields(expected_output); + std::vector<std::string> headers = rs.get_message_headers("fields"); + ASSERT_EQ(headers.size(), expected_output.size()); + for (int i = 0; i < expected_output.size(); ++i) + ASSERT_EQ(headers[i], expected_output[i]); +} + +TEST(RecordStreamMocker, OneValue) { + bolt::RecordStreamMocker rs; + rs.write_field("n"); + rs.write(TypedValue(5)); + ASSERT_EQ(rs.count_message_columns("fields"), 1); + std::vector<antlrcpp::Any> output = rs.get_message_column("fields", 0); + ASSERT_EQ(output.size(), 1); + auto val = output[0].as<TypedValue>().Value<int>(); + ASSERT_EQ(val, 5); +} + +TEST(RecordStreamMocker, OneListOfInts) { + bolt::RecordStreamMocker rs; + rs.write_field("n"); + std::vector<int> expected_output = {5, 4, 6, 7}; + rs.write_list_header(expected_output.size()); + for (auto x : expected_output) rs.write(TypedValue(x)); + ASSERT_EQ(rs.count_message_columns("fields"), 1); + std::vector<antlrcpp::Any> output = rs.get_message_column("fields", 0); + ASSERT_EQ(output.size(), 1); + std::vector<antlrcpp::Any> list = output[0].as<std::vector<antlrcpp::Any>>(); + ASSERT_EQ(list.size(), expected_output.size()); + for (int i = 0; i < list.size(); ++i) { + auto val = list[i].as<TypedValue>().Value<int>(); + ASSERT_EQ(val, expected_output[i]); + } +} + +TEST(RecordStreamMocker, OneListOfIntAndList) { + bolt::RecordStreamMocker rs; + rs.write_field("n"); + int expected_value = 42; + std::vector<int> expected_output = {5, 4, 6, 7}; + rs.write_list_header(2); + rs.write(expected_value); + rs.write_list_header(4); + for (auto x : expected_output) rs.write(TypedValue(x)); + + ASSERT_EQ(rs.count_message_columns("fields"), 1); + std::vector<antlrcpp::Any> output = rs.get_message_column("fields", 0); + ASSERT_EQ(output.size(), 1); + std::vector<antlrcpp::Any> list = output[0].as<std::vector<antlrcpp::Any>>(); + ASSERT_EQ(list.size(), 2); + ASSERT_EQ(list[0].as<TypedValue>().Value<int>(), expected_value); + + auto list_inside = list[1].as<std::vector<antlrcpp::Any>>(); + for (int i = 0; i < list_inside.size(); ++i) { + auto val = list_inside[i].as<TypedValue>().Value<int>(); + ASSERT_EQ(val, expected_output[i]); + } +} + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + logging::init_sync(); + logging::log->pipe(std::make_unique<Stdout>()); + return RUN_ALL_TESTS(); +}