cypher + new tcp server

This commit is contained in:
Dominik Tomičević 2015-10-27 20:21:28 +01:00
parent 2b13f347bd
commit 807912c151
17 changed files with 1039 additions and 80 deletions

14
cypher/codegen/code.hpp Normal file
View File

@ -0,0 +1,14 @@
#pragma once
#include <string>
#include <vector>
class Code
{
public:
private:
std::vector<std::string> buffer;
};

30
cypher/codegen/cppgen.hpp Normal file
View File

@ -0,0 +1,30 @@
#ifndef MEMGRAPH_CYPHER_CODEGEN_CPPGEN_HPP
#define MEMGRAPH_CYPHER_CODEGEN_CPPGEN_HPP
#include "cypher/visitor/traverser.hpp"
class CppGen : public Traverser
{
struct CreateGen : public Traverser
{
void visit(ast::Pattern& pattern) override
{
}
void visit(ast
};
public:
void visit(ast::Start& start) override
{
}
void visit(ast::Create create) override
{
};
};
#endif

View File

@ -4,11 +4,11 @@
#include <iostream>
#include <stack>
#include "cypher/ast/ast_visitor.hpp"
#include "cypher/ast/ast.hpp"
#include "cypher/visitor/traverser.hpp"
class PrintVisitor : public ast::AstVisitor
class PrintVisitor : public Traverser
{
public:
class Printer
{
public:
@ -80,45 +80,37 @@ class PrintVisitor : public ast::AstVisitor
size_t level = 0;
};
public:
PrintVisitor(std::ostream& stream)
: printer(stream, "Printing AST") {}
void visit(ast::Start& start) override
{
auto entry = printer.advance("Start");
accept(start.read_query);
accept(start.write_query);
Traverser::visit(start);
}
void visit(ast::ReadQuery& read_query) override
{
auto entry = printer.advance("Read Query");
accept(read_query.match);
accept(read_query.return_clause);
Traverser::visit(read_query);
}
void visit(ast::Match& match) override
{
auto entry = printer.advance("Match");
accept(match.pattern);
accept(match.where);
Traverser::visit(match);
}
void visit(ast::Pattern& pattern) override
{
auto entry = printer.advance("Pattern");
accept(pattern.node);
accept(pattern.relationship);
accept(pattern.next);
Traverser::visit(pattern);
}
void visit(ast::Node& node) override
{
auto entry = printer.advance("Node");
accept(node.idn);
accept(node.labels);
accept(node.props);
Traverser::visit(node);
}
void visit(ast::Identifier& idn) override
@ -130,14 +122,13 @@ public:
void visit(ast::Return& return_clause) override
{
auto entry = printer.advance("Return");
accept(return_clause.return_list);
Traverser::visit(return_clause);
}
void visit(ast::Accessor& accessor) override
{
auto entry = printer.advance("Accessor");
accept(accessor.entity);
accept(accessor.prop);
Traverser::visit(accessor);
}
void visit(ast::Boolean& boolean) override
@ -167,172 +158,144 @@ public:
void visit(ast::Property& property) override
{
auto entry = printer.advance("Property");
accept(property.idn);
accept(property.value);
Traverser::visit(property);
}
void visit(ast::And& and_expr) override
{
auto entry = printer.advance("And");
accept(and_expr.left);
accept(and_expr.right);
Traverser::visit(and_expr);
}
void visit(ast::Or& or_expr) override
{
auto entry = printer.advance("Or");
accept(or_expr.left);
accept(or_expr.right);
Traverser::visit(or_expr);
}
void visit(ast::Lt& lt_expr) override
{
auto entry = printer.advance("Less Than");
accept(lt_expr.left);
accept(lt_expr.right);
Traverser::visit(lt_expr);
}
void visit(ast::Gt& gt_expr) override
{
auto entry = printer.advance("Greater Than");
accept(gt_expr.left);
accept(gt_expr.right);
Traverser::visit(gt_expr);
}
void visit(ast::Ge& ge_expr) override
{
auto entry = printer.advance("Greater od Equal");
accept(ge_expr.left);
accept(ge_expr.right);
Traverser::visit(ge_expr);
}
void visit(ast::Le& le_expr) override
{
auto entry = printer.advance("Less or Equal");
accept(le_expr.left);
accept(le_expr.right);
Traverser::visit(le_expr);
}
void visit(ast::Eq& eq_expr) override
{
auto entry = printer.advance("Equal");
accept(eq_expr.left);
accept(eq_expr.right);
Traverser::visit(eq_expr);
}
void visit(ast::Ne& ne_expr) override
{
auto entry = printer.advance("Not Equal");
accept(ne_expr.left);
accept(ne_expr.right);
Traverser::visit(ne_expr);
}
void visit(ast::Plus& plus) override
{
auto entry = printer.advance("Plus");
accept(plus.left);
accept(plus.right);
Traverser::visit(plus);
}
void visit(ast::Minus& minus) override
{
auto entry = printer.advance("Minus");
accept(minus.left);
accept(minus.right);
Traverser::visit(minus);
}
void visit(ast::Star& star) override
{
auto entry = printer.advance("Star");
accept(star.left);
accept(star.right);
Traverser::visit(star);
}
void visit(ast::Slash& slash) override
{
auto entry = printer.advance("Slash");
accept(slash.left);
accept(slash.right);
Traverser::visit(slash);
}
void visit(ast::Rem& rem) override
{
auto entry = printer.advance("Rem (%)");
accept(rem.left);
accept(rem.right);
Traverser::visit(rem);
}
void visit(ast::PropertyList& prop_list) override
{
auto entry = printer.advance("Property List");
accept(prop_list.value);
accept(prop_list.next);
Traverser::visit(prop_list);
}
void visit(ast::RelationshipList& rel_list) override
{
auto entry = printer.advance("Relationship List");
accept(rel_list.value);
accept(rel_list.next);
Traverser::visit(rel_list);
}
void visit(ast::Relationship& rel) override
{
auto entry = printer.advance("Relationship");
entry << " direction: " << rel.direction;
accept(rel.specs);
Traverser::visit(rel);
}
void visit(ast::RelationshipSpecs& rel_specs) override
{
auto entry = printer.advance("Relationship Specs");
accept(rel_specs.idn);
accept(rel_specs.types);
accept(rel_specs.props);
Traverser::visit(rel_specs);
}
void visit(ast::LabelList& labels) override
{
auto entry = printer.advance("Label List");
accept(labels.value);
accept(labels.next);
Traverser::visit(labels);
}
void visit(ast::ReturnList& return_list) override
{
auto entry = printer.advance("Return List");
accept(return_list.value);
accept(return_list.next);
Traverser::visit(return_list);
}
void visit(ast::Where& where) override
{
auto entry = printer.advance("Where");
accept(where.expr);
Traverser::visit(where);
}
void visit(ast::WriteQuery& write_query) override
{
auto entry = printer.advance("Write Query");
accept(write_query.create);
accept(write_query.return_clause);
Traverser::visit(write_query);
}
void visit(ast::Create& create) override
{
auto entry = printer.advance("Create");
accept(create.pattern);
Traverser::visit(create);
}
private:
Printer printer;
template<class T>
void accept(T* node)
{
if(node != nullptr)
node->accept(*this);
}
};
#endif

View File

@ -10,7 +10,7 @@ int main()
//std::string input("MATCH (user:User { name: 'Dominik', age: 24})-[has:HAS]->(item:Item) WHERE item.name = 'XPS 13' AND item.price = 11999.99 RETURN user, has, item");
std::string input("create n return n");
std::string input("create (n { name: 'Dominik', age: 24 }) return n");
compiler.compile(input);

View File

@ -0,0 +1,198 @@
#ifndef MEMGRAPH_CYPHER_VISITOR_TRAVERSER_HPP
#define MEMGRAPH_CYPHER_VISITOR_TRAVERSER_HPP
#include "cypher/ast/ast_visitor.hpp"
#include "cypher/ast/ast.hpp"
class Traverser : public ast::AstVisitor
{
public:
void visit(ast::Start& start) override
{
accept(start.read_query);
accept(start.write_query);
}
void visit(ast::ReadQuery& read_query) override
{
accept(read_query.match);
accept(read_query.return_clause);
}
void visit(ast::Match& match) override
{
accept(match.pattern);
accept(match.where);
}
void visit(ast::Pattern& pattern) override
{
accept(pattern.node);
accept(pattern.relationship);
accept(pattern.next);
}
void visit(ast::Node& node) override
{
accept(node.idn);
accept(node.labels);
accept(node.props);
}
void visit(ast::Return& return_clause) override
{
accept(return_clause.return_list);
}
void visit(ast::Accessor& accessor) override
{
accept(accessor.entity);
accept(accessor.prop);
}
void visit(ast::Property& property) override
{
accept(property.idn);
accept(property.value);
}
void visit(ast::And& and_expr) override
{
accept(and_expr.left);
accept(and_expr.right);
}
void visit(ast::Or& or_expr) override
{
accept(or_expr.left);
accept(or_expr.right);
}
void visit(ast::Lt& lt_expr) override
{
accept(lt_expr.left);
accept(lt_expr.right);
}
void visit(ast::Gt& gt_expr) override
{
accept(gt_expr.left);
accept(gt_expr.right);
}
void visit(ast::Ge& ge_expr) override
{
accept(ge_expr.left);
accept(ge_expr.right);
}
void visit(ast::Le& le_expr) override
{
accept(le_expr.left);
accept(le_expr.right);
}
void visit(ast::Eq& eq_expr) override
{
accept(eq_expr.left);
accept(eq_expr.right);
}
void visit(ast::Ne& ne_expr) override
{
accept(ne_expr.left);
accept(ne_expr.right);
}
void visit(ast::Plus& plus) override
{
accept(plus.left);
accept(plus.right);
}
void visit(ast::Minus& minus) override
{
accept(minus.left);
accept(minus.right);
}
void visit(ast::Star& star) override
{
accept(star.left);
accept(star.right);
}
void visit(ast::Slash& slash) override
{
accept(slash.left);
accept(slash.right);
}
void visit(ast::Rem& rem) override
{
accept(rem.left);
accept(rem.right);
}
void visit(ast::PropertyList& prop_list) override
{
accept(prop_list.value);
accept(prop_list.next);
}
void visit(ast::RelationshipList& rel_list) override
{
accept(rel_list.value);
accept(rel_list.next);
}
void visit(ast::Relationship& rel) override
{
accept(rel.specs);
}
void visit(ast::RelationshipSpecs& rel_specs) override
{
accept(rel_specs.idn);
accept(rel_specs.types);
accept(rel_specs.props);
}
void visit(ast::LabelList& labels) override
{
accept(labels.value);
accept(labels.next);
}
void visit(ast::ReturnList& return_list) override
{
accept(return_list.value);
accept(return_list.next);
}
void visit(ast::Where& where) override
{
accept(where.expr);
}
void visit(ast::WriteQuery& write_query) override
{
accept(write_query.create);
accept(write_query.return_clause);
}
void visit(ast::Create& create) override
{
accept(create.pattern);
}
protected:
template<class T>
void accept(T* node)
{
if(node != nullptr)
node->accept(*this);
}
};
#endif

View File

@ -0,0 +1,87 @@
#pragma once
#include <atomic>
#include <memory>
namespace lockfree
{
template <class T, size_t N>
class BoundedSpscQueue
{
public:
static constexpr size_t size = N;
BoundedSpscQueue() = default;
BoundedSpscQueue(const BoundedSpscQueue&) = delete;
BoundedSpscQueue(BoundedSpscQueue&&) = delete;
BoundedSpscQueue& operator=(const BoundedSpscQueue&) = delete;
bool push(const T& item)
{
// load the current tail
// [] [] [1] [2] [3] [4] [5] [$] []
// H T
auto t = tail.load(std::memory_order_relaxed);
// what will next tail be after we push
// [] [] [1] [2] [3] [4] [5] [$] [ ]
// H T T'
auto next = increment(t);
// check if queue is full and do nothing if it is
// [3] [4] [5] [6] [7] [8] [$] [ 1 ] [2]
// T T'H
if(next == head.load(std::memory_order_acquire))
return false;
// insert the item into the empty spot
// [] [] [1] [2] [3] [4] [5] [ ] []
// H T T'
items[t] = item;
// release the tail to the consumer (serialization point)
// [] [] [1] [2] [3] [4] [5] [ $ ] []
// H T T'
tail.store(next, std::memory_order_release);
return true;
}
bool pop(T& item)
{
// [] [] [1] [2] [3] [4] [5] [$] []
// H T
auto h = head.load(std::memory_order_relaxed);
// [] [] [] [] [ $ ] [] [] [] []
// H T
if(h == tail.load(std::memory_order_acquire))
return false;
// move an item from the queue
item = std::move(items[h]);
// serialization point wrt producer
// [] [] [] [2] [3] [4] [5] [$] []
// H T
head.store(increment(h), std::memory_order_release);
return true;
}
private:
static constexpr size_t capacity = N + 1;
std::array<T, capacity> items;
std::atomic<size_t> head {0}, tail {0};
size_t increment(size_t idx) const
{
return (idx + 1) % capacity;
}
};
}

View File

@ -35,15 +35,15 @@ private:
return stream << bash_color::green
<< "[" << to_string(message.level) << "] " << bash_color::end
<< message.text << std::endl
<< bash_color::yellow << " on " << bash_color::end
<< time_string.substr(0, time_string.size() - 1)
<< bash_color::yellow << " in file " << bash_color::end
<< message.file
<< bash_color::yellow << " in function " << bash_color::end
<< message.function
<< bash_color::yellow << " at line " << bash_color::end
<< message.line;
<< message.text;
/* << bash_color::yellow << " on " << bash_color::end */
/* << time_string.substr(0, time_string.size() - 1) */
/* << bash_color::yellow << " in file " << bash_color::end */
/* << message.file */
/* << bash_color::yellow << " in function " << bash_color::end */
/* << message.function */
/* << bash_color::yellow << " at line " << bash_color::end */
/* << message.line; */
}
};

