From 35d8f6d7ab1a1e0d1ff13c19dbc755299dac61f8 Mon Sep 17 00:00:00 2001
From: Marko Budiselic <mbudiselicbuda@gmail.com>
Date: Wed, 10 Aug 2016 19:17:38 +0100
Subject: [PATCH] Bolt integration: RecordStream -> BoltSerializer ->
 BoltEncoder -> ChunkedEncoder -> ChunkedBuffer -> SockerStream

---
 include/bolt/v1/config.hpp                    |  14 +++
 include/bolt/v1/messaging/messages.hpp        |   8 --
 .../bolt/v1/serialization/bolt_serializer.hpp |   4 +-
 .../bolt/v1/serialization/record_stream.hpp   | 109 +++++++++++++++++-
 .../v1/serialization/socket_serializer.hpp    |  22 ----
 include/bolt/v1/session.hpp                   |  24 ++--
 include/bolt/v1/transport/bolt_encoder.hpp    |  10 +-
 include/bolt/v1/transport/buffer.hpp          |   4 +-
 include/bolt/v1/transport/chunked_buffer.hpp  |  74 ++++++++++++
 include/bolt/v1/transport/chunked_decoder.hpp |  27 ++---
 include/bolt/v1/transport/chunked_encoder.hpp |  18 ++-
 src/bolt/v1/states/error.cpp                  |  15 +--
 src/bolt/v1/states/executor.cpp               |  46 ++------
 src/bolt/v1/states/init.cpp                   |   4 +-
 src/examples/bolt_py_client/initial_test.py   |   7 +-
 15 files changed, 269 insertions(+), 117 deletions(-)
 create mode 100644 include/bolt/v1/config.hpp
 delete mode 100644 include/bolt/v1/messaging/messages.hpp
 delete mode 100644 include/bolt/v1/serialization/socket_serializer.hpp
 create mode 100644 include/bolt/v1/transport/chunked_buffer.hpp

diff --git a/include/bolt/v1/config.hpp b/include/bolt/v1/config.hpp
new file mode 100644
index 000000000..afd953f91
--- /dev/null
+++ b/include/bolt/v1/config.hpp
@@ -0,0 +1,14 @@
+#pragma once
+
+#include <cstddef>
+
+namespace bolt
+{
+
+namespace config
+{
+    static constexpr size_t N = 65535; /* chunk size */
+    static constexpr size_t C = N + 2; /* end mark */
+}
+
+}
diff --git a/include/bolt/v1/messaging/messages.hpp b/include/bolt/v1/messaging/messages.hpp
deleted file mode 100644
index f89c48054..000000000
--- a/include/bolt/v1/messaging/messages.hpp
+++ /dev/null
@@ -1,8 +0,0 @@
-#pragma once
-
-namespace bolt
-{
-
-// TODO: what should be here? (Question for Dominik)
-
-}
diff --git a/include/bolt/v1/serialization/bolt_serializer.hpp b/include/bolt/v1/serialization/bolt_serializer.hpp
index bafca1ed6..a405af22e 100644
--- a/include/bolt/v1/serialization/bolt_serializer.hpp
+++ b/include/bolt/v1/serialization/bolt_serializer.hpp
@@ -18,7 +18,7 @@ class BoltSerializer
     friend class Property;
 
 public:
