Implement record stream mocker.

Summary:
Begin record stream mocker implementation.

Record stream mocker alongside tests created.

Test Plan: Run record_stream_mocker in tests.

Reviewers: mislav.bradac, florijan, buda

Reviewed By: buda

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D54
This commit is contained in:
Dominik Gleich 2017-02-21 12:40:16 +01:00
parent 30f4e9d47a
commit d72383c0af
2 changed files with 231 additions and 0 deletions

View File

@ -0,0 +1,154 @@
#pragma once
#include <deque>
#include <map>
#include <stack>
#include "support/Any.h"
#include "logging/default.hpp"
#include "storage/edge_accessor.hpp"
#include "storage/vertex_accessor.hpp"
namespace bolt {
/**
* this class should be used to mock RecordStream.
*/
class RecordStreamMocker {
public:
RecordStreamMocker() { logger_ = logging::log->logger("Record Stream"); }
~RecordStreamMocker() = default;
// TODO: create apstract methods that are not bolt specific ---------------
void write_success() { logger_.trace("write_success"); }
void write_success_empty() { logger_.trace("write_success_empty"); }
void write_ignored() { logger_.trace("write_ignored"); }
void write_empty_fields() {}
void write_fields(const std::vector<std::string> &fields) {
messages_.push_back(std::make_pair("fields", Message(fields)));
}
void write_field(const std::string &field) {
messages_.push_back(std::make_pair("fields", Message({field})));
}
void write_list_header(size_t size) {
messages_.back().second.write_list(size);
}
void write_record() {}
// writes metadata at the end of the message
// TODO: write whole implementation (currently, only type is supported)
// { "stats": { "nodes created": 1, "properties set": 1},
// "type": "r" | "rw" | ...
void write_meta(const std::string &type) {
messages_.push_back(std::make_pair("meta", Message({type})));
}
void write_failure(const std::map<std::string, std::string> &data) {}
void write_count(const size_t count) {
messages_.back().second.write(antlrcpp::Any(count));
}
void write(const VertexAccessor &vertex) {
messages_.back().second.write(antlrcpp::Any(vertex));
}
void write(const EdgeAccessor &edge) {
messages_.back().second.write(antlrcpp::Any(edge));
}
void write(const TypedValue &value) {
messages_.back().second.write(antlrcpp::Any(value));
}
void write_edge_record(const EdgeAccessor &ea) { write(ea); }
void write_vertex_record(const VertexAccessor &va) { write(va); }
// Returns number of columns associated with 'message_name', 0 if no such
// message exists.
size_t count_message_columns(const std::string &message_name) {
for (auto x : messages_)
if (x.first == message_name) return x.second.count_columns();
return 0;
}
// Returns a vector of items inside column 'index' inside Message
// 'message_name'. Empty vector is returned if no such Message exists.
std::vector<antlrcpp::Any> get_message_column(const std::string &message_name,
int index) {
for (auto x : messages_)
if (x.first == message_name) return x.second.get_column(index);
return {};
}
// Returns a vector of column names (headers) for message 'message_name'.
// Empty vector is returned if no such message exists.
std::vector<std::string> get_message_headers(
const std::string &message_name) {
for (auto x : messages_)
if (x.first == message_name) return x.second.get_headers();
return {};
}
protected:
Logger logger_;
private:
class Message {
public:
Message(const std::vector<std::string> &header_labels)
: header_labels_(header_labels) {}
std::vector<std::string> get_headers() const { return header_labels_; }
// Returns vector of items belonging to column 'column'.
std::vector<antlrcpp::Any> get_column(int column) {
size_t index = 0;
std::vector<antlrcpp::Any> ret;
for (int i = 0;; ++i) {
if (index == values_.size()) break;
auto item = read(index);
if (i % header_labels_.size() == column) ret.push_back(item);
}
return ret;
}
size_t count_columns() const { return header_labels_.size(); }
enum TYPE { LIST, MAP, SINGLETON };
antlrcpp::Any read(size_t &index) {
if (index == values_.size()) return false;
std::tuple<enum TYPE, size_t, antlrcpp::Any> curr = values_[index++];
if (std::get<0>(curr) == MAP) {
throw "Not implemented.";
}
if (std::get<0>(curr) == LIST) {
std::vector<antlrcpp::Any> ret;
for (int i = 0; i < std::get<1>(curr); ++i) ret.push_back(read(index));
return ret;
}
if (std::get<0>(curr) == SINGLETON) {
return std::get<2>(curr);
}
return false;
}
void write_list(size_t size) {
values_.push_back(std::make_tuple(
TYPE::LIST, size, antlrcpp::Any(std::vector<antlrcpp::Any>())));
}
void write(const antlrcpp::Any &value) {
values_.push_back(std::make_tuple(TYPE::SINGLETON, (size_t)1, value));
}
std::vector<std::string> header_labels_;
std::vector<antlrcpp::Any> values_;
};
std::vector<std::pair<std::string, Message>> messages_;
};
}