46
io/network/addrinfo.hpp Normal file
View File

@ -0,0 +1,46 @@
#pragma once
#include <cstring>
#include <netdb.h>
#include "network_error.hpp"
#include "utils/underlying_cast.hpp"
namespace io
{
class AddrInfo
{
AddrInfo(struct addrinfo* info) : info(info) {}
public:
~AddrInfo()
{
freeaddrinfo(info);
}
static AddrInfo get(const char* port)
{
struct addrinfo hints;
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_UNSPEC; // IPv4 and IPv6
hints.ai_socktype = SOCK_STREAM; // TCP socket
hints.ai_flags = AI_PASSIVE;
struct addrinfo* result;
auto status = getaddrinfo(nullptr, port, &hints, &result);
if(status != 0)
throw NetworkError(gai_strerror(status));
return AddrInfo(result);
}
operator struct addrinfo*() { return info; }
private:
struct addrinfo* info;
};
}

39
io/network/epoll.hpp Normal file
View File

@ -0,0 +1,39 @@
#pragma once
#include <malloc.h>
#include <sys/epoll.h>
#include "socket.hpp"
#include "utils/likely.hpp"
namespace io
{
class Epoll
{
public:
using Event = struct epoll_event;
Epoll(int flags)
{
epoll_fd = epoll_create1(flags);
}
void add(Socket& socket, Event* event)
{
auto status = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, socket, event);
if(UNLIKELY(status))
throw NetworkError("Can't add connection to epoll listener.");
}
int wait(Event* events, int max_events, int timeout)
{
return epoll_wait(epoll_fd, events, max_events, timeout);
}
private:
int epoll_fd;
};
}

