This commit is contained in:
Marko Budiselic 2016-08-08 09:32:45 +01:00
commit e4d9258df9
16 changed files with 282 additions and 68 deletions

View File

@ -9,6 +9,17 @@ class Properties
public:
using sptr = std::shared_ptr<Properties>;
auto begin() const { return props.begin(); }
auto cbegin() const { return props.cbegin(); }
auto end() const { return props.end(); }
auto cend() const { return props.cend(); }
size_t size() const
{
return props.size();
}
const Property& at(const std::string& key) const;
template <class T, class... Args>

View File

@ -91,22 +91,6 @@ public:
friend std::ostream& operator<<(std::ostream& stream, const Property& prop);
// template <class Handler>
// void accept(Handler& h)
// {
// switch(flags)
// {
// case Flags::True: return h.handle(static_cast<Bool&>(*this));
// case Flags::False: return h.handle(static_cast<Bool&>(*this));
// case Flags::String: return h.handle(static_cast<String&>(*this));
// case Flags::Int32: return h.handle(static_cast<Int32&>(*this));
// case Flags::Int64: return h.handle(static_cast<Int64&>(*this));
// case Flags::Float: return h.handle(static_cast<Float&>(*this));
// case Flags::Double: return h.handle(static_cast<Double&>(*this));
// default: return;
// }
// }
const Flags flags;
};

View File

@ -10,43 +10,47 @@ namespace pack
enum Code : uint8_t
{
TinyString = 0x80,
TinyList = 0x90,
TinyMap = 0xA0,
TinyStruct = 0xB0,
TinyString = 0x80,
TinyList = 0x90,
TinyMap = 0xA0,
TinyStruct = 0xB0,
Null = 0xC0,
Null = 0xC0,
Float64 = 0xC1,
Float64 = 0xC1,
False = 0xC2,
True = 0xC3,
False = 0xC2,
True = 0xC3,
Int8 = 0xC8,
Int16 = 0xC9,
Int32 = 0xCA,
Int64 = 0xCB,
Int8 = 0xC8,
Int16 = 0xC9,
Int32 = 0xCA,
Int64 = 0xCB,
Bytes8 = 0xCC,
Bytes16 = 0xCD,
Bytes32 = 0xCE,
Bytes8 = 0xCC,
Bytes16 = 0xCD,
Bytes32 = 0xCE,
String8 = 0xD0,
String16 = 0xD1,
String32 = 0xD2,
String8 = 0xD0,
String16 = 0xD1,
String32 = 0xD2,
List8 = 0xD4,
List16 = 0xD5,
List32 = 0xD6,
List8 = 0xD4,
List16 = 0xD5,
List32 = 0xD6,
Map8 = 0xD8,
Map16 = 0xD9,
Map32 = 0xDA,
MapStream = 0xDB,
Map8 = 0xD8,
Map16 = 0xD9,
Map32 = 0xDA,
MapStream = 0xDB,
Struct8 = 0xDC,
Struct16 = 0xDD,
EndOfStream = 0xDF,
Node = 0x4E,
Relationship = 0x52,
Path = 0x50,
Struct8 = 0xDC,
Struct16 = 0xDD,
EndOfStream = 0xDF,
};
}

View File

