Modified hardcoded queries to use new encoder.

Summary:
Removed old encoder.

Changed namespace from bolt to communication::bolt.

Removed old include from new encoder.

Added an empty message success to encoder.

Changed order in communication::Server.

Changed bolt session to use new encoder.

Merge remote-tracking branch 'origin/dev' into mg_hardcoded_queries

Fixed PrintRecordStream.

Reviewers: buda, dgleich

Reviewed By: buda

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D158
This commit is contained in:
Matej Ferencevic 2017-03-22 16:36:48 +01:00
parent 3bf0bd40a7
commit 5a5ffface3
47 changed files with 188 additions and 923 deletions

View File

@ -1,14 +0,0 @@
#pragma once
#include <cstddef>
namespace bolt {
namespace config {
/** chunk size */
static constexpr size_t N = 65535;
/** end mark */
static constexpr size_t C = N + 2;
}
}

View File

@ -5,7 +5,6 @@
#include <vector>
#include <algorithm>
#include "communication/bolt/v1/config.hpp"
#include "logging/default.hpp"
#include "utils/types/byte.hpp"
#include "utils/bswap.hpp"

View File

@ -64,6 +64,16 @@ class Encoder {
buffer_.Flush();
}
/**
* Sends a Success message.
*
* This function sends a success message without additional metadata.
*/
void MessageSuccess() {
std::map<std::string, TypedValue> metadata;
MessageSuccess(metadata);
}
/**
* Sends a Failure message.
*

View File

@ -3,7 +3,7 @@
#include "utils/types/byte.hpp"
#include "utils/underlying_cast.hpp"
namespace bolt {
namespace communication::bolt {
enum class MessageCode : byte {
Init = 0x01,

View File

@ -2,9 +2,7 @@
#include <cstdint>
namespace bolt {
namespace pack {
namespace communication::bolt::pack {
enum Code : uint8_t {
TinyString = 0x80,
@ -55,4 +53,3 @@ enum Code : uint8_t {
enum Rule : uint8_t { MaxInitStructSize = 0x02 };
}
}

View File

@ -1,6 +1,6 @@
#pragma once
namespace bolt {
namespace communication::bolt {
enum class PackType {
/** denotes absence of a value */

View File

@ -1,136 +0,0 @@
#pragma once
#include "communication/bolt/v1/packing/codes.hpp"
#include "communication/bolt/v1/transport/bolt_encoder.hpp"
#include "communication/bolt/v1/transport/chunked_buffer.hpp"
#include "communication/bolt/v1/transport/chunked_encoder.hpp"
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "storage/property_value_store.hpp"
namespace bolt {
template <class Stream>
class BoltSerializer {
public:
BoltSerializer(Stream &stream) : encoder(stream) {}
/** Serializes the vertex accessor into the packstream format
*
* struct[size = 3] Vertex [signature = 0x4E] {
* Integer node_id;
* List<String> labels;
* Map<String, Value> properties;
* }
*
*/
void write(const VertexAccessor &vertex) {
// write signatures for the node struct and node data type
encoder.write_struct_header(3);
encoder.write(underlying_cast(pack::Code::Node));
// IMPORTANT: here we write a hardcoded 0 because we don't
// use internal IDs, but need to give something to Bolt
// note that OpenCypher has no id(x) function, so the client
// should not be able to do anything with this value anyway
encoder.write_integer(0); // uID
// write the list of labels
auto labels = vertex.labels();
encoder.write_list_header(labels.size());
for (auto label : labels)
encoder.write_string(vertex.db_accessor().label_name(label));
// write the properties
const PropertyValueStore<GraphDb::Property> &props = vertex.Properties();
encoder.write_map_header(props.size());
props.Accept([this, &vertex](const GraphDb::Property prop,
const PropertyValue &value) {
this->encoder.write(vertex.db_accessor().property_name(prop));
this->write(value);
});
}
/** Serializes the edge accessor into the packstream format
*
* struct[size = 5] Edge [signature = 0x52] {
* Integer edge_id; // IMPORTANT: always 0 since we
* don't do IDs
* Integer start_node_id; // IMPORTANT: always 0 since we
* don't do IDs
* Integer end_node_id; // IMPORTANT: always 0 since we
* don't do IDs
* String type;
* Map<String, Value> properties;
* }
*
*/
void write(const EdgeAccessor &edge) {
// write signatures for the edge struct and edge data type
encoder.write_struct_header(5);
encoder.write(underlying_cast(pack::Code::Relationship));
// IMPORTANT: here we write a hardcoded 0 because we don't
// use internal IDs, but need to give something to Bolt
// note that OpenCypher has no id(x) function, so the client
// should not be able to do anything with this value anyway
encoder.write_integer(0);
encoder.write_integer(0);
encoder.write_integer(0);
// write the type of the edge
encoder.write(edge.db_accessor().edge_type_name(edge.edge_type()));
// write the property map
const PropertyValueStore<GraphDb::Property> &props = edge.Properties();
encoder.write_map_header(props.size());
props.Accept(
[this, &edge](GraphDb::Property prop, const PropertyValue &value) {
this->encoder.write(edge.db_accessor().property_name(prop));
this->write(value);
});
}
// TODO document
void write_failure(const std::map<std::string, std::string> &data) {
encoder.message_failure();
encoder.write_map_header(data.size());
for (auto const &kv : data) {
write(kv.first);
write(kv.second);
}
}
/**
* Writes a PropertyValue (typically a property value in the edge or vertex).
*
* @param value The value to write.
*/
void write(const PropertyValue &value) {
switch (value.type()) {
case PropertyValue::Type::Null:
encoder.write_null();
return;
case PropertyValue::Type::Bool:
encoder.write_bool(value.Value<bool>());
return;
case PropertyValue::Type::String:
encoder.write_string(value.Value<std::string>());
return;
case PropertyValue::Type::Int:
encoder.write_integer(value.Value<int64_t>());
return;
case PropertyValue::Type::Double:
encoder.write_double(value.Value<double>());
return;
case PropertyValue::Type::List:
// Not implemented
assert(false);
}
}
protected:
Stream &encoder;
};
}

View File