-    BoltSerializer() {}
+    BoltSerializer(Stream& stream) : encoder(stream) {}
 
     /* Serializes the vertex accessor into the packstream format
      *
@@ -132,7 +132,7 @@ public:
     }
 
 protected:
-    BoltEncoder<Stream> encoder;
+    Stream& encoder;
 
     template <class T>
     void handle(const T& prop)
diff --git a/include/bolt/v1/serialization/record_stream.hpp b/include/bolt/v1/serialization/record_stream.hpp
index bb058d4fb..5e914e25e 100644
--- a/include/bolt/v1/serialization/record_stream.hpp
+++ b/include/bolt/v1/serialization/record_stream.hpp
@@ -1,15 +1,120 @@
 #pragma once
 
 #include "bolt/v1/serialization/bolt_serializer.hpp"
+#include "bolt/v1/transport/chunked_buffer.hpp"
+#include "bolt/v1/transport/chunked_encoder.hpp"
+#include "bolt/v1/transport/socket_stream.hpp"
+
+#include "logging/default.hpp"
 
 namespace bolt
 {
 
-class RecordStream : BoltSerializer
+// compiled queries have to use this class in order to return results
+// query code should not know about bolt protocol
+
+template <class Socket>
+class RecordStream
 {
 public:
+    RecordStream(Socket &socket) : socket(socket)
+    {
+        logger = logging::log->logger("Record Stream");
+    }
 
+    // TODO: create apstract methods that are not bolt specific ---------------
+    void write_success()
+    {
+        logger.trace("write_success");
+        bolt_encoder.message_success();
+    }
 
+    void write_success_empty()
+    {
+        logger.trace("write_success_empty");
+        bolt_encoder.message_success_empty();
+    }
+
+    void write_ignored()
+    {
+        logger.trace("write_ignored");
+        bolt_encoder.message_ignored();
+    }
+
+    void write_fields(const std::vector<std::string> &fields)
+    {
+        // TODO: that should be one level below?
+        bolt_encoder.message_success();
+
+        bolt_encoder.write_map_header(1);
+        bolt_encoder.write_string("fields");
+        write_list_header(fields.size());
+
+        for (auto &name : fields) {
+            bolt_encoder.write_string(name);
+        }
+
+        flush();
+    }
+
+    void write_list_header(size_t size)
+    {
+        bolt_encoder.write_list_header(size);
+    }
+
+    void write_record()
+    {
+        bolt_encoder.message_record();
+    }
+    // -- BOLT SPECIFIC METHODS -----------------------------------------------
+
+    void write(const Vertex::Accessor &vertex) { serializer.write(vertex); }
+    void write(const Edge::Accessor &edge) { serializer.write(edge); }
+
+    void write(const Property &prop) { serializer.write(prop); }
+    void write(const Bool& prop) { serializer.write(prop); }
+    void write(const Float& prop) { serializer.write(prop); }
+    void write(const Int32& prop) { serializer.write(prop); }
+    void write(const Int64& prop) { serializer.write(prop); }
+    void write(const Double& prop) { serializer.write(prop); }
+    void write(const String& prop) { serializer.write(prop); }
+
+    void flush()
+    {
+        chunked_encoder.flush();
+        chunked_buffer.flush();
+    }
+
+    void _write_test()
+    {
+        logger.trace("write_test");
+
+        write_fields({{"name"}});
+
+        write_record();
+        write_list_header(1);
+        write(String("max"));
+
+        write_record();
+        write_list_header(1);
+        write(String("paul"));
+
+        write_success_empty();
+    }
+
+protected:
+    Logger logger;
+
+private:
+    using buffer_t = ChunkedBuffer<SocketStream>;
+    using chunked_encoder_t = ChunkedEncoder<buffer_t>;
+    using bolt_encoder_t = BoltEncoder<chunked_encoder_t>;
+    using bolt_serializer_t = BoltSerializer<bolt_encoder_t>;
+
+    SocketStream socket;
+    buffer_t chunked_buffer{socket};
+    chunked_encoder_t chunked_encoder{chunked_buffer};
+    bolt_encoder_t bolt_encoder{chunked_encoder};
+    bolt_serializer_t serializer{bolt_encoder};
 };
-
 }
diff --git a/include/bolt/v1/serialization/socket_serializer.hpp b/include/bolt/v1/serialization/socket_serializer.hpp
deleted file mode 100644
index 7d16f6666..000000000
--- a/include/bolt/v1/serialization/socket_serializer.hpp
+++ /dev/null
@@ -1,22 +0,0 @@
-#pragma once
-
-#include "bolt/v1/transport/chunked_encoder.hpp"
-#include "bolt/v1/transport/socket_stream.hpp"
-
-#include "bolt/v1/transport/bolt_encoder.hpp"
-
-namespace bolt
-{
-
-template <class Socket>
-class SocketSerializer : public BoltEncoder<ChunkedEncoder<SocketStream>>
-{
-public:
-    SocketSerializer(Socket& socket) : BoltEncoder(encoder), stream(socket) {}
-
-private:
-    SocketStream stream;
-    ChunkedEncoder<SocketStream> encoder {stream};
-};
-
-}
diff --git a/include/bolt/v1/session.hpp b/include/bolt/v1/session.hpp
index dd8b772ec..b5fe46c0a 100644
--- a/include/bolt/v1/session.hpp
+++ b/include/bolt/v1/session.hpp
@@ -1,13 +1,13 @@
 #pragma once
 
-#include "io/network/tcp/stream.hpp"
 #include "io/network/socket.hpp"
+#include "io/network/tcp/stream.hpp"
 
+#include "bolt/v1/bolt.hpp"
+#include "bolt/v1/serialization/record_stream.hpp"
 #include "bolt/v1/states/state.hpp"
 #include "bolt/v1/transport/bolt_decoder.hpp"
 #include "bolt/v1/transport/bolt_encoder.hpp"
-#include "bolt/v1/serialization/socket_serializer.hpp"
-#include "bolt/v1/bolt.hpp"
 
 #include "logging/default.hpp"
 
@@ -18,27 +18,25 @@ class Session : public io::tcp::Stream<io::Socket>
 {
 public:
     using Decoder = BoltDecoder;
-    using Encoder = SocketSerializer<io::Socket>;
+    using OutputStream = RecordStream<io::Socket>;
 
-    Session(io::Socket&& socket, Bolt& bolt);
+    Session(io::Socket &&socket, Bolt &bolt);
 
     bool alive() const;
 
-    void execute(const byte* data, size_t len);
+    void execute(const byte *data, size_t len);
     void close();
 
-    Bolt& bolt;
-
-    Db& active_db();
+    Bolt &bolt;
+    Db &active_db();
 
     Decoder decoder;
-    Encoder encoder {socket};
+    OutputStream output_stream{socket};
 
-    bool connected {false};
-    State* state;
+    bool connected{false};
+    State *state;
 
 protected:
     Logger logger;
 };
-
 }
diff --git a/include/bolt/v1/transport/bolt_encoder.hpp b/include/bolt/v1/transport/bolt_encoder.hpp
index fe26349b2..e8758bab4 100644
--- a/include/bolt/v1/transport/bolt_encoder.hpp
+++ b/include/bolt/v1/transport/bolt_encoder.hpp
@@ -6,6 +6,7 @@
 #include "bolt/v1/messaging/codes.hpp"
 #include "utils/types/byte.hpp"
 #include "utils/bswap.hpp"
+#include "logging/default.hpp"
 
 namespace bolt
 {
@@ -22,7 +23,10 @@ class BoltEncoder
     static constexpr int64_t minus_2_to_the_31 = -2147483648L;
 
 public:
-    BoltEncoder(Stream& stream) : stream(stream) {}
+    BoltEncoder(Stream& stream) : stream(stream)
+    {
+        logger = logging::log->logger("Bolt Encoder");
+    }
 
     void flush()
     {
@@ -36,6 +40,7 @@ public:
 
     void write_byte(byte value)
     {
+        logger.trace("write byte: {}", value);
         stream.write(value);
     }
 
@@ -259,6 +264,9 @@ public:
         write_empty_map();
     }
 
+protected:
+    Logger logger;
+
 private:
     Stream& stream;
 };
diff --git a/include/bolt/v1/transport/buffer.hpp b/include/bolt/v1/transport/buffer.hpp
index ae44c7f19..51dc89ce6 100644
--- a/include/bolt/v1/transport/buffer.hpp
+++ b/include/bolt/v1/transport/buffer.hpp
@@ -4,14 +4,14 @@
 #include <cstdlib>
 #include <vector>
 
+#include "utils/types/byte.hpp"
+
 namespace bolt
 {
 
 class Buffer
 {
 public:
-    using byte = uint8_t;
-
     void write(const byte* data, size_t len);
 
     void clear();
diff --git a/include/bolt/v1/transport/chunked_buffer.hpp b/include/bolt/v1/transport/chunked_buffer.hpp
new file mode 100644
index 000000000..3a0846712
--- /dev/null
+++ b/include/bolt/v1/transport/chunked_buffer.hpp
@@ -0,0 +1,74 @@
+#pragma once
+
+#include <memory>
+#include <vector>
+#include <cstring>
+
+#include "bolt/v1/config.hpp"
+#include "utils/types/byte.hpp"
+#include "logging/default.hpp"
+
+namespace bolt
+{
+
+template <class Stream>
+class ChunkedBuffer
+{
+    static constexpr size_t C = bolt::config::C; /* chunk size */
+
+public:
+    ChunkedBuffer(Stream &stream) : stream(stream) 
+    {
+        logger = logging::log->logger("Chunked Buffer");
+    }
+
+    void write(const byte *values, size_t n)
+    {
+        // TODO: think about shared pointer
+        // TODO: this is naive implementation, it can be implemented much better
+
+        logger.trace("write {} bytes", n);
+       
+        byte *chunk = chunk = (byte *)std::malloc(n * sizeof(byte));
+        last_size = n;
+
+        std::memcpy(chunk, values, n);
+
+        buffer.push_back(chunk);
+    }
+
+    void flush()
+    {
+        logger.trace("Flush");
+
+        for (size_t i = 0; i < buffer.size(); ++i) {
+            if (i == buffer.size() - 1)
+                stream.get().write(buffer[i], last_size);
+            else
+                stream.get().write(buffer[i], C);
+        }
+
+        destroy();
+    }
+
+    ~ChunkedBuffer()
+    {
+        destroy(); 
+    }
+
+private:
+    Logger logger;
+    std::reference_wrapper<Stream> stream;
+    std::vector<byte *> buffer;
+    size_t last_size {0}; // last chunk size (it is going to be less than C)
+
+    void destroy()
+    {
+        for (size_t i = 0; i < buffer.size(); ++i) {
+            std::free(buffer[i]);
+        }
+        buffer.clear();
+    }
+};
+
+}
diff --git a/include/bolt/v1/transport/chunked_decoder.hpp b/include/bolt/v1/transport/chunked_decoder.hpp
index 6f1feb166..b7252b59f 100644
--- a/include/bolt/v1/transport/chunked_decoder.hpp
+++ b/include/bolt/v1/transport/chunked_decoder.hpp
@@ -1,13 +1,13 @@
 #pragma once
 