83
io/network/listener.hpp Normal file
View File

@ -0,0 +1,83 @@
#pragma once
#include <thread>
#include <atomic>
#include "socket.hpp"
#include "epoll.hpp"
#include "tcp_stream.hpp"
#include "utils/crtp.hpp"
namespace io
{
template <class Derived>
class Listener : public Crtp<Derived>
{
public:
Listener() : listener(0)
{
thread = std::thread([this]() { loop(); });
}
~Listener()
{
alive.store(false, std::memory_order_release);
thread.join();
}
void add(Socket& socket, Epoll::Event& event)
{
listener.add(socket, &event);
}
private:
void loop()
{
constexpr size_t MAX_EVENTS = 64;
Epoll::Event events[MAX_EVENTS];
while(alive.load(std::memory_order_acquire))
{
auto n = listener.wait(events, MAX_EVENTS, -1);
for(int i = 0; i < n; ++i)
{
auto& event = events[i];
auto stream = reinterpret_cast<TcpStream*>(event.data.ptr);
if(!(event.events & EPOLLIN))
{
LOG_DEBUG("error !EPOLLIN");
this->derived().on_error(stream);
continue;
}
if(event.events & EPOLLHUP)
{
LOG_DEBUG("error EPOLLHUP");
this->derived().on_error(stream);
continue;
}
if(event.events & EPOLLERR)
{
LOG_DEBUG("error EPOLLERR");
this->derived().on_error(stream);
continue;
}
this->derived().on_read(stream);
}
}
}
std::atomic<bool> alive {true};
std::thread thread;
Epoll listener;
};
}

