From 807912c151571f1aadbf94db65b8e7f6a49f924f Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Dominik=20Tomic=CC=8Cevic=CC=81?=
 <dominik.tomicevic@gmail.com>
Date: Tue, 27 Oct 2015 20:21:28 +0100
Subject: [PATCH] cypher + new tcp server

---
 cypher/codegen/code.hpp                      |  14 ++
 cypher/codegen/cppgen.hpp                    |  30 +++
 cypher/debug/tree_print.hpp                  | 103 +++------
 cypher/parser.cpp                            |   2 +-
 cypher/visitor/traverser.hpp                 | 198 ++++++++++++++++++
 data_structures/queue/bounded_spsc_queue.hpp |  87 ++++++++
 debug/log.hpp                                |  18 +-
 io/network/addrinfo.hpp                      |  46 +++++
 io/network/epoll.hpp                         |  39 ++++
 io/network/listener.hpp                      |  83 ++++++++
 io/network/network_error.hpp                 |  10 +
 io/network/socket.hpp                        | 123 +++++++++++
 io/network/tcp_stream.hpp                    |  29 +++
 io/network/test.cpp                          | 207 +++++++++++++++++++
 io/network/worker.hpp                        |  92 +++++++++
 utils/counters/ring_counter.hpp              |  28 +++
 utils/likely.hpp                             |  10 +
 17 files changed, 1039 insertions(+), 80 deletions(-)
 create mode 100644 cypher/codegen/code.hpp
 create mode 100644 cypher/codegen/cppgen.hpp
 create mode 100644 cypher/visitor/traverser.hpp
 create mode 100644 data_structures/queue/bounded_spsc_queue.hpp
 create mode 100644 io/network/addrinfo.hpp
 create mode 100644 io/network/epoll.hpp
 create mode 100644 io/network/listener.hpp
 create mode 100644 io/network/network_error.hpp
 create mode 100644 io/network/socket.hpp
 create mode 100644 io/network/tcp_stream.hpp
 create mode 100644 io/network/test.cpp
 create mode 100644 io/network/worker.hpp
 create mode 100644 utils/counters/ring_counter.hpp
 create mode 100644 utils/likely.hpp

