Serialization, record stream refactor in progress. UNSTABLE STATE.

Summary: Bolt record stream refactoring. BOLT STACK IN UNSTABLE STATE. RecordStreamMocker also refactored

Reviewers: buda, dgleich, mferencevic

Reviewed By: buda

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D88
This commit is contained in:
florijan 2017-03-03 12:57:14 +01:00
parent 2aeef25881
commit 38c3c513fa
4 changed files with 201 additions and 231 deletions

View File

@ -1,154 +0,0 @@
#pragma once
#include <deque>
#include <map>
#include <stack>
#include "support/Any.h"
#include "logging/default.hpp"
#include "storage/edge_accessor.hpp"
#include "storage/vertex_accessor.hpp"
namespace bolt {
/**
* this class should be used to mock RecordStream.
*/
class RecordStreamMocker {
public:
RecordStreamMocker() { logger_ = logging::log->logger("Record Stream"); }
~RecordStreamMocker() = default;
// TODO: create apstract methods that are not bolt specific ---------------
void write_success() { logger_.trace("write_success"); }
void write_success_empty() { logger_.trace("write_success_empty"); }
void write_ignored() { logger_.trace("write_ignored"); }
void write_empty_fields() {}
void write_fields(const std::vector<std::string> &fields) {
messages_.push_back(std::make_pair("fields", Message(fields)));
}
void write_field(const std::string &field) {
messages_.push_back(std::make_pair("fields", Message({field})));
}
void write_list_header(size_t size) {
messages_.back().second.write_list(size);
}
void write_record() {}
// writes metadata at the end of the message
// TODO: write whole implementation (currently, only type is supported)
// { "stats": { "nodes created": 1, "properties set": 1},
// "type": "r" | "rw" | ...
void write_meta(const std::string &type) {
messages_.push_back(std::make_pair("meta", Message({type})));
}
void write_failure(const std::map<std::string, std::string> &data) {}
void write_count(const size_t count) {
messages_.back().second.write(antlrcpp::Any(count));
}
void write(const VertexAccessor &vertex) {
messages_.back().second.write(antlrcpp::Any(vertex));
}
void write(const EdgeAccessor &edge) {
messages_.back().second.write(antlrcpp::Any(edge));
}
void write(const PropertyValue &value) {
messages_.back().second.write(antlrcpp::Any(value));
}
void write_edge_record(const EdgeAccessor &ea) { write(ea); }
void write_vertex_record(const VertexAccessor &va) { write(va); }
// Returns number of columns associated with 'message_name', 0 if no such
// message exists.
size_t count_message_columns(const std::string &message_name) {
for (auto x : messages_)
if (x.first == message_name) return x.second.count_columns();
return 0;
}
// Returns a vector of items inside column 'index' inside Message
// 'message_name'. Empty vector is returned if no such Message exists.
std::vector<antlrcpp::Any> get_message_column(const std::string &message_name,
int index) {
for (auto x : messages_)
if (x.first == message_name) return x.second.get_column(index);
return {};
}
// Returns a vector of column names (headers) for message 'message_name'.
// Empty vector is returned if no such message exists.
std::vector<std::string> get_message_headers(
const std::string &message_name) {
for (auto x : messages_)
if (x.first == message_name) return x.second.get_headers();
return {};
}
protected:
Logger logger_;
private:
class Message {
public:
Message(const std::vector<std::string> &header_labels)
: header_labels_(header_labels) {}
std::vector<std::string> get_headers() const { return header_labels_; }
// Returns vector of items belonging to column 'column'.
std::vector<antlrcpp::Any> get_column(int column) {
size_t index = 0;
std::vector<antlrcpp::Any> ret;
for (int i = 0;; ++i) {
if (index == values_.size()) break;
auto item = read(index);
if (i % header_labels_.size() == column) ret.push_back(item);
}
return ret;
}
size_t count_columns() const { return header_labels_.size(); }
enum TYPE { LIST, MAP, SINGLETON };
antlrcpp::Any read(size_t &index) {
if (index == values_.size()) return false;
std::tuple<enum TYPE, size_t, antlrcpp::Any> curr = values_[index++];
if (std::get<0>(curr) == MAP) {
throw "Not implemented.";
}
if (std::get<0>(curr) == LIST) {
std::vector<antlrcpp::Any> ret;
for (int i = 0; i < std::get<1>(curr); ++i) ret.push_back(read(index));
return ret;
}
if (std::get<0>(curr) == SINGLETON) {
return std::get<2>(curr);
}
return false;
}
void write_list(size_t size) {
values_.push_back(std::make_tuple(
TYPE::LIST, size, antlrcpp::Any(std::vector<antlrcpp::Any>())));
}
void write(const antlrcpp::Any &value) {
values_.push_back(std::make_tuple(TYPE::SINGLETON, (size_t)1, value));
}
std::vector<std::string> header_labels_;
std::vector<antlrcpp::Any> values_;
};
std::vector<std::pair<std::string, Message>> messages_;
};
}