View File

@ -0,0 +1,77 @@
#include "gtest/gtest.h"
#include "support/Any.h"
#include "communication/bolt/v1/serialization/record_stream_mocker.hpp"
#include "logging/default.hpp"
#include "logging/streams/stdout.hpp"
#include "utils/command_line/arguments.hpp"
TEST(RecordStreamMocker, Headers) {
bolt::RecordStreamMocker rs;
std::vector<std::string> expected_output = {"a", "b", "c", "0"};
rs.write_fields(expected_output);
std::vector<std::string> headers = rs.get_message_headers("fields");
ASSERT_EQ(headers.size(), expected_output.size());
for (int i = 0; i < expected_output.size(); ++i)
ASSERT_EQ(headers[i], expected_output[i]);
}
TEST(RecordStreamMocker, OneValue) {
bolt::RecordStreamMocker rs;
rs.write_field("n");
rs.write(TypedValue(5));
ASSERT_EQ(rs.count_message_columns("fields"), 1);
std::vector<antlrcpp::Any> output = rs.get_message_column("fields", 0);
ASSERT_EQ(output.size(), 1);
auto val = output[0].as<TypedValue>().Value<int>();
ASSERT_EQ(val, 5);
}
TEST(RecordStreamMocker, OneListOfInts) {
bolt::RecordStreamMocker rs;
rs.write_field("n");
std::vector<int> expected_output = {5, 4, 6, 7};
rs.write_list_header(expected_output.size());
for (auto x : expected_output) rs.write(TypedValue(x));
ASSERT_EQ(rs.count_message_columns("fields"), 1);
std::vector<antlrcpp::Any> output = rs.get_message_column("fields", 0);
ASSERT_EQ(output.size(), 1);
std::vector<antlrcpp::Any> list = output[0].as<std::vector<antlrcpp::Any>>();
ASSERT_EQ(list.size(), expected_output.size());
for (int i = 0; i < list.size(); ++i) {
auto val = list[i].as<TypedValue>().Value<int>();
ASSERT_EQ(val, expected_output[i]);
}
}
TEST(RecordStreamMocker, OneListOfIntAndList) {
bolt::RecordStreamMocker rs;
rs.write_field("n");
int expected_value = 42;
std::vector<int> expected_output = {5, 4, 6, 7};
rs.write_list_header(2);
rs.write(expected_value);
rs.write_list_header(4);
for (auto x : expected_output) rs.write(TypedValue(x));
ASSERT_EQ(rs.count_message_columns("fields"), 1);
std::vector<antlrcpp::Any> output = rs.get_message_column("fields", 0);
ASSERT_EQ(output.size(), 1);
std::vector<antlrcpp::Any> list = output[0].as<std::vector<antlrcpp::Any>>();
ASSERT_EQ(list.size(), 2);
ASSERT_EQ(list[0].as<TypedValue>().Value<int>(), expected_value);
auto list_inside = list[1].as<std::vector<antlrcpp::Any>>();
for (int i = 0; i < list_inside.size(); ++i) {
auto val = list_inside[i].as<TypedValue>().Value<int>();
ASSERT_EQ(val, expected_output[i]);
}
}
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
logging::init_sync();
logging::log->pipe(std::make_unique<Stdout>());
return RUN_ALL_TESTS();
}