View File

@ -0,0 +1,10 @@
#pragma once
#include <stdexcept>
class NetworkError : public std::runtime_error
{
public:
using std::runtime_error::runtime_error;
};

123
io/network/socket.hpp Normal file
View File

@ -0,0 +1,123 @@
#pragma once
#include <stdexcept>
#include <cstring>
#include <cstdio>
#include <cassert>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <errno.h>
#include "addrinfo.hpp"
#include "utils/likely.hpp"
namespace io
{
class Socket
{
Socket(int socket) : socket(socket) {}
Socket(int family, int socket_type, int protocol)
{
socket = ::socket(family, socket_type, protocol);
}
public:
Socket(const Socket&) = delete;
Socket(Socket&& other)
{
this->socket = other.socket;
other.socket = -1;
}
~Socket()
{
if(socket == -1)
return;
LOG_DEBUG("CLosing Socket " << socket);
close(socket);
}
bool is_open()
{
return socket != -1;
}
static Socket create(const char* port)
{
auto info = AddrInfo::get(port);
for(struct addrinfo* it = info; it != nullptr; it = it->ai_next)
{
LOG_DEBUG("Trying socket...");
auto s = Socket(it->ai_family, it->ai_socktype, it->ai_protocol);
if(!s.is_open())
continue;
/* int on = 1; */
/* if(setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))) */
/* continue; */
if(s.bind(it->ai_addr, it->ai_addrlen))
return std::move(s);
}
throw NetworkError("Unable to bind to socket");
}
bool bind(struct sockaddr* addr, socklen_t len)
{
assert(socket != -1);
return ::bind(socket, addr, len) == 0;
}
void set_non_blocking()
{
auto flags = fcntl(socket, F_GETFL, 0);
if(UNLIKELY(flags == -1))
throw NetworkError("Cannot read flags from socket");
flags |= O_NONBLOCK;
auto status = fcntl(socket, F_SETFL, flags);
if(UNLIKELY(status == -1))
throw NetworkError("Cannot set NON_BLOCK flag to socket");
}
void listen(int backlog)
{
auto status = ::listen(socket, backlog);
if(UNLIKELY(status == -1))
throw NetworkError("Cannot listen on socket");
}
Socket accept(struct sockaddr* addr, socklen_t* len)
{
return Socket(::accept(socket, addr, len));
}
operator int() { return socket; }
int id() const
{
return socket;
}
private:
int socket;
};
}