View File

@ -0,0 +1,137 @@
#pragma once
#include "communication/bolt/v1/serialization/bolt_serializer.hpp"
#include "query/backend/cpp/typed_value.hpp"
#include "logging/default.hpp"
namespace bolt {
/**
* A high level API for streaming a Bolt response. Exposes
* functionalities used by the compiler and query plans (which
* should not use any lower level API).
*
* @tparam TChunkedEncoder Type of chunked encoder used.
*/
// TODO templatisation on TChunkedEncoder might not be desired
// but makes the code a bit easer to understand because we know
// that this class uses a BoltEncoder (and not some arbitrary template)
// it helps the programmer, the compiler and the IDE
template <typename TChunkedEncoder>
class RecordStream {
public:
// TODO add logging to this class
RecordStream(BoltEncoder<TChunkedEncoder> &bolt_encoder) :
bolt_encoder_(bolt_encoder), serializer_(bolt_encoder) {}
void Header(const std::vector<std::string> &fields) {
bolt_encoder_.message_success();
bolt_encoder_.write_map_header(1);
bolt_encoder_.write_string("fields");
bolt_encoder_.write_list_header(fields.size());
for (auto &name : fields) {
bolt_encoder_.write_string(name);
}
Chunk();
Send();
}
void Result(std::vector<TypedValue> &values) {
bolt_encoder_.message_record();
Write(values);
Chunk();
Send();
}
/**
* Writes a summary. Typically a summary is something like:
* {
* "type" : "r" | "rw" | ...,
* "stats": {
* "nodes_created": 12,
* "nodes_deleted": 0
* }
* }
*
* @param value
*/
void Summary(const std::map<std::string, TypedValue> &summary) {
bolt_encoder_.message_success();
Write(summary);
Chunk();
}
private:
BoltEncoder<TChunkedEncoder> bolt_encoder_;
BoltSerializer<BoltEncoder<TChunkedEncoder>> serializer_;
/**
* Writes a TypedValue. Resolves it's type and uses
* encoder primitives to write exactly typed values.
*/
void Write(const TypedValue &value) {
switch (value.type()) {
case TypedValue::Type::Null:
bolt_encoder_.write_null();
break;
case TypedValue::Type::Bool:
bolt_encoder_.write(value.Value<bool>());
break;
case TypedValue::Type::Int:
bolt_encoder_.write(value.Value<int>());
break;
case TypedValue::Type::Double:
bolt_encoder_.write(value.Value<double>());
break;
case TypedValue::Type::String:
bolt_encoder_.write(value.Value<std::string>());
break;
case TypedValue::Type::List:
Write(value.Value<std::vector<TypedValue>>());
break;
case TypedValue::Type::Map:
Write(value.Value<std::map<std::string, TypedValue>>());
break;
case TypedValue::Type::Vertex:
serializer_.write(value.Value<VertexAccessor>());
break;
case TypedValue::Type::Edge:
serializer_.write(value.Value<EdgeAccessor>());
break;
default:
throw std::runtime_error("Serialization not implemented for given type");
}
}
void Write(const std::vector<TypedValue> &values) {
bolt_encoder_.write_list_header(values.size());
for (const auto &value : values)
Write(value);
}
void Write(const std::map<std::string, TypedValue> &values) {
bolt_encoder_.write_map_header(values.size());
for (const auto &kv : values) {
bolt_encoder_.write(kv.first);
Write(kv.second);
}
}
void Send() {
// TODO expose these low level functions in the encoder
// be careful! ChunkedEncoder seems to have a 'flush()' function
// but that is different from it's underlying ChunkedBuffer's
// 'flush()' method
bolt_encoder_.flush();
}
void Chunk() {
// TODO expose these low level functions in the encoder
bolt_encoder_.write_chunk();
}
};
}

View File

