From d04cf101b46eb050aef4f7d3ef28da96f09028aa Mon Sep 17 00:00:00 2001
From: Marko Budiselic <mbudiselicbuda@gmail.com>
Date: Sun, 31 Jul 2016 18:58:12 +0100
Subject: [PATCH 1/2] Cypher CRUD support is becomes bigger and bigger.
 Implemented: create relationship, match relationship and update relationship

---
 CMakeLists.txt                                | 48 +++++++++---
 src/memgraph_bolt.cpp                         | 17 ++++
 src/{memgraph.cpp => memgraph_http.cpp}       |  0
 .../code_generator/handlers/create.hpp        | 77 +++++++++++++++++--
 .../code_generator/handlers/match.hpp         |  4 +-
 .../code_generator/handlers/set.hpp           | 15 ++--
 .../code_generator/query_action_data.hpp      |  2 +-
 .../code_generator/structures.hpp             |  2 +-
 src/query_engine/exceptions/errors.hpp        |  7 ++
 src/query_engine/hardcode/queries.hpp         | 33 ++++++++
 src/query_engine/traverser/code.hpp           | 28 ++++---
 src/query_engine/traverser/cpp_traverser.hpp  | 50 +++++++-----
 12 files changed, 222 insertions(+), 61 deletions(-)
 create mode 100644 src/memgraph_bolt.cpp
 rename src/{memgraph.cpp => memgraph_http.cpp} (100%)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index faf0da662..1337a7772 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -9,8 +9,11 @@ project(${ProjectId})
 
 find_package(Threads REQUIRED)
 
+# flags
 # c++14
 set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++1y")
+# glibcxx debug
+# set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -D_GLIBCXX_DEBUG")
 
 # functions
 
@@ -175,14 +178,13 @@ EXECUTE_PROCESS(
 )
 
 # # memgraph executable
-# add_executable(memgraph src/memgraph.cpp)
-# add_dependencies(memgraph cypher_lib)
-# # memgraph link libraries
-# target_link_libraries(memgraph Threads::Threads)
-# target_link_libraries(memgraph pcre)
-# target_link_libraries(memgraph ${libuv_static_lib})
-# target_link_libraries(memgraph ${r3_static_lib})
-# target_link_libraries(memgraph ${http_parser_static_lib})
+# add_executable(memgraph_http src/memgraph.cpp)
+# add_dependencies(memgraph_http cypher_lib)
+# target_link_libraries(memgraph_http Threads::Threads)
+# target_link_libraries(memgraph_http pcre)
+# target_link_libraries(memgraph_http ${libuv_static_lib})
+# target_link_libraries(memgraph_http ${r3_static_lib})
+# target_link_libraries(memgraph_http ${http_parser_static_lib})
 
 # # query_engine executable
 # add_executable(query_engine src/query_engine/main_query_engine.cpp)
@@ -214,11 +216,33 @@ set(memgraph_src_files
     ${src_dir}/transactions/transaction.cpp
     ${src_dir}/template_engine/engine.cpp
 )
-
-# hard coded implementation of queries
-# add_executable(queries src/query_engine/main_queries.cpp ${memgraph_src_files})
-# target_link_libraries(queries ${fmt_static_lib})
+add_library(libmemgraph ${memgraph_src_files})
 
 # tests
 enable_testing()
 add_subdirectory(tests)
+
+# memgraph build name
+execute_process(
+    OUTPUT_VARIABLE COMMIT_NO 
+    COMMAND git rev-list --count HEAD
+)
+execute_process(
+    OUTPUT_VARIABLE COMMIT_HASH
+    COMMAND git rev-parse --short HEAD
+)
+string(STRIP ${COMMIT_HASH} COMMIT_HASH)
+string(STRIP ${COMMIT_NO} COMMIT_NO)
+set(MEMGRAPH_BUILD_NAME "memgraph_${COMMIT_HASH}_${COMMIT_NO}")
+
+# DEBUG BUILD
+add_executable(${MEMGRAPH_BUILD_NAME}_debug ${src_dir}/memgraph_bolt.cpp)
+
+# TEST BUILD
+# TODO
+
+# O-OPTIMIZED BUILD
+# TODO
+
+# RELEASE BUILD
+# TODO
diff --git a/src/memgraph_bolt.cpp b/src/memgraph_bolt.cpp
new file mode 100644
index 000000000..fb59219de
--- /dev/null
+++ b/src/memgraph_bolt.cpp
@@ -0,0 +1,17 @@
+#include "utils/terminate_handler.hpp"
+
+int main(int argc, char *argv[])
+{
+    if (argc < 2) {
+        std::cout << "Port not defined" << std::endl;
+        std::exit(0);
+    }
+
+    auto port = std::stoi(argv[1]);
+
+    std::cout << "Port is: " << port << std::endl;
+
+    std::set_terminate(&terminate_handler);
+
+    return 0;
+}
diff --git a/src/memgraph.cpp b/src/memgraph_http.cpp
similarity index 100%
rename from src/memgraph.cpp
rename to src/memgraph_http.cpp
diff --git a/src/query_engine/code_generator/handlers/create.hpp b/src/query_engine/code_generator/handlers/create.hpp
index 29bd1e614..6863aa500 100644
--- a/src/query_engine/code_generator/handlers/create.hpp
+++ b/src/query_engine/code_generator/handlers/create.hpp
@@ -2,6 +2,23 @@
 
 #include "query_engine/code_generator/handlers/includes.hpp"
 
+using Direction = RelationshipData::Direction;
+
+auto update_properties(const QueryActionData &action_data,
+                       const std::string &name)
+{
+    std::string code = "";
+
+    auto entity_data = action_data.get_entity_property(name);
+    for (auto &property : entity_data.properties) {
+        auto index =
+            action_data.parameter_index.at(ParameterIndexKey(name, property));
+        code += LINE(fmt::format(code::set_property, name, property, index));
+    }
+
+    return code;
+}
+
 auto create_query_action =
     [](CypherStateData &cypher_data,
        const QueryActionData &action_data) -> std::string {
@@ -11,26 +28,72 @@ auto create_query_action =
     for (auto const &kv : action_data.actions) {
 
         if (kv.second == ClauseAction::CreateNode) {
-            // 1. create node 2. update labels 3. update properties
+            // create node
             auto &name = kv.first;
             code += LINE(fmt::format(code::create_vertex, name));
+
+            // update properties
+            code += update_properties(action_data, name);
+
+            // update labels
             auto entity_data = action_data.get_entity_property(name);
-            for (auto &property : entity_data.properties) {
-                auto index = action_data.parameter_index.at(
-                    ParameterIndexKey(name, property));
-                code += LINE(fmt::format(code::set_vertex_property, name,
-                                         property, index));
-            }
             for (auto &label : entity_data.tags) {
                 code += LINE(fmt::format(code::create_label, label));
                 code += LINE(fmt::format(code::add_label, name, label));
             }
+
+            // mark node as created
             cypher_data.node_created(name);
         }
 
         if (kv.second == ClauseAction::CreateRelationship) {
+            // create relationship
             auto name = kv.first;
             code += LINE(fmt::format(code::create_edge, name));
+
+            // update properties
+            code += update_properties(action_data, name);
+
+            // update tag
+            auto entity_data = action_data.get_entity_property(name);
+            for (auto &tag : entity_data.tags) {
+                code += LINE(fmt::format(code::find_type, tag));
+                code += LINE(fmt::format(code::set_type, name, tag));
+            }
+
+            // find start and end node
+            auto &relationships_data = action_data.relationship_data;
+            if (relationships_data.find(name) == relationships_data.end())
+                throw CodeGenerationError("Unable to find data for: " + name);
+            auto &relationship_data = relationships_data.at(name);
+            auto left_node = relationship_data.nodes.first;
+            auto right_node = relationship_data.nodes.second;
+
+            // TODO: If node isn't already matched or created it has to be
+            // created here. It is not possible for now.
+            if (cypher_data.status(left_node) != EntityStatus::Matched) {
+                throw SemanticError("Create Relationship: node " + left_node +
+                                    " can't be found");
+            }
+            if (cypher_data.status(right_node) != EntityStatus::Matched) {
+                throw SemanticError("Create Relationship: node " + right_node +
+                                    " can't be found");
+            }
+
+            // define direction
+            if (relationship_data.direction == Direction::Right) {
+                code += LINE(fmt::format(code::node_out, left_node, name));
+                code += LINE(fmt::format(code::node_in, right_node, name));
+                code += LINE(fmt::format(code::edge_from, name, left_node));
+                code += LINE(fmt::format(code::edge_to, name, right_node));
+            } else if (relationship_data.direction == Direction::Left) {
+                code += LINE(fmt::format(code::node_out, right_node, name));
+                code += LINE(fmt::format(code::node_in, left_node, name));
+                code += LINE(fmt::format(code::edge_from, name, right_node));
+                code += LINE(fmt::format(code::edge_to, name, left_node));
+            }
+
+            // mark relationship as created
             cypher_data.relationship_created(name);
         }
     }
diff --git a/src/query_engine/code_generator/handlers/match.hpp b/src/query_engine/code_generator/handlers/match.hpp
index 8b9e16603..47fa263e5 100644
--- a/src/query_engine/code_generator/handlers/match.hpp
+++ b/src/query_engine/code_generator/handlers/match.hpp
@@ -31,12 +31,12 @@ auto match_query_action =
         if (kv.second == ClauseAction::MatchNode) {
             auto name = kv.first;
             if (already_matched(cypher_data, name, EntityType::Node)) continue;
+            cypher_data.node_matched(name);
             auto place = action_data.csm.min(kv.first);
             if (place == entity_search::search_internal_id) {
                 auto index = fetch_internal_index(action_data, name);
                 code +=
                     LINE(fmt::format(code::match_vertex_by_id, name, index));
-                cypher_data.node_matched(name);
             }
         }
 
@@ -45,11 +45,11 @@ auto match_query_action =
             auto name = kv.first;
             if (already_matched(cypher_data, name, EntityType::Relationship))
                 continue;
+            cypher_data.relationship_matched(name);
             auto place = action_data.csm.min(kv.first);
             if (place == entity_search::search_internal_id) {
                 auto index = fetch_internal_index(action_data, name);
                 code += LINE(fmt::format(code::match_edge_by_id, name, index));
-                cypher_data.relationship_matched(name);
             }
         }
     }
diff --git a/src/query_engine/code_generator/handlers/set.hpp b/src/query_engine/code_generator/handlers/set.hpp
index bce2cfa73..e1a582e42 100644
--- a/src/query_engine/code_generator/handlers/set.hpp
+++ b/src/query_engine/code_generator/handlers/set.hpp
@@ -9,16 +9,17 @@ auto set_query_action = [](CypherStateData &cypher_data,
 
     for (auto const &kv : action_data.actions) {
         auto name = kv.first;
+
         if (kv.second == ClauseAction::UpdateNode &&
             cypher_data.status(name) == EntityStatus::Matched &&
             cypher_data.type(name) == EntityType::Node) {
-            auto entity_data = action_data.get_entity_property(name);
-            for (auto &property : entity_data.properties) {
-                auto index = action_data.parameter_index.at(
-                    ParameterIndexKey(name, property));
-                code += LINE(
-                    fmt::format(code::update_property, name, property, index));
-            }
+            code += update_properties(action_data, name);
+        }
+
+        if (kv.second == ClauseAction::UpdateRelationship &&
+            cypher_data.status(name) == EntityStatus::Matched &&
+            cypher_data.type(name) == EntityType::Relationship) {
+            code += update_properties(action_data, name);
         }
     }
 
diff --git a/src/query_engine/code_generator/query_action_data.hpp b/src/query_engine/code_generator/query_action_data.hpp
index abcb805d3..7d45b9b01 100644
--- a/src/query_engine/code_generator/query_action_data.hpp
+++ b/src/query_engine/code_generator/query_action_data.hpp
@@ -76,7 +76,7 @@ struct ParameterIndexKey
     }
 };
 
-struct RelationshipData : public EntityData
+struct RelationshipData
 {
     enum class Direction
     {
diff --git a/src/query_engine/code_generator/structures.hpp b/src/query_engine/code_generator/structures.hpp
index b4884c72b..f24247b94 100644
--- a/src/query_engine/code_generator/structures.hpp
+++ b/src/query_engine/code_generator/structures.hpp
@@ -14,6 +14,6 @@ public:
     {
         runtime_assert(this->size() > 1, "Array size shoud be bigger than 1");
 
-        return std::make_pair(*(this->end() - 2), *(this->end() - 1));
+        return std::make_pair(*(this->end() - 1), *(this->end() - 2));
     }
 };
diff --git a/src/query_engine/exceptions/errors.hpp b/src/query_engine/exceptions/errors.hpp
index 70f6d2ddb..8eb87318f 100644
--- a/src/query_engine/exceptions/errors.hpp
+++ b/src/query_engine/exceptions/errors.hpp
@@ -10,3 +10,10 @@ public:
     SemanticError(const std::string& what) :
         BasicException("Semantic error: " + what) {}
 };
+
+class CodeGenerationError : public BasicException
+{
+public:
+    CodeGenerationError(const std::string& what) :
+        BasicException("Code Generation error: " + what) {}
+};
diff --git a/src/query_engine/hardcode/queries.hpp b/src/query_engine/hardcode/queries.hpp
index 17fe9080f..c898f1350 100644
--- a/src/query_engine/hardcode/queries.hpp
+++ b/src/query_engine/hardcode/queries.hpp
@@ -16,6 +16,16 @@ auto load_queries(Db& db)
 {
     std::map<uint64_t, std::function<bool(const properties_t &)>> queries;
 
+    // CREATE (n {prop: 0}) RETURN n)
+    auto create_node = [&db](const properties_t &args) {
+        auto &t = db.tx_engine.begin();
+        auto vertex_accessor = db.graph.vertices.insert(t);
+        vertex_accessor.property("prop", args[0]);
+        t.commit();
+        return true;
+    };
+    queries[11597417457737499503u] = create_node;
+
     auto create_labeled_and_named_node = [&db](const properties_t &args) {
         auto &t = db.tx_engine.begin();
         auto vertex_accessor = db.graph.vertices.insert(t);
@@ -137,6 +147,28 @@ auto load_queries(Db& db)
         return true;
     };
 
+    // MATCH (n1), (n2) WHERE ID(n1)=0 AND ID(n2)=1 CREATE (n1)<-[r:IS {age: 25, weight: 70}]-(n2) RETURN r]
+    auto create_edge_v2 = [&db](const properties_t &args)
+    {
+        auto& t = db.tx_engine.begin();
+        auto n1 = db.graph.vertices.find(t, args[0]->as<Int64>().value);
+        if (!n1) return t.commit(), false;
+        auto n2 = db.graph.vertices.find(t, args[1]->as<Int64>().value);
+        if (!n2) return t.commit(), false;
+        auto r = db.graph.edges.insert(t);
+        r.property("age", args[2]);
+        r.property("weight", args[3]);
+        auto &IS = db.graph.edge_type_store.find_or_create("IS");
+        r.edge_type(IS);
+        n2.vlist->update(t)->data.out.add(r.vlist);
+        n1.vlist->update(t)->data.in.add(r.vlist);
+        r.from(n2.vlist);
+        r.to(n1.vlist);
+        t.commit();
+        return true;
+    };
+    queries[15648836733456301916u] = create_edge_v2;
+
     queries[10597108978382323595u] = create_account;
     queries[5397556489557792025u] = create_labeled_and_named_node;
     queries[7939106225150551899u] = create_edge;
@@ -146,5 +178,6 @@ auto load_queries(Db& db)
     queries[6813335159006269041u] = update_node;
     queries[4857652843629217005u] = find_by_label;
 
+
     return queries;
 }
diff --git a/src/query_engine/traverser/code.hpp b/src/query_engine/traverser/code.hpp
index f601b7e88..d7ad495b1 100644
--- a/src/query_engine/traverser/code.hpp
+++ b/src/query_engine/traverser/code.hpp
@@ -20,15 +20,24 @@ const std::string transaction_begin = "auto& t = db.tx_engine.begin();";
 
 const std::string transaction_commit = "t.commit();";
 
+const std::string set_property = "{}.property(\"{}\", args[{}]);";
+
 // create vertex e.g. CREATE (n:PERSON {name: "Test", age: 23})
