diff --git a/include/storage/model/properties/properties.hpp b/include/storage/model/properties/properties.hpp index 029265bd3..b2c023af2 100644 --- a/include/storage/model/properties/properties.hpp +++ b/include/storage/model/properties/properties.hpp @@ -9,6 +9,17 @@ class Properties public: using sptr = std::shared_ptr<Properties>; + auto begin() const { return props.begin(); } + auto cbegin() const { return props.cbegin(); } + + auto end() const { return props.end(); } + auto cend() const { return props.cend(); } + + size_t size() const + { + return props.size(); + } + const Property& at(const std::string& key) const; template <class T, class... Args> diff --git a/include/storage/model/properties/property.hpp b/include/storage/model/properties/property.hpp index 28b8423ee..43c818239 100644 --- a/include/storage/model/properties/property.hpp +++ b/include/storage/model/properties/property.hpp @@ -91,22 +91,6 @@ public: friend std::ostream& operator<<(std::ostream& stream, const Property& prop); -// template <class Handler> -// void accept(Handler& h) -// { -// switch(flags) -// { -// case Flags::True: return h.handle(static_cast<Bool&>(*this)); -// case Flags::False: return h.handle(static_cast<Bool&>(*this)); -// case Flags::String: return h.handle(static_cast<String&>(*this)); -// case Flags::Int32: return h.handle(static_cast<Int32&>(*this)); -// case Flags::Int64: return h.handle(static_cast<Int64&>(*this)); -// case Flags::Float: return h.handle(static_cast<Float&>(*this)); -// case Flags::Double: return h.handle(static_cast<Double&>(*this)); -// default: return; -// } -// } - const Flags flags; }; diff --git a/src/bolt/v1/packing/codes.hpp b/src/bolt/v1/packing/codes.hpp index f963efb8c..274cbb757 100644 --- a/src/bolt/v1/packing/codes.hpp +++ b/src/bolt/v1/packing/codes.hpp @@ -10,43 +10,47 @@ namespace pack enum Code : uint8_t { - TinyString = 0x80, - TinyList = 0x90, - TinyMap = 0xA0, - TinyStruct = 0xB0, + TinyString = 0x80, + TinyList = 0x90, + TinyMap = 0xA0, + TinyStruct = 0xB0, - Null = 0xC0, + Null = 0xC0, - Float64 = 0xC1, + Float64 = 0xC1, - False = 0xC2, - True = 0xC3, + False = 0xC2, + True = 0xC3, - Int8 = 0xC8, - Int16 = 0xC9, - Int32 = 0xCA, - Int64 = 0xCB, + Int8 = 0xC8, + Int16 = 0xC9, + Int32 = 0xCA, + Int64 = 0xCB, - Bytes8 = 0xCC, - Bytes16 = 0xCD, - Bytes32 = 0xCE, + Bytes8 = 0xCC, + Bytes16 = 0xCD, + Bytes32 = 0xCE, - String8 = 0xD0, - String16 = 0xD1, - String32 = 0xD2, + String8 = 0xD0, + String16 = 0xD1, + String32 = 0xD2, - List8 = 0xD4, - List16 = 0xD5, - List32 = 0xD6, + List8 = 0xD4, + List16 = 0xD5, + List32 = 0xD6, - Map8 = 0xD8, - Map16 = 0xD9, - Map32 = 0xDA, - MapStream = 0xDB, + Map8 = 0xD8, + Map16 = 0xD9, + Map32 = 0xDA, + MapStream = 0xDB, - Struct8 = 0xDC, - Struct16 = 0xDD, - EndOfStream = 0xDF, + Node = 0x4E, + Relationship = 0x52, + Path = 0x50, + + Struct8 = 0xDC, + Struct16 = 0xDD, + EndOfStream = 0xDF, }; } diff --git a/src/bolt/v1/serialization/bolt_serializer.hpp b/src/bolt/v1/serialization/bolt_serializer.hpp new file mode 100644 index 000000000..be791817b --- /dev/null +++ b/src/bolt/v1/serialization/bolt_serializer.hpp @@ -0,0 +1,144 @@ +#pragma once + +#include "bolt/v1/transport/bolt_encoder.hpp" +#include "bolt/v1/packing/codes.hpp" + +#include "include/storage/vertex_accessor.hpp" +#include "storage/edge_accessor.hpp" + +#include "storage/model/properties/properties.hpp" +#include "storage/model/properties/all.hpp" + +namespace bolt +{ + +template <class Stream> +class BoltSerializer +{ + friend class Property; + +public: + BoltSerializer() {} + + /* Serializes the vertex accessor into the packstream format + * + * struct[size = 3] Vertex [signature = 0x4E] { + * Integer node_id; + * List<String> labels; + * Map<String, Value> properties; + * } + * + */ + void write(const Vertex::Accessor& vertex) + { + // write signatures for the node struct and node data type + encoder.write_struct_header(3); + encoder.write(underlying_cast(pack::Node)); + + // write the identifier for the node + encoder.write_integer(vertex.id()); + + // write the list of labels + auto labels = vertex.labels(); + + encoder.write_list_header(labels.size()); + + for(auto& label : labels) + encoder.write_string(label.get()); + + // write the property map + auto props = vertex.properties(); + + encoder.write_map_header(props.size()); + + for(auto& prop : props) + write(prop); + } + + /* Serializes the vertex accessor into the packstream format + * + * struct[size = 5] Edge [signature = 0x52] { + * Integer edge_id; + * Integer start_node_id; + * Integer end_node_id; + * String type; + * Map<String, Value> properties; + * } + * + */ + void write(const Edge::Accessor& edge) + { + // write signatures for the edge struct and edge data type + encoder.write_struct_header(5); + encoder.write(underlying_cast(pack::Relationship)); + + // write the identifier for the node + encoder.write_integer(edge.id()); + + // TODO refactor when from() and to() start returning Accessors + encoder.write_integer(edge.from()->id); + encoder.write_integer(edge.to()->id); + + // write the type of the edge + encoder.write_string(edge.edge_type()); + + // write the property map + auto props = edge.properties(); + + encoder.write_map_header(props.size()); + + for(auto& prop : props) + write(prop); + } + + void write(const Property& prop) + { + accept(prop, *this); + } + + void write_null() + { + encoder.write_null(); + } + + void write(const Bool& prop) + { + encoder.write_bool(prop.value()); + } + + void write(const Float& prop) + { + encoder.write_double(prop.value); + } + + void write(const Double& prop) + { + encoder.write_double(prop.value); + } + + void write(const Int32& prop) + { + encoder.write_integer(prop.value); + } + + void write(const Int64& prop) + { + encoder.write_integer(prop.value); + } + + void write(const String& prop) + { + encoder.write_string(prop.value); + } + +protected: + BoltEncoder<Stream> encoder; + + template <class T> + void handle(const T& prop) + { + write(prop); + } +}; + +} diff --git a/src/bolt/v1/serialization/record_stream.hpp b/src/bolt/v1/serialization/record_stream.hpp new file mode 100644 index 000000000..ffd14f6e4 --- /dev/null +++ b/src/bolt/v1/serialization/record_stream.hpp @@ -0,0 +1,15 @@ +#pragma once + +#include "bolt_serializer.hpp" + +namespace bolt +{ + +class RecordStream : BoltSerializer +{ +public: + + +}; + +} diff --git a/src/bolt/v1/serialization/socket_serializer.hpp b/src/bolt/v1/serialization/socket_serializer.hpp new file mode 100644 index 000000000..7d16f6666 --- /dev/null +++ b/src/bolt/v1/serialization/socket_serializer.hpp @@ -0,0 +1,22 @@ +#pragma once + +#include "bolt/v1/transport/chunked_encoder.hpp" +#include "bolt/v1/transport/socket_stream.hpp" + +#include "bolt/v1/transport/bolt_encoder.hpp" + +namespace bolt +{ + +template <class Socket> +class SocketSerializer : public BoltEncoder<ChunkedEncoder<SocketStream>> +{ +public: + SocketSerializer(Socket& socket) : BoltEncoder(encoder), stream(socket) {} + +private: + SocketStream stream; + ChunkedEncoder<SocketStream> encoder {stream}; +}; + +} diff --git a/src/bolt/v1/session.hpp b/src/bolt/v1/session.hpp index 9bfa48886..af5c1a708 100644 --- a/src/bolt/v1/session.hpp +++ b/src/bolt/v1/session.hpp @@ -8,6 +8,8 @@ #include "bolt/v1/transport/bolt_decoder.hpp" #include "bolt/v1/transport/bolt_encoder.hpp" +#include "bolt/v1/serialization/socket_serializer.hpp" + #include "bolt.hpp" #include "logging/default.hpp" @@ -18,7 +20,7 @@ class Session : public io::tcp::Stream<io::Socket> { public: using Decoder = BoltDecoder; - using Encoder = BoltEncoder<io::Socket>; + using Encoder = SocketSerializer<io::Socket>; Session(io::Socket&& socket, Bolt& bolt); diff --git a/src/bolt/v1/states.cpp b/src/bolt/v1/states.cpp index a7e0a9974..f7bdea07b 100644 --- a/src/bolt/v1/states.cpp +++ b/src/bolt/v1/states.cpp @@ -2,6 +2,8 @@ #include "states/handshake.hpp" #include "states/init.hpp" +#include "states/error.hpp" + #include "states/executor.hpp" namespace bolt @@ -12,6 +14,7 @@ States::States() handshake = std::make_unique<Handshake>(); init = std::make_unique<Init>(); executor = std::make_unique<Executor>(); + error = std::make_unique<Error>(); } } diff --git a/src/bolt/v1/states.hpp b/src/bolt/v1/states.hpp index c323214d4..be4cbc344 100644 --- a/src/bolt/v1/states.hpp +++ b/src/bolt/v1/states.hpp @@ -14,6 +14,7 @@ public: State::uptr handshake; State::uptr init; State::uptr executor; + State::uptr error; }; } diff --git a/src/bolt/v1/states/error.cpp b/src/bolt/v1/states/error.cpp index c482a1d8f..c51eda182 100644 --- a/src/bolt/v1/states/error.cpp +++ b/src/bolt/v1/states/error.cpp @@ -13,6 +13,9 @@ State* Error::run(Session& session) { // todo reset current statement? is it even necessary? + session.encoder.message_success_empty(); + session.encoder.flush(); + return session.bolt.states.executor.get(); } else if(message_type == MessageCode::Reset) @@ -20,6 +23,9 @@ State* Error::run(Session& session) // todo rollback current transaction // discard all records waiting to be sent + session.encoder.message_success_empty(); + session.encoder.flush(); + return session.bolt.states.executor.get(); } diff --git a/src/bolt/v1/transport/bolt_encoder.cpp b/src/bolt/v1/transport/bolt_encoder.cpp deleted file mode 100644 index 472a42447..000000000 --- a/src/bolt/v1/transport/bolt_encoder.cpp +++ /dev/null @@ -1 +0,0 @@ -#include "bolt_encoder.hpp" diff --git a/src/bolt/v1/transport/bolt_encoder.hpp b/src/bolt/v1/transport/bolt_encoder.hpp index 1fa8321e8..fe26349b2 100644 --- a/src/bolt/v1/transport/bolt_encoder.hpp +++ b/src/bolt/v1/transport/bolt_encoder.hpp @@ -1,19 +1,16 @@ #pragma once -#include "chunked_encoder.hpp" -#include "socket_stream.hpp" +#include <string> #include "bolt/v1/packing/codes.hpp" #include "bolt/v1/messaging/codes.hpp" - #include "utils/types/byte.hpp" - #include "utils/bswap.hpp" namespace bolt { -template <class Socket> +template <class Stream> class BoltEncoder { static constexpr int64_t plus_2_to_the_31 = 2147483648L; @@ -25,48 +22,58 @@ class BoltEncoder static constexpr int64_t minus_2_to_the_31 = -2147483648L; public: - BoltEncoder(Socket& socket) : stream(socket) {} + BoltEncoder(Stream& stream) : stream(stream) {} void flush() { - encoder.flush(); + stream.flush(); } void write(byte value) { - encoder.write(value); + write_byte(value); + } + + void write_byte(byte value) + { + stream.write(value); } void write(const byte* values, size_t n) { - encoder.write(values, n); + stream.write(values, n); } void write_null() { - encoder.write(pack::Null); + stream.write(pack::Null); } void write(bool value) + { + write_bool(value); + } + + void write_bool(bool value) { if(value) write_true(); else write_false(); } void write_true() { - encoder.write(pack::True); + stream.write(pack::True); } void write_false() { - encoder.write(pack::False); + stream.write(pack::False); } template <class T> void write_value(T value) { value = bswap(value); - encoder.write(reinterpret_cast<const byte*>(&value), sizeof(value)); + stream.write(reinterpret_cast<const byte*>(&value), sizeof(value)); } void write_integer(int64_t value) @@ -98,6 +105,11 @@ public: } void write(double value) + { + write_double(value); + } + + void write_double(double value) { write(pack::Float64); write_value(*reinterpret_cast<const int64_t*>(&value)); @@ -235,9 +247,20 @@ public: write_empty_list(); } + void message_ignored() + { + write_struct_header(1); + write(underlying_cast(MessageCode::Ignored)); + } + + void message_ignored_empty() + { + message_ignored(); + write_empty_map(); + } + private: - SocketStream stream; - ChunkedEncoder<SocketStream> encoder {stream}; + Stream& stream; }; } diff --git a/src/examples/bolt b/src/examples/bolt new file mode 100755 index 000000000..4cafc918d Binary files /dev/null and b/src/examples/bolt differ diff --git a/src/examples/compile-bolt.sh b/src/examples/compile-bolt.sh index 349e20e46..855539e59 100644 --- a/src/examples/compile-bolt.sh +++ b/src/examples/compile-bolt.sh @@ -1,3 +1,3 @@ #!/bin/bash -clang++ -g -rdynamic ../bolt/v1/states/executor.cpp ../logging/streams/stdout.cpp ../logging/levels.cpp ../logging/logs/sync_log.cpp ../logging/logs/async_log.cpp ../logging/default.cpp ../logging/log.cpp ../bolt/v1/bolt.cpp ../bolt/v1/states/init.cpp ../bolt/v1/states.cpp ../bolt/v1/states/handshake.cpp ../bolt/v1/transport/bolt_decoder.cpp ../bolt/v1/transport/buffer.cpp ../bolt/v1/session.cpp bolt.cpp ../io/network/tls.cpp -o bolt -std=c++14 -I ../ -I ../../libs/fmt/ -pthread -lcppformat -lssl -lcrypto +clang++ -g -rdynamic ../bolt/v1/states/error.cpp ../bolt/v1/states/executor.cpp ../logging/streams/stdout.cpp ../logging/levels.cpp ../logging/logs/sync_log.cpp ../logging/logs/async_log.cpp ../logging/default.cpp ../logging/log.cpp ../bolt/v1/bolt.cpp ../bolt/v1/states/init.cpp ../bolt/v1/states.cpp ../bolt/v1/states/handshake.cpp ../bolt/v1/transport/bolt_decoder.cpp ../bolt/v1/transport/buffer.cpp ../bolt/v1/session.cpp bolt.cpp ../io/network/tls.cpp -o bolt -std=c++14 -I ../ -I ../../libs/fmt/ -pthread -lcppformat -lssl -lcrypto diff --git a/src/storage/edge_accessor.hpp b/src/storage/edge_accessor.hpp index b1a8c4ac0..acb02d31b 100644 --- a/src/storage/edge_accessor.hpp +++ b/src/storage/edge_accessor.hpp @@ -36,7 +36,7 @@ public: this->record->data.to = vertex_record; } - auto from() { return this->record->data.from; } + auto from() const { return this->record->data.from; } - auto to() { return this->record->data.to; } + auto to() const { return this->record->data.to; } }; diff --git a/src/storage/vertex_accessor.cpp b/src/storage/vertex_accessor.cpp index f6237c216..380f30495 100644 --- a/src/storage/vertex_accessor.cpp +++ b/src/storage/vertex_accessor.cpp @@ -3,16 +3,16 @@ #include "storage/vertices.hpp" size_t Vertex::Accessor::out_degree() const -{ - return this->record->data.out.degree(); +{ + return this->record->data.out.degree(); } -size_t Vertex::Accessor::in_degree() const +size_t Vertex::Accessor::in_degree() const { return this->record->data.in.degree(); } -size_t Vertex::Accessor::degree() const +size_t Vertex::Accessor::degree() const { return in_degree() + out_degree(); }