29
io/network/tcp_stream.hpp Normal file
View File

@ -0,0 +1,29 @@
#pragma once
#include "epoll.hpp"
#include "socket.hpp"
namespace io
{
class TcpStream
{
public:
TcpStream(Socket&& socket, uint32_t events)
: socket(std::move(socket))
{
event.events = events;
event.data.ptr = this;
}
void close()
{
LOG_DEBUG("CLOSE");
delete reinterpret_cast<TcpStream*>(event.data.ptr);
}
Socket socket;
Epoll::Event event;
};
}

207
io/network/test.cpp Normal file
View File

@ -0,0 +1,207 @@
#include <iostream>
#include <array>
#include "debug/log.hpp"
#include "socket.hpp"
#include "worker.hpp"
#define MAXEVENTS 64
static int
make_socket_non_blocking (int sfd)
{
int flags, s;
flags = fcntl (sfd, F_GETFL, 0);
if (flags == -1)
{
perror ("fcntl");
return -1;
}
flags |= O_NONBLOCK;
s = fcntl (sfd, F_SETFL, flags);
if (s == -1)
{
perror ("fcntl");
return -1;
}
return 0;
}
int main(void)
{
std::array<io::Worker, 8> workers;
int idx = 0;
auto socket = io::Socket::create("7474");
socket.set_non_blocking();
socket.listen(1024);
int efd, s;
struct epoll_event event;
struct epoll_event *events;
efd = epoll_create1 (0);
if (efd == -1)
{
perror ("epoll_create");
abort ();
}
event.data.fd = socket;
event.events = EPOLLIN | EPOLLET;
s = epoll_ctl (efd, EPOLL_CTL_ADD, socket, &event);
if (s == -1)
{
perror ("epoll_ctl");
abort ();
}
/* Buffer where events are returned */
events = static_cast<struct epoll_event*>(calloc (MAXEVENTS, sizeof event));
/* The event loop */
while (1)
{
int n, i;
n = epoll_wait (efd, events, MAXEVENTS, -1);
for (i = 0; i < n; i++)
{
if ((events[i].events & EPOLLERR) ||
(events[i].events & EPOLLHUP) ||
(!(events[i].events & EPOLLIN)))
{
/* An error has occured on this fd, or the socket is not
ready for reading (why were we notified then?) */
fprintf (stderr, "epoll error\n");
close (events[i].data.fd);
continue;
}
else if (socket == events[i].data.fd)
{
/* We have a notification on the listening socket, which
means one or more incoming connections. */
while (true)
{
idx = (idx + 1) % workers.size();
auto& worker = workers[i];
if(!worker.accept(socket))
break;
}
/* struct sockaddr in_addr; */
/* socklen_t in_len; */
/* int infd; */
/* char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; */
/* in_len = sizeof in_addr; */
/* infd = accept (socket, &in_addr, &in_len); */
/* if (infd == -1) */
/* { */
/* if ((errno == EAGAIN) || */
/* (errno == EWOULDBLOCK)) */
/* { */
/* /1* We have processed all incoming */
/* connections. *1/ */
/* break; */
/* } */
/* else */
/* { */
/* perror ("accept"); */
/* break; */
/* } */
/* } */
/* s = getnameinfo (&in_addr, in_len, */
/* hbuf, sizeof hbuf, */
/* sbuf, sizeof sbuf, */
/* NI_NUMERICHOST | NI_NUMERICSERV); */
/* if (s == 0) */
/* { */
/* printf("Accepted connection on descriptor %d " */
/* "(host=%s, port=%s)\n", infd, hbuf, sbuf); */
/* } */
/* /1* Make the incoming socket non-blocking and add it to the */
/* list of fds to monitor. *1/ */
/* s = make_socket_non_blocking (infd); */
/* if (s == -1) */
/* abort (); */
/* event.data.fd = infd; */
/* event.events = EPOLLIN | EPOLLET; */
/* s = epoll_ctl (efd, EPOLL_CTL_ADD, infd, &event); */
/* if (s == -1) */
/* { */
/* perror ("epoll_ctl"); */
/* abort (); */
/* } */
}
/* else */
/* { */
/* /1* We have data on the fd waiting to be read. Read and */
/* display it. We must read whatever data is available */
/* completely, as we are running in edge-triggered mode */
/* and won't get a notification again for the same */
/* data. *1/ */
/* int done = 0; */
/* while (1) */
/* { */
/* ssize_t count; */
/* char buf[512]; */
/* count = read (events[i].data.fd, buf, sizeof buf); */
/* if (count == -1) */
/* { */
/* /1* If errno == EAGAIN, that means we have read all */
/* data. So go back to the main loop. *1/ */
/* if (errno != EAGAIN) */
/* { */
/* perror ("read"); */
/* done = 1; */
/* } */
/* break; */
/* } */
/* else if (count == 0) */
/* { */
/* /1* End of file. The remote has closed the */
/* connection. *1/ */
/* done = 1; */
/* break; */
/* } */
/* /1* Write the buffer to standard output *1/ */
/* s = write (1, buf, count); */
/* if (s == -1) */
/* { */
/* perror ("write"); */
/* abort (); */
/* } */
/* } */
/* if (done) */
/* { */
/* printf ("Closed connection on descriptor %d\n", */
/* events[i].data.fd); */
/* /1* Closing the descriptor will make epoll remove it */
/* from the set of descriptors which are monitored. *1/ */
/* close (events[i].data.fd); */
/* } */
/* } */
}
}
free (events);
return 0;
}