-const std::string create_vertex =
-    "auto {} = db.graph.vertices.insert(t);";
-const std::string set_vertex_property =
-    "{}.property(\"{}\", args[{}]);";
+const std::string create_vertex = "auto {} = db.graph.vertices.insert(t);";
 const std::string create_label =
     "auto &{0} = db.graph.label_store.find_or_create(\"{0}\");";
 const std::string add_label = "{}.add_label({});";
 
+// create edge e.g CREATE (n1)-[r:COST {cost: 100}]->(n2)
+const std::string create_edge = "auto {} = db.graph.edges.insert(t);";
+const std::string find_type =
+    "auto &{0} = db.graph.edge_type_store.find_or_create(\"{0}\");";
+const std::string set_type = "{}.edge_type({});";
+const std::string node_out = "{}.vlist->update(t)->data.out.add({}.vlist);";
+const std::string node_in = "{}.vlist->update(t)->data.in.add({}.vlist);"; 
+const std::string edge_from = "{}.from({}.vlist);";
+const std::string edge_to = "{}.to({}.vlist);";
+
 const std::string args_id = "auto id = args[{}]->as<Int32>();";
 
 const std::string vertex_accessor_args_id =
@@ -36,21 +45,16 @@ const std::string vertex_accessor_args_id =
 
 const std::string match_vertex_by_id =
     "auto {0} = db.graph.vertices.find(t, args[{1}]->as<Int64>().value);\n"
-    "if (!{0}) return t.commit(), std::make_shared<QueryResult>();";
+    "        if (!{0}) return t.commit(), std::make_shared<QueryResult>();";
 const std::string match_edge_by_id =
     "auto {0} = db.graph.edges.find(t, args[{1}]->as<Int64>().value);\n"
-    "if (!{0}) return t.commit(), std::make_shared<QueryResult>();";
-
-const std::string create_edge =
-    "auto {} = db.graph.edges.insert(t);";
+    "        if (!{0}) return t.commit(), std::make_shared<QueryResult>();";
 
 
 const std::string return_empty_result =
     "return std::make_shared<QueryResult>();";
 
-const std::string update_property =
-    "{}.property(\"{}\", args[{}]);";
-
+const std::string update_property = "{}.property(\"{}\", args[{}]);";
 
 const std::string todo = "// TODO: {}";
 
diff --git a/src/query_engine/traverser/cpp_traverser.hpp b/src/query_engine/traverser/cpp_traverser.hpp
index 8eae1235f..7c14bb278 100644
--- a/src/query_engine/traverser/cpp_traverser.hpp
+++ b/src/query_engine/traverser/cpp_traverser.hpp
@@ -3,6 +3,7 @@
 #include <string>
 
 #include "cypher/visitor/traverser.hpp"
+
 #include "query_engine/code_generator/cpp_generator.hpp"
 #include "query_engine/code_generator/entity_search.hpp"
 #include "query_engine/code_generator/structures.hpp"
@@ -209,10 +210,9 @@ public:
     {
         // TODO: Is that traversal order OK for all cases? Probably NOT.
         if (ast_pattern.has_next()) {
-            Traverser::visit(*(ast_pattern.next));
-            if (ast_pattern.has_node()) Traverser::visit(*(ast_pattern.node));
-            if (ast_pattern.has_relationship())
-                Traverser::visit(*(ast_pattern.relationship));
+            visit(*ast_pattern.next);
+            visit(*ast_pattern.node);
+            visit(*ast_pattern.relationship);
         } else {
             Traverser::visit(ast_pattern);
         }
@@ -252,7 +252,7 @@ public:
         if (!internal_id_expr.has_id()) return;
 
         auto name = internal_id_expr.entity_name();
-        // because entity_id will be value index inside the parameters array
+        // because entity_id value will be index inside the parameters array
         auto index = internal_id_expr.entity_id();
 
         auto &data = generator.action_data();
@@ -275,19 +275,11 @@ public:
     void visit(ast::Relationship &ast_relationship) override
     {
         auto &cypher_data = generator.cypher_data();
+        auto &action_data = generator.action_data();
 
-        if (ast_relationship.has_name()) entity = ast_relationship.name();
-
-        // TODO: simplify somehow
-        if (state == CypherState::Create) {
-            if (ast_relationship.has_name()) {
-                auto name = ast_relationship.name();
-                if (!cypher_data.exist(name)) {
-                    clause_action = ClauseAction::CreateRelationship;
-                    create_relationship(name);
-                }
-            }
-        }
+        if (!ast_relationship.has_name()) 
+            return;
+        entity = ast_relationship.name();
 
         using ast_direction = ast::Relationship::Direction;
         using generator_direction = RelationshipData::Direction;
@@ -298,9 +290,23 @@ public:
         if (ast_relationship.direction == ast_direction::Right)
             direction = generator_direction::Right;
 
-        Traverser::visit(ast_relationship);
-
         // TODO: add suport for Direction::Both
+
+        // TODO: simplify somehow
+        if (state == CypherState::Create) {
+            if (!cypher_data.exist(entity)) {
+                clause_action = ClauseAction::CreateRelationship;
+                create_relationship(entity);
+            }
+        }
+
+        if (state == CypherState::Match) {
+            if (!cypher_data.exist(entity)) {
+                action_data.actions[entity] = ClauseAction::MatchRelationship;
+            }
+        }
+
+        Traverser::visit(ast_relationship);
     }
 
     void visit(ast::RelationshipSpecs &ast_relationship_specs) override
@@ -317,6 +323,12 @@ public:
             }
         }
 
+        if (state == CypherState::Create) {
+            if (ast_relationship_specs.has_identifier()) {
+                entity = ast_relationship_specs.name();
+            }
+        }
+
         Traverser::visit(ast_relationship_specs);
     }
 

From eda9d67bbe38e2bd3960ff4ae2742639b8411eda Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Dominik=20Tomic=CC=8Cevic=CC=81?=
 <dominik.tomicevic@gmail.com>
Date: Mon, 1 Aug 2016 22:14:09 +0100
Subject: [PATCH 2/2] implemented bolt protocol

---
 src/bolt/v1/bolt.cpp                      |  26 ++
 src/bolt/v1/bolt.hpp                      |  24 ++
 src/bolt/v1/messaging/codes.hpp           |  46 ++++
 src/bolt/v1/messaging/messages.hpp        |   8 +
 src/bolt/v1/packing/codes.hpp             |  54 +++++
 src/bolt/v1/packing/types.hpp             |  21 ++
 src/bolt/v1/server/server.hpp             |  67 ++++++
 src/bolt/v1/server/worker.hpp             | 112 +++++++++
 src/bolt/v1/session.cpp                   |  54 +++++
 src/bolt/v1/session.hpp                   |  42 ++++
 src/bolt/v1/states.cpp                    |  17 ++
 src/bolt/v1/states.hpp                    |  19 ++
 src/bolt/v1/states/error.cpp              |  32 +++
 src/bolt/v1/states/error.hpp              |  14 ++
 src/bolt/v1/states/executor.cpp           |  95 ++++++++
 src/bolt/v1/states/executor.hpp           |  41 ++++
 src/bolt/v1/states/handshake.cpp          |  27 +++
 src/bolt/v1/states/handshake.hpp          |  14 ++
 src/bolt/v1/states/init.cpp               |  54 +++++
 src/bolt/v1/states/init.hpp               |  22 ++
 src/bolt/v1/states/message_parser.hpp     |  37 +++
 src/bolt/v1/states/state.hpp              |  23 ++
 src/bolt/v1/transport/bolt_decoder.cpp    | 158 ++++++++++++
 src/bolt/v1/transport/bolt_decoder.hpp    |  45 ++++
 src/bolt/v1/transport/bolt_encoder.cpp    |   1 +
 src/bolt/v1/transport/bolt_encoder.hpp    | 243 +++++++++++++++++++
 src/bolt/v1/transport/buffer.cpp          |  16 ++
 src/bolt/v1/transport/buffer.hpp          |  39 +++
 src/bolt/v1/transport/chunked_decoder.hpp |  77 ++++++
 src/bolt/v1/transport/chunked_encoder.hpp |  92 +++++++
 src/bolt/v1/transport/socket_stream.hpp   |  38 +++
 src/bolt/v1/transport/stream_error.hpp    |  14 ++
 src/cypher/debug/tree_print.hpp           |   8 +-
 src/cypher/lexertl                        |   1 +
 src/dbms/dbms.hpp                         |   8 +
 src/dbms/server/bolt.hpp                  |   9 +
 src/examples/bolt.cpp                     |  68 ++++++
 src/examples/compile-bolt.sh              |   3 +
 src/examples/endinan.cpp                  |  81 +++++++
 src/io/network/Makefile                   |  17 --
 src/io/network/epoll-example.c            | 281 ----------------------
 src/io/network/epoll.hpp                  |  16 +-
 src/io/network/event_listener.hpp         |  36 +--
 src/io/network/network_error.hpp          |  10 +-
 src/io/network/pps.sh                     |  27 ---
 src/io/network/secure_socket.hpp          |  93 +++++++
 src/io/network/secure_stream_reader.hpp   |  61 +++++
 src/io/network/server.hpp                 |  50 ++--
 src/io/network/socket.hpp                 |  48 +++-
 src/io/network/stream_dispatcher.hpp      |  13 +
 src/io/network/stream_listener.hpp        |  43 ++++
 src/io/network/stream_reader.hpp          |  39 ++-
 src/io/network/tcp/stream.hpp             |   6 +-
 src/io/network/tcp_server.hpp             |  94 --------
 src/io/network/tls.cpp                    |  55 +++++
 src/io/network/tls.hpp                    |  33 +++
 src/io/network/tls_error.hpp              |  14 ++
 src/io/network/worker.hpp                 |  91 -------
 src/logging/default.cpp                   |  33 +++
 src/logging/default.hpp                   |  22 ++
 src/logging/log.cpp                       |   2 +-
 src/logging/logger.hpp                    |  27 ++-
 src/logging/streams/stdout.cpp            |  13 +-
 src/mvcc/id.hpp                           |   2 +-
 src/speedy/rapidjson                      |   1 +
 src/utils/bswap.hpp                       |  45 ++++
 src/utils/datetime/timestamp.hpp          |   6 +-
 src/utils/exceptions/basic_exception.hpp  |  14 +-
 src/utils/string/weak_string.hpp          |   1 -
 src/utils/types/byte.hpp                  |   5 +
 tests/CMakeLists.txt                      |  30 +--
 tests/unit/chunked_decoder.cpp            |  62 +++++
 tests/unit/chunked_encoder.cpp            | 115 +++++++++
 73 files changed, 2535 insertions(+), 620 deletions(-)
 create mode 100644 src/bolt/v1/bolt.cpp
 create mode 100644 src/bolt/v1/bolt.hpp
 create mode 100644 src/bolt/v1/messaging/codes.hpp
 create mode 100644 src/bolt/v1/messaging/messages.hpp
 create mode 100644 src/bolt/v1/packing/codes.hpp
 create mode 100644 src/bolt/v1/packing/types.hpp
 create mode 100644 src/bolt/v1/server/server.hpp
 create mode 100644 src/bolt/v1/server/worker.hpp
 create mode 100644 src/bolt/v1/session.cpp
 create mode 100644 src/bolt/v1/session.hpp
 create mode 100644 src/bolt/v1/states.cpp
 create mode 100644 src/bolt/v1/states.hpp
 create mode 100644 src/bolt/v1/states/error.cpp
 create mode 100644 src/bolt/v1/states/error.hpp
 create mode 100644 src/bolt/v1/states/executor.cpp
 create mode 100644 src/bolt/v1/states/executor.hpp
 create mode 100644 src/bolt/v1/states/handshake.cpp
 create mode 100644 src/bolt/v1/states/handshake.hpp
 create mode 100644 src/bolt/v1/states/init.cpp
 create mode 100644 src/bolt/v1/states/init.hpp
 create mode 100644 src/bolt/v1/states/message_parser.hpp
 create mode 100644 src/bolt/v1/states/state.hpp
 create mode 100644 src/bolt/v1/transport/bolt_decoder.cpp
 create mode 100644 src/bolt/v1/transport/bolt_decoder.hpp
 create mode 100644 src/bolt/v1/transport/bolt_encoder.cpp
 create mode 100644 src/bolt/v1/transport/bolt_encoder.hpp
 create mode 100644 src/bolt/v1/transport/buffer.cpp
 create mode 100644 src/bolt/v1/transport/buffer.hpp
 create mode 100644 src/bolt/v1/transport/chunked_decoder.hpp
 create mode 100644 src/bolt/v1/transport/chunked_encoder.hpp
 create mode 100644 src/bolt/v1/transport/socket_stream.hpp
 create mode 100644 src/bolt/v1/transport/stream_error.hpp
 create mode 160000 src/cypher/lexertl
 create mode 100644 src/dbms/dbms.hpp
 create mode 100644 src/dbms/server/bolt.hpp
 create mode 100644 src/examples/bolt.cpp
 create mode 100644 src/examples/compile-bolt.sh
 create mode 100644 src/examples/endinan.cpp
 delete mode 100644 src/io/network/Makefile
 delete mode 100644 src/io/network/epoll-example.c
 delete mode 100755 src/io/network/pps.sh
 create mode 100644 src/io/network/secure_socket.hpp
 create mode 100644 src/io/network/secure_stream_reader.hpp
 create mode 100644 src/io/network/stream_dispatcher.hpp
 create mode 100644 src/io/network/stream_listener.hpp
 delete mode 100644 src/io/network/tcp_server.hpp
 create mode 100644 src/io/network/tls.cpp
 create mode 100644 src/io/network/tls.hpp
 create mode 100644 src/io/network/tls_error.hpp
 delete mode 100644 src/io/network/worker.hpp
 create mode 100644 src/logging/default.cpp
 create mode 100644 src/logging/default.hpp
 create mode 160000 src/speedy/rapidjson
 create mode 100644 src/utils/bswap.hpp
 create mode 100644 src/utils/types/byte.hpp
 create mode 100644 tests/unit/chunked_decoder.cpp
 create mode 100644 tests/unit/chunked_encoder.cpp