@ -1,158 +0,0 @@
#pragma once
#include "communication/bolt/v1/serialization/bolt_serializer.hpp"
#include "communication/bolt/v1/transport/chunked_buffer.hpp"
#include "communication/bolt/v1/transport/chunked_encoder.hpp"
#include "logging/default.hpp"
namespace bolt {
/**
* compiled queries have to use this class in order to return results
* query code should not know about bolt protocol
*/
template <class Socket>
class RecordStream {
public:
RecordStream(Socket &socket) : socket(socket) {
logger = logging::log->logger("Record Stream");
}
~RecordStream() = default;
// TODO: create apstract methods that are not bolt specific ---------------
void write_success() {
logger.trace("write_success");
bolt_encoder.message_success();
}
void write_success_empty() {
logger.trace("write_success_empty");
bolt_encoder.message_success_empty();
}
void write_ignored() {
logger.trace("write_ignored");
bolt_encoder.message_ignored();
}
void write_empty_fields() {
bolt_encoder.message_success();
bolt_encoder.write_map_header(1);
bolt_encoder.write_string("fields");
write_list_header(0);
chunk();
}
void write_fields(const std::vector<std::string> &fields) {
// TODO: that should be one level below?
bolt_encoder.message_success();
bolt_encoder.write_map_header(1);
bolt_encoder.write_string("fields");
write_list_header(fields.size());
for (auto &name : fields) {
bolt_encoder.write_string(name);
}
chunk();
send();
}
void write_field(const std::string &field) {
bolt_encoder.message_success();
bolt_encoder.write_map_header(1);
bolt_encoder.write_string("fields");
write_list_header(1);
bolt_encoder.write_string(field);
chunk();
send();
}
void write_list_header(size_t size) { bolt_encoder.write_list_header(size); }
void write_record() { bolt_encoder.message_record(); }
// writes metadata at the end of the message
// TODO: write whole implementation (currently, only type is supported)
// { "stats": { "nodes created": 1, "properties set": 1},
// "type": "r" | "rw" | ...
void write_meta(const std::string &type) {
bolt_encoder.message_success();
bolt_encoder.write_map_header(1);
bolt_encoder.write_string("type");
bolt_encoder.write_string(type);
chunk();
}
void write_failure(const std::map<std::string, std::string> &data) {
serializer.write_failure(data);
chunk();
}
void write_count(const size_t count) {
write_record();
write_list_header(1);
write(count);
chunk();
}
void write(const VertexAccessor &vertex) { serializer.write(vertex); }
void write_vertex_record(const VertexAccessor &va) {
write_record();
write_list_header(1);
write(va);
chunk();
}
void write(const EdgeAccessor &edge) { serializer.write(edge); }
void write_edge_record(const EdgeAccessor &ea) {
write_record();
write_list_header(1);
write(ea);
chunk();
}
void write(const PropertyValue &value) { serializer.write(value); }
void send() { chunked_buffer.flush(); }
void chunk() { chunked_encoder.write_chunk(); }
// TODO WTF is this test doing here?
void _write_test() {
logger.trace("write_test");
write_fields({{"name"}});
write_record();
write_list_header(1);
bolt_encoder.write("max");
write_record();
write_list_header(1);
bolt_encoder.write("paul");
write_success_empty();
}
protected:
Logger logger;
private:
using buffer_t = ChunkedBuffer<Socket>;
using chunked_encoder_t = ChunkedEncoder<buffer_t>;
using bolt_encoder_t = BoltEncoder<chunked_encoder_t>;
using bolt_serializer_t = BoltSerializer<bolt_encoder_t>;
Socket &socket;
buffer_t chunked_buffer{socket};
chunked_encoder_t chunked_encoder{chunked_buffer};
bolt_encoder_t bolt_encoder{chunked_encoder};
bolt_serializer_t serializer{bolt_encoder};
};
}

View File