diff --git a/cypher/codegen/code.hpp b/cypher/codegen/code.hpp
new file mode 100644
index 000000000..02973df01
--- /dev/null
+++ b/cypher/codegen/code.hpp
@@ -0,0 +1,14 @@
+#pragma once
+
+#include <string>
+#include <vector>
+
+class Code
+{
+public:
+    
+
+
+private:
+    std::vector<std::string> buffer;
+};
diff --git a/cypher/codegen/cppgen.hpp b/cypher/codegen/cppgen.hpp
new file mode 100644
index 000000000..e1e278fd7
--- /dev/null
+++ b/cypher/codegen/cppgen.hpp
@@ -0,0 +1,30 @@
+#ifndef MEMGRAPH_CYPHER_CODEGEN_CPPGEN_HPP
+#define MEMGRAPH_CYPHER_CODEGEN_CPPGEN_HPP
+
+#include "cypher/visitor/traverser.hpp"
+
+class CppGen : public Traverser
+{
+    struct CreateGen : public Traverser
+    {
+        void visit(ast::Pattern& pattern) override
+        {
+            
+        }
+
+        void visit(ast
+    };
+
+public:
+
+    void visit(ast::Start& start) override
+    {
+
+    }
+
+    void visit(ast::Create create) override
+    {
+    };
+};
+
+#endif
diff --git a/cypher/debug/tree_print.hpp b/cypher/debug/tree_print.hpp
index e6e8b7de8..4a095aa8a 100644
--- a/cypher/debug/tree_print.hpp
+++ b/cypher/debug/tree_print.hpp
@@ -4,11 +4,11 @@
 #include <iostream>
 #include <stack>
 
-#include "cypher/ast/ast_visitor.hpp"
-#include "cypher/ast/ast.hpp"
+#include "cypher/visitor/traverser.hpp"
 
-class PrintVisitor : public ast::AstVisitor
+class PrintVisitor : public Traverser
 {
+public:
     class Printer
     {
     public:
@@ -80,45 +80,37 @@ class PrintVisitor : public ast::AstVisitor
         size_t level = 0;
     };
 
-public:
     PrintVisitor(std::ostream& stream)
         : printer(stream, "Printing AST") {}
 
     void visit(ast::Start& start) override
     {
         auto entry = printer.advance("Start");
-        accept(start.read_query);
-        accept(start.write_query);
+        Traverser::visit(start);
     }
     
     void visit(ast::ReadQuery& read_query) override
     {
         auto entry = printer.advance("Read Query");
-        accept(read_query.match);
-        accept(read_query.return_clause);
+        Traverser::visit(read_query);
     }
 
     void visit(ast::Match& match) override
     {
         auto entry = printer.advance("Match");
-        accept(match.pattern);
-        accept(match.where);
+        Traverser::visit(match);
     }
 
     void visit(ast::Pattern& pattern) override
     {
         auto entry = printer.advance("Pattern");
-        accept(pattern.node);
-        accept(pattern.relationship);
-        accept(pattern.next);
+        Traverser::visit(pattern);
     }
 
     void visit(ast::Node& node) override
     {
         auto entry = printer.advance("Node");
-        accept(node.idn);
-        accept(node.labels);
-        accept(node.props);
+        Traverser::visit(node);
     }
 
     void visit(ast::Identifier& idn) override
@@ -130,14 +122,13 @@ public:
     void visit(ast::Return& return_clause) override
     {
         auto entry = printer.advance("Return");
-        accept(return_clause.return_list);
+        Traverser::visit(return_clause);
     }
 
     void visit(ast::Accessor& accessor) override
     {
         auto entry = printer.advance("Accessor");
-        accept(accessor.entity);
-        accept(accessor.prop);
+        Traverser::visit(accessor);
     }
 
     void visit(ast::Boolean& boolean) override
@@ -167,172 +158,144 @@ public:
     void visit(ast::Property& property) override
     {
         auto entry = printer.advance("Property");
-        accept(property.idn);
-        accept(property.value);
+        Traverser::visit(property);
     }
 
     void visit(ast::And& and_expr) override
     {
         auto entry = printer.advance("And");
-        accept(and_expr.left);
-        accept(and_expr.right);
+        Traverser::visit(and_expr);
     }
 
     void visit(ast::Or& or_expr) override
     {
         auto entry = printer.advance("Or");
-        accept(or_expr.left);
-        accept(or_expr.right);
+        Traverser::visit(or_expr);
     }
 
     void visit(ast::Lt& lt_expr) override
     {
         auto entry = printer.advance("Less Than");
-        accept(lt_expr.left);
-        accept(lt_expr.right);
+        Traverser::visit(lt_expr);
     }
 
     void visit(ast::Gt& gt_expr) override
     {
         auto entry = printer.advance("Greater Than");
-        accept(gt_expr.left);
-        accept(gt_expr.right);
+        Traverser::visit(gt_expr);
     }
 
     void visit(ast::Ge& ge_expr) override
     {
         auto entry = printer.advance("Greater od Equal");
-        accept(ge_expr.left);
-        accept(ge_expr.right);
+        Traverser::visit(ge_expr);
     }
 
     void visit(ast::Le& le_expr) override
     {
         auto entry = printer.advance("Less or Equal");
-        accept(le_expr.left);
-        accept(le_expr.right);
+        Traverser::visit(le_expr);
     }
 
     void visit(ast::Eq& eq_expr) override
     {
         auto entry = printer.advance("Equal");
-        accept(eq_expr.left);
-        accept(eq_expr.right);
+        Traverser::visit(eq_expr);
     }
 
     void visit(ast::Ne& ne_expr) override
     {
         auto entry = printer.advance("Not Equal");
-        accept(ne_expr.left);
-        accept(ne_expr.right);
+        Traverser::visit(ne_expr);
     }
 
     void visit(ast::Plus& plus) override
     {
         auto entry = printer.advance("Plus");
-        accept(plus.left);
-        accept(plus.right);
+        Traverser::visit(plus);
     }
 
     void visit(ast::Minus& minus) override
     {
         auto entry = printer.advance("Minus");
-        accept(minus.left);
-        accept(minus.right);
+        Traverser::visit(minus);
     }
 
     void visit(ast::Star& star) override
     {
         auto entry = printer.advance("Star");
-        accept(star.left);
-        accept(star.right);
+        Traverser::visit(star);
     }
 
     void visit(ast::Slash& slash) override
     {
         auto entry = printer.advance("Slash");
-        accept(slash.left);
-        accept(slash.right);
+        Traverser::visit(slash);
     }
 
     void visit(ast::Rem& rem) override
     {
         auto entry = printer.advance("Rem (%)");
-        accept(rem.left);
-        accept(rem.right);
+        Traverser::visit(rem);
     }
 
     void visit(ast::PropertyList& prop_list) override
     {
         auto entry = printer.advance("Property List");
-        accept(prop_list.value);
-        accept(prop_list.next);
+        Traverser::visit(prop_list);
     }
 
     void visit(ast::RelationshipList& rel_list) override
     {
         auto entry = printer.advance("Relationship List");
-        accept(rel_list.value);
-        accept(rel_list.next);
+        Traverser::visit(rel_list);
     }
 
     void visit(ast::Relationship& rel) override
     {
         auto entry = printer.advance("Relationship");
         entry << " direction: " << rel.direction;
-        accept(rel.specs);
+        Traverser::visit(rel);
     }
 
     void visit(ast::RelationshipSpecs& rel_specs) override
     {
         auto entry = printer.advance("Relationship Specs");
-        accept(rel_specs.idn);
-        accept(rel_specs.types);
-        accept(rel_specs.props);
+        Traverser::visit(rel_specs);
     }
 
     void visit(ast::LabelList& labels) override
     {
         auto entry = printer.advance("Label List");
-        accept(labels.value);
-        accept(labels.next);
+        Traverser::visit(labels);
     }
 
     void visit(ast::ReturnList& return_list) override
     {
         auto entry = printer.advance("Return List");
-        accept(return_list.value);
-        accept(return_list.next);
+        Traverser::visit(return_list);
     }
 
     void visit(ast::Where& where) override
     {
         auto entry = printer.advance("Where");
-        accept(where.expr);
+        Traverser::visit(where);
     }
 
     void visit(ast::WriteQuery& write_query) override
     {
         auto entry = printer.advance("Write Query");
-        accept(write_query.create);
-        accept(write_query.return_clause);
+        Traverser::visit(write_query);
     }
 
     void visit(ast::Create& create) override
     {
         auto entry = printer.advance("Create");
-        accept(create.pattern);
+        Traverser::visit(create);
     }
 
 private:
     Printer printer;
-
-    template<class T>
-    void accept(T* node)
-    {
-        if(node != nullptr)
-            node->accept(*this);
-    }
 };
 
 #endif
diff --git a/cypher/parser.cpp b/cypher/parser.cpp
index 3a07a354d..e0a1192c9 100644
--- a/cypher/parser.cpp
+++ b/cypher/parser.cpp
@@ -10,7 +10,7 @@ int main()
 
     //std::string input("MATCH (user:User { name: 'Dominik', age: 24})-[has:HAS]->(item:Item) WHERE item.name = 'XPS 13' AND item.price = 11999.99 RETURN user, has, item");
 
-    std::string input("create n return n");
+    std::string input("create (n { name: 'Dominik', age: 24 }) return n");
 
     compiler.compile(input);
 
diff --git a/cypher/visitor/traverser.hpp b/cypher/visitor/traverser.hpp
new file mode 100644
index 000000000..9d080f5ef
--- /dev/null
+++ b/cypher/visitor/traverser.hpp
@@ -0,0 +1,198 @@
+#ifndef MEMGRAPH_CYPHER_VISITOR_TRAVERSER_HPP
+#define MEMGRAPH_CYPHER_VISITOR_TRAVERSER_HPP
+
+#include "cypher/ast/ast_visitor.hpp"
+#include "cypher/ast/ast.hpp"
+
+class Traverser : public ast::AstVisitor
+{
+public:
+
+    void visit(ast::Start& start) override
+    {
+        accept(start.read_query);
+        accept(start.write_query);
+    }
+    
+    void visit(ast::ReadQuery& read_query) override
+    {
+        accept(read_query.match);
+        accept(read_query.return_clause);
+    }
+
+    void visit(ast::Match& match) override
+    {
+        accept(match.pattern);
+        accept(match.where);
+    }
+
+    void visit(ast::Pattern& pattern) override
+    {
+        accept(pattern.node);
+        accept(pattern.relationship);
+        accept(pattern.next);
+    }
+
+    void visit(ast::Node& node) override
+    {
+        accept(node.idn);
+        accept(node.labels);
+        accept(node.props);
+    }
+
+    void visit(ast::Return& return_clause) override
+    {
+        accept(return_clause.return_list);
+    }
+
+    void visit(ast::Accessor& accessor) override
+    {
+        accept(accessor.entity);
+        accept(accessor.prop);
+    }
+    void visit(ast::Property& property) override
+    {
+        accept(property.idn);
+        accept(property.value);
+    }
+
+    void visit(ast::And& and_expr) override
+    {
+        accept(and_expr.left);
+        accept(and_expr.right);
+    }
+
+    void visit(ast::Or& or_expr) override
+    {
+        accept(or_expr.left);
+        accept(or_expr.right);
+    }
+
+    void visit(ast::Lt& lt_expr) override
+    {
+        accept(lt_expr.left);
+        accept(lt_expr.right);
+    }
+
+    void visit(ast::Gt& gt_expr) override
+    {
+        accept(gt_expr.left);
+        accept(gt_expr.right);
+    }
+
+    void visit(ast::Ge& ge_expr) override
+    {
+        accept(ge_expr.left);
+        accept(ge_expr.right);
+    }
+
+    void visit(ast::Le& le_expr) override
+    {
+        accept(le_expr.left);
+        accept(le_expr.right);
+    }
+
+    void visit(ast::Eq& eq_expr) override
+    {
+        accept(eq_expr.left);
+        accept(eq_expr.right);
+    }
+
+    void visit(ast::Ne& ne_expr) override
+    {
+        accept(ne_expr.left);
+        accept(ne_expr.right);
+    }
+
+    void visit(ast::Plus& plus) override
+    {
+        accept(plus.left);
+        accept(plus.right);
+    }
+
+    void visit(ast::Minus& minus) override
+    {
+        accept(minus.left);
+        accept(minus.right);
+    }
+
+    void visit(ast::Star& star) override
+    {
+        accept(star.left);
+        accept(star.right);
+    }
+
+    void visit(ast::Slash& slash) override
+    {
+        accept(slash.left);
+        accept(slash.right);
+    }
+
+    void visit(ast::Rem& rem) override
+    {
+        accept(rem.left);
+        accept(rem.right);
+    }
+
+    void visit(ast::PropertyList& prop_list) override
+    {
+        accept(prop_list.value);
+        accept(prop_list.next);
+    }
+
+    void visit(ast::RelationshipList& rel_list) override
+    {
+        accept(rel_list.value);
+        accept(rel_list.next);
+    }
+
+    void visit(ast::Relationship& rel) override
+    {
+        accept(rel.specs);
+    }
+
+    void visit(ast::RelationshipSpecs& rel_specs) override
+    {
+        accept(rel_specs.idn);
+        accept(rel_specs.types);
+        accept(rel_specs.props);
+    }
+
+    void visit(ast::LabelList& labels) override
+    {
+        accept(labels.value);
+        accept(labels.next);
+    }
+
+    void visit(ast::ReturnList& return_list) override
+    {
+        accept(return_list.value);
+        accept(return_list.next);
+    }
+
+    void visit(ast::Where& where) override
+    {
+        accept(where.expr);
+    }
+
+    void visit(ast::WriteQuery& write_query) override
+    {
+        accept(write_query.create);
+        accept(write_query.return_clause);
+    }
+
+    void visit(ast::Create& create) override
+    {
+        accept(create.pattern);
+    }
+
+protected:
+    template<class T>
+    void accept(T* node)
+    {
+        if(node != nullptr)
+            node->accept(*this);
+    }
+};
+
+#endif
diff --git a/data_structures/queue/bounded_spsc_queue.hpp b/data_structures/queue/bounded_spsc_queue.hpp
new file mode 100644
index 000000000..8f2ed93c1
--- /dev/null
+++ b/data_structures/queue/bounded_spsc_queue.hpp
@@ -0,0 +1,87 @@
+#pragma once
+
+#include <atomic>
+#include <memory>
+
+namespace lockfree
+{
+
+template <class T, size_t N>
+class BoundedSpscQueue
+{
+public:
+    static constexpr size_t size = N;
+
+    BoundedSpscQueue() = default;
+
+    BoundedSpscQueue(const BoundedSpscQueue&) = delete;
+    BoundedSpscQueue(BoundedSpscQueue&&) = delete;
+
+    BoundedSpscQueue& operator=(const BoundedSpscQueue&) = delete;
+
+    bool push(const T& item)
+    {
+        // load the current tail
+        // [] [] [1] [2] [3] [4] [5] [$] []
+        //        H                   T
+        auto t = tail.load(std::memory_order_relaxed);
+
+        // what will next tail be after we push
+        // [] [] [1] [2] [3] [4] [5] [$] [ ]
+        //        H                   T   T' 
+        auto next = increment(t);
+
+        // check if queue is full and do nothing if it is
+        // [3] [4] [5] [6] [7] [8] [$] [ 1 ] [2]
+        //                          T   T'H
+        if(next == head.load(std::memory_order_acquire))
+            return false;
+
+        // insert the item into the empty spot
+        // [] [] [1] [2] [3] [4] [5] [ ] []
+        //        H               T   T' 
+        items[t] = item;
+
+        // release the tail to the consumer (serialization point)
+        // [] [] [1] [2] [3] [4] [5] [ $ ] []
+        //        H                   T T' 
+        tail.store(next, std::memory_order_release);
+
+        return true;
+    }
+
+    bool pop(T& item)
+    {
+        // [] [] [1] [2] [3] [4] [5] [$] []
+        //        H                   T
+        auto h = head.load(std::memory_order_relaxed);
+
+        // [] [] [] [] [ $ ] [] [] [] []
+        //              H T        
+        if(h == tail.load(std::memory_order_acquire))
+            return false;
+
+        // move an item from the queue
+        item = std::move(items[h]);
+
+        // serialization point wrt producer
+        // [] [] [] [2] [3] [4] [5] [$] []
+        //           H               T
+        head.store(increment(h), std::memory_order_release);
+
+        return true;
+    }
+
+private:
+    static constexpr size_t capacity = N + 1;
+
+    std::array<T, capacity> items;
+    std::atomic<size_t> head {0}, tail {0};
+
+    size_t increment(size_t idx) const
+    {
+        return (idx + 1) % capacity;
+    }
+};
+
+}
diff --git a/debug/log.hpp b/debug/log.hpp
index 5cce6de2c..75fc091bb 100644
--- a/debug/log.hpp
+++ b/debug/log.hpp
@@ -35,15 +35,15 @@ private:
 
             return stream << bash_color::green
                 << "[" << to_string(message.level) << "] " << bash_color::end
-                << message.text << std::endl
-                << bash_color::yellow << "    on " << bash_color::end
-                << time_string.substr(0, time_string.size() - 1)
-                << bash_color::yellow << " in file " << bash_color::end
-                << message.file
-                << bash_color::yellow << " in function " << bash_color::end
-                << message.function
-                << bash_color::yellow << " at line " << bash_color::end
-                << message.line;
+                << message.text;
+                /* << bash_color::yellow << "    on " << bash_color::end */
+                /* << time_string.substr(0, time_string.size() - 1) */
+                /* << bash_color::yellow << " in file " << bash_color::end */
+                /* << message.file */
+                /* << bash_color::yellow << " in function " << bash_color::end */
+                /* << message.function */
+                /* << bash_color::yellow << " at line " << bash_color::end */
+                /* << message.line; */
         }
     };
 
