diff --git a/cypher/codegen/code.hpp b/cypher/codegen/code.hpp new file mode 100644 index 000000000..02973df01 --- /dev/null +++ b/cypher/codegen/code.hpp @@ -0,0 +1,14 @@ +#pragma once + +#include +#include + +class Code +{ +public: + + + +private: + std::vector buffer; +}; diff --git a/cypher/codegen/cppgen.hpp b/cypher/codegen/cppgen.hpp new file mode 100644 index 000000000..e1e278fd7 --- /dev/null +++ b/cypher/codegen/cppgen.hpp @@ -0,0 +1,30 @@ +#ifndef MEMGRAPH_CYPHER_CODEGEN_CPPGEN_HPP +#define MEMGRAPH_CYPHER_CODEGEN_CPPGEN_HPP + +#include "cypher/visitor/traverser.hpp" + +class CppGen : public Traverser +{ + struct CreateGen : public Traverser + { + void visit(ast::Pattern& pattern) override + { + + } + + void visit(ast + }; + +public: + + void visit(ast::Start& start) override + { + + } + + void visit(ast::Create create) override + { + }; +}; + +#endif diff --git a/cypher/debug/tree_print.hpp b/cypher/debug/tree_print.hpp index e6e8b7de8..4a095aa8a 100644 --- a/cypher/debug/tree_print.hpp +++ b/cypher/debug/tree_print.hpp @@ -4,11 +4,11 @@ #include #include -#include "cypher/ast/ast_visitor.hpp" -#include "cypher/ast/ast.hpp" +#include "cypher/visitor/traverser.hpp" -class PrintVisitor : public ast::AstVisitor +class PrintVisitor : public Traverser { +public: class Printer { public: @@ -80,45 +80,37 @@ class PrintVisitor : public ast::AstVisitor size_t level = 0; }; -public: PrintVisitor(std::ostream& stream) : printer(stream, "Printing AST") {} void visit(ast::Start& start) override { auto entry = printer.advance("Start"); - accept(start.read_query); - accept(start.write_query); + Traverser::visit(start); } void visit(ast::ReadQuery& read_query) override { auto entry = printer.advance("Read Query"); - accept(read_query.match); - accept(read_query.return_clause); + Traverser::visit(read_query); } void visit(ast::Match& match) override { auto entry = printer.advance("Match"); - accept(match.pattern); - accept(match.where); + Traverser::visit(match); } void visit(ast::Pattern& pattern) override { auto entry = printer.advance("Pattern"); - accept(pattern.node); - accept(pattern.relationship); - accept(pattern.next); + Traverser::visit(pattern); } void visit(ast::Node& node) override { auto entry = printer.advance("Node"); - accept(node.idn); - accept(node.labels); - accept(node.props); + Traverser::visit(node); } void visit(ast::Identifier& idn) override @@ -130,14 +122,13 @@ public: void visit(ast::Return& return_clause) override { auto entry = printer.advance("Return"); - accept(return_clause.return_list); + Traverser::visit(return_clause); } void visit(ast::Accessor& accessor) override { auto entry = printer.advance("Accessor"); - accept(accessor.entity); - accept(accessor.prop); + Traverser::visit(accessor); } void visit(ast::Boolean& boolean) override @@ -167,172 +158,144 @@ public: void visit(ast::Property& property) override { auto entry = printer.advance("Property"); - accept(property.idn); - accept(property.value); + Traverser::visit(property); } void visit(ast::And& and_expr) override { auto entry = printer.advance("And"); - accept(and_expr.left); - accept(and_expr.right); + Traverser::visit(and_expr); } void visit(ast::Or& or_expr) override { auto entry = printer.advance("Or"); - accept(or_expr.left); - accept(or_expr.right); + Traverser::visit(or_expr); } void visit(ast::Lt& lt_expr) override { auto entry = printer.advance("Less Than"); - accept(lt_expr.left); - accept(lt_expr.right); + Traverser::visit(lt_expr); } void visit(ast::Gt& gt_expr) override { auto entry = printer.advance("Greater Than"); - accept(gt_expr.left); - accept(gt_expr.right); + Traverser::visit(gt_expr); } void visit(ast::Ge& ge_expr) override { auto entry = printer.advance("Greater od Equal"); - accept(ge_expr.left); - accept(ge_expr.right); + Traverser::visit(ge_expr); } void visit(ast::Le& le_expr) override { auto entry = printer.advance("Less or Equal"); - accept(le_expr.left); - accept(le_expr.right); + Traverser::visit(le_expr); } void visit(ast::Eq& eq_expr) override { auto entry = printer.advance("Equal"); - accept(eq_expr.left); - accept(eq_expr.right); + Traverser::visit(eq_expr); } void visit(ast::Ne& ne_expr) override { auto entry = printer.advance("Not Equal"); - accept(ne_expr.left); - accept(ne_expr.right); + Traverser::visit(ne_expr); } void visit(ast::Plus& plus) override { auto entry = printer.advance("Plus"); - accept(plus.left); - accept(plus.right); + Traverser::visit(plus); } void visit(ast::Minus& minus) override { auto entry = printer.advance("Minus"); - accept(minus.left); - accept(minus.right); + Traverser::visit(minus); } void visit(ast::Star& star) override { auto entry = printer.advance("Star"); - accept(star.left); - accept(star.right); + Traverser::visit(star); } void visit(ast::Slash& slash) override { auto entry = printer.advance("Slash"); - accept(slash.left); - accept(slash.right); + Traverser::visit(slash); } void visit(ast::Rem& rem) override { auto entry = printer.advance("Rem (%)"); - accept(rem.left); - accept(rem.right); + Traverser::visit(rem); } void visit(ast::PropertyList& prop_list) override { auto entry = printer.advance("Property List"); - accept(prop_list.value); - accept(prop_list.next); + Traverser::visit(prop_list); } void visit(ast::RelationshipList& rel_list) override { auto entry = printer.advance("Relationship List"); - accept(rel_list.value); - accept(rel_list.next); + Traverser::visit(rel_list); } void visit(ast::Relationship& rel) override { auto entry = printer.advance("Relationship"); entry << " direction: " << rel.direction; - accept(rel.specs); + Traverser::visit(rel); } void visit(ast::RelationshipSpecs& rel_specs) override { auto entry = printer.advance("Relationship Specs"); - accept(rel_specs.idn); - accept(rel_specs.types); - accept(rel_specs.props); + Traverser::visit(rel_specs); } void visit(ast::LabelList& labels) override { auto entry = printer.advance("Label List"); - accept(labels.value); - accept(labels.next); + Traverser::visit(labels); } void visit(ast::ReturnList& return_list) override { auto entry = printer.advance("Return List"); - accept(return_list.value); - accept(return_list.next); + Traverser::visit(return_list); } void visit(ast::Where& where) override { auto entry = printer.advance("Where"); - accept(where.expr); + Traverser::visit(where); } void visit(ast::WriteQuery& write_query) override { auto entry = printer.advance("Write Query"); - accept(write_query.create); - accept(write_query.return_clause); + Traverser::visit(write_query); } void visit(ast::Create& create) override { auto entry = printer.advance("Create"); - accept(create.pattern); + Traverser::visit(create); } private: Printer printer; - - template - void accept(T* node) - { - if(node != nullptr) - node->accept(*this); - } }; #endif diff --git a/cypher/parser.cpp b/cypher/parser.cpp index 3a07a354d..e0a1192c9 100644 --- a/cypher/parser.cpp +++ b/cypher/parser.cpp @@ -10,7 +10,7 @@ int main() //std::string input("MATCH (user:User { name: 'Dominik', age: 24})-[has:HAS]->(item:Item) WHERE item.name = 'XPS 13' AND item.price = 11999.99 RETURN user, has, item"); - std::string input("create n return n"); + std::string input("create (n { name: 'Dominik', age: 24 }) return n"); compiler.compile(input); diff --git a/cypher/visitor/traverser.hpp b/cypher/visitor/traverser.hpp new file mode 100644 index 000000000..9d080f5ef --- /dev/null +++ b/cypher/visitor/traverser.hpp @@ -0,0 +1,198 @@ +#ifndef MEMGRAPH_CYPHER_VISITOR_TRAVERSER_HPP +#define MEMGRAPH_CYPHER_VISITOR_TRAVERSER_HPP + +#include "cypher/ast/ast_visitor.hpp" +#include "cypher/ast/ast.hpp" + +class Traverser : public ast::AstVisitor +{ +public: + + void visit(ast::Start& start) override + { + accept(start.read_query); + accept(start.write_query); + } + + void visit(ast::ReadQuery& read_query) override + { + accept(read_query.match); + accept(read_query.return_clause); + } + + void visit(ast::Match& match) override + { + accept(match.pattern); + accept(match.where); + } + + void visit(ast::Pattern& pattern) override + { + accept(pattern.node); + accept(pattern.relationship); + accept(pattern.next); + } + + void visit(ast::Node& node) override + { + accept(node.idn); + accept(node.labels); + accept(node.props); + } + + void visit(ast::Return& return_clause) override + { + accept(return_clause.return_list); + } + + void visit(ast::Accessor& accessor) override + { + accept(accessor.entity); + accept(accessor.prop); + } + void visit(ast::Property& property) override + { + accept(property.idn); + accept(property.value); + } + + void visit(ast::And& and_expr) override + { + accept(and_expr.left); + accept(and_expr.right); + } + + void visit(ast::Or& or_expr) override + { + accept(or_expr.left); + accept(or_expr.right); + } + + void visit(ast::Lt& lt_expr) override + { + accept(lt_expr.left); + accept(lt_expr.right); + } + + void visit(ast::Gt& gt_expr) override + { + accept(gt_expr.left); + accept(gt_expr.right); + } + + void visit(ast::Ge& ge_expr) override + { + accept(ge_expr.left); + accept(ge_expr.right); + } + + void visit(ast::Le& le_expr) override + { + accept(le_expr.left); + accept(le_expr.right); + } + + void visit(ast::Eq& eq_expr) override + { + accept(eq_expr.left); + accept(eq_expr.right); + } + + void visit(ast::Ne& ne_expr) override + { + accept(ne_expr.left); + accept(ne_expr.right); + } + + void visit(ast::Plus& plus) override + { + accept(plus.left); + accept(plus.right); + } + + void visit(ast::Minus& minus) override + { + accept(minus.left); + accept(minus.right); + } + + void visit(ast::Star& star) override + { + accept(star.left); + accept(star.right); + } + + void visit(ast::Slash& slash) override + { + accept(slash.left); + accept(slash.right); + } + + void visit(ast::Rem& rem) override + { + accept(rem.left); + accept(rem.right); + } + + void visit(ast::PropertyList& prop_list) override + { + accept(prop_list.value); + accept(prop_list.next); + } + + void visit(ast::RelationshipList& rel_list) override + { + accept(rel_list.value); + accept(rel_list.next); + } + + void visit(ast::Relationship& rel) override + { + accept(rel.specs); + } + + void visit(ast::RelationshipSpecs& rel_specs) override + { + accept(rel_specs.idn); + accept(rel_specs.types); + accept(rel_specs.props); + } + + void visit(ast::LabelList& labels) override + { + accept(labels.value); + accept(labels.next); + } + + void visit(ast::ReturnList& return_list) override + { + accept(return_list.value); + accept(return_list.next); + } + + void visit(ast::Where& where) override + { + accept(where.expr); + } + + void visit(ast::WriteQuery& write_query) override + { + accept(write_query.create); + accept(write_query.return_clause); + } + + void visit(ast::Create& create) override + { + accept(create.pattern); + } + +protected: + template + void accept(T* node) + { + if(node != nullptr) + node->accept(*this); + } +}; + +#endif diff --git a/data_structures/queue/bounded_spsc_queue.hpp b/data_structures/queue/bounded_spsc_queue.hpp new file mode 100644 index 000000000..8f2ed93c1 --- /dev/null +++ b/data_structures/queue/bounded_spsc_queue.hpp @@ -0,0 +1,87 @@ +#pragma once + +#include +#include + +namespace lockfree +{ + +template +class BoundedSpscQueue +{ +public: + static constexpr size_t size = N; + + BoundedSpscQueue() = default; + + BoundedSpscQueue(const BoundedSpscQueue&) = delete; + BoundedSpscQueue(BoundedSpscQueue&&) = delete; + + BoundedSpscQueue& operator=(const BoundedSpscQueue&) = delete; + + bool push(const T& item) + { + // load the current tail + // [] [] [1] [2] [3] [4] [5] [$] [] + // H T + auto t = tail.load(std::memory_order_relaxed); + + // what will next tail be after we push + // [] [] [1] [2] [3] [4] [5] [$] [ ] + // H T T' + auto next = increment(t); + + // check if queue is full and do nothing if it is + // [3] [4] [5] [6] [7] [8] [$] [ 1 ] [2] + // T T'H + if(next == head.load(std::memory_order_acquire)) + return false; + + // insert the item into the empty spot + // [] [] [1] [2] [3] [4] [5] [ ] [] + // H T T' + items[t] = item; + + // release the tail to the consumer (serialization point) + // [] [] [1] [2] [3] [4] [5] [ $ ] [] + // H T T' + tail.store(next, std::memory_order_release); + + return true; + } + + bool pop(T& item) + { + // [] [] [1] [2] [3] [4] [5] [$] [] + // H T + auto h = head.load(std::memory_order_relaxed); + + // [] [] [] [] [ $ ] [] [] [] [] + // H T + if(h == tail.load(std::memory_order_acquire)) + return false; + + // move an item from the queue + item = std::move(items[h]); + + // serialization point wrt producer + // [] [] [] [2] [3] [4] [5] [$] [] + // H T + head.store(increment(h), std::memory_order_release); + + return true; + } + +private: + static constexpr size_t capacity = N + 1; + + std::array items; + std::atomic head {0}, tail {0}; + + size_t increment(size_t idx) const + { + return (idx + 1) % capacity; + } +}; + +} diff --git a/debug/log.hpp b/debug/log.hpp index 5cce6de2c..75fc091bb 100644 --- a/debug/log.hpp +++ b/debug/log.hpp @@ -35,15 +35,15 @@ private: return stream << bash_color::green << "[" << to_string(message.level) << "] " << bash_color::end - << message.text << std::endl - << bash_color::yellow << " on " << bash_color::end - << time_string.substr(0, time_string.size() - 1) - << bash_color::yellow << " in file " << bash_color::end - << message.file - << bash_color::yellow << " in function " << bash_color::end - << message.function - << bash_color::yellow << " at line " << bash_color::end - << message.line; + << message.text; + /* << bash_color::yellow << " on " << bash_color::end */ + /* << time_string.substr(0, time_string.size() - 1) */ + /* << bash_color::yellow << " in file " << bash_color::end */ + /* << message.file */ + /* << bash_color::yellow << " in function " << bash_color::end */ + /* << message.function */ + /* << bash_color::yellow << " at line " << bash_color::end */ + /* << message.line; */ } }; diff --git a/io/network/addrinfo.hpp b/io/network/addrinfo.hpp new file mode 100644 index 000000000..242796e1f --- /dev/null +++ b/io/network/addrinfo.hpp @@ -0,0 +1,46 @@ +#pragma once + +#include +#include + +#include "network_error.hpp" +#include "utils/underlying_cast.hpp" + +namespace io +{ + +class AddrInfo +{ + AddrInfo(struct addrinfo* info) : info(info) {} + +public: + ~AddrInfo() + { + freeaddrinfo(info); + } + + static AddrInfo get(const char* port) + { + struct addrinfo hints; + memset(&hints, 0, sizeof(struct addrinfo)); + + hints.ai_family = AF_UNSPEC; // IPv4 and IPv6 + hints.ai_socktype = SOCK_STREAM; // TCP socket + hints.ai_flags = AI_PASSIVE; + + struct addrinfo* result; + auto status = getaddrinfo(nullptr, port, &hints, &result); + + if(status != 0) + throw NetworkError(gai_strerror(status)); + + return AddrInfo(result); + } + + operator struct addrinfo*() { return info; } + +private: + struct addrinfo* info; +}; + +} diff --git a/io/network/epoll.hpp b/io/network/epoll.hpp new file mode 100644 index 000000000..d83aeaae5 --- /dev/null +++ b/io/network/epoll.hpp @@ -0,0 +1,39 @@ +#pragma once + +#include +#include + +#include "socket.hpp" +#include "utils/likely.hpp" + +namespace io +{ + +class Epoll +{ +public: + using Event = struct epoll_event; + + Epoll(int flags) + { + epoll_fd = epoll_create1(flags); + } + + void add(Socket& socket, Event* event) + { + auto status = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, socket, event); + + if(UNLIKELY(status)) + throw NetworkError("Can't add connection to epoll listener."); + } + + int wait(Event* events, int max_events, int timeout) + { + return epoll_wait(epoll_fd, events, max_events, timeout); + } + +private: + int epoll_fd; +}; + +} diff --git a/io/network/listener.hpp b/io/network/listener.hpp new file mode 100644 index 000000000..a41c990a4 --- /dev/null +++ b/io/network/listener.hpp @@ -0,0 +1,83 @@ +#pragma once + +#include +#include + +#include "socket.hpp" +#include "epoll.hpp" +#include "tcp_stream.hpp" + +#include "utils/crtp.hpp" + +namespace io +{ + +template +class Listener : public Crtp +{ +public: + Listener() : listener(0) + { + thread = std::thread([this]() { loop(); }); + } + + ~Listener() + { + alive.store(false, std::memory_order_release); + thread.join(); + } + + void add(Socket& socket, Epoll::Event& event) + { + listener.add(socket, &event); + } + +private: + void loop() + { + constexpr size_t MAX_EVENTS = 64; + + Epoll::Event events[MAX_EVENTS]; + + while(alive.load(std::memory_order_acquire)) + { + auto n = listener.wait(events, MAX_EVENTS, -1); + + for(int i = 0; i < n; ++i) + { + auto& event = events[i]; + auto stream = reinterpret_cast(event.data.ptr); + + if(!(event.events & EPOLLIN)) + { + LOG_DEBUG("error !EPOLLIN"); + this->derived().on_error(stream); + continue; + } + + if(event.events & EPOLLHUP) + { + LOG_DEBUG("error EPOLLHUP"); + this->derived().on_error(stream); + continue; + } + + if(event.events & EPOLLERR) + { + LOG_DEBUG("error EPOLLERR"); + this->derived().on_error(stream); + continue; + } + + this->derived().on_read(stream); + } + } + } + + std::atomic alive {true}; + std::thread thread; + + Epoll listener; +}; + +} diff --git a/io/network/network_error.hpp b/io/network/network_error.hpp new file mode 100644 index 000000000..7384dcd83 --- /dev/null +++ b/io/network/network_error.hpp @@ -0,0 +1,10 @@ +#pragma once + +#include + +class NetworkError : public std::runtime_error +{ +public: + using std::runtime_error::runtime_error; +}; + diff --git a/io/network/socket.hpp b/io/network/socket.hpp new file mode 100644 index 000000000..ea5bc14fa --- /dev/null +++ b/io/network/socket.hpp @@ -0,0 +1,123 @@ +#pragma once + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "addrinfo.hpp" +#include "utils/likely.hpp" + +namespace io +{ + +class Socket +{ + Socket(int socket) : socket(socket) {} + + Socket(int family, int socket_type, int protocol) + { + socket = ::socket(family, socket_type, protocol); + } + +public: + Socket(const Socket&) = delete; + + Socket(Socket&& other) + { + this->socket = other.socket; + other.socket = -1; + } + + ~Socket() + { + if(socket == -1) + return; + + LOG_DEBUG("CLosing Socket " << socket); + close(socket); + } + + bool is_open() + { + return socket != -1; + } + + static Socket create(const char* port) + { + auto info = AddrInfo::get(port); + + for(struct addrinfo* it = info; it != nullptr; it = it->ai_next) + { + LOG_DEBUG("Trying socket..."); + + auto s = Socket(it->ai_family, it->ai_socktype, it->ai_protocol); + + if(!s.is_open()) + continue; + + /* int on = 1; */ + /* if(setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))) */ + /* continue; */ + + if(s.bind(it->ai_addr, it->ai_addrlen)) + return std::move(s); + } + + throw NetworkError("Unable to bind to socket"); + } + + bool bind(struct sockaddr* addr, socklen_t len) + { + assert(socket != -1); + return ::bind(socket, addr, len) == 0; + } + + void set_non_blocking() + { + auto flags = fcntl(socket, F_GETFL, 0); + + if(UNLIKELY(flags == -1)) + throw NetworkError("Cannot read flags from socket"); + + flags |= O_NONBLOCK; + + auto status = fcntl(socket, F_SETFL, flags); + + if(UNLIKELY(status == -1)) + throw NetworkError("Cannot set NON_BLOCK flag to socket"); + } + + void listen(int backlog) + { + auto status = ::listen(socket, backlog); + + if(UNLIKELY(status == -1)) + throw NetworkError("Cannot listen on socket"); + } + + Socket accept(struct sockaddr* addr, socklen_t* len) + { + return Socket(::accept(socket, addr, len)); + } + + operator int() { return socket; } + + int id() const + { + return socket; + } + +private: + int socket; +}; + +} diff --git a/io/network/tcp_stream.hpp b/io/network/tcp_stream.hpp new file mode 100644 index 000000000..2bd60e841 --- /dev/null +++ b/io/network/tcp_stream.hpp @@ -0,0 +1,29 @@ +#pragma once + +#include "epoll.hpp" +#include "socket.hpp" + +namespace io +{ + +class TcpStream +{ +public: + TcpStream(Socket&& socket, uint32_t events) + : socket(std::move(socket)) + { + event.events = events; + event.data.ptr = this; + } + + void close() + { + LOG_DEBUG("CLOSE"); + delete reinterpret_cast(event.data.ptr); + } + + Socket socket; + Epoll::Event event; +}; + +} diff --git a/io/network/test.cpp b/io/network/test.cpp new file mode 100644 index 000000000..82342dff6 --- /dev/null +++ b/io/network/test.cpp @@ -0,0 +1,207 @@ +#include +#include + +#include "debug/log.hpp" + +#include "socket.hpp" +#include "worker.hpp" + +#define MAXEVENTS 64 + +static int +make_socket_non_blocking (int sfd) +{ + int flags, s; + + flags = fcntl (sfd, F_GETFL, 0); + if (flags == -1) + { + perror ("fcntl"); + return -1; + } + + flags |= O_NONBLOCK; + s = fcntl (sfd, F_SETFL, flags); + if (s == -1) + { + perror ("fcntl"); + return -1; + } + + return 0; +} + +int main(void) +{ + std::array workers; + int idx = 0; + + auto socket = io::Socket::create("7474"); + socket.set_non_blocking(); + socket.listen(1024); + + int efd, s; + struct epoll_event event; + struct epoll_event *events; + + efd = epoll_create1 (0); + if (efd == -1) + { + perror ("epoll_create"); + abort (); + } + + event.data.fd = socket; + event.events = EPOLLIN | EPOLLET; + s = epoll_ctl (efd, EPOLL_CTL_ADD, socket, &event); + if (s == -1) + { + perror ("epoll_ctl"); + abort (); + } + + /* Buffer where events are returned */ + events = static_cast(calloc (MAXEVENTS, sizeof event)); + + /* The event loop */ + while (1) + { + int n, i; + + n = epoll_wait (efd, events, MAXEVENTS, -1); + for (i = 0; i < n; i++) + { + if ((events[i].events & EPOLLERR) || + (events[i].events & EPOLLHUP) || + (!(events[i].events & EPOLLIN))) + { + /* An error has occured on this fd, or the socket is not + ready for reading (why were we notified then?) */ + fprintf (stderr, "epoll error\n"); + close (events[i].data.fd); + continue; + } + + else if (socket == events[i].data.fd) + { + /* We have a notification on the listening socket, which + means one or more incoming connections. */ + while (true) + { + idx = (idx + 1) % workers.size(); + + auto& worker = workers[i]; + + if(!worker.accept(socket)) + break; + } + + /* struct sockaddr in_addr; */ + /* socklen_t in_len; */ + /* int infd; */ + /* char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; */ + + /* in_len = sizeof in_addr; */ + /* infd = accept (socket, &in_addr, &in_len); */ + /* if (infd == -1) */ + /* { */ + /* if ((errno == EAGAIN) || */ + /* (errno == EWOULDBLOCK)) */ + /* { */ + /* /1* We have processed all incoming */ + /* connections. *1/ */ + /* break; */ + /* } */ + /* else */ + /* { */ + /* perror ("accept"); */ + /* break; */ + /* } */ + /* } */ + + /* s = getnameinfo (&in_addr, in_len, */ + /* hbuf, sizeof hbuf, */ + /* sbuf, sizeof sbuf, */ + /* NI_NUMERICHOST | NI_NUMERICSERV); */ + /* if (s == 0) */ + /* { */ + /* printf("Accepted connection on descriptor %d " */ + /* "(host=%s, port=%s)\n", infd, hbuf, sbuf); */ + /* } */ + + /* /1* Make the incoming socket non-blocking and add it to the */ + /* list of fds to monitor. *1/ */ + /* s = make_socket_non_blocking (infd); */ + /* if (s == -1) */ + /* abort (); */ + + /* event.data.fd = infd; */ + /* event.events = EPOLLIN | EPOLLET; */ + /* s = epoll_ctl (efd, EPOLL_CTL_ADD, infd, &event); */ + /* if (s == -1) */ + /* { */ + /* perror ("epoll_ctl"); */ + /* abort (); */ + /* } */ + } + /* else */ + /* { */ + /* /1* We have data on the fd waiting to be read. Read and */ + /* display it. We must read whatever data is available */ + /* completely, as we are running in edge-triggered mode */ + /* and won't get a notification again for the same */ + /* data. *1/ */ + /* int done = 0; */ + + /* while (1) */ + /* { */ + /* ssize_t count; */ + /* char buf[512]; */ + + /* count = read (events[i].data.fd, buf, sizeof buf); */ + /* if (count == -1) */ + /* { */ + /* /1* If errno == EAGAIN, that means we have read all */ + /* data. So go back to the main loop. *1/ */ + /* if (errno != EAGAIN) */ + /* { */ + /* perror ("read"); */ + /* done = 1; */ + /* } */ + /* break; */ + /* } */ + /* else if (count == 0) */ + /* { */ + /* /1* End of file. The remote has closed the */ + /* connection. *1/ */ + /* done = 1; */ + /* break; */ + /* } */ + + /* /1* Write the buffer to standard output *1/ */ + /* s = write (1, buf, count); */ + /* if (s == -1) */ + /* { */ + /* perror ("write"); */ + /* abort (); */ + /* } */ + /* } */ + + /* if (done) */ + /* { */ + /* printf ("Closed connection on descriptor %d\n", */ + /* events[i].data.fd); */ + + /* /1* Closing the descriptor will make epoll remove it */ + /* from the set of descriptors which are monitored. *1/ */ + /* close (events[i].data.fd); */ + /* } */ + /* } */ + } + } + + free (events); + + + return 0; +} diff --git a/io/network/worker.hpp b/io/network/worker.hpp new file mode 100644 index 000000000..fb238c947 --- /dev/null +++ b/io/network/worker.hpp @@ -0,0 +1,92 @@ +#pragma once + +#include "listener.hpp" +#include "tcp_stream.hpp" + +namespace io +{ + const char* response = "HTTP/1.1 200 OK\r\nContent-Length:0\r\nConnection:Keep-Alive\r\n\r\n"; + + size_t len = strlen(response); + +class Worker : public Listener +{ + char buf[512]; + +public: + Worker() = default; + + bool accept(Socket& socket) + { + auto s = socket.accept(nullptr, nullptr); + + if(!s.is_open()) + return false; + + s.set_non_blocking(); + + auto stream = new TcpStream(std::move(s), EPOLLIN | EPOLLET); + + this->add(stream->socket, stream->event); + + LOG_DEBUG("Listening to TCP stream at" << stream->socket.id()) + return true; + } + + void on_error(TcpStream* stream) + { + delete stream; + } + + void on_read(TcpStream* stream) + { + int done = 0; + + while (1) + { + ssize_t count; + + count = read(stream->socket, buf, sizeof buf); + if (count == -1) + { + /* If errno == EAGAIN, that means we have read all + data. So go back to the main loop. */ + if (errno != EAGAIN) + { + perror ("read"); + done = 1; + } + break; + } + else if (count == 0) + { + /* End of file. The remote has closed the + connection. */ + done = 1; + break; + } + + size_t sum = 0; + char* resp = (char*)response; + + while(sum < len) + { + int k = write(stream->socket, resp, len - sum); + sum += k; + resp += k; + + } + } + + if (done) + { + LOG_DEBUG("Closing TCP stream at " << stream->socket.id()) + + /* Closing the descriptor will make epoll remove it + from the set of descriptors which are monitored. */ + delete stream; + } + } +}; + +} diff --git a/utils/counters/ring_counter.hpp b/utils/counters/ring_counter.hpp new file mode 100644 index 000000000..6cf6d0d96 --- /dev/null +++ b/utils/counters/ring_counter.hpp @@ -0,0 +1,28 @@ +#pragma once + +#include + +class RingCounter +{ +public: + RingCounter(size_t n, size_t initial = 0) + : n(n), counter(initial) {} + + size_t operator++() + { + counter = (counter + 1) % n; + return counter; + } + + size_t operator++(int) + { + auto value = counter; + ++counter; + return value; + } + + operator size_t() const { return counter; } + +private: + size_t n, counter; +}; diff --git a/utils/likely.hpp b/utils/likely.hpp new file mode 100644 index 000000000..fbb8cc12c --- /dev/null +++ b/utils/likely.hpp @@ -0,0 +1,10 @@ +#pragma once + +#if __GNUC__ >= 3 +// make a hint for the branch predictor in a conditional or a loop +#define LIKELY(x) __builtin_expect(!!(x), 1) +#define UNLIKELY(x) __builtin_expect(!!(x), 0) +#else +#define LIKELY(x) (x) +#define UNLIKELY(x) (x) +#endif