@ -0,0 +1,144 @@
#pragma once
#include "bolt/v1/transport/bolt_encoder.hpp"
#include "bolt/v1/packing/codes.hpp"
#include "include/storage/vertex_accessor.hpp"
#include "storage/edge_accessor.hpp"
#include "storage/model/properties/properties.hpp"
#include "storage/model/properties/all.hpp"
namespace bolt
{
template <class Stream>
class BoltSerializer
{
friend class Property;
public:
BoltSerializer() {}
/* Serializes the vertex accessor into the packstream format
*
* struct[size = 3] Vertex [signature = 0x4E] {
* Integer node_id;
* List<String> labels;
* Map<String, Value> properties;
* }
*
*/
void write(const Vertex::Accessor& vertex)
{
// write signatures for the node struct and node data type
encoder.write_struct_header(3);
encoder.write(underlying_cast(pack::Node));
// write the identifier for the node
encoder.write_integer(vertex.id());
// write the list of labels
auto labels = vertex.labels();
encoder.write_list_header(labels.size());
for(auto& label : labels)
encoder.write_string(label.get());
// write the property map
auto props = vertex.properties();
encoder.write_map_header(props.size());
for(auto& prop : props)
write(prop);
}
/* Serializes the vertex accessor into the packstream format
*
* struct[size = 5] Edge [signature = 0x52] {
* Integer edge_id;
* Integer start_node_id;
* Integer end_node_id;
* String type;
* Map<String, Value> properties;
* }
*
*/
void write(const Edge::Accessor& edge)
{
// write signatures for the edge struct and edge data type
encoder.write_struct_header(5);
encoder.write(underlying_cast(pack::Relationship));
// write the identifier for the node
encoder.write_integer(edge.id());
// TODO refactor when from() and to() start returning Accessors
encoder.write_integer(edge.from()->id);
encoder.write_integer(edge.to()->id);
// write the type of the edge
encoder.write_string(edge.edge_type());
// write the property map
auto props = edge.properties();
encoder.write_map_header(props.size());
for(auto& prop : props)
write(prop);
}
void write(const Property& prop)
{
accept(prop, *this);
}
void write_null()
{
encoder.write_null();
}
void write(const Bool& prop)
{
encoder.write_bool(prop.value());
}
void write(const Float& prop)
{
encoder.write_double(prop.value);
}
void write(const Double& prop)
{
encoder.write_double(prop.value);
}
void write(const Int32& prop)
{
encoder.write_integer(prop.value);
}
void write(const Int64& prop)
{
encoder.write_integer(prop.value);
}
void write(const String& prop)
{
encoder.write_string(prop.value);
}
protected:
BoltEncoder<Stream> encoder;
template <class T>
void handle(const T& prop)
{
write(prop);
}
};
}

View File

@ -0,0 +1,15 @@
#pragma once
#include "bolt_serializer.hpp"
namespace bolt
{
class RecordStream : BoltSerializer
{
public:
};
}

View File

@ -0,0 +1,22 @@
#pragma once
#include "bolt/v1/transport/chunked_encoder.hpp"
#include "bolt/v1/transport/socket_stream.hpp"
#include "bolt/v1/transport/bolt_encoder.hpp"
namespace bolt
{
template <class Socket>
class SocketSerializer : public BoltEncoder<ChunkedEncoder<SocketStream>>
{
public:
SocketSerializer(Socket& socket) : BoltEncoder(encoder), stream(socket) {}
private:
SocketStream stream;
ChunkedEncoder<SocketStream> encoder {stream};
};
}

View File

@ -8,6 +8,8 @@
#include "bolt/v1/transport/bolt_decoder.hpp"
#include "bolt/v1/transport/bolt_encoder.hpp"
#include "bolt/v1/serialization/socket_serializer.hpp"
#include "bolt.hpp"
#include "logging/default.hpp"
@ -18,7 +20,7 @@ class Session : public io::tcp::Stream<io::Socket>
{
public:
using Decoder = BoltDecoder;
using Encoder = BoltEncoder<io::Socket>;
using Encoder = SocketSerializer<io::Socket>;
Session(io::Socket&& socket, Bolt& bolt);

View File

@ -2,6 +2,8 @@
#include "states/handshake.hpp"
#include "states/init.hpp"
#include "states/error.hpp"
#include "states/executor.hpp"
namespace bolt
@ -12,6 +14,7 @@ States::States()
handshake = std::make_unique<Handshake>();
init = std::make_unique<Init>();
executor = std::make_unique<Executor>();
error = std::make_unique<Error>();
}
}

View File

@ -14,6 +14,7 @@ public:
State::uptr handshake;
State::uptr init;
State::uptr executor;
State::uptr error;
};
}

View File

@ -13,6 +13,9 @@ State* Error::run(Session& session)
{
// todo reset current statement? is it even necessary?
session.encoder.message_success_empty();
session.encoder.flush();
return session.bolt.states.executor.get();
}
else if(message_type == MessageCode::Reset)
@ -20,6 +23,9 @@ State* Error::run(Session& session)
// todo rollback current transaction
// discard all records waiting to be sent
session.encoder.message_success_empty();
session.encoder.flush();
return session.bolt.states.executor.get();
}

View File

@ -1 +0,0 @@
#include "bolt_encoder.hpp"

View File