@ -12,23 +12,24 @@
#include "communication/bolt/v1/states/executor.hpp"
#include "communication/bolt/v1/states/error.hpp"
#include "communication/bolt/v1/serialization/record_stream.hpp"
#include "communication/bolt/v1/encoder/encoder.hpp"
#include "communication/bolt/v1/encoder/result_stream.hpp"
#include "communication/bolt/v1/transport/bolt_decoder.hpp"
#include "communication/bolt/v1/transport/bolt_encoder.hpp"
#include "logging/default.hpp"
namespace bolt {
namespace communication::bolt {
template<typename Socket>
class Session {
public:
using Decoder = BoltDecoder;
using OutputStream = RecordStream<Socket>;
using OutputStream = ResultStream<Socket>;
Session(Socket &&socket, Dbms &dbms, QueryEngine<OutputStream> &query_engine)
: socket(std::move(socket)),
dbms(dbms), query_engine(query_engine),
encoder(this->socket), output_stream(encoder),
logger(logging::log->logger("Session")) {
event.data.ptr = this;
// start with a handshake state
@ -63,15 +64,18 @@ class Session {
break;
case INIT:
logger.debug("Current state: INIT");
state = state_init_run<Socket>(output_stream, decoder);
// TODO: swap around parameters so that inputs are first and outputs are last!
state = state_init_run<Socket>(encoder, decoder);
break;
case EXECUTOR:
logger.debug("Current state: EXECUTOR");
state = state_executor_run<Socket>(output_stream, decoder, dbms, query_engine);
// TODO: swap around parameters so that inputs are first and outputs are last!
state = state_executor_run<Socket>(output_stream, encoder, decoder, dbms, query_engine);
break;
case ERROR:
logger.debug("Current state: ERROR");
state = state_error_run<Socket>(output_stream, decoder);
// TODO: swap around parameters so that inputs are first and outputs are last!
state = state_error_run<Socket>(output_stream, encoder, decoder);
break;
case NULLSTATE:
break;
@ -86,6 +90,8 @@ class Session {
this->socket.Close();
}
// TODO: these members should be private
Socket socket;
io::network::Epoll::Event event;
@ -95,7 +101,8 @@ class Session {
GraphDbAccessor active_db() { return dbms.active(); }
Decoder decoder;
OutputStream output_stream{socket};
Encoder<ChunkedBuffer<Socket>, Socket> encoder;
OutputStream output_stream;
bool connected{false};
State state;

View File

@ -1,6 +1,6 @@
#pragma once
namespace bolt {
namespace communication::bolt {
enum State {
HANDSHAKE,

View File

@ -2,14 +2,14 @@
#include "communication/bolt/v1/state.hpp"
#include "communication/bolt/v1/transport/bolt_decoder.hpp"
#include "communication/bolt/v1/serialization/record_stream.hpp"
#include "communication/bolt/v1/encoder/result_stream.hpp"
#include "logging/default.hpp"
namespace bolt {
namespace communication::bolt {
template<typename Socket>
State state_error_run(RecordStream<Socket> &output_stream, BoltDecoder &decoder) {
State state_error_run(ResultStream<Socket> &output_stream, Encoder<ChunkedBuffer<Socket>, Socket>& encoder, BoltDecoder &decoder) {
Logger logger = logging::log->logger("State ERROR");
logger.trace("Run");
@ -19,34 +19,29 @@ State state_error_run(RecordStream<Socket> &output_stream, BoltDecoder &decoder)
logger.trace("Message type byte is: {:02X}", message_type);
if (message_type == MessageCode::PullAll) {
output_stream.write_ignored();
output_stream.chunk();
output_stream.send();
// TODO: write_ignored, chunk, send
encoder.MessageIgnored();
return ERROR;
} else if (message_type == MessageCode::AckFailure) {
// TODO reset current statement? is it even necessary?
logger.trace("AckFailure received");
output_stream.write_success_empty();
output_stream.chunk();
output_stream.send();
// TODO: write_success, chunk, send
encoder.MessageSuccess();
return EXECUTOR;
} else if (message_type == MessageCode::Reset) {
// TODO rollback current transaction
// discard all records waiting to be sent
output_stream.write_success_empty();
output_stream.chunk();
output_stream.send();
// TODO: write_success, chunk, send
encoder.MessageSuccess();
return EXECUTOR;
}
// TODO: write this as single call
output_stream.write_ignored();
output_stream.chunk();
output_stream.send();
// TODO: write_ignored, chunk, send
encoder.MessageIgnored();
return ERROR;
}

View File

@ -10,14 +10,14 @@
#include "logging/default.hpp"
namespace bolt {
namespace communication::bolt {
struct Query {
std::string statement;
};
template<typename Socket>
State state_executor_run(RecordStream<Socket> &output_stream, BoltDecoder &decoder, Dbms &dmbs, QueryEngine<RecordStream<Socket>> &query_engine){
State state_executor_run(ResultStream<Socket> &output_stream, Encoder<ChunkedBuffer<Socket>, Socket>& encoder, BoltDecoder &decoder, Dbms &dmbs, QueryEngine<ResultStream<Socket>> &query_engine){
Logger logger = logging::log->logger("State EXECUTOR");
// just read one byte that represents the struct type, we can skip the
// information contained in this byte
@ -43,23 +43,23 @@ State state_executor_run(RecordStream<Socket> &output_stream, BoltDecoder &decod
query_engine.Run(query.statement, db_accessor, output_stream);
if (!is_successfully_executed) {
output_stream.write_failure(
// TODO: write_failure, send
encoder.MessageFailure(
{{"code", "Memgraph.QueryExecutionFail"},
{"message",
"Query execution has failed (probably there is no "
"element or there are some problems with concurrent "
"access -> client has to resolve problems with "
"concurrent access)"}});
output_stream.send();
return ERROR;
}
return EXECUTOR;
// TODO: RETURN success MAYBE
} catch (const frontend::opencypher::SyntaxException &e) {
output_stream.write_failure(
// TODO: write_failure, send
encoder.MessageFailure(
{{"code", "Memgraph.SyntaxException"}, {"message", "Syntax error"}});
output_stream.send();
return ERROR;
// } catch (const backend::cpp::GeneratorException &e) {
// output_stream.write_failure(
@ -68,34 +68,33 @@ State state_executor_run(RecordStream<Socket> &output_stream, BoltDecoder &decod
// output_stream.send();
// return ERROR;
} catch (const QueryEngineException &e) {
output_stream.write_failure(
// TODO: write_failure, send
encoder.MessageFailure(
{{"code", "Memgraph.QueryEngineException"},
{"message", "Query engine was unable to execute the query"}});
output_stream.send();
return ERROR;
} catch (const StacktraceException &e) {
output_stream.write_failure(
// TODO: write_failure, send
encoder.MessageFailure(
{{"code", "Memgraph.StacktraceException"},
{"message", "Unknow exception"}});
output_stream.send();
{"message", "Unknown exception"}});
return ERROR;
} catch (std::exception &e) {
output_stream.write_failure(
{{"code", "Memgraph.Exception"}, {"message", "unknow exception"}});
output_stream.send();
// TODO: write_failure, send
encoder.MessageFailure(
{{"code", "Memgraph.Exception"}, {"message", "Unknown exception"}});
return ERROR;
}
} else if (message_type == MessageCode::PullAll) {
logger.trace("[PullAll]");
output_stream.send();
// TODO: all query output should not be immediately flushed from the buffer, it should wait the PullAll command to start flushing!!
//output_stream.send();
} else if (message_type == MessageCode::DiscardAll) {
logger.trace("[DiscardAll]");
// TODO: discard state
output_stream.write_success();
output_stream.chunk();
output_stream.send();
// TODO: write_success, send
encoder.MessageSuccess();
} else if (message_type == MessageCode::Reset) {
// TODO: rollback current transaction
// discard all records waiting to be sent

View File

@ -6,7 +6,7 @@
#include "logging/default.hpp"
namespace bolt {
namespace communication::bolt {
static constexpr uint32_t preamble = 0x6060B017;

View File

@ -1,17 +1,18 @@
#pragma once
#include "communication/bolt/v1/packing/codes.hpp"
#include "communication/bolt/v1/state.hpp"
#include "communication/bolt/v1/transport/bolt_decoder.hpp"
#include "communication/bolt/v1/serialization/record_stream.hpp"
#include "communication/bolt/v1/encoder/result_stream.hpp"
#include "communication/bolt/v1/messaging/codes.hpp"
#include "logging/default.hpp"
#include "utils/likely.hpp"
namespace bolt {
namespace communication::bolt {
template<typename Socket>
State state_init_run(RecordStream<Socket> &output_stream, BoltDecoder &decoder) {
State state_init_run(Encoder<ChunkedBuffer<Socket>, Socket> &encoder, BoltDecoder &decoder) {
Logger logger = logging::log->logger("State INIT");
logger.debug("Parsing message");
@ -45,9 +46,8 @@ State state_init_run(RecordStream<Socket> &output_stream, BoltDecoder &decoder)
logger.debug("Executing state");
logger.debug("Client connected '{}'", client_name);
output_stream.write_success_empty();
output_stream.chunk();
output_stream.send();
// TODO: write_success, chunk, send
encoder.MessageSuccess();
return EXECUTOR;
}

View File

@ -4,7 +4,7 @@
#include "logging/default.hpp"
#include "utils/bswap.hpp"
namespace bolt {
namespace communication::bolt {
void BoltDecoder::handshake(const byte *&data, size_t len) {
buffer.write(data, len);

View File

@ -4,7 +4,7 @@
#include "communication/bolt/v1/transport/chunked_decoder.hpp"
#include "utils/types/byte.hpp"
namespace bolt {
namespace communication::bolt {
class BoltDecoder {
public:

View File

@ -1,200 +0,0 @@
#pragma once
#include <string>
#include "communication/bolt/v1/messaging/codes.hpp"
#include "communication/bolt/v1/packing/codes.hpp"
#include "logging/default.hpp"
#include "utils/bswap.hpp"
#include "utils/types/byte.hpp"
namespace bolt {
template <class Stream>
class BoltEncoder {
static constexpr int64_t plus_2_to_the_31 = 2147483648L;
static constexpr int64_t plus_2_to_the_15 = 32768L;
static constexpr int64_t plus_2_to_the_7 = 128L;
static constexpr int64_t minus_2_to_the_4 = -16L;
static constexpr int64_t minus_2_to_the_7 = -128L;
static constexpr int64_t minus_2_to_the_15 = -32768L;
static constexpr int64_t minus_2_to_the_31 = -2147483648L;
public:
BoltEncoder(Stream &stream) : stream(stream) {
logger = logging::log->logger("Bolt Encoder");
}
void write(byte value) { write_byte(value); }
void write(pack::Code value) { write_byte(static_cast<byte>(value)); }
void write_byte(byte value) {
logger.trace("write byte: {}", value);
stream.write(value);
}
void write(const byte *values, size_t n) { stream.write(values, n); }
void write_null() { stream.write(static_cast<byte>(pack::Code::Null)); }
void write(bool value) { write_bool(value); }
void write_bool(bool value) {
if (value)
write_true();
else
write_false();
}
void write_true() { stream.write(pack::Code::True); }
void write_false() { stream.write(pack::Code::False); }
template <class T>
void write_value(T value) {
value = bswap(value);
stream.write(reinterpret_cast<const byte *>(&value), sizeof(value));
}
void write_integer(int64_t value) {
if (value >= minus_2_to_the_4 && value < plus_2_to_the_7) {
write(static_cast<byte>(value));
} else if (value >= minus_2_to_the_7 && value < minus_2_to_the_4) {
write(pack::Code::Int8);
write(static_cast<byte>(value));
} else if (value >= minus_2_to_the_15 && value < plus_2_to_the_15) {
write(pack::Code::Int16);
write_value(static_cast<int16_t>(value));
} else if (value >= minus_2_to_the_31 && value < plus_2_to_the_31) {
write(pack::Code::Int32);
write_value(static_cast<int32_t>(value));
} else {
write(pack::Code::Int64);
write_value(value);
}
}
void write(double value) { write_double(value); }
void write_double(double value) {
write(pack::Code::Float64);
write_value(*reinterpret_cast<const int64_t *>(&value));
}
void write_map_header(size_t size) {
if (size < 0x10) {
write(static_cast<byte>(static_cast<size_t>(pack::Code::TinyMap) | size));
} else if (size <= 0xFF) {
write(pack::Code::Map8);
write(static_cast<byte>(size));
} else if (size <= 0xFFFF) {
write(pack::Code::Map16);
write_value<uint16_t>(size);
} else {
write(pack::Code::Map32);
write_value<uint32_t>(size);
}
}
void write_empty_map() { write(pack::Code::TinyMap); }
void write_list_header(size_t size) {
if (size < 0x10) {
write(
static_cast<byte>(static_cast<size_t>(pack::Code::TinyList) | size));
} else if (size <= 0xFF) {
write(pack::Code::List8);
write(static_cast<byte>(size));
} else if (size <= 0xFFFF) {
write(pack::Code::List16);
write_value<uint16_t>(size);
} else {
write(pack::Code::List32);
write_value<uint32_t>(size);
}
}
void write_empty_list() { write(pack::Code::TinyList); }
void write_string_header(size_t size) {
if (size < 0x10) {
write(
static_cast<byte>(static_cast<byte>(pack::Code::TinyString) | size));
} else if (size <= 0xFF) {
write(pack::Code::String8);
write(static_cast<byte>(size));
} else if (size <= 0xFFFF) {
write(pack::Code::String16);
write_value<uint16_t>(size);
} else {
write(pack::Code::String32);
write_value<uint32_t>(size);
}
}
void write(const std::string &str) { write_string(str); }
void write_string(const std::string &str) {
write_string(str.c_str(), str.size());
}
void write_string(const char *str, size_t len) {
write_string_header(len);
write(reinterpret_cast<const byte *>(str), len);
}
void write_struct_header(size_t size) {
if (size < 0x10) {
write(static_cast<byte>(static_cast<size_t>(pack::Code::TinyStruct) |
size));
} else if (size <= 0xFF) {
write(pack::Code::Struct8);
write(static_cast<byte>(size));
} else {
write(pack::Code::Struct16);
write_value<uint16_t>(size);
}
}
void message_success() {
write_struct_header(1);
write(underlying_cast(MessageCode::Success));
}
void message_success_empty() {
message_success();
write_empty_map();
}
void message_record() {
write_struct_header(1);
write(underlying_cast(MessageCode::Record));
}
void message_record_empty() {
message_record();
write_empty_list();
}
void message_ignored() {
write_struct_header(0);
write(underlying_cast(MessageCode::Ignored));
}
void message_failure() {
write_struct_header(1);
write(underlying_cast(MessageCode::Failure));
}
void message_ignored_empty() {
message_ignored();
write_empty_map();
}
protected:
Logger logger;
private:
Stream &stream;
};
}

View File

@ -1,6 +1,6 @@
#include "communication/bolt/v1/transport/buffer.hpp"
namespace bolt {
namespace communication::bolt {
void Buffer::write(const byte* data, size_t len) {
buffer.insert(buffer.end(), data, data + len);

View File

@ -6,7 +6,7 @@
#include "utils/types/byte.hpp"
namespace bolt {
namespace communication::bolt {
class Buffer {
public:

View File

@ -1,58 +0,0 @@
#pragma once
#include <cstring>
#include <memory>
#include <vector>
#include "communication/bolt/v1/config.hpp"
#include "logging/default.hpp"
#include "utils/types/byte.hpp"
namespace bolt {
template <class Stream>
class ChunkedBuffer {
static constexpr size_t C = bolt::config::C; /* chunk size */
public:
ChunkedBuffer(Stream &stream) : stream(stream) {
logger = logging::log->logger("Chunked Buffer");
}
void write(const byte *values, size_t n) {
logger.trace("Write {} bytes", n);
// total size of the buffer is now bigger for n
size += n;
// reserve enough spece for the new data
buffer.reserve(size);
// copy new data
std::copy(values, values + n, std::back_inserter(buffer));
}
void flush() {
// TODO: check for success
stream.get().Write(&buffer.front(), size);
logger.trace("Flushed {} bytes", size);
// GC
// TODO: impelement a better strategy
buffer.clear();
// reset size
size = 0;
}
~ChunkedBuffer() {}
private:
Logger logger;
// every new stream.write creates new TCP package
std::reference_wrapper<Stream> stream;
std::vector<byte> buffer;
size_t size{0};
};
}

View File

@ -8,7 +8,7 @@
#include "utils/likely.hpp"
#include "utils/types/byte.hpp"
namespace bolt {
namespace communication::bolt {
template <class Stream>
class ChunkedDecoder {

View File

@ -1,83 +0,0 @@
#pragma once
#include <array>
#include <cstring>
#include <functional>
#include "communication/bolt/v1/config.hpp"
#include "logging/default.hpp"
#include "utils/likely.hpp"
namespace bolt {
template <class Stream>
class ChunkedEncoder {
static constexpr size_t N = bolt::config::N;
static constexpr size_t C = bolt::config::C;
public:
using byte = unsigned char;
ChunkedEncoder(Stream &stream)
: logger(logging::log->logger("Chunked Encoder")), stream(stream) {}
static constexpr size_t chunk_size = N - 2;
void write(byte value) {
if (UNLIKELY(pos == N)) write_chunk();
chunk[pos++] = value;
}
void write(const byte *values, size_t n) {
logger.trace("write {} bytes", n);
while (n > 0) {
auto size = n < N - pos ? n : N - pos;
std::memcpy(chunk.data() + pos, values, size);
pos += size;
n -= size;
// TODO: see how bolt splits message over more TCP packets,
// test for more TCP packets
if (pos == N) write_chunk();
}
}
void write_chunk() {
write_chunk_header();
// write two zeros to signal message end
chunk[pos++] = 0x00;
chunk[pos++] = 0x00;
flush();
}
private:
Logger logger;
std::reference_wrapper<Stream> stream;
std::array<byte, C> chunk;
size_t pos{2};
void write_chunk_header() {
// write the size of the chunk
uint16_t size = pos - 2;
// write the higher byte
chunk[0] = size >> 8;
// write the lower byte
chunk[1] = size & 0xFF;
}
void flush() {
// write chunk to the stream
stream.get().write(chunk.data(), pos);
pos = 2;
}
};
}

View File

@ -2,7 +2,7 @@
#include "utils/exceptions/stacktrace_exception.hpp"
namespace bolt {
namespace communication::bolt {
class StreamError : StacktraceException {
public:

View File

@ -7,7 +7,7 @@
#include "utils/bswap.hpp"
#include "utils/types/byte.hpp"
namespace bolt {
namespace communication::bolt {
// BoltDecoder for streams. Meant for use in SnapshotDecoder.
// This should be recoded to recieve the current caller so that decoder can

View File

@ -104,10 +104,10 @@ class Server
std::atomic<bool> alive_{true};
int idx_{0};
Socket socket_;
Dbms &dbms_;
QueryEngine<OutputStream> &query_engine_;
Event event_;
Socket socket_;
Logger logger_;
};
}

View File

@ -21,9 +21,9 @@
using endpoint_t = io::network::NetworkEndpoint;
using socket_t = io::network::Socket;
using bolt_server_t =
communication::Server<bolt::Session<socket_t>, bolt::RecordStream<socket_t>,
socket_t>;
using session_t = communication::bolt::Session<socket_t>;
using result_stream_t = communication::bolt::ResultStream<socket_t>;
using bolt_server_t = communication::Server<session_t, result_stream_t, socket_t>;
static bolt_server_t *serverptr;
@ -98,7 +98,7 @@ int main(int argc, char **argv) {
logger.info("Listening on {} at {}", interface, port);
Dbms dbms;
QueryEngine<bolt::RecordStream<socket_t>> query_engine;
QueryEngine<result_stream_t> query_engine;
// initialize server
bolt_server_t server(std::move(socket), dbms, query_engine);

View File

@ -2,7 +2,7 @@
#include <string>
#include "data_structures/bitset/static_bitset.hpp"
#include "communication/bolt/v1/serialization/record_stream.hpp"
#include "communication/bolt/v1/encoder/result_stream.hpp"
#include "io/network/socket.hpp"
#include "query/backend/cpp/typed_value.hpp"
#include "query/plan_interface.hpp"

View File

@ -30,12 +30,10 @@ enum CliqueQuery { SCORE_AND_LIMIT, FIND_ALL };
bool run_general_query(GraphDbAccessor &db_accessor, const Parameters &args,
Stream &stream, enum CliqueQuery query_type) {
if (query_type == CliqueQuery::FIND_ALL)
stream.write_fields(
{"a.garment_id", "b.garment_id", "c.garment_id", "d.garment_id"});
else
stream.write_fields({"a.garment_id", "b.garment_id", "c.garment_id",
"d.garment_id", "score"});
std::vector<std::string> headers{std::string("a.garment_id"), std::string("b.garment_id"), std::string("c.garment_id"), std::string("d.garment_id")};
if (query_type != CliqueQuery::FIND_ALL)
headers.push_back(std::string("score"));
stream.Header(headers);
// TODO dgleich: this code is very inefficient as it first makes a copy
// of all the vertices/edges, and filters aftwarwards. I warned about this
// happening in code review!!!
@ -206,18 +204,18 @@ bool run_general_query(GraphDbAccessor &db_accessor, const Parameters &args,
? args.At((int)args.Size() - 1).Value<int64_t>()
: (int)results.size();
for (int i = 0; i < std::min(limit, (int)results.size()); ++i) {
stream.write_record();
stream.write_list_header(query_type == CliqueQuery::SCORE_AND_LIMIT ? 5
: 4);
std::vector<TypedValue> result;
for (auto x : results[i]) {
stream.write(vertices_indexed[x]
result.push_back(vertices_indexed[x]
->PropsAt(db_accessor.property("garment_id"))
.Value<int64_t>());
}
if (query_type == CliqueQuery::SCORE_AND_LIMIT)
stream.write(calc_score(results[i]));
result.push_back(calc_score(results[i]));
stream.Result(result);
}
stream.write_meta("r");
std::map<std::string, TypedValue> meta{std::make_pair(std::string("type"), TypedValue(std::string("r")))};
stream.Summary(meta);
db_accessor.commit();
return true;
}

View File

@ -22,9 +22,12 @@ class CPUPlan : public PlanInterface<Stream> {
v.PropsSet(db_accessor.property("partner_id"), args.At(1));
v.PropsSet(db_accessor.property("conceals"), args.At(2));
v.add_label(db_accessor.label("profile"));
stream.write_field("p");
stream.write_vertex_record(v);
stream.write_meta("rw");
std::vector<std::string> headers{std::string("p")};
stream.Header(headers);
std::vector<TypedValue> result{TypedValue(v)};
stream.Result(result);
std::map<std::string, TypedValue> meta{std::make_pair(std::string("type"), TypedValue(std::string("rw")))};
stream.Summary(meta);
db_accessor.commit();
return true;
}

View File

@ -20,9 +20,12 @@ class CPUPlan : public PlanInterface<Stream> {
v.PropsSet(db_accessor.property("profile_id"), args.At(0));
v.PropsSet(db_accessor.property("partner_id"), args.At(1));
v.add_label(db_accessor.label("profile"));
stream.write_field("p");
stream.write_vertex_record(v);
stream.write_meta("rw");
std::vector<std::string> headers{std::string("p")};
stream.Header(headers);
std::vector<TypedValue> result{TypedValue(v)};
stream.Result(result);
std::map<std::string, TypedValue> meta{std::make_pair(std::string("type"), TypedValue(std::string("rw")))};
stream.Summary(meta);
db_accessor.commit();
return true;
}

View File

@ -22,9 +22,12 @@ class CPUPlan : public PlanInterface<Stream> {
v.PropsSet(db_accessor.property("partner_id"), args.At(1));
v.PropsSet(db_accessor.property("reveals"), args.At(2));
v.add_label(db_accessor.label("profile"));
stream.write_field("p");
stream.write_vertex_record(v);
stream.write_meta("rw");
std::vector<std::string> headers{std::string("p")};
stream.Header(headers);
std::vector<TypedValue> result{TypedValue(v)};
stream.Result(result);
std::map<std::string, TypedValue> meta{std::make_pair(std::string("type"), TypedValue(std::string("rw")))};
stream.Summary(meta);
db_accessor.commit();
return true;
}

View File

@ -20,9 +20,12 @@ class CPUPlan : public PlanInterface<Stream> {
v.add_label(db_accessor.label("garment"));
v.PropsSet(db_accessor.property("garment_id"), args.At(0));
v.PropsSet(db_accessor.property("garment_category_id"), args.At(1));
stream.write_field("g");
stream.write_vertex_record(v);
stream.write_meta("rw");
std::vector<std::string> headers{std::string("g")};
stream.Header(headers);
std::vector<TypedValue> result{TypedValue(v)};
stream.Result(result);
std::map<std::string, TypedValue> meta{std::make_pair(std::string("type"), TypedValue(std::string("rw")))};
stream.Summary(meta);
db_accessor.commit();
return true;
}

View File

@ -22,9 +22,12 @@ class CPUPlan : public PlanInterface<Stream> {
v.PropsSet(db_accessor.property("garment_id"), args.At(0));
v.PropsSet(db_accessor.property("garment_category_id"), args.At(1));
v.PropsSet(db_accessor.property("conceals"), args.At(2));
stream.write_field("g");
stream.write_vertex_record(v);
stream.write_meta("rw");
std::vector<std::string> headers{std::string("g")};
stream.Header(headers);
std::vector<TypedValue> result{TypedValue(v)};
stream.Result(result);
std::map<std::string, TypedValue> meta{std::make_pair(std::string("type"), TypedValue(std::string("rw")))};
stream.Summary(meta);
db_accessor.commit();
return true;
}

View File

@ -22,9 +22,12 @@ class CPUPlan : public PlanInterface<Stream> {
v.PropsSet(db_accessor.property("garment_id"), args.At(0));
v.PropsSet(db_accessor.property("garment_category_id"), args.At(1));
v.PropsSet(db_accessor.property("reveals"), args.At(2));
stream.write_field("g");
stream.write_vertex_record(v);
stream.write_meta("rw");
std::vector<std::string> headers{std::string("g")};
stream.Header(headers);
std::vector<TypedValue> result{TypedValue(v)};
stream.Result(result);
std::map<std::string, TypedValue> meta{std::make_pair(std::string("type"), TypedValue(std::string("rw")))};
stream.Summary(meta);
db_accessor.commit();
return true;
}

View File

@ -17,8 +17,10 @@ class CPUPlan : public PlanInterface<Stream> {
bool run(GraphDbAccessor &db_accessor, const Parameters &args,
Stream &stream) {
for (auto v : db_accessor.vertices()) db_accessor.detach_remove_vertex(v);
stream.write_empty_fields();
stream.write_meta("rw");
std::vector<std::string> headers;
stream.Header(headers);
std::map<std::string, TypedValue> meta{std::make_pair(std::string("type"), TypedValue(std::string("rw")))};
stream.Summary(meta);
db_accessor.commit();
return true;
}

View File

@ -17,7 +17,8 @@ class CPUPlan : public PlanInterface<Stream> {
public:
bool run(GraphDbAccessor &db_accessor, const Parameters &args,
Stream &stream) {
stream.write_field("g");
std::vector<std::string> headers{std::string("g")};
stream.Header(headers);
for (auto vertex : db_accessor.vertices()) {
if (vertex.has_label(db_accessor.label("garment"))) {
TypedValue prop = vertex.PropsAt(db_accessor.property("garment_id"));
@ -25,10 +26,12 @@ class CPUPlan : public PlanInterface<Stream> {
auto cmp = prop == args.At(0);
if (cmp.type() != TypedValue::Type::Bool) continue;
if (cmp.Value<bool>() != true) continue;
stream.write_vertex_record(vertex);
std::vector<TypedValue> result{TypedValue(vertex)};
stream.Result(result);
}
}
stream.write_meta("r");
std::map<std::string, TypedValue> meta{std::make_pair(std::string("type"), TypedValue(std::string("r")))};
stream.Summary(meta);
db_accessor.commit();
return true;
}

View File

@ -18,7 +18,8 @@ class CPUPlan : public PlanInterface<Stream> {
public:
bool run(GraphDbAccessor &db_accessor, const Parameters &args,
Stream &stream) {
stream.write_field("r");
std::vector<std::string> headers{std::string("r")};
stream.Header(headers);
std::vector<VertexAccessor> g1_set, g2_set;
for (auto g1 : db_accessor.vertices()) {
if (g1.has_label(db_accessor.label("garment"))) {
@ -44,9 +45,11 @@ class CPUPlan : public PlanInterface<Stream> {
for (auto g2 : g2_set) {
EdgeAccessor e = db_accessor.insert_edge(
g1, g2, db_accessor.edge_type("default_outfit"));
stream.write_edge_record(e);
std::vector<TypedValue> result{TypedValue(e)};
stream.Result(result);
}
stream.write_meta("rw");
std::map<std::string, TypedValue> meta{std::make_pair(std::string("type"), TypedValue(std::string("rw")))};
stream.Summary(meta);
db_accessor.commit();
return true;
}

View File

@ -17,7 +17,8 @@ using std::endl;
bool run_general_query(GraphDbAccessor &db_accessor,
const Parameters &args, Stream &stream,
const std::string &general_label) {
stream.write_field("g");
std::vector<std::string> headers{std::string("g")};
stream.Header(headers);
for (auto vertex : db_accessor.vertices()) {
if (vertex.has_label(db_accessor.label("garment"))) {
TypedValue prop = vertex.PropsAt(db_accessor.property("garment_id"));
@ -26,10 +27,12 @@ bool run_general_query(GraphDbAccessor &db_accessor,
if (cmp.type() != TypedValue::Type::Bool) continue;
if (cmp.Value<bool>() != true) continue;
vertex.add_label(db_accessor.label(general_label));
stream.write_vertex_record(vertex);
std::vector<TypedValue> result{TypedValue(vertex)};
stream.Result(result);
}
}
stream.write_meta("rw");
std::map<std::string, TypedValue> meta{std::make_pair(std::string("type"), TypedValue(std::string("rw")))};
stream.Summary(meta);
db_accessor.commit();
return true;
}

View File

@ -17,7 +17,8 @@ class CPUPlan : public PlanInterface<Stream> {
public:
bool run(GraphDbAccessor &db_accessor, const Parameters &args,
Stream &stream) {
stream.write_field("p");
std::vector<std::string> headers{std::string("p")};
stream.Header(headers);
for (auto vertex : db_accessor.vertices()) {
if (vertex.has_label(db_accessor.label("profile"))) {
TypedValue prop = vertex.PropsAt(db_accessor.property("profile_id"));
@ -31,10 +32,12 @@ class CPUPlan : public PlanInterface<Stream> {
auto cmp2 = prop2 == args.At(1);
if (cmp2.type() != TypedValue::Type::Bool) continue;
if (cmp2.Value<bool>() != true) continue;
stream.write_vertex_record(vertex);
std::vector<TypedValue> result{TypedValue(vertex)};
stream.Result(result);
}
}
stream.write_meta("r");
std::map<std::string, TypedValue> meta{std::make_pair(std::string("type"), TypedValue(std::string("r")))};
stream.Summary(meta);
db_accessor.commit();
return true;
}

View File

@ -20,7 +20,8 @@ class CPUPlan : public PlanInterface<Stream> {
public:
bool run(GraphDbAccessor &db_accessor, const Parameters &args,
Stream &stream) {
stream.write_field("s");
std::vector<std::string> headers{std::string("s")};
stream.Header(headers);
auto profile = [&db_accessor, &args](const VertexAccessor &v) -> bool {
TypedValue prop = v.PropsAt(db_accessor.property("profile_id"));
if (prop.type() == TypedValue::Type::Null) return false;
@ -46,10 +47,12 @@ class CPUPlan : public PlanInterface<Stream> {
auto to = edge.to();
if (edge.edge_type() != db_accessor.edge_type("score")) continue;
if ((profile(from) && garment(to)) || (profile(to) && garment(from))) {
stream.write_edge_record(edge);
std::vector<TypedValue> result{TypedValue(edge)};
stream.Result(result);
}
}
stream.write_meta("r");
std::map<std::string, TypedValue> meta{std::make_pair(std::string("type"), TypedValue(std::string("r")))};
stream.Summary(meta);
db_accessor.commit();
return true;
}

View File

@ -18,7 +18,8 @@ class CPUPlan : public PlanInterface<Stream> {
public:
bool run(GraphDbAccessor &db_accessor, const Parameters &args,
Stream &stream) {
stream.write_field("r");
std::vector<std::string> headers{std::string("r")};
stream.Header(headers);
std::vector<VertexAccessor> g1_set, g2_set;
for (auto g1 : db_accessor.vertices()) {
if (g1.has_label(db_accessor.label("profile"))) {
@ -51,9 +52,11 @@ class CPUPlan : public PlanInterface<Stream> {
EdgeAccessor e =
db_accessor.insert_edge(g1, g2, db_accessor.edge_type("score"));
e.PropsSet(db_accessor.property("score"), args.At(3));
stream.write_edge_record(e);
std::vector<TypedValue> result{TypedValue(e)};
stream.Result(result);
}
stream.write_meta("rw");
std::map<std::string, TypedValue> meta{std::make_pair(std::string("type"), TypedValue(std::string("rw")))};
stream.Summary(meta);
db_accessor.commit();
return true;
}

View File

@ -19,7 +19,8 @@ class CPUPlan : public PlanInterface<Stream> {
public:
bool run(GraphDbAccessor &db_accessor, const Parameters &args,
Stream &stream) {
stream.write_field("s");
std::vector<std::string> headers{std::string("s")};
stream.Header(headers);
auto profile = [&db_accessor, &args](const VertexAccessor &v) -> bool {
TypedValue prop = v.PropsAt(db_accessor.property("profile_id"));
if (prop.type() == TypedValue::Type::Null) return false;
@ -46,10 +47,12 @@ class CPUPlan : public PlanInterface<Stream> {
if (edge.edge_type() != db_accessor.edge_type("score")) continue;
if ((profile(from) && garment(to)) || (profile(to) && garment(from))) {
edge.PropsSet(db_accessor.property("score"), args.At(3));
stream.write_edge_record(edge);
std::vector<TypedValue> result{TypedValue(edge)};
stream.Result(result);
}
}
stream.write_meta("rw");
std::map<std::string, TypedValue> meta{std::make_pair(std::string("type"), TypedValue(std::string("rw")))};
stream.Summary(meta);
db_accessor.commit();
return true;
}

View File

@ -3,9 +3,9 @@
// the flag is only used in hardcoded queries compilation
// see usage in plan_compiler.hpp
#ifndef HARDCODED_OUTPUT_STREAM
#include "communication/bolt/v1/serialization/record_stream.hpp"
#include "communication/bolt/v1/encoder/result_stream.hpp"
#include "io/network/socket.hpp"
using Stream = bolt::RecordStream<io::network::Socket>;
using Stream = communication::bolt::ResultStream<io::network::Socket>;
#else
#include "../stream/print_record_stream.hpp"
using Stream = PrintRecordStream;

View File

@ -44,40 +44,16 @@ class PrintRecordStream {
public:
PrintRecordStream(std::ostream &stream) : stream(stream) {}
void write_success() { stream << "SUCCESS\n"; }
void write_success_empty() { stream << "SUCCESS EMPTY\n"; }
void write_ignored() { stream << "IGNORED\n"; }
void write_empty_fields() { stream << "EMPTY FIELDS\n"; }
void write_fields(const std::vector<std::string> &fields) {
stream << "FIELDS:";
for (auto &field : fields) {
stream << " " << field;
}
stream << '\n';
// TODO: all these functions should pretty print their data
void Header(const std::vector<std::string> &fields) {
stream << "Header\n";
}
void write_field(const std::string &field) {
stream << "Field: " << field << '\n';
void Result(std::vector<TypedValue> &values) {
stream << "Result\n";
}
void write(const TypedValue &value) { stream << value << " "; }
void write_list_header(size_t size) { stream << "List: " << size << '\n'; }
void write_record() { stream << "Record\n"; }
void write_vertex_record(const VertexAccessor &vertex) { stream << vertex; }
void write_edge_record(const EdgeAccessor &edge) { stream << edge; }
void write_meta(const std::string &type) {
stream << "Meta: " << type << std::endl;
void Summary(const std::map<std::string, TypedValue> &summary) {
stream << "Summary\n";
}
void write_failure(const std::map<std::string, std::string> &data) {}
void write_count(const size_t count) {}
void chunk() { stream << "CHUNK\n"; }
};

View File

@ -1,9 +1,12 @@
#include "bolt_common.hpp"
#include "communication/bolt/v1/serialization/record_stream.hpp"
#include "communication/bolt/v1/encoder/result_stream.hpp"
#include "communication/bolt/v1/session.hpp"
#include "query/engine.hpp"
using result_stream_t = communication::bolt::ResultStream<TestSocket>;
using session_t = communication::bolt::Session<TestSocket>;
const uint8_t handshake_req[] =
"\x60\x60\xb0\x17\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
@ -24,19 +27,19 @@ const uint8_t run_req[] =
TEST(Bolt, Session) {
Dbms dbms;
TestSocket socket(10);
QueryEngine<bolt::RecordStream<TestSocket>> query_engine;
bolt::Session<TestSocket> session(std::move(socket), dbms, query_engine);
QueryEngine<result_stream_t> query_engine;
session_t session(std::move(socket), dbms, query_engine);
std::vector<uint8_t>& output = session.socket.output;
// execute handshake
session.execute(handshake_req, 20);
ASSERT_EQ(session.state, bolt::INIT);
ASSERT_EQ(session.state, communication::bolt::INIT);
print_output(output);
check_output(output, handshake_resp, 4);
// execute init
session.execute(init_req, 67);
ASSERT_EQ(session.state, bolt::EXECUTOR);
ASSERT_EQ(session.state, communication::bolt::EXECUTOR);
print_output(output);
check_output(output, init_resp, 7);

View File

@ -21,7 +21,7 @@ struct DummyStream {
std::vector<byte> data;
};
using Decoder = bolt::ChunkedDecoder<DummyStream>;
using Decoder = communication::bolt::ChunkedDecoder<DummyStream>;
std::vector<byte> chunks[] = {
{0x00, 0x08, 'A', ' ', 'q', 'u', 'i', 'c', 'k', ' ', 0x00, 0x06, 'b', 'r',

View File

@ -1,111 +0,0 @@
#include <cassert>
#include <deque>
#include <iostream>
#include <vector>
#include "gtest/gtest.h"
#include "communication/bolt/v1/transport/chunked_encoder.hpp"
#include "logging/default.hpp"
#include "logging/streams/stdout.hpp"
using byte = unsigned char;
void print_hex(byte x) { printf("%02X ", static_cast<byte>(x)); }
class DummyStream {
public:
void write(const byte *values, size_t n) {
num_calls++;
data.insert(data.end(), values, values + n);
}
byte pop() {
auto c = data.front();
data.pop_front();
return c;
}
size_t pop_size() { return ((size_t)pop() << 8) | pop(); }
void print() {
for (size_t i = 0; i < data.size(); ++i) print_hex(data[i]);
}
std::deque<byte> data;
size_t num_calls{0};
};
using Encoder = bolt::ChunkedEncoder<DummyStream>;
void write_ff(Encoder &encoder, size_t n) {
std::vector<byte> v;
for (size_t i = 0; i < n; ++i) v.push_back('\xFF');
encoder.write(v.data(), v.size());
}
void check_ff(DummyStream &stream, size_t n) {
for (size_t i = 0; i < n; ++i) ASSERT_EQ(stream.pop(), byte('\xFF'));
(void)stream;
}
using encoder_t = bolt::ChunkedEncoder<DummyStream>;
TEST(ChunkedEncoderTest, Encode) {
DummyStream stream;
encoder_t encoder(stream);
size_t chunk_size = encoder_t::chunk_size;
write_ff(encoder, 10);
write_ff(encoder, 10);
encoder.write_chunk();
write_ff(encoder, 10);
write_ff(encoder, 10);
encoder.write_chunk();
// this should be two chunks, one of size 65533 and the other of size 1467
write_ff(encoder, 67000);
encoder.write_chunk();
for (int i = 0; i < 10000; ++i) write_ff(encoder, 1500);
encoder.write_chunk();
ASSERT_EQ(stream.pop_size(), 20);
check_ff(stream, 20);
ASSERT_EQ(stream.pop_size(), 0);
ASSERT_EQ(stream.pop_size(), 20);
check_ff(stream, 20);
ASSERT_EQ(stream.pop_size(), 0);
ASSERT_EQ(stream.pop_size(), chunk_size);
check_ff(stream, chunk_size);
ASSERT_EQ(stream.pop_size(), 0);
ASSERT_EQ(stream.pop_size(), 1467);
check_ff(stream, 1467);
ASSERT_EQ(stream.pop_size(), 0);
size_t k = 10000 * 1500;
while (k > 0) {
auto size = k > chunk_size ? chunk_size : k;
ASSERT_EQ(stream.pop_size(), size);
check_ff(stream, size);
ASSERT_EQ(stream.pop_size(), 0);
k -= size;
}
ASSERT_EQ(stream.pop_size(), 0);
}
int main(int argc, char **argv) {
logging::init_sync();
logging::log->pipe(std::make_unique<Stdout>());
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}