92
io/network/worker.hpp Normal file
View File

@ -0,0 +1,92 @@
#pragma once
#include "listener.hpp"
#include "tcp_stream.hpp"
namespace io
{
const char* response = "HTTP/1.1 200 OK\r\nContent-Length:0\r\nConnection:Keep-Alive\r\n\r\n";
size_t len = strlen(response);
class Worker : public Listener<Worker>
{
char buf[512];
public:
Worker() = default;
bool accept(Socket& socket)
{
auto s = socket.accept(nullptr, nullptr);
if(!s.is_open())
return false;
s.set_non_blocking();
auto stream = new TcpStream(std::move(s), EPOLLIN | EPOLLET);
this->add(stream->socket, stream->event);
LOG_DEBUG("Listening to TCP stream at" << stream->socket.id())
return true;
}
void on_error(TcpStream* stream)
{
delete stream;
}
void on_read(TcpStream* stream)
{
int done = 0;
while (1)
{
ssize_t count;
count = read(stream->socket, buf, sizeof buf);
if (count == -1)
{
/* If errno == EAGAIN, that means we have read all
data. So go back to the main loop. */
if (errno != EAGAIN)
{
perror ("read");
done = 1;
}
break;
}
else if (count == 0)
{
/* End of file. The remote has closed the
connection. */
done = 1;
break;
}
size_t sum = 0;
char* resp = (char*)response;
while(sum < len)
{
int k = write(stream->socket, resp, len - sum);
sum += k;
resp += k;
}
}
if (done)
{
LOG_DEBUG("Closing TCP stream at " << stream->socket.id())
/* Closing the descriptor will make epoll remove it
from the set of descriptors which are monitored. */
delete stream;
}
}
};
}

View File

@ -0,0 +1,28 @@
#pragma once
#include <cstdlib>
class RingCounter
{
public:
RingCounter(size_t n, size_t initial = 0)
: n(n), counter(initial) {}
size_t operator++()
{
counter = (counter + 1) % n;
return counter;
}
size_t operator++(int)
{
auto value = counter;
++counter;
return value;
}
operator size_t() const { return counter; }
private:
size_t n, counter;
};

10
utils/likely.hpp Normal file
View File

@ -0,0 +1,10 @@
#pragma once
#if __GNUC__ >= 3
// make a hint for the branch predictor in a conditional or a loop
#define LIKELY(x) __builtin_expect(!!(x), 1)
#define UNLIKELY(x) __builtin_expect(!!(x), 0)
#else
#define LIKELY(x) (x)
#define UNLIKELY(x) (x)
#endif