Bolt integration: RecordStream -> BoltSerializer -> BoltEncoder -> ChunkedEncoder -> ChunkedBuffer -> SockerStream

This commit is contained in:
Marko Budiselic 2016-08-10 19:17:38 +01:00
parent 869da8dcda
commit 35d8f6d7ab
15 changed files with 269 additions and 117 deletions

View File

@ -0,0 +1,14 @@
#pragma once
#include <cstddef>
namespace bolt
{
namespace config
{
static constexpr size_t N = 65535; /* chunk size */
static constexpr size_t C = N + 2; /* end mark */
}
}

View File

@ -1,8 +0,0 @@
#pragma once
namespace bolt
{
// TODO: what should be here? (Question for Dominik)
}

View File

@ -18,7 +18,7 @@ class BoltSerializer
friend class Property;
public:
BoltSerializer() {}
BoltSerializer(Stream& stream) : encoder(stream) {}
/* Serializes the vertex accessor into the packstream format
*
@ -132,7 +132,7 @@ public:
}
protected:
BoltEncoder<Stream> encoder;
Stream& encoder;
template <class T>
void handle(const T& prop)

View File

@ -1,15 +1,120 @@
#pragma once
#include "bolt/v1/serialization/bolt_serializer.hpp"
#include "bolt/v1/transport/chunked_buffer.hpp"
#include "bolt/v1/transport/chunked_encoder.hpp"
#include "bolt/v1/transport/socket_stream.hpp"
#include "logging/default.hpp"
namespace bolt
{
class RecordStream : BoltSerializer
// compiled queries have to use this class in order to return results
// query code should not know about bolt protocol
template <class Socket>
class RecordStream
{
public:
RecordStream(Socket &socket) : socket(socket)
{
logger = logging::log->logger("Record Stream");
}
// TODO: create apstract methods that are not bolt specific ---------------
void write_success()
{
logger.trace("write_success");
bolt_encoder.message_success();
}
void write_success_empty()
{
logger.trace("write_success_empty");
bolt_encoder.message_success_empty();
}
void write_ignored()
{
logger.trace("write_ignored");
bolt_encoder.message_ignored();
}
void write_fields(const std::vector<std::string> &fields)
{
// TODO: that should be one level below?
bolt_encoder.message_success();
bolt_encoder.write_map_header(1);
bolt_encoder.write_string("fields");
write_list_header(fields.size());
for (auto &name : fields) {
bolt_encoder.write_string(name);
}
flush();
}
void write_list_header(size_t size)
{
bolt_encoder.write_list_header(size);
}
void write_record()
{
bolt_encoder.message_record();
}
// -- BOLT SPECIFIC METHODS -----------------------------------------------
void write(const Vertex::Accessor &vertex) { serializer.write(vertex); }
void write(const Edge::Accessor &edge) { serializer.write(edge); }
void write(const Property &prop) { serializer.write(prop); }
void write(const Bool& prop) { serializer.write(prop); }
void write(const Float& prop) { serializer.write(prop); }
void write(const Int32& prop) { serializer.write(prop); }
void write(const Int64& prop) { serializer.write(prop); }
void write(const Double& prop) { serializer.write(prop); }
void write(const String& prop) { serializer.write(prop); }
void flush()
{
chunked_encoder.flush();
chunked_buffer.flush();
}
void _write_test()
{
logger.trace("write_test");
write_fields({{"name"}});
write_record();
write_list_header(1);
write(String("max"));
write_record();
write_list_header(1);
write(String("paul"));
write_success_empty();
}
protected:
Logger logger;
private:
using buffer_t = ChunkedBuffer<SocketStream>;
using chunked_encoder_t = ChunkedEncoder<buffer_t>;
using bolt_encoder_t = BoltEncoder<chunked_encoder_t>;
using bolt_serializer_t = BoltSerializer<bolt_encoder_t>;
SocketStream socket;
buffer_t chunked_buffer{socket};
chunked_encoder_t chunked_encoder{chunked_buffer};
bolt_encoder_t bolt_encoder{chunked_encoder};
bolt_serializer_t serializer{bolt_encoder};
};
}

View File

@ -1,22 +0,0 @@
#pragma once
#include "bolt/v1/transport/chunked_encoder.hpp"
#include "bolt/v1/transport/socket_stream.hpp"
#include "bolt/v1/transport/bolt_encoder.hpp"
namespace bolt
{
template <class Socket>
class SocketSerializer : public BoltEncoder<ChunkedEncoder<SocketStream>>
{
public:
SocketSerializer(Socket& socket) : BoltEncoder(encoder), stream(socket) {}
private:
SocketStream stream;
ChunkedEncoder<SocketStream> encoder {stream};
};
}

View File

@ -1,13 +1,13 @@
#pragma once
#include "io/network/tcp/stream.hpp"
#include "io/network/socket.hpp"
#include "io/network/tcp/stream.hpp"
#include "bolt/v1/bolt.hpp"
#include "bolt/v1/serialization/record_stream.hpp"
#include "bolt/v1/states/state.hpp"
#include "bolt/v1/transport/bolt_decoder.hpp"
#include "bolt/v1/transport/bolt_encoder.hpp"
#include "bolt/v1/serialization/socket_serializer.hpp"
#include "bolt/v1/bolt.hpp"
#include "logging/default.hpp"
@ -18,27 +18,25 @@ class Session : public io::tcp::Stream<io::Socket>
{
public:
using Decoder = BoltDecoder;
using Encoder = SocketSerializer<io::Socket>;
using OutputStream = RecordStream<io::Socket>;
Session(io::Socket&& socket, Bolt& bolt);
Session(io::Socket &&socket, Bolt &bolt);
bool alive() const;
void execute(const byte* data, size_t len);
void execute(const byte *data, size_t len);
void close();
Bolt& bolt;
Db& active_db();
Bolt &bolt;
Db &active_db();
Decoder decoder;
Encoder encoder {socket};
OutputStream output_stream{socket};
bool connected {false};
State* state;
bool connected{false};
State *state;
protected:
Logger logger;
};
}

View File

@ -6,6 +6,7 @@
#include "bolt/v1/messaging/codes.hpp"
#include "utils/types/byte.hpp"
#include "utils/bswap.hpp"
#include "logging/default.hpp"
namespace bolt
{
@ -22,7 +23,10 @@ class BoltEncoder
static constexpr int64_t minus_2_to_the_31 = -2147483648L;
public:
BoltEncoder(Stream& stream) : stream(stream) {}
BoltEncoder(Stream& stream) : stream(stream)
{
logger = logging::log->logger("Bolt Encoder");
}
void flush()
{
@ -36,6 +40,7 @@ public:
void write_byte(byte value)
{
logger.trace("write byte: {}", value);
stream.write(value);
}
@ -259,6 +264,9 @@ public:
write_empty_map();
}
protected:
Logger logger;
private:
Stream& stream;
};

View File

@ -4,14 +4,14 @@
#include <cstdlib>
#include <vector>
#include "utils/types/byte.hpp"
namespace bolt
{
class Buffer
{
public:
using byte = uint8_t;
void write(const byte* data, size_t len);
void clear();

View File

@ -0,0 +1,74 @@
#pragma once
#include <memory>
#include <vector>
#include <cstring>
#include "bolt/v1/config.hpp"
#include "utils/types/byte.hpp"
#include "logging/default.hpp"
namespace bolt
{
template <class Stream>
class ChunkedBuffer
{
static constexpr size_t C = bolt::config::C; /* chunk size */
public:
ChunkedBuffer(Stream &stream) : stream(stream)
{
logger = logging::log->logger("Chunked Buffer");
}
void write(const byte *values, size_t n)
{
// TODO: think about shared pointer
// TODO: this is naive implementation, it can be implemented much better
logger.trace("write {} bytes", n);
byte *chunk = chunk = (byte *)std::malloc(n * sizeof(byte));
last_size = n;
std::memcpy(chunk, values, n);
buffer.push_back(chunk);
}
void flush()
{
logger.trace("Flush");
for (size_t i = 0; i < buffer.size(); ++i) {
if (i == buffer.size() - 1)
stream.get().write(buffer[i], last_size);
else
stream.get().write(buffer[i], C);
}
destroy();
}
~ChunkedBuffer()
{
destroy();
}
private:
Logger logger;
std::reference_wrapper<Stream> stream;
std::vector<byte *> buffer;
size_t last_size {0}; // last chunk size (it is going to be less than C)
void destroy()
{
for (size_t i = 0; i < buffer.size(); ++i) {
std::free(buffer[i]);
}
buffer.clear();
}
};
}