@ -1,19 +1,16 @@
#pragma once
#include "chunked_encoder.hpp"
#include "socket_stream.hpp"
#include <string>
#include "bolt/v1/packing/codes.hpp"
#include "bolt/v1/messaging/codes.hpp"
#include "utils/types/byte.hpp"
#include "utils/bswap.hpp"
namespace bolt
{
template <class Socket>
template <class Stream>
class BoltEncoder
{
static constexpr int64_t plus_2_to_the_31 = 2147483648L;
@ -25,48 +22,58 @@ class BoltEncoder
static constexpr int64_t minus_2_to_the_31 = -2147483648L;
public:
BoltEncoder(Socket& socket) : stream(socket) {}
BoltEncoder(Stream& stream) : stream(stream) {}
void flush()
{
encoder.flush();
stream.flush();
}
void write(byte value)
{
encoder.write(value);
write_byte(value);
}
void write_byte(byte value)
{
stream.write(value);
}
void write(const byte* values, size_t n)
{
encoder.write(values, n);
stream.write(values, n);
}
void write_null()
{
encoder.write(pack::Null);
stream.write(pack::Null);
}
void write(bool value)
{
write_bool(value);
}
void write_bool(bool value)
{
if(value) write_true(); else write_false();
}
void write_true()
{
encoder.write(pack::True);
stream.write(pack::True);
}
void write_false()
{
encoder.write(pack::False);
stream.write(pack::False);
}
template <class T>
void write_value(T value)
{
value = bswap(value);
encoder.write(reinterpret_cast<const byte*>(&value), sizeof(value));
stream.write(reinterpret_cast<const byte*>(&value), sizeof(value));
}
void write_integer(int64_t value)
@ -98,6 +105,11 @@ public:
}
void write(double value)
{
write_double(value);
}
void write_double(double value)
{
write(pack::Float64);
write_value(*reinterpret_cast<const int64_t*>(&value));
@ -235,9 +247,20 @@ public:
write_empty_list();
}
void message_ignored()
{
write_struct_header(1);
write(underlying_cast(MessageCode::Ignored));
}
void message_ignored_empty()
{
message_ignored();
write_empty_map();
}
private:
SocketStream stream;
ChunkedEncoder<SocketStream> encoder {stream};
Stream& stream;
};
}

BIN
src/examples/bolt Executable file

Binary file not shown.

View File

@ -1,3 +1,3 @@
#!/bin/bash
clang++ -g -rdynamic ../bolt/v1/states/executor.cpp ../logging/streams/stdout.cpp ../logging/levels.cpp ../logging/logs/sync_log.cpp ../logging/logs/async_log.cpp ../logging/default.cpp ../logging/log.cpp ../bolt/v1/bolt.cpp ../bolt/v1/states/init.cpp ../bolt/v1/states.cpp ../bolt/v1/states/handshake.cpp ../bolt/v1/transport/bolt_decoder.cpp ../bolt/v1/transport/buffer.cpp ../bolt/v1/session.cpp bolt.cpp ../io/network/tls.cpp -o bolt -std=c++14 -I ../ -I ../../libs/fmt/ -pthread -lcppformat -lssl -lcrypto
clang++ -g -rdynamic ../bolt/v1/states/error.cpp ../bolt/v1/states/executor.cpp ../logging/streams/stdout.cpp ../logging/levels.cpp ../logging/logs/sync_log.cpp ../logging/logs/async_log.cpp ../logging/default.cpp ../logging/log.cpp ../bolt/v1/bolt.cpp ../bolt/v1/states/init.cpp ../bolt/v1/states.cpp ../bolt/v1/states/handshake.cpp ../bolt/v1/transport/bolt_decoder.cpp ../bolt/v1/transport/buffer.cpp ../bolt/v1/session.cpp bolt.cpp ../io/network/tls.cpp -o bolt -std=c++14 -I ../ -I ../../libs/fmt/ -pthread -lcppformat -lssl -lcrypto

View File

@ -36,7 +36,7 @@ public:
this->record->data.to = vertex_record;
}
auto from() { return this->record->data.from; }
auto from() const { return this->record->data.from; }
auto to() { return this->record->data.to; }
auto to() const { return this->record->data.to; }
};

View File

@ -3,16 +3,16 @@
#include "storage/vertices.hpp"
size_t Vertex::Accessor::out_degree() const
{
return this->record->data.out.degree();
{
return this->record->data.out.degree();
}
size_t Vertex::Accessor::in_degree() const
size_t Vertex::Accessor::in_degree() const
{
return this->record->data.in.degree();
}
size_t Vertex::Accessor::degree() const
size_t Vertex::Accessor::degree() const
{
return in_degree() + out_degree();
}