+#include <cassert>
 #include <cstring>
 #include <functional>
-#include <cassert>
-
-#include "utils/exceptions/basic_exception.hpp"
-#include "utils/likely.hpp"
 
 #include "logging/default.hpp"
+#include "utils/exceptions/basic_exception.hpp"
+#include "utils/likely.hpp"
+#include "utils/types/byte.hpp"
 
 namespace bolt
 {
@@ -22,8 +22,6 @@ public:
         using BasicException::BasicException;
     };
 
-    using byte = unsigned char;
-
     ChunkedDecoder(Stream& stream) : stream(stream) {}
 
     /* Decode chunked data
@@ -33,14 +31,14 @@ public:
      * |Header|     Data     ||Header|    Data      || ... || End |
      * |  2B  |  size bytes  ||  2B  |  size bytes  || ... ||00 00|
      */
-    bool decode(const byte*& chunk, size_t n)
+    bool decode(const byte *&chunk, size_t n)
     {
-        while(n > 0)
+        while (n > 0)
         {
             // get size from first two bytes in the chunk
             auto size = get_size(chunk);
 
-            if(UNLIKELY(size + 2 > n))
+            if (UNLIKELY(size + 2 > n))
                 throw DecoderError("Chunk size larger than available data.");
 
             // advance chunk to pass those two bytes
@@ -48,8 +46,7 @@ public:
             n -= 2;
 
             // if chunk size is 0, we're done!
-            if(size == 0)
-                return true;
+            if (size == 0) return true;
 
             stream.get().write(chunk, size);
 
@@ -60,18 +57,14 @@ public:
         return false;
     }
 
-    bool operator()(const byte*& chunk, size_t n)
-    {
-        return decode(chunk, n);
-    }
+    bool operator()(const byte *&chunk, size_t n) { return decode(chunk, n); }
 
 private:
     std::reference_wrapper<Stream> stream;
 
-    size_t get_size(const byte* chunk)
+    size_t get_size(const byte *chunk)
     {
         return size_t(chunk[0]) << 8 | chunk[1];
     }
 };