View File

@ -1,13 +1,13 @@
#pragma once
#include <cassert>
#include <cstring>
#include <functional>
#include <cassert>
#include "utils/exceptions/basic_exception.hpp"
#include "utils/likely.hpp"
#include "logging/default.hpp"
#include "utils/exceptions/basic_exception.hpp"
#include "utils/likely.hpp"
#include "utils/types/byte.hpp"
namespace bolt
{
@ -22,8 +22,6 @@ public:
using BasicException::BasicException;
};
using byte = unsigned char;
ChunkedDecoder(Stream& stream) : stream(stream) {}
/* Decode chunked data
@ -33,14 +31,14 @@ public:
* |Header| Data ||Header| Data || ... || End |
* | 2B | size bytes || 2B | size bytes || ... ||00 00|
*/
bool decode(const byte*& chunk, size_t n)
bool decode(const byte *&chunk, size_t n)
{
while(n > 0)
while (n > 0)
{
// get size from first two bytes in the chunk
auto size = get_size(chunk);
if(UNLIKELY(size + 2 > n))
if (UNLIKELY(size + 2 > n))
throw DecoderError("Chunk size larger than available data.");
// advance chunk to pass those two bytes
@ -48,8 +46,7 @@ public:
n -= 2;
// if chunk size is 0, we're done!
if(size == 0)
return true;
if (size == 0) return true;
stream.get().write(chunk, size);
@ -60,18 +57,14 @@ public:
return false;
}
bool operator()(const byte*& chunk, size_t n)
{
return decode(chunk, n);
}
bool operator()(const byte *&chunk, size_t n) { return decode(chunk, n); }
private:
std::reference_wrapper<Stream> stream;
size_t get_size(const byte* chunk)
size_t get_size(const byte *chunk)
{
return size_t(chunk[0]) << 8 | chunk[1];
}
};
}