diff --git a/io/network/addrinfo.hpp b/io/network/addrinfo.hpp
new file mode 100644
index 000000000..242796e1f
--- /dev/null
+++ b/io/network/addrinfo.hpp
@@ -0,0 +1,46 @@
+#pragma once
+
+#include <cstring>
+#include <netdb.h>
+
+#include "network_error.hpp"
+#include "utils/underlying_cast.hpp"
+
+namespace io
+{
+
+class AddrInfo
+{
+    AddrInfo(struct addrinfo* info) : info(info) {}
+
+public:
+    ~AddrInfo()
+    {
+        freeaddrinfo(info);
+    }
+
+    static AddrInfo get(const char* port)
+    {
+        struct addrinfo hints;
+        memset(&hints, 0, sizeof(struct addrinfo));
+
+        hints.ai_family = AF_UNSPEC;     // IPv4 and IPv6
+        hints.ai_socktype = SOCK_STREAM; // TCP socket
+        hints.ai_flags = AI_PASSIVE;
+
+        struct addrinfo* result;
+        auto status = getaddrinfo(nullptr, port, &hints, &result);
+
+        if(status != 0)
+            throw NetworkError(gai_strerror(status));
+
+        return AddrInfo(result);
+    }
+
+    operator struct addrinfo*() { return info; }
+
+private:
+    struct addrinfo* info;
+};
+
+}
diff --git a/io/network/epoll.hpp b/io/network/epoll.hpp
new file mode 100644
index 000000000..d83aeaae5
--- /dev/null
+++ b/io/network/epoll.hpp
@@ -0,0 +1,39 @@
+#pragma once
+
+#include <malloc.h>
+#include <sys/epoll.h>
+
+#include "socket.hpp"
+#include "utils/likely.hpp"
+
+namespace io
+{
+
+class Epoll
+{
+public:
+    using Event = struct epoll_event;
+
+    Epoll(int flags)
+    {
+        epoll_fd = epoll_create1(flags);
+    }
+
+    void add(Socket& socket, Event* event)
+    {
+        auto status = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, socket, event);
+
+        if(UNLIKELY(status))
+            throw NetworkError("Can't add connection to epoll listener.");
+    }
+
+    int wait(Event* events, int max_events, int timeout)
+    {
+        return epoll_wait(epoll_fd, events, max_events, timeout);
+    }
+
+private:
+    int epoll_fd;
+};
+
+}
diff --git a/io/network/listener.hpp b/io/network/listener.hpp
new file mode 100644
index 000000000..a41c990a4
--- /dev/null
+++ b/io/network/listener.hpp
@@ -0,0 +1,83 @@
+#pragma once
+
+#include <thread>
+#include <atomic>
+
+#include "socket.hpp"
+#include "epoll.hpp"
+#include "tcp_stream.hpp"
+
+#include "utils/crtp.hpp"
+
+namespace io
+{
+
+template <class Derived>
+class Listener : public Crtp<Derived>
+{
+public:
+    Listener() : listener(0)
+    {
+        thread = std::thread([this]() { loop(); });
+    }
+
+    ~Listener()
+    {
+        alive.store(false, std::memory_order_release);
+        thread.join();
+    }
+
+    void add(Socket& socket, Epoll::Event& event)
+    {
+        listener.add(socket, &event);
+    }
+
+private:
+    void loop()
+    {
+        constexpr size_t MAX_EVENTS = 64;
+
+        Epoll::Event events[MAX_EVENTS];
+
+        while(alive.load(std::memory_order_acquire))
+        {
+            auto n = listener.wait(events, MAX_EVENTS, -1);
+
+            for(int i = 0; i < n; ++i)
+            {
+                auto& event = events[i];
+                auto stream = reinterpret_cast<TcpStream*>(event.data.ptr);
+
+                if(!(event.events & EPOLLIN))
+                {
+                    LOG_DEBUG("error !EPOLLIN");
+                    this->derived().on_error(stream);
+                    continue;
+                }
+
+                if(event.events & EPOLLHUP)
+                {
+                    LOG_DEBUG("error EPOLLHUP");
+                    this->derived().on_error(stream);
+                    continue;
+                }
+
+                if(event.events & EPOLLERR)
+                {
+                    LOG_DEBUG("error EPOLLERR");
+                    this->derived().on_error(stream);
+                    continue;
+                }
+
+                this->derived().on_read(stream);
+            }
+        }
+    }
+
+    std::atomic<bool> alive {true};
+    std::thread thread;
+
+    Epoll listener;
+};
+
+}
diff --git a/io/network/network_error.hpp b/io/network/network_error.hpp
new file mode 100644
index 000000000..7384dcd83
--- /dev/null
+++ b/io/network/network_error.hpp
@@ -0,0 +1,10 @@
+#pragma once
+
+#include <stdexcept>
+
+class NetworkError : public std::runtime_error
+{
+public:
+    using std::runtime_error::runtime_error;
+};
+
diff --git a/io/network/socket.hpp b/io/network/socket.hpp
new file mode 100644
index 000000000..ea5bc14fa
--- /dev/null
+++ b/io/network/socket.hpp
@@ -0,0 +1,123 @@
+#pragma once
+
+#include <stdexcept>
+#include <cstring>
+#include <cstdio>
+#include <cassert>
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/epoll.h>
+#include <errno.h>
+
+#include "addrinfo.hpp"
+#include "utils/likely.hpp"
+
+namespace io
+{
+
+class Socket
+{
+    Socket(int socket) : socket(socket) {}
+
+    Socket(int family, int socket_type, int protocol)
+    {
+        socket = ::socket(family, socket_type, protocol);
+    }
+
+public:
+    Socket(const Socket&) = delete;
+    
+    Socket(Socket&& other)
+    {
+        this->socket = other.socket;
+        other.socket = -1;
+    }
+
+    ~Socket()
+    {
+        if(socket == -1)
+            return;
+        
+        LOG_DEBUG("CLosing Socket " << socket);
+        close(socket);
+    }
+
+    bool is_open()
+    {
+        return socket != -1;
+    }
+
+    static Socket create(const char* port)
+    {
+        auto info = AddrInfo::get(port);
+
+        for(struct addrinfo* it = info; it != nullptr; it = it->ai_next)
+        {
+            LOG_DEBUG("Trying socket...");
+
+            auto s = Socket(it->ai_family, it->ai_socktype, it->ai_protocol);
+
+            if(!s.is_open())
+                continue;
+
+            /* int on = 1; */
+            /* if(setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))) */
+            /*     continue; */
+
+            if(s.bind(it->ai_addr, it->ai_addrlen))
+                return std::move(s);
+        }
+
+        throw NetworkError("Unable to bind to socket");
+    }
+
+    bool bind(struct sockaddr* addr, socklen_t len)
+    {
+        assert(socket != -1);
+        return ::bind(socket, addr, len) == 0;
+    }
+
+    void set_non_blocking()
+    {
+        auto flags = fcntl(socket, F_GETFL, 0);
+
+        if(UNLIKELY(flags == -1))
+            throw NetworkError("Cannot read flags from socket");
+
+        flags |= O_NONBLOCK;
+
+        auto status = fcntl(socket, F_SETFL, flags);
+
+        if(UNLIKELY(status == -1))
+            throw NetworkError("Cannot set NON_BLOCK flag to socket");
+    }
+
+    void listen(int backlog)
+    {
+        auto status = ::listen(socket, backlog);
+
+        if(UNLIKELY(status == -1))
+            throw NetworkError("Cannot listen on socket");
+    }
+
+    Socket accept(struct sockaddr* addr, socklen_t* len)
+    {
+        return Socket(::accept(socket, addr, len));
+    }
+
+    operator int() { return socket; }
+
+    int id() const
+    {
+        return socket;
+    }
+
+private:
+    int socket;
+};
+
+}
diff --git a/io/network/tcp_stream.hpp b/io/network/tcp_stream.hpp
new file mode 100644
index 000000000..2bd60e841
--- /dev/null
+++ b/io/network/tcp_stream.hpp
@@ -0,0 +1,29 @@
+#pragma once
+
+#include "epoll.hpp"
+#include "socket.hpp"
+
+namespace io
+{
+
+class TcpStream
+{
+public:
+    TcpStream(Socket&& socket, uint32_t events)
+        : socket(std::move(socket))
+    {
+        event.events = events;
+        event.data.ptr = this;
+    }
+
+    void close()
+    {
+        LOG_DEBUG("CLOSE");
+        delete reinterpret_cast<TcpStream*>(event.data.ptr);
+    }
+
+    Socket socket;
+    Epoll::Event event;
+};
+
+}
diff --git a/io/network/test.cpp b/io/network/test.cpp
new file mode 100644
index 000000000..82342dff6
--- /dev/null
+++ b/io/network/test.cpp
@@ -0,0 +1,207 @@
+#include <iostream>
+#include <array>
+
+#include "debug/log.hpp"
+
+#include "socket.hpp"
+#include "worker.hpp"
+
+#define MAXEVENTS 64
+
+static int
+make_socket_non_blocking (int sfd)
+{
+  int flags, s;
+
+  flags = fcntl (sfd, F_GETFL, 0);
+  if (flags == -1)
+    {
+      perror ("fcntl");
+      return -1;
+    }
+
+  flags |= O_NONBLOCK;
+  s = fcntl (sfd, F_SETFL, flags);
+  if (s == -1)
+    {
+      perror ("fcntl");
+      return -1;
+    }
+
+  return 0;
+}
+
+int main(void)
+{
+    std::array<io::Worker, 8> workers;
+    int idx = 0;
+
+    auto socket = io::Socket::create("7474");
+    socket.set_non_blocking();
+    socket.listen(1024);
+
+  int efd, s;
+  struct epoll_event event;
+  struct epoll_event *events;
+
+  efd = epoll_create1 (0);
+  if (efd == -1)
+    {
+      perror ("epoll_create");
+      abort ();
+    }
+
+  event.data.fd = socket;
+  event.events = EPOLLIN | EPOLLET;
+  s = epoll_ctl (efd, EPOLL_CTL_ADD, socket, &event);
+  if (s == -1)
+    {
+      perror ("epoll_ctl");
+      abort ();
+    }
+
+  /* Buffer where events are returned */
+  events = static_cast<struct epoll_event*>(calloc (MAXEVENTS, sizeof event));
+
+  /* The event loop */
+  while (1)
+    {
+      int n, i;
+
+      n = epoll_wait (efd, events, MAXEVENTS, -1);
+      for (i = 0; i < n; i++)
+	{
+	  if ((events[i].events & EPOLLERR) ||
+              (events[i].events & EPOLLHUP) ||
+              (!(events[i].events & EPOLLIN)))
+	    {
+              /* An error has occured on this fd, or the socket is not
+                 ready for reading (why were we notified then?) */
+	      fprintf (stderr, "epoll error\n");
+	      close (events[i].data.fd);
+	      continue;
+	    }
+
+	  else if (socket == events[i].data.fd)
+	    {
+              /* We have a notification on the listening socket, which
+                 means one or more incoming connections. */
+              while (true)
+              {
+                  idx = (idx + 1) % workers.size();
+
+                  auto& worker = workers[i];
+
+                  if(!worker.accept(socket))
+                      break;
+              }
+                    
+                  /* struct sockaddr in_addr; */
+                  /* socklen_t in_len; */
+                  /* int infd; */
+                  /* char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; */
+
+                  /* in_len = sizeof in_addr; */
+                  /* infd = accept (socket, &in_addr, &in_len); */
+                  /* if (infd == -1) */
+                  /*   { */
+                  /*     if ((errno == EAGAIN) || */
+                  /*         (errno == EWOULDBLOCK)) */
+                  /*       { */
+                  /*         /1* We have processed all incoming */
+                  /*            connections. *1/ */
+                  /*         break; */
+                  /*       } */
+                  /*     else */
+                  /*       { */
+                  /*         perror ("accept"); */
+                  /*         break; */
+                  /*       } */
+                  /*   } */
+
+                  /* s = getnameinfo (&in_addr, in_len, */
+                  /*                  hbuf, sizeof hbuf, */
+                  /*                  sbuf, sizeof sbuf, */
+                  /*                  NI_NUMERICHOST | NI_NUMERICSERV); */
+                  /* if (s == 0) */
+                  /*   { */
+                  /*     printf("Accepted connection on descriptor %d " */
+                  /*            "(host=%s, port=%s)\n", infd, hbuf, sbuf); */
+                  /*   } */
+
+                  /* /1* Make the incoming socket non-blocking and add it to the */
+                  /*    list of fds to monitor. *1/ */
+                  /* s = make_socket_non_blocking (infd); */
+                  /* if (s == -1) */
+                  /*   abort (); */
+
+                  /* event.data.fd = infd; */
+                  /* event.events = EPOLLIN | EPOLLET; */
+                  /* s = epoll_ctl (efd, EPOLL_CTL_ADD, infd, &event); */
+                  /* if (s == -1) */
+                  /*   { */
+                  /*     perror ("epoll_ctl"); */
+                  /*     abort (); */
+                  /*   } */
+            }
+          /* else */
+          /*   { */
+          /*     /1* We have data on the fd waiting to be read. Read and */
+          /*        display it. We must read whatever data is available */
+          /*        completely, as we are running in edge-triggered mode */
+          /*        and won't get a notification again for the same */
+          /*        data. *1/ */
+          /*     int done = 0; */
+
+          /*     while (1) */
+          /*       { */
+          /*         ssize_t count; */
+          /*         char buf[512]; */
+
+          /*         count = read (events[i].data.fd, buf, sizeof buf); */
+          /*         if (count == -1) */
+          /*           { */
+          /*             /1* If errno == EAGAIN, that means we have read all */
+          /*                data. So go back to the main loop. *1/ */
+          /*             if (errno != EAGAIN) */
+          /*               { */
+          /*                 perror ("read"); */
+          /*                 done = 1; */
+          /*               } */
+          /*             break; */
+          /*           } */
+          /*         else if (count == 0) */
+          /*           { */
+          /*             /1* End of file. The remote has closed the */
+          /*                connection. *1/ */
+          /*             done = 1; */
+          /*             break; */
+          /*           } */
+
+          /*         /1* Write the buffer to standard output *1/ */
+          /*         s = write (1, buf, count); */
+          /*         if (s == -1) */
+          /*           { */
+          /*             perror ("write"); */
+          /*             abort (); */
+          /*           } */
+          /*       } */
+
+          /*     if (done) */
+          /*       { */
+          /*         printf ("Closed connection on descriptor %d\n", */
+          /*                 events[i].data.fd); */
+
+          /*         /1* Closing the descriptor will make epoll remove it */
+          /*            from the set of descriptors which are monitored. *1/ */
+          /*         close (events[i].data.fd); */
+          /*       } */
+          /*   } */
+        }
+    }
+
+  free (events);
+
+
+    return 0;
+}
diff --git a/io/network/worker.hpp b/io/network/worker.hpp
new file mode 100644
index 000000000..fb238c947
--- /dev/null
+++ b/io/network/worker.hpp
@@ -0,0 +1,92 @@
+#pragma once
+
+#include "listener.hpp"
+#include "tcp_stream.hpp"
+
+namespace io
+{
+  const char* response = "HTTP/1.1 200 OK\r\nContent-Length:0\r\nConnection:Keep-Alive\r\n\r\n";
+
+  size_t len = strlen(response);
+
+class Worker : public Listener<Worker>
+{
+    char buf[512];
+
+public:
+    Worker() = default;
+
+    bool accept(Socket& socket)
+    {
+        auto s = socket.accept(nullptr, nullptr);
+
+        if(!s.is_open())
+            return false;
+
+        s.set_non_blocking();
+    
+        auto stream = new TcpStream(std::move(s), EPOLLIN | EPOLLET);
+                
+        this->add(stream->socket, stream->event);
+    
+        LOG_DEBUG("Listening to TCP stream at" << stream->socket.id())
+        return true;
+    }
+
+    void on_error(TcpStream* stream)
+    {
+        delete stream;
+    }
+
+    void on_read(TcpStream* stream)
+    {
+        int done = 0;
+
+        while (1)
+        {
+            ssize_t count;
+
+            count = read(stream->socket, buf, sizeof buf);
+            if (count == -1)
+              {
+                /* If errno == EAGAIN, that means we have read all
+                   data. So go back to the main loop. */
+                if (errno != EAGAIN)
+                  {
+                    perror ("read");
+                    done = 1;
+                  }
+                break;
+              }
+            else if (count == 0)
+              {
+                /* End of file. The remote has closed the
+                   connection. */
+                done = 1;
+                break;
+              }
+
+            size_t sum = 0;
+            char* resp = (char*)response;
+
+            while(sum < len)
+            {
+                int k = write(stream->socket, resp, len - sum);
+                sum += k;
+                resp += k;
+
+            }
+          }
+
+        if (done)
+          {
+            LOG_DEBUG("Closing TCP stream at " << stream->socket.id())
+
+            /* Closing the descriptor will make epoll remove it
+               from the set of descriptors which are monitored. */
+            delete stream;
+          }
+    }
+};
+
+}
diff --git a/utils/counters/ring_counter.hpp b/utils/counters/ring_counter.hpp
new file mode 100644
index 000000000..6cf6d0d96
--- /dev/null
+++ b/utils/counters/ring_counter.hpp
@@ -0,0 +1,28 @@
+#pragma once
+
+#include <cstdlib>
+
+class RingCounter
+{
+public:
+    RingCounter(size_t n, size_t initial = 0)
+        : n(n), counter(initial) {}
+
+    size_t operator++()
+    {
+        counter = (counter + 1) % n;
+        return counter;
+    }
+
+    size_t operator++(int)
+    {
+        auto value = counter;
+        ++counter;
+        return value;
+    }
+
+    operator size_t() const { return counter; }
+
+private:
+    size_t n, counter;
+};
diff --git a/utils/likely.hpp b/utils/likely.hpp
new file mode 100644
index 000000000..fbb8cc12c
--- /dev/null
+++ b/utils/likely.hpp
@@ -0,0 +1,10 @@
+#pragma once
+
+#if __GNUC__ >= 3
+// make a hint for the branch predictor in a conditional or a loop
+#define LIKELY(x) __builtin_expect(!!(x), 1)
+#define UNLIKELY(x) __builtin_expect(!!(x), 0)
+#else
+#define LIKELY(x) (x)
+#define UNLIKELY(x) (x)
+#endif