-
 }
diff --git a/include/bolt/v1/transport/chunked_encoder.hpp b/include/bolt/v1/transport/chunked_encoder.hpp
index 9ded82c9a..295cc03c2 100644
--- a/include/bolt/v1/transport/chunked_encoder.hpp
+++ b/include/bolt/v1/transport/chunked_encoder.hpp
@@ -5,6 +5,8 @@
 #include <functional>
 
 #include "utils/likely.hpp"
+#include "bolt/v1/config.hpp"
+#include "logging/default.hpp"
 
 namespace bolt
 {
@@ -12,13 +14,16 @@ namespace bolt
 template <class Stream>
 class ChunkedEncoder
 {
-    static constexpr size_t N = 65535;
-    static constexpr size_t C = N + 2 /* end mark */;
+    static constexpr size_t N = bolt::config::N;
+    static constexpr size_t C = bolt::config::C;
 
 public:
     using byte = unsigned char;
 
-    ChunkedEncoder(Stream& stream) : stream(stream) {}
+    ChunkedEncoder(Stream& stream) : stream(stream)
+    {
+        logger = logging::log->logger("Chunked Encoder");
+    }
 
     static constexpr size_t chunk_size = N - 2;
 
@@ -32,6 +37,8 @@ public:
 
     void write(const byte* values, size_t n)
     {
+        logger.trace("write {} bytes", n);
+
         while(n > 0)
         {
             auto size = n < N - pos ? n : N - pos;
@@ -58,6 +65,7 @@ public:
     }
 
 private:
+    Logger logger;
     std::reference_wrapper<Stream> stream;
 
     std::array<byte, C> chunk;
@@ -65,7 +73,9 @@ private:
 
     void end_chunk()
     {
-        write_chunk_header();
+        // TODO: this call is unnecessary bacause the same method is called
+        // inside the flush method
+        // write_chunk_header();
         flush();
     }
 
diff --git a/src/bolt/v1/states/error.cpp b/src/bolt/v1/states/error.cpp
index 98807d12d..315fbf3e0 100644
--- a/src/bolt/v1/states/error.cpp
+++ b/src/bolt/v1/states/error.cpp
@@ -10,9 +10,9 @@ State* Error::run(Session& session)
     if(message_type == MessageCode::AckFailure)
     {
         // TODO reset current statement? is it even necessary?
-
-        session.encoder.message_success_empty();
-        session.encoder.flush();
+        
+        session.output_stream.write_success_empty();
+        session.output_stream.flush();
 
         return session.bolt.states.executor.get();
     }
@@ -21,14 +21,15 @@ State* Error::run(Session& session)
         // TODO rollback current transaction
         // discard all records waiting to be sent
 
-        session.encoder.message_success_empty();
-        session.encoder.flush();
+        session.output_stream.write_success_empty();
+        session.output_stream.flush();
+
 
         return session.bolt.states.executor.get();
     }
 
-    session.encoder.message_ignored();
-    session.encoder.flush();
+    session.output_stream.write_ignored();
+    session.output_stream.flush();
 
     return this;
 }
diff --git a/src/bolt/v1/states/executor.cpp b/src/bolt/v1/states/executor.cpp
index 08a370241..db95fa65d 100644
--- a/src/bolt/v1/states/executor.cpp
+++ b/src/bolt/v1/states/executor.cpp
@@ -12,6 +12,8 @@ State* Executor::run(Session& session)
     // information contained in this byte
     session.decoder.read_byte();
 
+    logger.debug("Run");
+
     auto message_type = session.decoder.read_byte();
 
     if(message_type == MessageCode::Run)
@@ -52,52 +54,28 @@ void Executor::run(Session& session, Query& query)
 {
     logger.trace("[Run] '{}'", query.statement);
 
-    auto &db = session.active_db();
-    logger.info("[ActiveDB] '{}'", db.name());
-
-    query_engine.execute(query.statement, db);
-
-    session.encoder.message_success();
-    session.encoder.write_map_header(1);
-
-    session.encoder.write_string("fields");
-    session.encoder.write_list_header(1);
-    session.encoder.write_string("name");
-
-    session.encoder.flush();
+    // auto &db = session.active_db();
+    // logger.info("[ActiveDB] '{}'", db.name());
+    // query_engine.execute(query.statement, db);
+    
+    session.output_stream._write_test();
 }
 
 void Executor::pull_all(Session& session)
 {
     logger.trace("[PullAll]");
 
-    session.encoder.message_record();
-    session.encoder.write_list_header(1);
-    session.encoder.write_string(session.active_db().name());
-
-    session.encoder.message_record();
-    session.encoder.write_list_header(1);
-    session.encoder.write_string("buda");
-
-    session.encoder.message_record();
-    session.encoder.write_list_header(1);
-    session.encoder.write_string("domko");
-
-    session.encoder.message_record();
-    session.encoder.write_list_header(1);
-    session.encoder.write_string("max");
-
-    session.encoder.message_success_empty();
-
-    session.encoder.flush();
+    session.output_stream.flush();
 }
 
 void Executor::discard_all(Session& session)
 {
     logger.trace("[DiscardAll]");
 
-    session.encoder.message_success();
-    session.encoder.flush();
+    // TODO: discard state
+
+    session.output_stream.write_success();
+    session.output_stream.flush();
 }
 
 }