diff --git a/src/bolt/v1/bolt.cpp b/src/bolt/v1/bolt.cpp
new file mode 100644
index 000000000..77dbe6d18
--- /dev/null
+++ b/src/bolt/v1/bolt.cpp
@@ -0,0 +1,26 @@
+#include "bolt.hpp"
+
+#include "session.hpp"
+#include <iostream>
+
+namespace bolt
+{
+
+Bolt::Bolt()
+{
+}
+
+Session* Bolt::create_session(io::Socket&& socket)
+{
+    // TODO fix session lifecycle handling
+    // dangling pointers are not cool :)
+
+    return new Session(std::forward<io::Socket>(socket), *this);
+}
+
+void Bolt::close(Session* session)
+{
+    session->socket.close();
+}
+
+}
diff --git a/src/bolt/v1/bolt.hpp b/src/bolt/v1/bolt.hpp
new file mode 100644
index 000000000..3b42a200e
--- /dev/null
+++ b/src/bolt/v1/bolt.hpp
@@ -0,0 +1,24 @@
+#pragma once
+
+#include "states.hpp"
+#include "io/network/socket.hpp"
+
+namespace bolt
+{
+
+class Session;
+
+class Bolt
+{
+    friend class Session;
+
+public:
+    Bolt();
+
+    Session* create_session(io::Socket&& socket);
+    void close(Session* session);
+
+    States states;
+};
+
+}
diff --git a/src/bolt/v1/messaging/codes.hpp b/src/bolt/v1/messaging/codes.hpp
new file mode 100644
index 000000000..19083199c
--- /dev/null
+++ b/src/bolt/v1/messaging/codes.hpp
@@ -0,0 +1,46 @@
+#pragma once
+
+#include "utils/types/byte.hpp"
+#include "utils/underlying_cast.hpp"
+
+namespace bolt
+{
+
+enum class MessageCode : byte
+{
+    Init       = 0x01,
+	AckFailure = 0x0E,
+	Reset      = 0x0F,
+
+	Run        = 0x10,
+	DiscardAll = 0x2F,
+	PullAll    = 0x3F,
+
+	Record     = 0x71,
+	Success    = 0x70,
+	Ignored    = 0x7E,
+	Failure    = 0x7F
+};
+
+inline bool operator==(byte value, MessageCode code)
+{
+    return value == underlying_cast(code);
+}
+
+inline bool operator==(MessageCode code, byte value)
+{
+    return operator==(value, code);
+}
+
+inline bool operator!=(byte value, MessageCode code)
+{
+    return !operator==(value, code);
+}
+
+inline bool operator!=(MessageCode code, byte value)
+{
+    return operator!=(value, code);
+}
+
+
+}
diff --git a/src/bolt/v1/messaging/messages.hpp b/src/bolt/v1/messaging/messages.hpp
new file mode 100644
index 000000000..854f53936
--- /dev/null
+++ b/src/bolt/v1/messaging/messages.hpp
@@ -0,0 +1,8 @@
+#pragma once
+
+namespace bolt
+{
+
+
+
+}
diff --git a/src/bolt/v1/packing/codes.hpp b/src/bolt/v1/packing/codes.hpp
new file mode 100644
index 000000000..f963efb8c
--- /dev/null
+++ b/src/bolt/v1/packing/codes.hpp
@@ -0,0 +1,54 @@
+#pragma once
+
+#include <cstdint>
+
+namespace bolt
+{
+
+namespace pack
+{
+
+enum Code : uint8_t
+{
+    TinyString  = 0x80,
+    TinyList    = 0x90,
+    TinyMap     = 0xA0,
+    TinyStruct  = 0xB0,
+
+    Null        = 0xC0,
+
+    Float64     = 0xC1,
+
+    False       = 0xC2,
+    True        = 0xC3,
+
+    Int8        = 0xC8,
+    Int16       = 0xC9,
+    Int32       = 0xCA,
+    Int64       = 0xCB,
+
+    Bytes8      = 0xCC,
+    Bytes16     = 0xCD,
+    Bytes32     = 0xCE,
+
+    String8     = 0xD0,
+    String16    = 0xD1,
+    String32    = 0xD2,
+
+    List8       = 0xD4,
+    List16      = 0xD5,
+    List32      = 0xD6,
+
+    Map8        = 0xD8,
+    Map16       = 0xD9,
+    Map32       = 0xDA,
+    MapStream   = 0xDB,
+
+    Struct8     = 0xDC,
+    Struct16    = 0xDD,
+    EndOfStream = 0xDF,
+};
+
+}
+
+}
diff --git a/src/bolt/v1/packing/types.hpp b/src/bolt/v1/packing/types.hpp
new file mode 100644
index 000000000..d840c3f42
--- /dev/null
+++ b/src/bolt/v1/packing/types.hpp
@@ -0,0 +1,21 @@
+#pragma once
+
+namespace bolt
+{
+
+enum class PackType
+{
+    Null,        // denotes absence of a value
+    Boolean,     // denotes a type with two possible values (t/f)
+    Integer,     // 64-bit signed integral number
+    Float,       // 64-bit floating point number
+    Bytes,       // binary data
+    String,      // unicode string
+    List,        // collection of values
+    Map,         // collection of zero or more key/value pairs
+    Struct,      // zero or more packstream values
+    EndOfStream, // denotes stream value end
+    Reserved     // reserved for future use
+};
+
+}
diff --git a/src/bolt/v1/server/server.hpp b/src/bolt/v1/server/server.hpp
new file mode 100644
index 000000000..ea61154aa
--- /dev/null
+++ b/src/bolt/v1/server/server.hpp
@@ -0,0 +1,67 @@
+#pragma once
+
+#include <vector>
+#include <memory>
+#include <thread>
+#include <atomic>
+#include <cassert>
+
+#include "io/network/server.hpp"
+#include "bolt/v1/bolt.hpp"
+
+namespace bolt
+{
+
+template <class Worker>
+class Server : public io::Server<Server<Worker>>
+{
+public:
+    Server(io::Socket&& socket)
+        : io::Server<Server<Worker>>(std::forward<io::Socket>(socket)) {}
+
+    void start(size_t n)
+    {
+        workers.reserve(n);
+
+        for(size_t i = 0; i < n; ++i)
+        {
+            workers.push_back(std::make_shared<Worker>(bolt));
+            workers.back()->start(alive);
+        }
+
+        while(alive)
+        {
+            this->wait_and_process_events();
+        }
+    }
+
+    void shutdown()
+    {
+        alive.store(false);
+
+        for(auto& worker : workers)
+            worker->thread.join();
+    }
+
+    void on_connect()
+    {
+        assert(idx < workers.size());
+
+        if(UNLIKELY(!workers[idx]->accept(this->socket)))
+            return;
+
+        idx = idx == workers.size() - 1 ? 0 : idx + 1;
+    }
+
+    void on_wait_timeout() {}
+
+private:
+    Bolt bolt;
+
+    std::vector<typename Worker::sptr> workers;
+    std::atomic<bool> alive {true};
+
+    int idx {0};
+};
+
+}
diff --git a/src/bolt/v1/server/worker.hpp b/src/bolt/v1/server/worker.hpp
new file mode 100644
index 000000000..eb475e135
--- /dev/null
+++ b/src/bolt/v1/server/worker.hpp
@@ -0,0 +1,112 @@
+#pragma once
+
+#include <iomanip>
+#include <cstdio>
+#include <atomic>
+#include <sstream>
+
+#include <memory>
+#include <thread>
+
+#include "io/network/stream_reader.hpp"
+
+#include "bolt/v1/bolt.hpp"
+#include "bolt/v1/session.hpp"
+
+#include "logging/default.hpp"
+
+namespace bolt
+{
+
+template <class Worker>
+class Server;
+
+class Worker : public io::StreamReader<Worker, Session>
+{
+    friend class bolt::Server<Worker>;
+
+public:
+    using sptr = std::shared_ptr<Worker>;
+
+    Worker(Bolt& bolt) : bolt(bolt)
+    {
+        logger = logging::log->logger("Network");
+    }
+
+    Session& on_connect(io::Socket&& socket)
+    {
+        logger.trace("Accepting connection on socket {}", socket.id());
+
+        return *bolt.get().create_session(std::forward<io::Socket>(socket));
+    }
+
+    void on_error(Session&)
+    {
+        logger.trace("[on_error] errno = {}", errno);
+
+#ifndef NDEBUG
+        auto err = io::NetworkError("");
+        logger.debug("{}", err.what());
+#endif
+
+        logger.error("Error occured in this session");
+
+    }
+
+    void on_wait_timeout() {}
+
+    Buffer on_alloc(Session&)
+    {
+        /* logger.trace("[on_alloc] Allocating {}B", sizeof buf); */
+
+        return Buffer { buf, sizeof buf };
+    }
+
+    void on_read(Session& session, Buffer& buf)
+    {
+        logger.trace("[on_read] Received {}B", buf.len);
+
+#ifndef NDEBUG
+        std::stringstream stream;
+
+        for(size_t i = 0; i < buf.len; ++i)
+            stream << fmt::format("{:02X} ", static_cast<byte>(buf.ptr[i]));
+
+        logger.trace("[on_read] {}", stream.str());
+#endif
+
+        try
+        {
+            session.execute(reinterpret_cast<const byte*>(buf.ptr), buf.len);
+        }
+        catch(const std::exception& e)
+        {
+            logger.error("Error occured while executing statement.");
+            logger.error("{}", e.what());
+        }
+    }
+
+    void on_close(Session& session)
+    {
+        logger.trace("[on_close] Client closed the connection");
+        session.close();
+    }
+
+    char buf[65536];
+
+protected:
+    std::reference_wrapper<Bolt> bolt;
+
+    Logger logger;
+    std::thread thread;
+
+    void start(std::atomic<bool>& alive)
+    {
+        thread = std::thread([&, this]() {
+            while(alive)
+                wait_and_process_events();
+        });
+    }
+};
+
+}
diff --git a/src/bolt/v1/session.cpp b/src/bolt/v1/session.cpp
new file mode 100644
index 000000000..dd437cdf0
--- /dev/null
+++ b/src/bolt/v1/session.cpp
@@ -0,0 +1,54 @@
+#include "session.hpp"
+
+namespace bolt
+{
+
+Session::Session(io::Socket&& socket, Bolt& bolt)
+    : Stream(std::forward<io::Socket>(socket)), bolt(bolt)
+{
+    logger = logging::log->logger("Session");
+
+    // start with a handshake state
+    state = bolt.states.handshake.get();
+}
+
+bool Session::alive() const
+{
+    return state != nullptr;
+}
+
+void Session::execute(const byte* data, size_t len)
+{
+    // mark the end of the message
+    auto end = data + len;
+
+    while(true)
+    {
+        auto size = end - data;
+
+        if(LIKELY(connected))
+        {
+            logger.debug("Decoding chunk of size {}", size);
+            auto finished = decoder.decode(data, size);
+
+            if(!finished)
+                return;
+        }
+        else
+        {
+            logger.debug("Decoding handshake of size {}", size);
+            decoder.handshake(data, size);
+        }
+
+        state = state->run(*this);
+        decoder.reset();
+    }
+}
+
+void Session::close()
+{
+    logger.debug("Closing session");
+    bolt.close(this);
+}
+
+}
diff --git a/src/bolt/v1/session.hpp b/src/bolt/v1/session.hpp
new file mode 100644
index 000000000..9bfa48886
--- /dev/null
+++ b/src/bolt/v1/session.hpp
@@ -0,0 +1,42 @@
+#pragma once
+
+#include "io/network/tcp/stream.hpp"
+#include "io/network/socket.hpp"
+
+#include "bolt/v1/states/state.hpp"
+
+#include "bolt/v1/transport/bolt_decoder.hpp"
+#include "bolt/v1/transport/bolt_encoder.hpp"
+
+#include "bolt.hpp"
+#include "logging/default.hpp"
+
+namespace bolt
+{
+
+class Session : public io::tcp::Stream<io::Socket>
+{
+public:
+    using Decoder = BoltDecoder;
+    using Encoder = BoltEncoder<io::Socket>;
+
+    Session(io::Socket&& socket, Bolt& bolt);
+
+    bool alive() const;
+
+    void execute(const byte* data, size_t len);
+    void close();
+
+    Bolt& bolt;
+
+    Decoder decoder;
+    Encoder encoder {socket};
+
+    bool connected {false};
+    State* state;
+
+protected:
+    Logger logger;
+};
+
+}
diff --git a/src/bolt/v1/states.cpp b/src/bolt/v1/states.cpp
new file mode 100644
index 000000000..a7e0a9974
--- /dev/null
+++ b/src/bolt/v1/states.cpp
@@ -0,0 +1,17 @@
+#include "states.hpp"
+
+#include "states/handshake.hpp"
+#include "states/init.hpp"
+#include "states/executor.hpp"
+
+namespace bolt
+{
+
+States::States()
+{
+    handshake = std::make_unique<Handshake>();
+    init = std::make_unique<Init>();
+    executor = std::make_unique<Executor>();
+}
+
+}
diff --git a/src/bolt/v1/states.hpp b/src/bolt/v1/states.hpp
new file mode 100644
index 000000000..c323214d4
--- /dev/null
+++ b/src/bolt/v1/states.hpp
@@ -0,0 +1,19 @@
+#pragma once
+
+#include "states/state.hpp"
+#include "logging/log.hpp"
+
+namespace bolt
+{
+
+class States
+{
+public:
+    States();
+
+    State::uptr handshake;
+    State::uptr init;
+    State::uptr executor;
+};
+
+}
diff --git a/src/bolt/v1/states/error.cpp b/src/bolt/v1/states/error.cpp
new file mode 100644
index 000000000..c482a1d8f
--- /dev/null
+++ b/src/bolt/v1/states/error.cpp
@@ -0,0 +1,32 @@
+#include "error.hpp"
+
+#include "bolt/v1/session.hpp"
+
+namespace bolt
+{
+
+State* Error::run(Session& session)
+{
+    auto message_type = session.decoder.read_byte();
+
+    if(message_type == MessageCode::AckFailure)
+    {
+        // todo reset current statement? is it even necessary?
+
+        return session.bolt.states.executor.get();
+    }
+    else if(message_type == MessageCode::Reset)
+    {
+        // todo rollback current transaction
+        // discard all records waiting to be sent
+
+        return session.bolt.states.executor.get();
+    }
+
+    session.encoder.message_ignored();
+    session.encoder.flush();
+
+    return this;
+}
+
+}
diff --git a/src/bolt/v1/states/error.hpp b/src/bolt/v1/states/error.hpp
new file mode 100644
index 000000000..0ef5964bf
--- /dev/null
+++ b/src/bolt/v1/states/error.hpp
@@ -0,0 +1,14 @@
+#pragma once
+
+#include "bolt/v1/states/state.hpp"
+
+namespace bolt
+{
+
+class Error : public State
+{
+public:
+    State* run(Session& session) override;
+};
+
+}
diff --git a/src/bolt/v1/states/executor.cpp b/src/bolt/v1/states/executor.cpp
new file mode 100644
index 000000000..7306618ab
--- /dev/null
+++ b/src/bolt/v1/states/executor.cpp
@@ -0,0 +1,95 @@
+#include "executor.hpp"
+
+#include "bolt/v1/messaging/codes.hpp"
+
+namespace bolt
+{
+
+Executor::Executor() : logger(logging::log->logger("Executor")) {}
+
+State* Executor::run(Session& session)
+{
+    // just read one byte that represents the struct type, we can skip the
+    // information contained in this byte
+    session.decoder.read_byte();
+
+    auto message_type = session.decoder.read_byte();
+
+    if(message_type == MessageCode::Run)
+    {
+        Query q;
+
+        q.statement = session.decoder.read_string();
+
+        this->run(session, q);
+    }
+    else if(message_type == MessageCode::PullAll)
+    {
+        pull_all(session);
+    }
+    else if(message_type == MessageCode::DiscardAll)
+    {
+        discard_all(session);
+    }
+    else if(message_type == MessageCode::Reset)
+    {
+        // todo rollback current transaction
+        // discard all records waiting to be sent
+
+        return this;
+    }
+    else
+    {
+        logger.error("Unrecognized message recieved");
+        logger.debug("Invalid message type 0x{:02X}", message_type);
+
+        return session.bolt.states.error.get();
+    }
+
+    return this;
+}
+
+void Executor::run(Session& session, Query& query)
+{
+    logger.trace("[Run] '{}'", query.statement);
+
+    session.encoder.message_success();
+    session.encoder.write_map_header(1);
+
+    session.encoder.write_string("fields");
+    session.encoder.write_list_header(1);
+    session.encoder.write_string("name");
+
+    session.encoder.flush();
+}
+
+void Executor::pull_all(Session& session)
+{
+    logger.trace("[PullAll]");
+
+    session.encoder.message_record();
+    session.encoder.write_list_header(1);
+    session.encoder.write_string("buda");
+
+    session.encoder.message_record();
+    session.encoder.write_list_header(1);
+    session.encoder.write_string("domko");
+
+    session.encoder.message_record();
+    session.encoder.write_list_header(1);
+    session.encoder.write_string("max");
+
+    session.encoder.message_success_empty();
+
+    session.encoder.flush();
+}
+
+void Executor::discard_all(Session& session)
+{
+    logger.trace("[DiscardAll]");
+
+    session.encoder.message_success();
+    session.encoder.flush();
+}
+
+}
diff --git a/src/bolt/v1/states/executor.hpp b/src/bolt/v1/states/executor.hpp
new file mode 100644
index 000000000..5b1127a3a
--- /dev/null
+++ b/src/bolt/v1/states/executor.hpp
@@ -0,0 +1,41 @@
+#pragma once
+
+#include "bolt/v1/states/state.hpp"
+#include "bolt/v1/session.hpp"
+
+namespace bolt
+{
+
+class Executor : public State
+{
+    struct Query
+    {
+        std::string statement;
+    };
+
+public:
+    Executor();
+
+    State* run(Session& session) override final;
+
+protected:
+    Logger logger;
+
+    /* Execute an incoming query
+     *
+     */
+    void run(Session& session, Query& query);
+
+    /* Send all remaining results to the client
+     *
+     */
+    void pull_all(Session& session);
+
+    /* Discard all remaining results
+     *
+     */
+    void discard_all(Session& session);
+};
+
+}
+
diff --git a/src/bolt/v1/states/handshake.cpp b/src/bolt/v1/states/handshake.cpp
new file mode 100644
index 000000000..b685381d6
--- /dev/null
+++ b/src/bolt/v1/states/handshake.cpp
@@ -0,0 +1,27 @@
+#include "handshake.hpp"
+
+#include "bolt/v1/session.hpp"
+
+namespace bolt
+{
+
+static constexpr uint32_t preamble = 0x6060B017;
+
+static constexpr byte protocol[4] = {0x00, 0x00, 0x00, 0x01};
+
+State* Handshake::run(Session& session)
+{
+    if(UNLIKELY(session.decoder.read_uint32() != preamble))
+        return nullptr;
+
+    // TODO so far we only support version 1 of the protocol so it doesn't
+    // make sense to check which version the client prefers
+    // this will change in the future
+
+    session.connected = true;
+    session.socket.write(protocol, sizeof protocol);
+
+    return session.bolt.states.init.get();
+}
+
+}
diff --git a/src/bolt/v1/states/handshake.hpp b/src/bolt/v1/states/handshake.hpp
new file mode 100644
index 000000000..fe6d7052a
--- /dev/null
+++ b/src/bolt/v1/states/handshake.hpp
@@ -0,0 +1,14 @@
+#pragma once
+
+#include "bolt/v1/states/state.hpp"
+
+namespace bolt
+{
+
+class Handshake : public State
+{
+public:
+    State* run(Session& session) override;
+};
+
+}
diff --git a/src/bolt/v1/states/init.cpp b/src/bolt/v1/states/init.cpp
new file mode 100644
index 000000000..e1f30f690
--- /dev/null
+++ b/src/bolt/v1/states/init.cpp
@@ -0,0 +1,54 @@
+#include "init.hpp"
+
+#include "bolt/v1/session.hpp"
+#include "bolt/v1/messaging/codes.hpp"
+
+#include "utils/likely.hpp"
+
+namespace bolt
+{
+
+Init::Init() : MessageParser<Init>(logging::log->logger("Init")) {}
+
+State* Init::parse(Session& session, Message& message)
+{
+    auto struct_type = session.decoder.read_byte();
+
+    if(UNLIKELY(struct_type != 0xB2))
+    {
+        logger.debug("{}", struct_type);
+
+        logger.debug("Expected struct marker 0xB2 instead of 0x{:02X}",
+                     (unsigned)struct_type);
+
+        return nullptr;
+    }
+
+    auto message_type = session.decoder.read_byte();
+
+    if(UNLIKELY(message_type != MessageCode::Init))
+    {
+        logger.debug("Expected Init (0x01) instead of (0x{:02X})",
+                     (unsigned)message_type);
+
+        return nullptr;
+    }
+
+    message.client_name = session.decoder.read_string();
+
+    // TODO read authentication tokens
+
+    return this;
+}
+
+State* Init::execute(Session& session, Message& message)
+{
+    logger.debug("Client connected '{}'", message.client_name);
+
+    session.encoder.message_success_empty();
+    session.encoder.flush();
+
+    return session.bolt.states.executor.get();
+}
+
+}
diff --git a/src/bolt/v1/states/init.hpp b/src/bolt/v1/states/init.hpp
new file mode 100644
index 000000000..dce0c2b83
--- /dev/null
+++ b/src/bolt/v1/states/init.hpp
@@ -0,0 +1,22 @@
+#pragma once
+
+#include "bolt/v1/states/message_parser.hpp"
+
+namespace bolt
+{
+
+class Init : public MessageParser<Init>
+{
+public:
+    struct Message
+    {
+        std::string client_name;
+    };
+
+    Init();
+
+    State* parse(Session& session, Message& message);
+    State* execute(Session& session, Message& message);
+};
+
+}
diff --git a/src/bolt/v1/states/message_parser.hpp b/src/bolt/v1/states/message_parser.hpp
new file mode 100644
index 000000000..ff8b6952c
--- /dev/null
+++ b/src/bolt/v1/states/message_parser.hpp
@@ -0,0 +1,37 @@
+#pragma once
+
+#include "state.hpp"
+#include "utils/crtp.hpp"
+
+#include "bolt/v1/session.hpp"
+
+namespace bolt
+{
+
+template <class Derived>
+class MessageParser : public State, public Crtp<Derived>
+{
+public:
+    MessageParser(Logger&& logger)
+        : logger(std::forward<Logger>(logger)) {}
+
+    State* run(Session& session) override final
+    {
+        typename Derived::Message message;
+
+        logger.debug("Parsing message");
+        auto next = this->derived().parse(session, message);
+
+        // return next state if parsing was unsuccessful (i.e. error state)
+        if(next != &this->derived())
+            return next;
+
+        logger.debug("Executing state");
+        return this->derived().execute(session, message);
+    }
+
+protected:
+    Logger logger;
+};
+
+}
diff --git a/src/bolt/v1/states/state.hpp b/src/bolt/v1/states/state.hpp
new file mode 100644
index 000000000..50a38494c
--- /dev/null
+++ b/src/bolt/v1/states/state.hpp
@@ -0,0 +1,23 @@
+#pragma once
+
+#include <cstdlib>
+#include <cstdint>
+#include <memory>
+
+namespace bolt
+{
+
+class Session;
+
+class State
+{
+public:
+    using uptr = std::unique_ptr<State>;
+
+    State() = default;
+    virtual ~State() = default;
+
+    virtual State* run(Session& session) = 0;
+};
+
+}
diff --git a/src/bolt/v1/transport/bolt_decoder.cpp b/src/bolt/v1/transport/bolt_decoder.cpp
new file mode 100644
index 000000000..9e16c2264
--- /dev/null
+++ b/src/bolt/v1/transport/bolt_decoder.cpp
@@ -0,0 +1,158 @@
+#include "bolt_decoder.hpp"
+
+#include <iostream>
+
+#include "utils/bswap.hpp"
+#include "logging/default.hpp"
+
+#include "bolt/v1/packing/codes.hpp"
+
+namespace bolt
+{
+
+void BoltDecoder::handshake(const byte*& data, size_t len)
+{
+    buffer.write(data, len);
+    data += len;
+}
+
+bool BoltDecoder::decode(const byte*& data, size_t len)
+{
+    return decoder(data, len);
+}
+
+bool BoltDecoder::empty() const
+{
+    return pos == buffer.size();
+}
+
+void BoltDecoder::reset()
+{
+    buffer.clear();
+    pos = 0;
+}
+
+byte BoltDecoder::peek() const
+{
+    return buffer[pos];
+}
+
+byte BoltDecoder::read_byte()
+{
+    return buffer[pos++];
+}
+
+void BoltDecoder::read_bytes(void* dest, size_t n)
+{
+    std::memcpy(dest, buffer.data() + pos, n);
+    pos += n;
+}
+
+template <class T>
+T parse(const void* data)
+{
+    // reinterpret bytes as the target value
+    auto value = reinterpret_cast<const T*>(data);
+
+    // swap values to little endian
+    return bswap(*value);
+}
+
+template <class T>
+T parse(Buffer& buffer, size_t& pos)
+{
+    // get a pointer to the data we're converting
+    auto ptr = buffer.data() + pos;
+
+    // skip sizeof bytes that we're going to read
+    pos += sizeof(T);
+
+    // read and convert the value
+    return parse<T>(ptr);
+}
+
+int16_t BoltDecoder::read_int16()
+{
+    return parse<int16_t>(buffer, pos);
+}
+
+uint16_t BoltDecoder::read_uint16()
+{
+    return parse<uint16_t>(buffer, pos);
+}
+
+int32_t BoltDecoder::read_int32()
+{
+    return parse<int32_t>(buffer, pos);
+}
+
+uint32_t BoltDecoder::read_uint32()
+{
+    return parse<uint32_t>(buffer, pos);
+}
+
+int64_t BoltDecoder::read_int64()
+{
+    return parse<int64_t>(buffer, pos);
+}
+
+uint64_t BoltDecoder::read_uint64()
+{
+    return parse<uint64_t>(buffer, pos);
+}
+
+double BoltDecoder::read_float64()
+{
+    auto v = parse<int64_t>(buffer, pos);
+    return *reinterpret_cast<const double *>(&v);
+}
+
+std::string BoltDecoder::read_string()
+{
+    auto marker = read_byte();
+
+    std::string res;
+    uint32_t size;
+
+    // if the first 4 bits equal to 1000 (0x8), this is a tiny string
+    if((marker & 0xF0) == pack::TinyString)
+    {
+        // size is stored in the lower 4 bits of the marker byte
+        size = marker & 0x0F;
+    }
+    // if the marker is 0xD0, size is an 8-bit unsigned integer
+    if(marker == pack::String8)
+    {
+        size = read_byte();
+    }
+    // if the marker is 0xD1, size is a 16-bit big-endian unsigned integer
+    else if(marker == pack::String16)
+    {
+        size = read_uint16();
+    }
+    // if the marker is 0xD2, size is a 32-bit big-endian unsigned integer
+    else if(marker == pack::String32)
+    {
+        size = read_uint32();
+    }
+    else
+    {
+        // TODO error?
+        return res;
+    }
+
+    if(size == 0)
+        return res;
+
+    res.append(reinterpret_cast<const char*>(raw()), size);
+    pos += size;
+
+    return res;
+}
+
+const byte* BoltDecoder::raw() const
+{
+    return buffer.data() + pos;
+}
+
+}
diff --git a/src/bolt/v1/transport/bolt_decoder.hpp b/src/bolt/v1/transport/bolt_decoder.hpp
new file mode 100644
index 000000000..c326f0548
--- /dev/null
+++ b/src/bolt/v1/transport/bolt_decoder.hpp
@@ -0,0 +1,45 @@
+#pragma once
+
+#include "buffer.hpp"
+#include "chunked_decoder.hpp"
+
+#include "utils/types/byte.hpp"
+
+namespace bolt
+{
+
+class BoltDecoder
+{
+public:
+    void handshake(const byte*& data, size_t len);
+    bool decode(const byte*& data, size_t len);
+
+    bool empty() const;
+    void reset();
+
+    byte peek() const;
+    byte read_byte();
+    void read_bytes(void* dest, size_t n);
+
+    int16_t read_int16();
+    uint16_t read_uint16();
+
+    int32_t read_int32();
+    uint32_t read_uint32();
+
+    int64_t read_int64();
+    uint64_t read_uint64();
+
+    double read_float64();
+
+    std::string read_string();
+
+private:
+    Buffer buffer;
+    ChunkedDecoder<Buffer> decoder {buffer};
+    size_t pos {0};
+
+    const byte* raw() const;
+};
+
+}
diff --git a/src/bolt/v1/transport/bolt_encoder.cpp b/src/bolt/v1/transport/bolt_encoder.cpp
new file mode 100644
index 000000000..472a42447
--- /dev/null
+++ b/src/bolt/v1/transport/bolt_encoder.cpp
@@ -0,0 +1 @@
+#include "bolt_encoder.hpp"
diff --git a/src/bolt/v1/transport/bolt_encoder.hpp b/src/bolt/v1/transport/bolt_encoder.hpp
new file mode 100644
index 000000000..1fa8321e8
--- /dev/null
+++ b/src/bolt/v1/transport/bolt_encoder.hpp
@@ -0,0 +1,243 @@
+#pragma once
+
+#include "chunked_encoder.hpp"
+#include "socket_stream.hpp"
+
+#include "bolt/v1/packing/codes.hpp"
+#include "bolt/v1/messaging/codes.hpp"
+
+#include "utils/types/byte.hpp"
+
+#include "utils/bswap.hpp"
+
+namespace bolt
+{
+
+template <class Socket>
+class BoltEncoder
+{
+    static constexpr int64_t plus_2_to_the_31  =  2147483648L;
+    static constexpr int64_t plus_2_to_the_15  =  32768L;
+    static constexpr int64_t plus_2_to_the_7   =  128L;
+    static constexpr int64_t minus_2_to_the_4  = -16L;
+    static constexpr int64_t minus_2_to_the_7  = -128L;
+    static constexpr int64_t minus_2_to_the_15 = -32768L;
+    static constexpr int64_t minus_2_to_the_31 = -2147483648L;
+
+public:
+    BoltEncoder(Socket& socket) : stream(socket) {}
+
+    void flush()
+    {
+        encoder.flush();
+    }
+
+    void write(byte value)
+    {
+        encoder.write(value);
+    }
+
+    void write(const byte* values, size_t n)
+    {
+        encoder.write(values, n);
+    }
+
+    void write_null()
+    {
+        encoder.write(pack::Null);
+    }
+
+    void write(bool value)
+    {
+        if(value) write_true(); else write_false();
+    }
+
+    void write_true()
+    {
+        encoder.write(pack::True);
+    }
+
+    void write_false()
+    {
+        encoder.write(pack::False);
+    }
+
+    template <class T>
+    void write_value(T value)
+    {
+        value = bswap(value);
+        encoder.write(reinterpret_cast<const byte*>(&value), sizeof(value));
+    }
+
+    void write_integer(int64_t value)
+    {
+        if(value >= minus_2_to_the_4 && value < plus_2_to_the_7)
+        {
+            write(static_cast<byte>(value));
+        }
+        else if(value >= minus_2_to_the_7 && value < minus_2_to_the_4)
+        {
+            write(pack::Int8);
+            write(static_cast<byte>(value));
+        }
+        else if(value >= minus_2_to_the_15 && value < plus_2_to_the_15)
+        {
+            write(pack::Int16);
+            write_value(static_cast<int16_t>(value));
+        }
+        else if(value >= minus_2_to_the_31 && value < plus_2_to_the_31)
+        {
+            write(pack::Int32);
+            write_value(static_cast<int32_t>(value));
+        }
+        else
+        {
+            write(pack::Int64);
+            write_value(value);
+        }
+    }
+
+    void write(double value)
+    {
+        write(pack::Float64);
+        write_value(*reinterpret_cast<const int64_t*>(&value));
+    }
+
+    void write_map_header(size_t size)
+    {
+        if(size < 0x10)
+        {
+            write(static_cast<byte>(pack::TinyMap | size));
+        }
+        else if(size <= 0xFF)
+        {
+            write(pack::Map8);
+            write(static_cast<byte>(size));
+        }
+        else if(size <= 0xFFFF)
+        {
+            write(pack::Map16);
+            write_value<uint16_t>(size);
+        }
+        else
+        {
+            write(pack::Map32);
+            write_value<uint32_t>(size);
+        }
+    }
+
+    void write_empty_map()
+    {
+        write(pack::TinyMap);
+    }
+
+    void write_list_header(size_t size)
+    {
+        if(size < 0x10)
+        {
+            write(static_cast<byte>(pack::TinyList | size));
+        }
+        else if(size <= 0xFF)
+        {
+            write(pack::List8);
+            write(static_cast<byte>(size));
+        }
+        else if(size <= 0xFFFF)
+        {
+            write(pack::List16);
+            write_value<uint16_t>(size);
+        }
+        else
+        {
+            write(pack::List32);
+            write_value<uint32_t>(size);
+        }
+    }
+
+    void write_empty_list()
+    {
+        write(pack::TinyList);
+    }
+
+    void write_string_header(size_t size)
+    {
+        if(size < 0x10)
+        {
+            write(static_cast<byte>(pack::TinyString | size));
+        }
+        else if(size <= 0xFF)
+        {
+            write(pack::String8);
+            write(static_cast<byte>(size));
+        }
+        else if(size <= 0xFFFF)
+        {
+            write(pack::String16);
+            write_value<uint16_t>(size);
+        }
+        else
+        {
+            write(pack::String32);
+            write_value<uint32_t>(size);
+        }
+    }
+
+    void write_string(const std::string& str)
+    {
+        write_string(str.c_str(), str.size());
+    }
+
+    void write_string(const char* str, size_t len)
+    {
+        write_string_header(len);
+        write(reinterpret_cast<const byte*>(str), len);
+    }
+
+    void write_struct_header(size_t size)
+    {
+        if(size < 0x10)
+        {
+            write(static_cast<byte>(pack::TinyStruct | size));
+        }
+        else if(size <= 0xFF)
+        {
+            write(pack::Struct8);
+            write(static_cast<byte>(size));
+        }
+        else
+        {
+            write(pack::Struct16);
+            write_value<uint16_t>(size);
+        }
+    }
+
+    void message_success()
+    {
+        write_struct_header(1);
+        write(underlying_cast(MessageCode::Success));
+    }
+
+    void message_success_empty()
+    {
+        message_success();
+        write_empty_map();
+    }
+
+    void message_record()
+    {
+        write_struct_header(1);
+        write(underlying_cast(MessageCode::Record));
+    }
+
+    void message_record_empty()
+    {
+        message_record();
+        write_empty_list();
+    }
+
+private:
+    SocketStream stream;
+    ChunkedEncoder<SocketStream> encoder {stream};
+};
+
+}
diff --git a/src/bolt/v1/transport/buffer.cpp b/src/bolt/v1/transport/buffer.cpp
new file mode 100644
index 000000000..05a0769da
--- /dev/null
+++ b/src/bolt/v1/transport/buffer.cpp
@@ -0,0 +1,16 @@
+#include "buffer.hpp"
+
+namespace bolt
+{
+
+void Buffer::write(const byte* data, size_t len)
+{
+    buffer.insert(buffer.end(), data, data + len);
+}
+
+void Buffer::clear()
+{
+    buffer.clear();
+}
+
+}
diff --git a/src/bolt/v1/transport/buffer.hpp b/src/bolt/v1/transport/buffer.hpp
new file mode 100644
index 000000000..ae44c7f19
--- /dev/null
+++ b/src/bolt/v1/transport/buffer.hpp
@@ -0,0 +1,39 @@
+#pragma once
+
+#include <cstdint>
+#include <cstdlib>
+#include <vector>
+
+namespace bolt
+{
+
+class Buffer
+{
+public:
+    using byte = uint8_t;
+
+    void write(const byte* data, size_t len);
+
+    void clear();
+
+    size_t size() const
+    {
+        return buffer.size();
+    }
+
+    byte operator[](size_t idx) const
+    {
+        return buffer[idx];
+    }
+
+    const byte* data() const
+    {
+        return buffer.data();
+    }
+
+private:
+    std::vector<byte> buffer;
+};
+
+
+}
diff --git a/src/bolt/v1/transport/chunked_decoder.hpp b/src/bolt/v1/transport/chunked_decoder.hpp
new file mode 100644
index 000000000..6f1feb166
--- /dev/null
+++ b/src/bolt/v1/transport/chunked_decoder.hpp
@@ -0,0 +1,77 @@
+#pragma once
+
+#include <cstring>
+#include <functional>
+#include <cassert>
+
+#include "utils/exceptions/basic_exception.hpp"
+#include "utils/likely.hpp"
+
+#include "logging/default.hpp"
+
+namespace bolt
+{
+
+template <class Stream>
+class ChunkedDecoder
+{
+public:
+    class DecoderError : public BasicException
+    {
+    public:
+        using BasicException::BasicException;
+    };
+
+    using byte = unsigned char;
+
+    ChunkedDecoder(Stream& stream) : stream(stream) {}
+
+    /* Decode chunked data
+     *
+     * Chunk format looks like:
+     *
+     * |Header|     Data     ||Header|    Data      || ... || End |
+     * |  2B  |  size bytes  ||  2B  |  size bytes  || ... ||00 00|
+     */
+    bool decode(const byte*& chunk, size_t n)
+    {
+        while(n > 0)
+        {
+            // get size from first two bytes in the chunk
+            auto size = get_size(chunk);
+
+            if(UNLIKELY(size + 2 > n))
+                throw DecoderError("Chunk size larger than available data.");
+
+            // advance chunk to pass those two bytes
+            chunk += 2;
+            n -= 2;
+
+            // if chunk size is 0, we're done!
+            if(size == 0)
+                return true;
+
+            stream.get().write(chunk, size);
+
+            chunk += size;
+            n -= size;
+        }
+
+        return false;
+    }
+
+    bool operator()(const byte*& chunk, size_t n)
+    {
+        return decode(chunk, n);
+    }
+
+private:
+    std::reference_wrapper<Stream> stream;
+
+    size_t get_size(const byte* chunk)
+    {
+        return size_t(chunk[0]) << 8 | chunk[1];
+    }
+};
+
+}
diff --git a/src/bolt/v1/transport/chunked_encoder.hpp b/src/bolt/v1/transport/chunked_encoder.hpp
new file mode 100644
index 000000000..9ded82c9a
--- /dev/null
+++ b/src/bolt/v1/transport/chunked_encoder.hpp
@@ -0,0 +1,92 @@
+#pragma once
+
+#include <array>
+#include <cstring>
+#include <functional>
+
+#include "utils/likely.hpp"
+
+namespace bolt
+{
+
+template <class Stream>
+class ChunkedEncoder
+{
+    static constexpr size_t N = 65535;
+    static constexpr size_t C = N + 2 /* end mark */;
+
+public:
+    using byte = unsigned char;
+
+    ChunkedEncoder(Stream& stream) : stream(stream) {}
+
+    static constexpr size_t chunk_size = N - 2;
+
+    void write(byte value)
+    {
+        if(UNLIKELY(pos == N))
+            end_chunk();
+
+        chunk[pos++] = value;
+    }
+
+    void write(const byte* values, size_t n)
+    {
+        while(n > 0)
+        {
+            auto size = n < N - pos ? n : N - pos;
+
+            std::memcpy(chunk.data() + pos, values, size);
+
+            pos += size;
+            n -= size;
+
+            if(pos == N)
+                end_chunk();
+        }
+    }
+
+    void flush()
+    {
+        write_chunk_header();
+
+        // write two zeros to signal message end
+        chunk[pos++] = 0x00;
+        chunk[pos++] = 0x00;
+
+        flush_stream();
+    }
+
+private:
+    std::reference_wrapper<Stream> stream;
+
+    std::array<byte, C> chunk;
+    size_t pos {2};
+
+    void end_chunk()
+    {
+        write_chunk_header();
+        flush();
+    }
+
+    void write_chunk_header()
+    {
+        // write the size of the chunk
+        uint16_t size = pos - 2;
+
+        // write the higher byte
+        chunk[0] = size >> 8;
+
+        // write the lower byte
+        chunk[1] = size & 0xFF;
+    }
+
+    void flush_stream()
+    {
+        // write chunk to the stream
+        stream.get().write(chunk.data(), pos);
+        pos = 2;
+    }
+};
+
+}
diff --git a/src/bolt/v1/transport/socket_stream.hpp b/src/bolt/v1/transport/socket_stream.hpp
new file mode 100644
index 000000000..de41815bc
--- /dev/null
+++ b/src/bolt/v1/transport/socket_stream.hpp
@@ -0,0 +1,38 @@
+#pragma once
+
+#include <cstdint>
+#include <vector>
+#include <cstdio>
+
+#include "io/network/socket.hpp"
+#include "stream_error.hpp"
+
+namespace bolt
+{
+
+class SocketStream
+{
+public:
+    using byte = uint8_t;
+
+    SocketStream(io::Socket& socket) : socket(socket) {}
+
+    void write(const byte* data, size_t n)
+    {
+        while(n > 0)
+        {
+            auto written = socket.get().write(data, n);
+
+            if(UNLIKELY(written == -1))
+                throw StreamError("Can't write to stream");
+
+            n -= written;
+            data += written;
+        }
+    }
+
+private:
+    std::reference_wrapper<io::Socket> socket;
+};
+
+}
diff --git a/src/bolt/v1/transport/stream_error.hpp b/src/bolt/v1/transport/stream_error.hpp
new file mode 100644
index 000000000..fec4525dd
--- /dev/null
+++ b/src/bolt/v1/transport/stream_error.hpp
@@ -0,0 +1,14 @@
+#pragma once
+
+#include "utils/exceptions/basic_exception.hpp"
+
+namespace bolt
+{
+
+class StreamError : BasicException
+{
+public:
+    using BasicException::BasicException;
+};
+
+}
diff --git a/src/cypher/debug/tree_print.hpp b/src/cypher/debug/tree_print.hpp
index 8e2994397..93f87a863 100644
--- a/src/cypher/debug/tree_print.hpp
+++ b/src/cypher/debug/tree_print.hpp
@@ -11,6 +11,8 @@ public:
     class Printer
     {
     public:
+        friend class Entry;
+
         Printer(std::ostream& stream, const std::string& header)
             : stream(stream)
         {
@@ -49,10 +51,10 @@ public:
             }
 
             template <class T>
-            friend Entry& operator<<(Entry& entry, const T& item)
+            Entry& operator<<(const T& item)
             {
-                entry.printer.stream << item;
-                return entry;
+                printer.stream << item;
+                return *this;
             }
 
         private:
diff --git a/src/cypher/lexertl b/src/cypher/lexertl
new file mode 160000
index 000000000..7d4d36a35
--- /dev/null
+++ b/src/cypher/lexertl
@@ -0,0 +1 @@
+Subproject commit 7d4d36a357027df0e817453cc9cf948f71047ca9
diff --git a/src/dbms/dbms.hpp b/src/dbms/dbms.hpp
new file mode 100644
index 000000000..a71c2ad24
--- /dev/null
+++ b/src/dbms/dbms.hpp
@@ -0,0 +1,8 @@
+#pragma once
+
+class Dbms
+{
+public:
+
+
+};
diff --git a/src/dbms/server/bolt.hpp b/src/dbms/server/bolt.hpp
new file mode 100644
index 000000000..c11568cb6
--- /dev/null
+++ b/src/dbms/server/bolt.hpp
@@ -0,0 +1,9 @@
+#pragma once
+
+class BoltServer
+{
+public:
+    BoltServer() = default;
+
+
+};
diff --git a/src/examples/bolt.cpp b/src/examples/bolt.cpp
new file mode 100644
index 000000000..2b2b7f317
--- /dev/null
+++ b/src/examples/bolt.cpp
@@ -0,0 +1,68 @@
+#include <iostream>
+#include <signal.h>
+
+#include "bolt/v1/server/server.hpp"
+#include "bolt/v1/server/worker.hpp"
+
+#include "io/network/socket.hpp"
+
+#include "logging/default.hpp"
+#include "logging/streams/stdout.hpp"
+
+static bolt::Server<bolt::Worker>* serverptr;
+
+Logger logger;
+
+void sigint_handler(int s)
+{
+    auto signal = s == SIGINT ? "SIGINT" : "SIGABRT";
+
+    logger.info("Recieved signal {}", signal);
+    logger.info("Shutting down...");
+
+    std::exit(EXIT_SUCCESS);
+}
+
+static constexpr const char* interface = "0.0.0.0";
+static constexpr const char* port = "7687";
+
+int main(void)
+{
+    logging::init_sync();
+    logging::log->pipe(std::make_unique<Stdout>());
+    logger = logging::log->logger("Main");
+
+    signal(SIGINT, sigint_handler);
+    signal(SIGABRT, sigint_handler);
+
+    io::Socket socket;
+
+    try
+    {
+        socket = io::Socket::bind(interface, port);
+    }
+    catch(io::NetworkError e)
+    {
+        logger.error("Cannot bind to socket on {} at {}", interface, port);
+        logger.error("{}", e.what());
+
+        std::exit(EXIT_FAILURE);
+    }
+
+    socket.set_non_blocking();
+    socket.listen(1024);
+
+    logger.info("Listening on {} at {}", interface, port);
+
+    bolt::Server<bolt::Worker> server(std::move(socket));
+    serverptr = &server;
+
+    constexpr size_t N = 1;
+
+    logger.info("Starting {} workers", N);
+    server.start(N);
+
+    logger.info("Shutting down...");
+
+    return EXIT_SUCCESS;
+}
diff --git a/src/examples/compile-bolt.sh b/src/examples/compile-bolt.sh
new file mode 100644
index 000000000..349e20e46
--- /dev/null
+++ b/src/examples/compile-bolt.sh
@@ -0,0 +1,3 @@
+#!/bin/bash
+
+clang++ -g -rdynamic ../bolt/v1/states/executor.cpp ../logging/streams/stdout.cpp ../logging/levels.cpp ../logging/logs/sync_log.cpp ../logging/logs/async_log.cpp ../logging/default.cpp ../logging/log.cpp ../bolt/v1/bolt.cpp ../bolt/v1/states/init.cpp ../bolt/v1/states.cpp ../bolt/v1/states/handshake.cpp  ../bolt/v1/transport/bolt_decoder.cpp ../bolt/v1/transport/buffer.cpp ../bolt/v1/session.cpp bolt.cpp ../io/network/tls.cpp -o bolt -std=c++14 -I ../ -I ../../libs/fmt/ -pthread -lcppformat -lssl -lcrypto
diff --git a/src/examples/endinan.cpp b/src/examples/endinan.cpp
new file mode 100644
index 000000000..58eac20de
--- /dev/null
+++ b/src/examples/endinan.cpp
@@ -0,0 +1,81 @@
+#include <iostream>
+#include <cstdint>
+
+#include <byteswap.h>
+
+char b[8] = {1, 2, 3, 4, 0, 0, 0, 1};
+
+int64_t safe_int64(const char* b)
+{
+    return int64_t(b[0]) << 56 | int64_t(b[1]) << 48
+         | int64_t(b[2]) << 40 | int64_t(b[3]) << 32
+         | int64_t(b[4]) << 24 | int64_t(b[5]) << 16
+         | int64_t(b[6]) << 8  | int64_t(b[7]);
+}
+
+int64_t unsafe_int64(const char* b)
+{
+    auto i = reinterpret_cast<const int64_t*>(b);
+    return __bswap_64(*i);
+}
+
+int32_t safe_int32(const char* b)
+{
+    return b[0] << 24 | b[1] << 16 | b[2] << 8 | b[3];
+}
+
+int32_t unsafe_int32(const char* b)
+{
+    auto i = reinterpret_cast<const int32_t*>(b);
+    return __bswap_32(*i);
+}
+
+[[clang::optnone]]
+void test(uint64_t n)
+{
+    for(uint64_t i = 0; i < n; ++i)
+        unsafe_int64(b);
+}
+
+uint8_t f[8] = {0x3F, 0xF1, 0x99, 0x99, 0x99, 0x99, 0x99, 0x9A};
+
+double ff = 1.1;
+
+double get_double(const uint8_t* b)
+{
+    auto v = __bswap_64(*reinterpret_cast<const uint64_t*>(b));
+    return *reinterpret_cast<const double*>(&v);
+}
+
+void print_hex(const char* buf, size_t n)
+{
+    for (size_t i = 0; i < n; ++i)
+        printf("%02X ", (unsigned char)buf[i]);
+}
+
+void print_hex(const uint8_t* buf, size_t n)
+{
+    print_hex((const char*)buf, n);
+}
+
+int main(void)
+{
+    auto dd = get_double(f);
+
+    print_hex(f, 8);
+
+    std::cout << std::endl;
+    print_hex((const uint8_t*)(&ff), 8);
+
+    std::cout << std::endl;
+    print_hex((const uint8_t*)(&dd), 8);
+
+    std::cout << dd << std::endl;
+
+    /* std::cout << safe_int64(b) << std::endl; */
+    /* std::cout << unsafe_int64(b) << std::endl; */
+
+    /* test(1000000000ull); */
+
+    return 0;
+}
diff --git a/src/io/network/Makefile b/src/io/network/Makefile
deleted file mode 100644
index 245054369..000000000
--- a/src/io/network/Makefile
+++ /dev/null
@@ -1,17 +0,0 @@
-CXX=clang++
-CFLAGS=-std=c++1y -O2 -Wall -Wno-unknown-pragmas
-LDFLAGS=-lhttp_parser -pthread
-
-INC=-I../../
-SOURCES=test.cpp
-EXECUTABLE=test
-
-all: $(EXECUTABLE)
-	    
-$(EXECUTABLE): $(SOURCES) 
-	$(CXX) $(CFLAGS) $(SOURCES) -o $(EXECUTABLE) $(INC) $(LDFLAGS) 
-
-.PHONY:
-clean:
-	rm -f test
-	rm -f *.o
diff --git a/src/io/network/epoll-example.c b/src/io/network/epoll-example.c
deleted file mode 100644
index 7f88c7625..000000000
--- a/src/io/network/epoll-example.c
+++ /dev/null
@@ -1,281 +0,0 @@
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netdb.h>
-#include <unistd.h>
-#include <fcntl.h>
-#include <sys/epoll.h>
-#include <errno.h>
-
-char buf[512];
-#define MAXEVENTS 64
-
-static int
-make_socket_non_blocking (int sfd)
-{
-  int flags, s;
-
-  flags = fcntl (sfd, F_GETFL, 0);
-  if (flags == -1)
-    {
-      perror ("fcntl");
-      return -1;
-    }
-
-  flags |= O_NONBLOCK;
-  s = fcntl (sfd, F_SETFL, flags);
-  if (s == -1)
-    {
-      perror ("fcntl");
-      return -1;
-    }
-
-  return 0;
-}
-
-static int
-create_and_bind (char *port)
-{
-  struct addrinfo hints;
-  struct addrinfo *result, *rp;
-  int s, sfd;
-
-  memset (&hints, 0, sizeof (struct addrinfo));
-  hints.ai_family = AF_UNSPEC;     /* Return IPv4 and IPv6 choices */
-  hints.ai_socktype = SOCK_STREAM; /* We want a TCP socket */
-  hints.ai_flags = AI_PASSIVE;     /* All interfaces */
-
-  s = getaddrinfo (NULL, port, &hints, &result);
-  if (s != 0)
-    {
-      fprintf (stderr, "getaddrinfo: %s\n", gai_strerror (s));
-      return -1;
-    }
-
-  for (rp = result; rp != NULL; rp = rp->ai_next)
-    {
-      sfd = socket (rp->ai_family, rp->ai_socktype, rp->ai_protocol);
-      if (sfd == -1)
-        continue;
-
-      s = bind (sfd, rp->ai_addr, rp->ai_addrlen);
-      if (s == 0)
-        {
-          /* We managed to bind successfully! */
-          break;
-        }
-
-      close (sfd);
-    }
-
-  if (rp == NULL)
-    {
-      fprintf (stderr, "Could not bind\n");
-      return -1;
-    }
-
-  freeaddrinfo (result);
-
-  return sfd;
-}
-
-int
-main (int argc, char *argv[])
-{
-  const char* response = "HTTP/1.1 200 OK\r\nContent-Length:0\r\nConnection:Keep-Alive\r\n\r\n";
-
-  size_t len = strlen(response);
-
-  int sfd, s;
-  int efd;
-  struct epoll_event event;
-  struct epoll_event *events;
-
-  if (argc != 2)
-    {
-      fprintf (stderr, "Usage: %s [port]\n", argv[0]);
-      exit (EXIT_FAILURE);
-    }
-
-  sfd = create_and_bind (argv[1]);
-  if (sfd == -1)
-    abort ();
-
-  s = make_socket_non_blocking (sfd);
-  if (s == -1)
-    abort ();
-
-  s = listen (sfd, SOMAXCONN);
-  if (s == -1)
-    {
-      perror ("listen");
-      abort ();
-    }
-
-  efd = epoll_create1 (0);
-  if (efd == -1)
-    {
-      perror ("epoll_create");
-      abort ();
-    }
-
-  event.data.fd = sfd;
-  event.events = EPOLLIN | EPOLLET;
-  s = epoll_ctl (efd, EPOLL_CTL_ADD, sfd, &event);
-  if (s == -1)
-    {
-      perror ("epoll_ctl");
-      abort ();
-    }
-
-  /* Buffer where events are returned */
-  events = calloc (MAXEVENTS, sizeof event);
-
-  /* The event loop */
-  while (1)
-    {
-      int n, i;
-
-      n = epoll_wait (efd, events, MAXEVENTS, -1);
-      for (i = 0; i < n; i++)
-	{
-	  if ((events[i].events & EPOLLERR) ||
-              (events[i].events & EPOLLHUP) ||
-              (!(events[i].events & EPOLLIN)))
-	    {
-              /* An error has occured on this fd, or the socket is not
-                 ready for reading (why were we notified then?) */
-	      fprintf (stderr, "epoll error\n");
-	      close (events[i].data.fd);
-	      continue;
-	    }
-
-	  else if (sfd == events[i].data.fd)
-	    {
-              /* We have a notification on the listening socket, which
-                 means one or more incoming connections. */
-              while (1)
-                {
-                  struct sockaddr in_addr;
-                  socklen_t in_len;
-                  int infd;
-                  char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
-
-                  in_len = sizeof in_addr;
-                  infd = accept (sfd, &in_addr, &in_len);
-                  if (infd == -1)
-                    {
-                      if ((errno == EAGAIN) ||
-                          (errno == EWOULDBLOCK))
-                        {
-                          /* We have processed all incoming
-                             connections. */
-                          break;
-                        }
-                      else
-                        {
-                          perror ("accept");
-                          break;
-                        }
-                    }
-
-                  s = getnameinfo (&in_addr, in_len,
-                                   hbuf, sizeof hbuf,
-                                   sbuf, sizeof sbuf,
-                                   NI_NUMERICHOST | NI_NUMERICSERV);
-                  if (s == 0)
-                    {
-                      printf("Accepted connection on descriptor %d "
-                             "(host=%s, port=%s)\n", infd, hbuf, sbuf);
-                    }
-
-                  /* Make the incoming socket non-blocking and add it to the
-                     list of fds to monitor. */
-                  s = make_socket_non_blocking (infd);
-                  if (s == -1)
-                    abort ();
-
-                  event.data.fd = infd;
-                  event.events = EPOLLIN | EPOLLET;
-                  s = epoll_ctl (efd, EPOLL_CTL_ADD, infd, &event);
-                  if (s == -1)
-                    {
-                      perror ("epoll_ctl");
-                      abort ();
-                    }
-                }
-              continue;
-            }
-          else
-            {
-              /* We have data on the fd waiting to be read. Read and
-                 display it. We must read whatever data is available
-                 completely, as we are running in edge-triggered mode
-                 and won't get a notification again for the same
-                 data. */
-              int done = 0;
-
-              while (1)
-                {
-                  ssize_t count;
-
-                  count = read (events[i].data.fd, buf, sizeof buf);
-                  if (count == -1)
-                    {
-                      /* If errno == EAGAIN, that means we have read all
-                         data. So go back to the main loop. */
-                      if (errno != EAGAIN)
-                        {
-                          perror ("read");
-                          done = 1;
-                        }
-                      break;
-                    }
-                  else if (count == 0)
-                    {
-                      /* End of file. The remote has closed the
-                         connection. */
-                      done = 1;
-                      break;
-                    }
-
-                    size_t sum = 0;
-                    char* resp = (char*)response;
-
-                    while(sum < len)
-                    {
-                        int k = write(event.data.fd, resp, len - sum);
-                        sum += k;
-                        resp += k;
-
-                    }
-
-                  /* Write the buffer to standard output */
-                  if (s == -1)
-                    {
-                      perror ("write");
-                      abort ();
-                    }
-                }
-
-              if (done)
-                {
-                  printf ("Closed connection on descriptor %d\n",
-                          events[i].data.fd);
-
-                  /* Closing the descriptor will make epoll remove it
-                     from the set of descriptors which are monitored. */
-                  close (events[i].data.fd);
-                }
-            }
-        }
-    }
-
-  free (events);
-
-  close (sfd);
-
-  return EXIT_SUCCESS;
-}
diff --git a/src/io/network/epoll.hpp b/src/io/network/epoll.hpp
index 11c3764de..6ce725280 100644
--- a/src/io/network/epoll.hpp
+++ b/src/io/network/epoll.hpp
@@ -9,6 +9,12 @@
 namespace io
 {
 
+class EpollError : BasicException
+{
+public:
+    using BasicException::BasicException;
+};
+
 class Epoll
 {
 public:
@@ -17,14 +23,18 @@ public:
     Epoll(int flags)
     {
         epoll_fd = epoll_create1(flags);
+
+        if(UNLIKELY(epoll_fd == -1))
+            throw EpollError("Can't create epoll file descriptor");
     }
 
-    void add(Socket& socket, Event* event)
+    template <class Stream>
+    void add(Stream& stream, Event* event)
     {
-        auto status = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, socket, event);
+        auto status = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, stream, event);
 
         if(UNLIKELY(status))
-            throw NetworkError("Can't add an event to epoll listener.");
+            throw EpollError("Can't add an event to epoll listener.");
     }
 
     int wait(Event* events, int max_events, int timeout)
diff --git a/src/io/network/event_listener.hpp b/src/io/network/event_listener.hpp
index b536b437a..1600b835a 100644
--- a/src/io/network/event_listener.hpp
+++ b/src/io/network/event_listener.hpp
@@ -1,14 +1,12 @@
 #pragma once
 
-#include "socket.hpp"
 #include "epoll.hpp"
 #include "utils/crtp.hpp"
 
 namespace io
 {
 
-template <class Derived, class Stream,
-          size_t max_events = 64, int wait_timeout = -1>
+template <class Derived, size_t max_events = 64, int wait_timeout = -1>
 class EventListener : public Crtp<Derived>
 {
 public:
@@ -16,32 +14,28 @@ public:
 
     EventListener(uint32_t flags = 0) : listener(flags) {}
 
-    void add(Stream& stream)
-    {
-        // add the stream to the event listener
-        listener.add(stream.socket, &stream.event);
-    }
-
     void wait_and_process_events()
     {
+        // TODO hardcoded a wait timeout because of thread joining
+        // when you shutdown the server. This should be wait_timeout of the
+        // template parameter and should almost never change from that.
+        // thread joining should be resolved using a signal that interrupts
+        // the system call.
+
         // waits for an event/multiple events and returns a maximum of
         // max_events and stores them in the events array. it waits for
         // wait_timeout milliseconds. if wait_timeout is achieved, returns 0
-        auto n = listener.wait(events, max_events, wait_timeout);
-
-        LOG_DEBUG("received " << n << " events");
+        auto n = listener.wait(events, max_events, 200);
 
         // go through all events and process them in order
         for(int i = 0; i < n; ++i)
         {
             auto& event = events[i];
-            auto& stream = *reinterpret_cast<Stream*>(event.data.ptr);
 
-            // a stream was closed
+            // hangup event
             if(UNLIKELY(event.events & EPOLLRDHUP))
             {
-                LOG_DEBUG("EPOLLRDHUP event recieved on socket " << stream.id());
-                this->derived().on_close(stream);
+                this->derived().on_close_event(event);
                 continue;
             }
 
@@ -49,18 +43,12 @@ public:
             if(UNLIKELY(!(event.events & EPOLLIN) ||
                           event.events & (EPOLLHUP | EPOLLERR)))
             {
-                LOG_DEBUG(">> EPOLL ERR");
-                LOG_DEBUG("EPOLLIN" << (event.events & EPOLLIN));
-                LOG_DEBUG("EPOLLHUP" << (event.events & EPOLLHUP));
-                LOG_DEBUG("EPOLLERR" << (event.events & EPOLLERR));
-
-                this->derived().on_error(stream);
+                this->derived().on_error_event(event);
                 continue;
             }
 
-            LOG_DEBUG("signalling that data exists on socket " << stream.id());
             // we have some data waiting to be read
-            this->derived().on_data(stream);
+            this->derived().on_data_event(event);
         }
 
         // this will be optimized out :D
diff --git a/src/io/network/network_error.hpp b/src/io/network/network_error.hpp
index 7384dcd83..ed528a07b 100644
--- a/src/io/network/network_error.hpp
+++ b/src/io/network/network_error.hpp
@@ -2,9 +2,15 @@
 
 #include <stdexcept>
 
-class NetworkError : public std::runtime_error
+#include "utils/exceptions/basic_exception.hpp"
+
+namespace io
+{
+
+class NetworkError : public BasicException
 {
 public:
-    using std::runtime_error::runtime_error;
+    using BasicException::BasicException;
 };
 
+}
diff --git a/src/io/network/pps.sh b/src/io/network/pps.sh
deleted file mode 100755
index c46b46198..000000000
--- a/src/io/network/pps.sh
+++ /dev/null
@@ -1,27 +0,0 @@
-#!/bin/bash
- 
-INTERVAL="1"  # update interval in seconds
- 
-if [ -z "$1" ]; then
-        echo
-        echo usage: $0 [network-interface]
-        echo
-        echo e.g. $0 eth0
-        echo
-        echo shows packets-per-second
-        exit
-fi
- 
-IF=$1
- 
-while true
-do
-        R1=`cat /sys/class/net/$1/statistics/rx_packets`
-        T1=`cat /sys/class/net/$1/statistics/tx_packets`
-        sleep $INTERVAL
-        R2=`cat /sys/class/net/$1/statistics/rx_packets`
-        T2=`cat /sys/class/net/$1/statistics/tx_packets`
-        TXPPS=`expr $T2 - $T1`
-        RXPPS=`expr $R2 - $R1`
-        echo "TX $1: $TXPPS pkts/s RX $1: $RXPPS pkts/s"
-done
diff --git a/src/io/network/secure_socket.hpp b/src/io/network/secure_socket.hpp
new file mode 100644
index 000000000..c79ff3c12
--- /dev/null
+++ b/src/io/network/secure_socket.hpp
@@ -0,0 +1,93 @@
+#pragma once
+
+#include "tls.hpp"
+#include "io/network/socket.hpp"
+#include "tls_error.hpp"
+#include "utils/types/byte.hpp"
+
+#include <iostream>
+
+namespace io
+{
+
+class SecureSocket
+{
+public:
+    SecureSocket(Socket&& socket, const Tls::Context& tls)
+        : socket(std::forward<Socket>(socket))
+    {
+        ssl = SSL_new(tls);
+        SSL_set_fd(ssl, this->socket);
+
+        SSL_set_accept_state(ssl);
+
+        if(SSL_accept(ssl) <= 0)
+            ERR_print_errors_fp(stderr);
+    }
+
+    SecureSocket(SecureSocket&& other)
+    {
+        *this = std::forward<SecureSocket>(other);
+    }
+
+    SecureSocket& operator=(SecureSocket&& other)
+    {
+        socket = std::move(other.socket);
+
+        ssl = other.ssl;
+        other.ssl = nullptr;
+
+        return *this;
+    }
+
+    ~SecureSocket()
+    {
+        if(ssl == nullptr)
+            return;
+
+        std::cout << "DELETING SSL" << std::endl;
+
+        SSL_free(ssl);
+    }
+
+    int error(int status)
+    {
+        return SSL_get_error(ssl, status);
+    }
+
+    int write(const std::string& str)
+    {
+        return write(str.c_str(), str.size());
+    }
+
+    int write(const byte* data, size_t len)
+    {
+        return SSL_write(ssl, data, len);
+    }
+
+    int write(const char* data, size_t len)
+    {
+        return SSL_write(ssl, data, len);
+    }
+
+    int read(char* buffer, size_t len)
+    {
+        return SSL_read(ssl, buffer, len);
+    }
+
+    operator int()
+    {
+        return socket;
+    }
+
+    operator Socket&()
+    {
+        return socket;
+    }
+
+private:
+    Socket socket;
+    SSL* ssl {nullptr};
+};
+
+}
diff --git a/src/io/network/secure_stream_reader.hpp b/src/io/network/secure_stream_reader.hpp
new file mode 100644
index 000000000..889bdb60f
--- /dev/null
+++ b/src/io/network/secure_stream_reader.hpp
@@ -0,0 +1,61 @@
+#pragma once
+
+#include <openssl/ssl.h>
+
+#include "stream_reader.hpp"
+#include "logging/default.hpp"
+
+namespace io
+{
+using namespace memory::literals;
+
+template <class Derived, class Stream>
+class SecureStreamReader : public StreamReader<Derived, Stream>
+{
+public:
+    struct Buffer
+    {
+        char* ptr;
+        size_t len;
+    };
+
+    SecureStreamReader(uint32_t flags = 0)
+        : StreamReader<Derived, Stream>(flags) {}
+
+    void on_data(Stream& stream)
+    {
+        while(true)
+        {
+            // allocate the buffer to fill the data
+            auto buf = this->derived().on_alloc(stream);
+
+            // read from the buffer at most buf.len bytes
+            auto len = stream.socket.read(buf.ptr, buf.len);
+
+            if(LIKELY(len > 0))
+            {
+                buf.len = len;
+                return this->derived().on_read(stream, buf);
+            }
+
+            auto err = stream.socket.error(len);
+
+            // the socket is not ready for reading yet
+            if(err == SSL_ERROR_WANT_READ ||
+               err == SSL_ERROR_WANT_WRITE ||
+               err == SSL_ERROR_WANT_X509_LOOKUP)
+            {
+                return;
+            }
+
+            // the socket notified a close event
+            if(err == SSL_ERROR_ZERO_RETURN)
+                return stream.close();
+
+            // some other error occurred, check errno
+            return this->derived().on_error(stream);
+        }
+    }
+};
+
+}
diff --git a/src/io/network/server.hpp b/src/io/network/server.hpp
index 8467086d0..47d5bf792 100644
--- a/src/io/network/server.hpp
+++ b/src/io/network/server.hpp
@@ -5,33 +5,39 @@
 namespace io
 {
 
-template <class Derived, class Stream>
-class Server : public StreamReader<Derived, Stream>
+template <class Derived>
+class Server : public EventListener<Derived>
 {
 public:
-    bool accept(Socket& socket)
+    Server(Socket&& socket) : socket(std::forward<Socket>(socket))
     {
-        // accept a connection from a socket
-        auto s = socket.accept(nullptr, nullptr);
-        LOG_DEBUG("socket " << s.id() << " accepted");
+        event.data.fd = this->socket;
+        event.events = EPOLLIN | EPOLLET;
 
-        if(!s.is_open())
-            return false;
-
-        // make the recieved socket non blocking
-        s.set_non_blocking();
-
-        auto& stream = this->derived().on_connect(std::move(s));
-
-        // we want to listen to an incoming event which is edge triggered and
-        // we also want to listen on the hangup event
-        stream.event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
-
-        // add the connection to the event listener
-        this->add(stream);
-
-        return true;
+        this->listener.add(this->socket, &event);
     }
+
+    void on_close_event(Epoll::Event& event)
+    {
+        ::close(event.data.fd);
+    }
+
+    void on_error_event(Epoll::Event& event)
+    {
+        ::close(event.data.fd);
+    }
+
+    void on_data_event(Epoll::Event& event)
+    {
+        if(UNLIKELY(socket != event.data.fd))
+            return;
+
+        this->derived().on_connect();
+    }
+
+protected:
+    Epoll::Event event;
+    Socket socket;
 };
 
 }
diff --git a/src/io/network/socket.hpp b/src/io/network/socket.hpp
index d8ab88601..e93ec02ee 100644
--- a/src/io/network/socket.hpp
+++ b/src/io/network/socket.hpp
@@ -16,17 +16,24 @@
 #include "addrinfo.hpp"
 #include "utils/likely.hpp"
 
+#include "logging/default.hpp"
+
+#include <iostream>
+
 namespace io
 {
 
 class Socket
 {
+protected:
     Socket(int family, int socket_type, int protocol)
     {
         socket = ::socket(family, socket_type, protocol);
     }
 
 public:
+    using byte = uint8_t;
+
     Socket(int socket = -1) : socket(socket) {}
 
     Socket(const Socket&) = delete;
@@ -41,7 +48,16 @@ public:
         if(socket == -1)
             return;
 
-        close(socket);
+
+        std::cout << "DELETING SOCKET" << std::endl;
+
+        ::close(socket);
+    }
+
+    void close()
+    {
+        ::close(socket);
+        socket = -1;
     }
 
     Socket& operator=(Socket&& other)
@@ -73,7 +89,7 @@ public:
                 continue;
 
             if(::connect(s, it->ai_addr, it->ai_addrlen) == 0)
-                return std::move(s);
+                return s;
         }
 
         throw NetworkError("Unable to connect to socket");
@@ -100,7 +116,7 @@ public:
                 continue;
 
             if(::bind(s, it->ai_addr, it->ai_addrlen) == 0)
-                return std::move(s);
+                return s;
         }
 
         throw NetworkError("Unable to bind to socket");
@@ -141,22 +157,38 @@ public:
         return socket;
     }
 
-    size_t write(const std::string& str)
+    int write(const std::string& str)
     {
-        return ::write(socket, str.c_str(), str.size());
+        return write(str.c_str(), str.size());
     }
 
-    size_t write(const char* data, size_t len)
+    int write(const char* data, size_t len)
     {
+        return write(reinterpret_cast<const byte*>(data), len);
+    }
+
+    int write(const byte* data, size_t len)
+    {
+#ifndef NDEBUG
+        std::stringstream stream;
+
+        for(size_t i = 0; i < len; ++i)
+            stream << fmt::format("{:02X} ", static_cast<byte>(data[i]));
+
+        auto str = stream.str();
+
+        logging::debug("[Write {}B] {}", len, str);
+#endif
+
         return ::write(socket, data, len);
     }
 
-    size_t read(char* buffer, size_t len)
+    int read(void* buffer, size_t len)
     {
         return ::read(socket, buffer, len);
     }
 
-private:
+protected:
     int socket;
 };
 
diff --git a/src/io/network/stream_dispatcher.hpp b/src/io/network/stream_dispatcher.hpp
new file mode 100644
index 000000000..3db4c00b4
--- /dev/null
+++ b/src/io/network/stream_dispatcher.hpp
@@ -0,0 +1,13 @@
+#pragma once
+
+#include "
+
+namespace io
+{
+
+class StreamDispatcher
+{
+
+};
+
+}
diff --git a/src/io/network/stream_listener.hpp b/src/io/network/stream_listener.hpp
new file mode 100644
index 000000000..8048be7f9
--- /dev/null
+++ b/src/io/network/stream_listener.hpp
@@ -0,0 +1,43 @@
+#pragma once
+
+#include "event_listener.hpp"
+
+namespace io
+{
+
+template <class Derived, class Stream,
+          size_t max_events = 64, int wait_timeout = -1>
+class StreamListener : public EventListener<Derived, max_events, wait_timeout>
+{
+public:
+    using EventListener<Derived, max_events, wait_timeout>::EventListener;
+
+    void add(Stream& stream)
+    {
+        // add the stream to the event listener
+        this->listener.add(stream.socket, &stream.event);
+    }
+
+    void on_close_event(Epoll::Event& event)
+    {
+        this->derived().on_close(to_stream(event));
+    }
+
+    void on_error_event(Epoll::Event& event)
+    {
+        this->derived().on_error(to_stream(event));
+    }
+
+    void on_data_event(Epoll::Event& event)
+    {
+        this->derived().on_data(to_stream(event));
+    }
+
+private:
+    Stream& to_stream(Epoll::Event& event)
+    {
+        return *reinterpret_cast<Stream*>(event.data.ptr);
+    }
+};
+
+}
diff --git a/src/io/network/stream_reader.hpp b/src/io/network/stream_reader.hpp
index 3fad770f8..48e8d70d1 100644
--- a/src/io/network/stream_reader.hpp
+++ b/src/io/network/stream_reader.hpp
@@ -1,6 +1,6 @@
 #pragma once
 
-#include "event_listener.hpp"
+#include "stream_listener.hpp"
 #include "memory/literals.hpp"
 
 namespace io
@@ -8,7 +8,7 @@ namespace io
 using namespace memory::literals;
 
 template <class Derived, class Stream>
-class StreamReader : public EventListener<Derived, Stream>
+class StreamReader : public StreamListener<Derived, Stream>
 {
 public:
     struct Buffer
@@ -17,12 +17,41 @@ public:
         size_t len;
     };
 
-    StreamReader(uint32_t flags = 0) : EventListener<Derived, Stream>(flags) {}
+    StreamReader(uint32_t flags = 0) : StreamListener<Derived, Stream>(flags) {}
+
+    bool accept(Socket& socket)
+    {
+        // accept a connection from a socket
+        auto s = socket.accept(nullptr, nullptr);
+
+        if(!s.is_open())
+            return false;
+
+        // make the recieved socket non blocking
+        s.set_non_blocking();
+
+        auto& stream = this->derived().on_connect(std::move(s));
+
+        // we want to listen to an incoming event which is edge triggered and
+        // we also want to listen on the hangup event
+        stream.event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
+
+        // add the connection to the event listener
+        this->add(stream);
+
+        return true;
+    }
 
     void on_data(Stream& stream)
     {
         while(true)
         {
+            if(UNLIKELY(!stream.alive()))
+            {
+                stream.close();
+                break;
+            }
+
             // allocate the buffer to fill the data
             auto buf = this->derived().on_alloc(stream);
 
@@ -35,23 +64,21 @@ public:
                 // this means we have read all available data
                 if(LIKELY(errno == EAGAIN))
                 {
-                    LOG_DEBUG("EAGAIN read all data on socket " << stream.id());
                     break;
                 }
 
                 // some other error occurred, check errno
                 this->derived().on_error(stream);
+                break;
             }
 
             // end of file, the client has closed the connection
             if(UNLIKELY(buf.len == 0))
             {
-                LOG_DEBUG("EOF stream closed on socket " << stream.id());
                 stream.close();
                 break;
             }
 
-            LOG_DEBUG("data on socket " << stream.id());
             this->derived().on_read(stream, buf);
         }
     }