View File

@ -5,6 +5,8 @@
#include <functional>
#include "utils/likely.hpp"
#include "bolt/v1/config.hpp"
#include "logging/default.hpp"
namespace bolt
{
@ -12,13 +14,16 @@ namespace bolt
template <class Stream>
class ChunkedEncoder
{
static constexpr size_t N = 65535;
static constexpr size_t C = N + 2 /* end mark */;
static constexpr size_t N = bolt::config::N;
static constexpr size_t C = bolt::config::C;
public:
using byte = unsigned char;
ChunkedEncoder(Stream& stream) : stream(stream) {}
ChunkedEncoder(Stream& stream) : stream(stream)
{
logger = logging::log->logger("Chunked Encoder");
}
static constexpr size_t chunk_size = N - 2;
@ -32,6 +37,8 @@ public:
void write(const byte* values, size_t n)
{
logger.trace("write {} bytes", n);
while(n > 0)
{
auto size = n < N - pos ? n : N - pos;
@ -58,6 +65,7 @@ public:
}
private:
Logger logger;
std::reference_wrapper<Stream> stream;
std::array<byte, C> chunk;
@ -65,7 +73,9 @@ private:
void end_chunk()
{
write_chunk_header();
// TODO: this call is unnecessary bacause the same method is called
// inside the flush method
// write_chunk_header();
flush();
}

View File

@ -10,9 +10,9 @@ State* Error::run(Session& session)
if(message_type == MessageCode::AckFailure)
{
// TODO reset current statement? is it even necessary?
session.encoder.message_success_empty();
session.encoder.flush();
session.output_stream.write_success_empty();
session.output_stream.flush();
return session.bolt.states.executor.get();
}
@ -21,14 +21,15 @@ State* Error::run(Session& session)
// TODO rollback current transaction
// discard all records waiting to be sent
session.encoder.message_success_empty();
session.encoder.flush();
session.output_stream.write_success_empty();
session.output_stream.flush();
return session.bolt.states.executor.get();
}
session.encoder.message_ignored();
session.encoder.flush();
session.output_stream.write_ignored();
session.output_stream.flush();
return this;
}

View File

@ -12,6 +12,8 @@ State* Executor::run(Session& session)
// information contained in this byte
session.decoder.read_byte();
logger.debug("Run");
auto message_type = session.decoder.read_byte();
if(message_type == MessageCode::Run)
@ -52,52 +54,28 @@ void Executor::run(Session& session, Query& query)
{
logger.trace("[Run] '{}'", query.statement);
auto &db = session.active_db();
logger.info("[ActiveDB] '{}'", db.name());
query_engine.execute(query.statement, db);
session.encoder.message_success();
session.encoder.write_map_header(1);
session.encoder.write_string("fields");
session.encoder.write_list_header(1);
session.encoder.write_string("name");
session.encoder.flush();
// auto &db = session.active_db();
// logger.info("[ActiveDB] '{}'", db.name());
// query_engine.execute(query.statement, db);
session.output_stream._write_test();
}
void Executor::pull_all(Session& session)
{
logger.trace("[PullAll]");
session.encoder.message_record();
session.encoder.write_list_header(1);
session.encoder.write_string(session.active_db().name());
session.encoder.message_record();
session.encoder.write_list_header(1);
session.encoder.write_string("buda");
session.encoder.message_record();
session.encoder.write_list_header(1);
session.encoder.write_string("domko");
session.encoder.message_record();
session.encoder.write_list_header(1);
session.encoder.write_string("max");
session.encoder.message_success_empty();
session.encoder.flush();
session.output_stream.flush();
}
void Executor::discard_all(Session& session)
{
logger.trace("[DiscardAll]");
session.encoder.message_success();
session.encoder.flush();
// TODO: discard state
session.output_stream.write_success();
session.output_stream.flush();
}
}

View File

@ -45,8 +45,8 @@ State* Init::execute(Session& session, Message& message)
{
logger.debug("Client connected '{}'", message.client_name);
session.encoder.message_success_empty();
session.encoder.flush();
session.output_stream.write_success_empty();
session.output_stream.flush();
return session.bolt.states.executor.get();
}

View File

@ -5,8 +5,9 @@ driver = GraphDatabase.driver("bolt://localhost",
encrypted=0)
session = driver.session()
session.run("CREATE (a:Person {age:25})")
# result = session.run("MATCH (a:Person) RETURN a.age AS age")
# session.run("CREATE (a:Person {age:25})")
result = session.run("MATCH (a:Person) RETURN a.name AS name")
for record in result:
print(record["age"])
print(record["name"])
session.close()