@ -0,0 +1,64 @@
#pragma once
#include "logging/default.hpp"
#include "query/backend/cpp/typed_value.hpp"
#include "utils/assert.hpp"
/**
* A mocker for the data output record stream.
* This implementation checks that messages are
* sent to it in an acceptable order, and tracks
* the content of those messages.
*/
class ResultStreamFaker {
public:
void Header(std::vector<std::string> &&fields) {
debug_assert(current_state_ == State::Start, "Headers can only be written in the beginning");
header_ = std::forward(fields);
current_state_ = State::WritingResults;
}
void Result(std::vector<TypedValue> &&values) {
debug_assert(current_state_ == State::WritingResults, "Can't accept results before header nor after summary");
results_.push_back(std::forward(values));
}
void Summary(std::map<std::string, TypedValue> &&summary) {
debug_assert(current_state_ != State::Done, "Can only send a summary once");
summary_ = std::forward(summary);
current_state_ = State::Done;
}
const auto &GetHeader() {
debug_assert(current_state_ != State::Start, "Header not written");
return header_;
}
const auto &GetResults() { return results_; }
const auto &GetSummary() {
debug_assert(current_state_ == State::Done, "Summary not written");
return summary_;
}
private:
/**
* Possible states of the Mocker. Used for checking if calls to
* the Mocker as in acceptable order.
*/
enum class State {
Start,
WritingResults,
Done
};
// the current state
State current_state_ = State::Start;
// the data that the record stream can accept
std::vector<std::string> header_;
std::vector<std::vector<TypedValue>> results_;
std::map<std::string, TypedValue> summary_;
};

View File

@ -1,77 +0,0 @@
#include "gtest/gtest.h"
#include "support/Any.h"
#include "communication/bolt/v1/serialization/record_stream_mocker.hpp"
#include "logging/default.hpp"
#include "logging/streams/stdout.hpp"
#include "utils/command_line/arguments.hpp"
TEST(RecordStreamMocker, Headers) {
bolt::RecordStreamMocker rs;
std::vector<std::string> expected_output = {"a", "b", "c", "0"};
rs.write_fields(expected_output);
std::vector<std::string> headers = rs.get_message_headers("fields");
ASSERT_EQ(headers.size(), expected_output.size());
for (int i = 0; i < expected_output.size(); ++i)
ASSERT_EQ(headers[i], expected_output[i]);
}
TEST(RecordStreamMocker, OneValue) {
bolt::RecordStreamMocker rs;
rs.write_field("n");
rs.write(PropertyValue(5));
ASSERT_EQ(rs.count_message_columns("fields"), 1);
std::vector<antlrcpp::Any> output = rs.get_message_column("fields", 0);
ASSERT_EQ(output.size(), 1);
auto val = output[0].as<PropertyValue>().Value<int>();
ASSERT_EQ(val, 5);
}
TEST(RecordStreamMocker, OneListOfInts) {
bolt::RecordStreamMocker rs;
rs.write_field("n");
std::vector<int> expected_output = {5, 4, 6, 7};
rs.write_list_header(expected_output.size());
for (auto x : expected_output) rs.write(PropertyValue(x));
ASSERT_EQ(rs.count_message_columns("fields"), 1);
std::vector<antlrcpp::Any> output = rs.get_message_column("fields", 0);
ASSERT_EQ(output.size(), 1);
std::vector<antlrcpp::Any> list = output[0].as<std::vector<antlrcpp::Any>>();
ASSERT_EQ(list.size(), expected_output.size());
for (int i = 0; i < list.size(); ++i) {
auto val = list[i].as<PropertyValue>().Value<int>();
ASSERT_EQ(val, expected_output[i]);
}
}
TEST(RecordStreamMocker, OneListOfIntAndList) {
bolt::RecordStreamMocker rs;
rs.write_field("n");
int expected_value = 42;
std::vector<int> expected_output = {5, 4, 6, 7};
rs.write_list_header(2);
rs.write(expected_value);
rs.write_list_header(4);
for (auto x : expected_output) rs.write(PropertyValue(x));
ASSERT_EQ(rs.count_message_columns("fields"), 1);
std::vector<antlrcpp::Any> output = rs.get_message_column("fields", 0);
ASSERT_EQ(output.size(), 1);
std::vector<antlrcpp::Any> list = output[0].as<std::vector<antlrcpp::Any>>();
ASSERT_EQ(list.size(), 2);
ASSERT_EQ(list[0].as<PropertyValue>().Value<int>(), expected_value);
auto list_inside = list[1].as<std::vector<antlrcpp::Any>>();
for (int i = 0; i < list_inside.size(); ++i) {
auto val = list_inside[i].as<PropertyValue>().Value<int>();
ASSERT_EQ(val, expected_output[i]);
}
}
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
logging::init_sync();
logging::log->pipe(std::make_unique<Stdout>());
return RUN_ALL_TESTS();
}