diff --git a/src/io/network/tcp/stream.hpp b/src/io/network/tcp/stream.hpp
index c4b1af76c..1317884a6 100644
--- a/src/io/network/tcp/stream.hpp
+++ b/src/io/network/tcp/stream.hpp
@@ -8,6 +8,7 @@ namespace io
 namespace tcp
 {
 
+template <class Socket>
 class Stream
 {
 public:
@@ -24,11 +25,6 @@ public:
         event.data.ptr = this;
     }
 
-    void close()
-    {
-        delete reinterpret_cast<Stream*>(event.data.ptr);
-    }
-
     int id() const { return socket.id(); }
 
     Socket socket;
diff --git a/src/io/network/tcp_server.hpp b/src/io/network/tcp_server.hpp
deleted file mode 100644
index c3965298a..000000000
--- a/src/io/network/tcp_server.hpp
+++ /dev/null
@@ -1,94 +0,0 @@
-#pragma once
-
-#include <list>
-#include <thread>
-#include <atomic>
-
-#include "debug/log.hpp"
-#include "tcp_listener.hpp"
-
-namespace io
-{
-
-template <class T>
-class TcpServer : TcpListener<TcpServer<T>, 64, -1>
-{
-public:
-    TcpServer(const char* addr, const char* port)
-        : stream(std::move(Socket::bind(addr, port))) {}
-
-    ~TcpServer()
-    {
-        stop();
-
-        for(auto& worker : workers)
-            worker.join();
-    }
-
-    template <class F>
-    void listen(size_t n, size_t backlog, F&& f)
-    {
-        for(size_t i = 0; i < n; ++i)
-        {
-            workers.emplace_back();
-            auto& w = workers.back();
-
-            threads[i] = std::thread([this, &w]() {
-                while(alive)
-                {
-                    LOG_DEBUG("Worker " << hash(std::this_thread::get_id())
-                              << " waiting... ");
-                    w.wait_and_process_events();
-                }
-            });
-        }
-
-        stream.socket.listen(backlog);
-    }
-
-    void stop()
-    {
-        alive.store(false, std::memory_order_release);
-    }
-
-private:
-    std::list<std::thread> threads;
-    std::list<T> workers;
-    std::atomic<bool> alive { true };
-
-    TcpStream stream;
-    size_t idx = 0;
-
-    void on_close(TcpStream& stream)
-    {
-        LOG_DEBUG("server on_close!!!!");
-    }
-
-    void on_error(TcpStream& stream)
-    {
-        LOG_DEBUG("server on_error!!!!");
-    }
-
-    void on_data(TcpStream&)
-    {
-        while (true)
-        {
-            LOG_DEBUG("Trying to accept... ");
-            if(!workers[idx].accept(socket))
-            {
-                LOG_DEBUG("Did not accept!");
-                break;
-            }
-
-            idx = (idx + 1) % workers.size();
-            LOG_DEBUG("Accepted a new connection!");
-        }
-    }
-
-    void on_wait_timeout()
-    {
-
-    }
-};
-
-}
diff --git a/src/io/network/tls.cpp b/src/io/network/tls.cpp
new file mode 100644
index 000000000..d35ffe8df
--- /dev/null
+++ b/src/io/network/tls.cpp
@@ -0,0 +1,55 @@
+#include "tls.hpp"
+#include "tls_error.hpp"
+
+namespace io
+{
+
+Tls::Context::Context()
+{
+    auto method = SSLv23_server_method();
+    ctx = SSL_CTX_new(method);
+
+    if(!ctx)
+    {
+        ERR_print_errors_fp(stderr);
+        throw io::TlsError("Unable to create TLS context");
+    }
+
+    SSL_CTX_set_ecdh_auto(ctx, 1);
+}
+
+Tls::Context::~Context()
+{
+    SSL_CTX_free(ctx);
+}
+
+Tls::Context& Tls::Context::cert(const std::string& path)
+{
+    if(SSL_CTX_use_certificate_file(ctx, path.c_str(), SSL_FILETYPE_PEM) >= 0)
+        return *this;
+
+    ERR_print_errors_fp(stderr);
+    throw TlsError("Error Loading cert '{}'", path);
+}
+
+Tls::Context& Tls::Context::key(const std::string& path)
+{
+    if(SSL_CTX_use_PrivateKey_file(ctx, path.c_str(), SSL_FILETYPE_PEM) >= 0)
+        return *this;
+
+    ERR_print_errors_fp(stderr);
+    throw TlsError("Error Loading private key '{}'", path);
+}
+
+void Tls::initialize()
+{
+    SSL_load_error_strings();
+    OpenSSL_add_ssl_algorithms();
+}
+
+void Tls::cleanup()
+{
+    EVP_cleanup();
+}
+
+}
diff --git a/src/io/network/tls.hpp b/src/io/network/tls.hpp
new file mode 100644
index 000000000..bd0de8fdc
--- /dev/null
+++ b/src/io/network/tls.hpp
@@ -0,0 +1,33 @@
+#pragma once
+
+#include <string>
+
+#include <openssl/ssl.h>
+#include <openssl/err.h>
+
+namespace io
+{
+
+class Tls
+{
+public:
+    class Context
+    {
+    public:
+        Context();
+        ~Context();
+
+        Context& cert(const std::string& path);
+        Context& key(const std::string& path);
+
+        operator SSL_CTX*() const { return ctx; }
+
+    private:
+        SSL_CTX* ctx;
+    };
+
+    static void initialize();
+    static void cleanup();
+};
+
+}
diff --git a/src/io/network/tls_error.hpp b/src/io/network/tls_error.hpp
new file mode 100644
index 000000000..53440a728
--- /dev/null
+++ b/src/io/network/tls_error.hpp
@@ -0,0 +1,14 @@
+#pragma once
+
+#include "utils/exceptions/basic_exception.hpp"
+
+namespace io
+{
+
+class TlsError : public BasicException
+{
+public:
+    using BasicException::BasicException;
+};
+
+}
diff --git a/src/io/network/worker.hpp b/src/io/network/worker.hpp
deleted file mode 100644
index b50610525..000000000
--- a/src/io/network/worker.hpp
+++ /dev/null
@@ -1,91 +0,0 @@
-#pragma once
-
-#include "listener.hpp"
-#include "tcp_stream.hpp"
-
-namespace io
-{
-  const char* response = "HTTP/1.1 200 OK\r\nContent-Length:0\r\nConnection:Keep-Alive\r\n\r\n";
-
-  size_t len = strlen(response);
-
-class Worker : public Listener<Worker>
-{
-    char buf[64_kB];
-
-public:
-    using Listener::Listener;
-
-    bool accept(Socket& socket)
-    {
-        auto s = socket.accept(nullptr, nullptr);
-
-        if(!s.is_open())
-            return false;
-
-        this->add(s);
-
-        return true;
-    }
-
-    void on_error(TcpStream* stream)
-    {
-        delete stream;
-    }
-
-    std::atomic<int> requests {0};
-
-    void on_read(TcpStream* stream)
-    {
-        int done = 0;
-
-        while (1)
-        {
-            ssize_t count;
-
-            count = read(stream->socket, buf, sizeof buf);
-            if (count == -1)
-              {
-                /* If errno == EAGAIN, that means we have read all
-                   data. So go back to the main loop. */
-                if (errno != EAGAIN)
-                  {
-                    perror ("read");
-                    done = 1;
-                  }
-                break;
-              }
-            else if (count == 0)
-              {
-                /* End of file. The remote has closed the
-                   connection. */
-                done = 1;
-                break;
-              }
-
-            size_t sum = 0;
-            char* resp = (char*)response;
-
-            while(sum < len)
-            {
-                int k = write(stream->socket, resp, len - sum);
-                sum += k;
-                resp += k;
-            }
-
-            requests.fetch_add(1, std::memory_order_relaxed);
-
-          }
-
-        if (done)
-          {
-            LOG_DEBUG("Closing TCP stream at " << stream->socket.id())
-
-            /* Closing the descriptor will make epoll remove it
-               from the set of descriptors which are monitored. */
-            delete stream;
-          }
-    }
-};
-
-}
diff --git a/src/logging/default.cpp b/src/logging/default.cpp
new file mode 100644
index 000000000..914d50417
--- /dev/null
+++ b/src/logging/default.cpp
@@ -0,0 +1,33 @@
+#include "default.hpp"
+
+#include "logging/logs/async_log.hpp"
+#include "logging/logs/sync_log.hpp"
+
+#include "logging/streams/stdout.hpp"
+
+namespace logging
+{
+
+std::unique_ptr<Log> log;
+
+std::unique_ptr<Log> debug_log = std::make_unique<SyncLog>();
+
+Logger init_debug_logger()
+{
+    debug_log->pipe(std::make_unique<Stdout>());
+    return debug_log->logger("DEBUG");
+}
+
+Logger debug_logger = init_debug_logger();
+
+void init_async()
+{
+    log = std::make_unique<AsyncLog>();
+}
+
+void init_sync()
+{
+    log = std::make_unique<SyncLog>();
+}
+
+}
diff --git a/src/logging/default.hpp b/src/logging/default.hpp
new file mode 100644
index 000000000..dd8357426
--- /dev/null
+++ b/src/logging/default.hpp
@@ -0,0 +1,22 @@
+#pragma once
+
+#include "log.hpp"
+#include "logger.hpp"
+
+namespace logging
+{
+
+extern std::unique_ptr<Log> log;
+
+extern Logger debug_logger;
+
+template <class... Args>
+void debug(Args&&... args)
+{
+    debug_logger.debug(std::forward<Args>(args)...);
+}
+
+void init_async();
+void init_sync();
+
+}
diff --git a/src/logging/log.cpp b/src/logging/log.cpp
index d3aefc899..586e58424 100644
--- a/src/logging/log.cpp
+++ b/src/logging/log.cpp
@@ -5,5 +5,5 @@
 
 Logger Log::logger(const std::string& name)
 {
-    return Logger(*this, name);
+    return Logger(this, name);
 }
diff --git a/src/logging/logger.hpp b/src/logging/logger.hpp
index 17af92809..d85c2e23d 100644
--- a/src/logging/logger.hpp
+++ b/src/logging/logger.hpp
@@ -1,5 +1,8 @@
 #pragma once
 
+#include <cassert>
+#include <fmt/format.h>
+
 #include "log.hpp"
 #include "levels.hpp"
 
@@ -44,50 +47,68 @@ class Logger
     };
 
 public:
-    Logger(Log& log, const std::string& name) : log(log), name(name) {}
+    Logger() = default;
+
+    Logger(Log* log, const std::string& name) : log(log), name(name) {}
 
     template <class Level, class... Args>
     void emit(Args&&... args)
     {
+        assert(log != nullptr);
+
         auto message = std::make_unique<Message<Level>>(
             Timestamp::now(), name, fmt::format(std::forward<Args>(args)...)
         );
 
-        log.get().emit(std::move(message));
+        log->emit(std::move(message));
     }
 
     template <class... Args>
     void trace(Args&&... args)
     {
+#ifndef NDEBUG
+#ifndef LOG_NO_TRACE
         emit<Trace>(std::forward<Args>(args)...);
+#endif
+#endif
     }
 
     template <class... Args>
     void debug(Args&&... args)
     {
+#ifndef NDEBUG
+#ifndef LOG_NO_DEBUG
         emit<Debug>(std::forward<Args>(args)...);
+#endif
+#endif
     }
 
     template <class... Args>
     void info(Args&&... args)
     {
+#ifndef LOG_NO_INFO
         emit<Info>(std::forward<Args>(args)...);
+#endif
     }
 
     template <class... Args>
     void warn(Args&&... args)
     {
+#ifndef LOG_NO_WARN
         emit<Warn>(std::forward<Args>(args)...);
+#endif
     }
 
     template <class... Args>
     void error(Args&&... args)
     {
+#ifndef LOG_NO_ERROR
         emit<Error>(std::forward<Args>(args)...);
+#endif
     }
 
 private:
-    std::reference_wrapper<Log> log;
+    Log* log;
     std::string name;
 };
 
diff --git a/src/logging/streams/stdout.cpp b/src/logging/streams/stdout.cpp
index 7f73fb808..8ffb97234 100644
--- a/src/logging/streams/stdout.cpp
+++ b/src/logging/streams/stdout.cpp
@@ -1,10 +1,17 @@
 #include "stdout.hpp"
 
-#include <cppformat/format.h>
+#include <iostream>
+#include <fmt/format.h>
 
 void Stdout::emit(const Log::Record& record)
 {
-    fmt::print("{} {:<5} [{}] {}\n", record.when(), record.level_str(),
-               record.where(), record.text());
+    auto s = fmt::format("{} {:<5} [{}] {}\n", static_cast<std::string>(
+                         record.when()), record.level_str(), record.where(),
+                         record.text());
+
+    std::cout << s;
+
+    /* fmt::printf("{} {:<5} [{}] {}\n", static_cast<std::string>(record.when()), */
+    /*             record.level_str(), record.where(), record.text()); */
 }
 
diff --git a/src/mvcc/id.hpp b/src/mvcc/id.hpp
index f0aaaf252..465986ff8 100644
--- a/src/mvcc/id.hpp
+++ b/src/mvcc/id.hpp
@@ -30,7 +30,7 @@ public:
     {
         return id;
     }