diff --git a/src/bolt/v1/states/init.cpp b/src/bolt/v1/states/init.cpp
index 2efe1fabe..33568f783 100644
--- a/src/bolt/v1/states/init.cpp
+++ b/src/bolt/v1/states/init.cpp
@@ -45,8 +45,8 @@ State* Init::execute(Session& session, Message& message)
 {
     logger.debug("Client connected '{}'", message.client_name);
 
-    session.encoder.message_success_empty();
-    session.encoder.flush();
+    session.output_stream.write_success_empty();
+    session.output_stream.flush();
 
     return session.bolt.states.executor.get();
 }
diff --git a/src/examples/bolt_py_client/initial_test.py b/src/examples/bolt_py_client/initial_test.py
index 300143edd..1cd436e86 100644
--- a/src/examples/bolt_py_client/initial_test.py
+++ b/src/examples/bolt_py_client/initial_test.py
@@ -5,8 +5,9 @@ driver = GraphDatabase.driver("bolt://localhost",
                               encrypted=0)
 
 session = driver.session()
-session.run("CREATE (a:Person {age:25})")
-# result = session.run("MATCH (a:Person) RETURN a.age AS age")
+# session.run("CREATE (a:Person {age:25})")
+result = session.run("MATCH (a:Person) RETURN a.name AS name")
+
 for record in result:
-    print(record["age"])
+    print(record["name"])
     session.close()