-    
+
 private:
     uint64_t id {0};
 };
diff --git a/src/speedy/rapidjson b/src/speedy/rapidjson
new file mode 160000
index 000000000..c02d52ad5
--- /dev/null
+++ b/src/speedy/rapidjson
@@ -0,0 +1 @@
+Subproject commit c02d52ad56595dc70b38daf46b5f315d3a7115fa
diff --git a/src/utils/bswap.hpp b/src/utils/bswap.hpp
new file mode 100644
index 000000000..0e7ee0fef
--- /dev/null
+++ b/src/utils/bswap.hpp
@@ -0,0 +1,45 @@
+#pragma once
+
+#include <cstdint>
+#include <cstdlib>
+
+#include <byteswap.h>
+
+template <class T>
+inline T bswap(T value);
+
+template<>
+inline int16_t bswap<int16_t>(int16_t value)
+{
+    return __bswap_16(value);
+}
+
+template<>
+inline uint16_t bswap<uint16_t>(uint16_t value)
+{
+    return __bswap_16(value);
+}
+
+template<>
+inline int32_t bswap<int32_t>(int32_t value)
+{
+    return __bswap_32(value);
+}
+
+template<>
+inline uint32_t bswap<uint32_t>(uint32_t value)
+{
+    return __bswap_32(value);
+}
+
+template<>
+inline int64_t bswap<int64_t>(int64_t value)
+{
+    return __bswap_64(value);
+}
+
+template<>
+inline uint64_t bswap<uint64_t>(uint64_t value)
+{
+    return __bswap_64(value);
+}
diff --git a/src/utils/datetime/timestamp.hpp b/src/utils/datetime/timestamp.hpp
index f824e97ab..f9287e0ad 100644
--- a/src/utils/datetime/timestamp.hpp
+++ b/src/utils/datetime/timestamp.hpp
@@ -5,7 +5,7 @@
 #include <iomanip>
 #include <ostream>
 
-#include <cppformat/format.h>
+#include <fmt/format.h>
 
 #include "utils/datetime/datetime_error.hpp"
 #include "utils/total_ordering.hpp"
@@ -66,7 +66,7 @@ public:
 
     long subsec() const
     {
-        return nsec;
+        return nsec / 10000;
     }
 
     const std::string to_iso8601() const
@@ -103,5 +103,5 @@ private:
     long nsec;
 
     static constexpr auto fiso8601 =
-        "{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:09d}Z";
+        "{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:05d}Z";
 };
diff --git a/src/utils/exceptions/basic_exception.hpp b/src/utils/exceptions/basic_exception.hpp
index 91c1be5b5..79ea3d5f9 100644
--- a/src/utils/exceptions/basic_exception.hpp
+++ b/src/utils/exceptions/basic_exception.hpp
@@ -1,7 +1,6 @@
 #pragma once
 
 #include <stdexcept>
-
 #include <fmt/format.h>
 
 #include "utils/auto_scope.hpp"
@@ -10,21 +9,23 @@
 class BasicException : public std::exception
 {
 public:
-    template <class... Args>
-    BasicException(Args&&... args) noexcept
-        : message(fmt::format(std::forward<Args>(args)...))
+    BasicException(const std::string& message) noexcept : message(message)
     {
 #ifndef NDEBUG
-        message += '\n';
+        this->message += '\n';
 
         Stacktrace stacktrace;
 
         for(auto& line : stacktrace)
-            message += fmt::format("  at {} ({})\n",
+            this->message += fmt::format("  at {} ({})\n",
                 line.function, line.location);
 #endif
     }
 
+    template <class... Args>
+    BasicException(const std::string& format, Args&&... args) noexcept
+        : BasicException(fmt::format(format, std::forward<Args>(args)...)) {}
+
     const char* what() const noexcept override
     {
         return message.c_str();
@@ -34,4 +35,3 @@ private:
     std::string message;
 };
 
-
diff --git a/src/utils/string/weak_string.hpp b/src/utils/string/weak_string.hpp
index 17189eff1..f3ebdc8a3 100644
--- a/src/utils/string/weak_string.hpp
+++ b/src/utils/string/weak_string.hpp
@@ -79,7 +79,6 @@ public:
         return !(lhs == rhs);
     }
 
-
 private:
     const char* str;
     size_t len;
diff --git a/src/utils/types/byte.hpp b/src/utils/types/byte.hpp
new file mode 100644
index 000000000..931bd4008
--- /dev/null
+++ b/src/utils/types/byte.hpp
@@ -0,0 +1,5 @@
+#pragma once
+
+#include <cstdint>
+
+using byte = uint8_t;
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 0c4b8f3f4..716d88dc9 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -16,21 +16,17 @@ message(STATUS "Available unit tests are: ${unit_test_names}")
 file(COPY ${CMAKE_SOURCE_DIR}/tests/data
      DESTINATION ${CMAKE_BINARY_DIR}/tests)
 
-set(chosen_test "concurrent_skiplist")
-
 # build unit tests
 foreach(test ${unit_test_names})
     set(test_name unit_${test})
-    if (${chosen_test} STREQUAL ${test_name})
-        add_executable(${test_name} unit/${test}.cpp)
-        # TODO: separate dependencies
-        target_link_libraries(${test_name} stdc++fs)
-        target_link_libraries(${test_name} cypher_lib)
-        target_link_libraries(${test_name} Threads::Threads)
-        target_link_libraries(${test_name} ${fmt_static_lib})
-        add_test(NAME ${test_name} COMMAND ${test_name})
-        set_property(TARGET ${test_name} PROPERTY CXX_STANDARD 14)
-    endif(${chosen_test} STREQUAL ${test_name})
+    add_executable(${test_name} unit/${test}.cpp)
+    # TODO: separate dependencies
+    target_link_libraries(${test_name} stdc++fs)
+    target_link_libraries(${test_name} cypher_lib)
+    target_link_libraries(${test_name} Threads::Threads)
+    target_link_libraries(${test_name} ${fmt_static_lib})
+    add_test(NAME ${test_name} COMMAND ${test_name})
+    set_property(TARGET ${test_name} PROPERTY CXX_STANDARD 14)
 endforeach()
 
 ## CONCURRENCY TESTS
@@ -45,10 +41,8 @@ message(STATUS "Available concurrency tests are: ${concurrency_test_names}")
 # build concurrency tests
 foreach(test ${concurrency_test_names})
     set(test_name concurrent_${test})
-    if (${chosen_test} STREQUAL ${test_name})
-        add_executable(${test_name} concurrent/${test}.cpp)
-        target_link_libraries(${test_name} Threads::Threads)
-        add_test(NAME ${test_name} COMMAND ${test_name})
-        set_property(TARGET ${test_name} PROPERTY CXX_STANDARD 14)
-    endif(${chosen_test} STREQUAL ${test_name})
+    add_executable(${test_name} concurrent/${test}.cpp)
+    target_link_libraries(${test_name} Threads::Threads)
+    add_test(NAME ${test_name} COMMAND ${test_name})
+    set_property(TARGET ${test_name} PROPERTY CXX_STANDARD 14)
 endforeach()
diff --git a/tests/unit/chunked_decoder.cpp b/tests/unit/chunked_decoder.cpp
new file mode 100644
index 000000000..45bcee3c2
--- /dev/null
+++ b/tests/unit/chunked_decoder.cpp
@@ -0,0 +1,62 @@
+#include <iostream>
+#include <deque>
+#include <cassert>
+#include <cstring>
+#include <array>
+#include <vector>
+
+#include "bolt/v1/transport/chunked_decoder.hpp"
+
+using byte = unsigned char;
+
+void print_hex(byte x)
+{
+    printf("%02X ", static_cast<byte>(x));
+}
+
+class DummyStream
+{
+public:
+    void write(const byte* values, size_t n)
+    {
+        data.insert(data.end(), values, values + n);
+    }
+
+    std::vector<byte> data;
+};
+
+using Decoder = bolt::ChunkedDecoder<DummyStream>;
+
+std::vector<byte> chunks[] = {
+    {0x00,0x08,'A',' ','q','u','i','c','k',' ',0x00,0x06,'b','r','o','w','n',' '},
+    {0x00,0x0A,'f','o','x',' ','j','u','m','p','s',' '},
+    {0x00,0x07,'o','v','e','r',' ','a',' '},
+    {0x00,0x08,'l','a','z','y',' ','d','o','g',0x00,0x00}
+};
+
+static constexpr size_t N = std::extent<decltype(chunks)>::value;
+
+std::string decoded = "A quick brown fox jumps over a lazy dog";
+
+int main(void)
+{
+    DummyStream stream;
+    Decoder decoder(stream);
+
+    for(size_t i = 0; i < N; ++i)
+    {
+        auto& chunk = chunks[i];
+        auto finished = decoder.decode(chunk.data(), chunk.size());
+
+        // break early if finished
+        if(finished)
+            break;
+    }
+
+    assert(decoded.size() == stream.data.size());
+
+    for(size_t i = 0; i < decoded.size(); ++i)
+        assert(decoded[i] == stream.data[i]);
+
+    return 0;
+}
diff --git a/tests/unit/chunked_encoder.cpp b/tests/unit/chunked_encoder.cpp
new file mode 100644
index 000000000..acd9d3442
--- /dev/null
+++ b/tests/unit/chunked_encoder.cpp
@@ -0,0 +1,115 @@
+#include <iostream>
+#include <deque>
+#include <cassert>
+#include <vector>
+
+#include "bolt/v1/transport/chunked_encoder.hpp"
+
+using byte = unsigned char;
+
+void print_hex(byte x)
+{
+    printf("%02X ", static_cast<byte>(x));
+}
+
+class DummyStream
+{
+public:
+    void write(const byte* values, size_t n)
+    {
+        num_calls++;
+        data.insert(data.end(), values, values + n);
+    }
+
+    byte pop()
+    {
+        auto c = data.front();
+        data.pop_front();
+        return c;
+    }
+
+    size_t pop_size()
+    {
+        return ((size_t)pop() << 8) | pop();
+    }
+
+    void print()
+    {
+        for(size_t i = 0; i < data.size(); ++i)
+            print_hex(data[i]);
+    }
+
+    std::deque<byte> data;
+    size_t num_calls {0};
+};
+
+using Encoder = bolt::ChunkedEncoder<DummyStream>;
+
+void write_ff(Encoder& encoder, size_t n)
+{
+    std::vector<byte> v;
+
+    for(size_t i = 0; i < n; ++i)
+        v.push_back('\xFF');
+
+    encoder.write(v.data(), v.size());
+}
+
+void check_ff(DummyStream& stream, size_t n)
+{
+    for(size_t i = 0; i < n; ++i)
+        assert(stream.pop() == byte('\xFF'));
+
+    (void)stream;
+}
+
+int main(void)
+{
+    DummyStream stream;
+    bolt::ChunkedEncoder<DummyStream> encoder(stream);
+
+    write_ff(encoder, 10);
+    write_ff(encoder, 10);
+    encoder.finish();
+
+    write_ff(encoder, 10);
+    write_ff(encoder, 10);
+    encoder.finish();
+
+    // this should be two chunks, one of size 65533 and the other of size 1467
+    write_ff(encoder, 67000);
+    encoder.finish();
+
+    for(int i = 0; i < 10000; ++i)
+        write_ff(encoder, 1500);
+    encoder.finish();
+
+    assert(stream.pop_size() == 20);
+    check_ff(stream, 20);
+    assert(stream.pop_size() == 0);
+
+    assert(stream.pop_size() == 20);
+    check_ff(stream, 20);
+    assert(stream.pop_size() == 0);
+
+    assert(stream.pop_size() == encoder.chunk_size);
+    check_ff(stream, encoder.chunk_size);
+    assert(stream.pop_size() == 1467);
+    check_ff(stream, 1467);
+    assert(stream.pop_size() == 0);
+
+    size_t k = 10000 * 1500;
+
+    while(k > 0)
+    {
+        auto size = k > encoder.chunk_size ? encoder.chunk_size : k;
+        assert(stream.pop_size() == size);
+        check_ff(stream, size);
+
+        k -= size;
+    }
+
+    assert(stream.pop_size() == 0);
+
+    return 0;
+}