checking in all work because i won't be able to work on it for some time

This commit is contained in:
Dominik Tomičević 2015-11-22 21:35:40 +01:00
parent 101b0ec12f
commit 7c8d15b949
23 changed files with 1019 additions and 482 deletions

View File

@ -0,0 +1,257 @@
#ifndef MEMGRAPH_DATA_STRUCTURES_SKIPLIST_SKIPLIST_HPP
#define MEMGRAPH_DATA_STRUCTURES_SKIPLIST_SKIPLIST_HPP
#include <algorithm>
#include <cstdlib>
#include <array>
#include "new_height.hpp"
#include "skipnode.hpp"
// concurrent skiplist based on the implementation described in
// "A Provably Correct Scalable Concurrent Skip List"
// https://www.cs.tau.ac.il/~shanir/nir-pubs-web/Papers/OPODIS2006-BA.pdf
template <class K,
class T,
size_t MAX_HEIGHT = 24,
class compare=std::less<K>,
class lock_type=SpinLock>
class SkipList
{
using Node = SkipNode<K, T, lock_type>;
public:
SkipList()
: size_(0),
header(Node::create(MAX_HEIGHT, nullptr, nullptr)) {}
~SkipList()
{
for(Node* current = header.load(); current;)
{
Node* next = current->forward(0);
Node::destroy(current);
current = next;
}
}
size_t size() const
{
return size_.load();
}
uint8_t height() const
{
return MAX_HEIGHT;
}
//private:
bool greater(const K* const key, const Node* node)
{
return node && compare()(*node->key, *key);
}
bool less(const K* const key, const Node* node)
{
return (node == nullptr) || compare()(*key, *node->key);
}
size_t increment_size(size_t delta)
{
return size_.fetch_add(delta) + delta;
}
int find_path(Node* from,
int start_level,
const K* const key,
Node* preds[],
Node* succs[])
{
int lfound = -1;
Node* pred = from;
for(int level = start_level; level >= 0; --level)
{
Node* node = pred->forward(level);
while(greater(key, node))
pred = node, node = pred->forward(level);
if(lfound == -1 && !less(key, node))
lfound = level;
preds[level] = pred;
succs[level] = node;
}
return lfound;
}
Node* find(const K* const key)
{
Node* pred = header.load();
Node* node = nullptr;
uint8_t level = pred->height;
bool found = false;
while(!found)
{
// descend down first, facebook says it works better xD but make
// some tests when you have time to determine the best strategy
for(; level > 0 &&
less(key, node = pred->forward(level - 1)); --level) {}
if(level == 0)
return nullptr;
--level;
while(greater(key, node))
pred = node, node = node->forward(level);
found = !less(key, node);
}
return node;
}
template <bool ADDING>
bool lock_nodes(uint8_t height,
std::unique_lock<lock_type> guards[MAX_HEIGHT],
Node* preds[MAX_HEIGHT],
Node* succs[MAX_HEIGHT])
{
Node *prepred, *pred, *succ = nullptr;
bool valid = true;
for(int level = 0; valid && level < height; ++level)
{
pred = preds[level], succ = succs[level];
if(pred != prepred)
guards[level] = pred->guard(), prepred = pred;
valid = !pred->marked() && pred->forward(level) == succ;
if(ADDING)
valid = valid && (succ == nullptr || !succ->marked());
}
return valid;
}
bool insert(K* key, T* item)
{
Node *preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
while(true)
{
auto head = header.load();
auto lfound = find_path(head, MAX_HEIGHT - 1, key, preds, succs);
if(lfound != -1)
{
auto found = succs[lfound];
if(!found->marked())
{
while(!found->fully_linked()) {}
return false;
}
continue;
}
auto node_height = new_height(MAX_HEIGHT);
std::unique_lock<lock_type> guards[MAX_HEIGHT];
// try to acquire the locks for predecessors up to the height of
// the new node. release the locks and try again if someone else
// has the locks
if(!lock_nodes<true>(node_height, guards, preds, succs))
continue;
// you have the locks, create a new node
auto new_node = Node::create(node_height, key, item);
// link the predecessors and successors, e.g.
//
// 4 HEAD ... P ------------------------> S ... NULL
// 3 HEAD ... ... P -----> NEW ---------> S ... NULL
// 2 HEAD ... ... P -----> NEW -----> S ... ... NULL
// 1 HEAD ... ... ... P -> NEW -> S ... ... ... NULL
for(uint8_t level = 0; level < node_height; ++level)
{
new_node->forward(level, succs[level]);
preds[level]->forward(level, new_node);
}
new_node->set_fully_linked();
increment_size(1);
return true;
}
}
bool ok_delete(Node* node, int level)
{
return node->fully_linked()
&& node->height - 1 == level
&& !node->marked();
}
bool remove(const K* const key)
{
Node* node = nullptr;
std::unique_lock<lock_type> node_guard;
bool marked = false;
int node_height = 0;
Node* preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
while(true)
{
auto head = header.load();
auto lfound = find_path(head, MAX_HEIGHT - 1, key, preds, succs);
if(!marked && (lfound == -1 || !ok_delete(succs[lfound], lfound)))
return false;
if(!marked)
{
node = succs[lfound];
node_height = node->height;
node_guard = node->guard();
if(node->marked())
return false;
node->set_marked();
}
std::unique_lock<lock_type> guards[MAX_HEIGHT];
if(!lock_nodes<false>(node_height, guards, preds, succs))
continue;
for(int level = node_height - 1; level >= 0; --level)
preds[level]->forward(level, node->forward(level));
increment_size(-1);
break;
}
// TODO recyclee(node);
return true;
}
std::atomic<size_t> size_;
std::atomic<Node*> header;
};
#endif

View File

@ -1,73 +1,265 @@
#ifndef MEMGRAPH_DATA_STRUCTURES_SKIPLIST_SKIPLIST_HPP
#define MEMGRAPH_DATA_STRUCTURES_SKIPLIST_SKIPLIST_HPP
#pragma once
#include <algorithm>
#include <cstdlib>
#include <array>
#include <memory>
#include "new_height.hpp"
#include "skipnode.hpp"
#include "threading/hazard_ptr.hpp"
#include "threading/sync/lockable.hpp"
#include "threading/sync/spinlock.hpp"
#include "utils/random/fast_binomial.hpp"
// concurrent skiplist based on the implementation described in
// "A Provably Correct Scalable Concurrent Skip List"
// https://www.cs.tau.ac.il/~shanir/nir-pubs-web/Papers/OPODIS2006-BA.pdf
template <class K,
class T,
size_t MAX_HEIGHT = 24,
class compare=std::less<K>,
class lock_type=SpinLock>
template <class K, class T, size_t H=32, class lock_t=SpinLock>
class SkipList
{
using Node = SkipNode<K, T, lock_type>;
static thread_local FastBinomial<H> rnd;
struct Flags
{
enum node_flags : uint8_t {
MARKED = 0x01,
FULLY_LINKED = 0x10,
};
bool is_marked() const
{
return flags.load(std::memory_order_acquire) & MARKED;
}
void set_marked()
{
flags.fetch_or(MARKED, std::memory_order_release);
}
bool is_fully_linked() const
{
return flags.load(std::memory_order_acquire) & FULLY_LINKED;
}
void set_fully_linked()
{
flags.fetch_or(FULLY_LINKED, std::memory_order_release);
}
private:
std::atomic<uint8_t> flags {0};
};
class Node : Lockable<lock_t>
{
friend class SkipList;
public:
class reference
{
public:
reference() {}
reference(Node* node, bool store_hazard = true)
: node(node), hazard(store_hazard ? node : nullptr) {}
reference(const reference&) = delete;
reference(reference&& other)
: node(other.node), hazard(std::move(other.hazard))
{
other.node = nullptr;
}
reference& operator=(const reference& other) = delete;
reference& operator=(const reference&& other)
{
node = other.node;
other.node = nullptr;
hazard = std::move(other.hazard);
return *this;
}
T& operator*() { return *node; }
const T& operator*() const { return *node; }
T* operator->() { return node; }
const T* operator->() const { return node; }
operator T*() { return node; }
const operator T*() const { return node; }
const hazard_ptr& get_hazard() const
{
return hazard;
}
private:
hazard_ptr hazard;
Node* node;
};
K key;
T item;
const uint8_t height;
Flags flags;
static Node* create(K key, T item, uint8_t height)
{
// [ Node ][Node*][Node*][Node*]...[Node*]
// | | | | |
// | 0 1 2 height-1
// |----------------||-----------------------------|
// space for Node space for tower pointers
// structure right after the Node
// structure
auto size = sizeof(Node) + height * sizeof(std::atomic<Node*>);
auto node = static_cast<Node*>(std::malloc(size));
// we have raw memory and we need to construct an object
// of type Node on it
return new (node) Node(key, item, height);
}
static void destroy(Node* node)
{
node->~Node();
free(node);
}
Node::reference forward(size_t level) const
{
while(true)
{
auto ref = Node::reference(load_tower(level));
if(ref.get_hazard() == load_tower(level))
return std::move(ref);
}
}
private:
Node(K key, T item, uint8_t height)
: key(key), item(item), height(height)
{
// here we assume, that the memory for N towers (N = height) has
// been allocated right after the Node structure so we need to
// initialize that memory
for(auto i = 0; i < height; ++i)
new (&tower[i]) std::atomic<Node*> {nullptr};
}
~Node()
{
for(auto i = 0; i < height; ++i)
tower[i].~atomic();
}
void forward(size_t level, Node* next)
{
tower[level].store(next, std::memory_order_release);
}
Node* load_tower(size_t level)
{
return tower[level].load(std::memory_order_acquire);
}
// this creates an array of the size zero. we can't put any sensible
// value here since we don't know what size it will be untill the
// node is allocated. we could make it a Node** but then we would
// have two memory allocations, one for node and one for the forward
// list. this way we avoid expensive malloc/free calls and also cache
// thrashing when following a pointer on the heap
std::atomic<Node*> tower[0];
};
using node_ref_t = typename Node::reference;
public:
SkipList()
: size_(0),
header(Node::create(MAX_HEIGHT, nullptr, nullptr)) {}
~SkipList()
class iterator
{
for(Node* current = header.load(); current;)
public:
iterator() = default;
iterator(node_ref_t&& node) : node(std::move(node)) {}
iterator(const iterator&) = delete;
T& operator*() { return node; }
T* operator->() { return node; }
operator T*() { return node; }
iterator& operator++()
{
Node* next = current->forward(0);
Node::destroy(current);
current = next;
node = node->forward(0);
return *this;
}
}
iterator& operator++(int)
{
return operator++();
}
private:
node_ref_t node;
};
SkipList() : header(Node::create(K(), T(), H)) {}
auto begin() { return iterator(header->forward(0)); }
auto end() { return iterator(); }
size_t size() const
{
return size_.load();
return count.load(std::memory_order_acquire);
}
uint8_t height() const
iterator find(const K& key)
{
return MAX_HEIGHT;
auto pred = node_ref_t(header);
node_ref_t node;
uint8_t h = pred->height - 1;
while(true)
{
// try to descend down first the next key on this layer overshoots
for(; h >= 0 && less(key, node = pred->forward(h)); --h) {}
// if we overshoot at every layer, item doesn't exist
if(h < 0)
return iterator();
// the item is farther to the right, continue going right as long
// as the key is greater than the current node's key
while(greater(key, node))
pred = node, node = node->forward(h);
// check if we have a hit. if not, we need to descend down again
if(!less(key, node))
return iterator(std::move(node));
}
}
//private:
private:
std::atomic<size_t> count {0};
Node* header;
bool greater(const K* const key, const Node* node)
bool greater(const K& key, const Node* const node)
{
return node && compare()(*node->key, *key);
return node && key > node->key;
}
bool less(const K* const key, const Node* node)
bool less(const K& key, const Node* const node)
{
return (node == nullptr) || compare()(*key, *node->key);
return (node == nullptr) || key < node->key;
}
size_t increment_size(size_t delta)
{
return size_.fetch_add(delta) + delta;
}
int find_path(Node* from,
int start_level,
const K* const key,
Node* preds[],
Node* succs[])
int find_path(Node& from, int start, const K& key,
node_ref_t (preds&)[],
node_ref_t (succs&)[])
{
int lfound = -1;
Node* pred = from;
@ -89,169 +281,4 @@ public:
return lfound;
}
Node* find(const K* const key)
{
Node* pred = header.load();
Node* node = nullptr;
uint8_t level = pred->height;
bool found = false;
while(!found)
{
// descend down first, facebook says it works better xD but make
// some tests when you have time to determine the best strategy
for(; level > 0 &&
less(key, node = pred->forward(level - 1)); --level) {}
if(level == 0)
return nullptr;
--level;
while(greater(key, node))
pred = node, node = node->forward(level);
found = !less(key, node);
}
return node;
}
template <bool ADDING>
bool lock_nodes(uint8_t height,
std::unique_lock<lock_type> guards[MAX_HEIGHT],
Node* preds[MAX_HEIGHT],
Node* succs[MAX_HEIGHT])
{
Node *prepred, *pred, *succ = nullptr;
bool valid = true;
for(int level = 0; valid && level < height; ++level)
{
pred = preds[level], succ = succs[level];
if(pred != prepred)
guards[level] = pred->guard(), prepred = pred;
valid = !pred->marked() && pred->forward(level) == succ;
if(ADDING)
valid = valid && (succ == nullptr || !succ->marked());
}
return valid;
}
bool insert(K* key, T* item)
{
Node *preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
while(true)
{
auto head = header.load();
auto lfound = find_path(head, MAX_HEIGHT - 1, key, preds, succs);
if(lfound != -1)
{
auto found = succs[lfound];
if(!found->marked())
{
while(!found->fully_linked()) {}
return false;
}
continue;
}
auto node_height = new_height(MAX_HEIGHT);
std::unique_lock<lock_type> guards[MAX_HEIGHT];
// try to acquire the locks for predecessors up to the height of
// the new node. release the locks and try again if someone else
// has the locks
if(!lock_nodes<true>(node_height, guards, preds, succs))
continue;
// you have the locks, create a new node
auto new_node = Node::create(node_height, key, item);
// link the predecessors and successors, e.g.
//
// 4 HEAD ... P ------------------------> S ... NULL
// 3 HEAD ... ... P -----> NEW ---------> S ... NULL
// 2 HEAD ... ... P -----> NEW -----> S ... ... NULL
// 1 HEAD ... ... ... P -> NEW -> S ... ... ... NULL
for(uint8_t level = 0; level < node_height; ++level)
{
new_node->forward(level, succs[level]);
preds[level]->forward(level, new_node);
}
new_node->set_fully_linked();
increment_size(1);
return true;
}
}
bool ok_delete(Node* node, int level)
{
return node->fully_linked()
&& node->height - 1 == level
&& !node->marked();
}
bool remove(const K* const key)
{
Node* node = nullptr;
std::unique_lock<lock_type> node_guard;
bool marked = false;
int node_height = 0;
Node* preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
while(true)
{
auto head = header.load();
auto lfound = find_path(head, MAX_HEIGHT - 1, key, preds, succs);
if(!marked && (lfound == -1 || !ok_delete(succs[lfound], lfound)))
return false;
if(!marked)
{
node = succs[lfound];
node_height = node->height;
node_guard = node->guard();
if(node->marked())
return false;
node->set_marked();
}
std::unique_lock<lock_type> guards[MAX_HEIGHT];
if(!lock_nodes<false>(node_height, guards, preds, succs))
continue;
for(int level = node_height - 1; level >= 0; --level)
preds[level]->forward(level, node->forward(level));
increment_size(-1);
break;
}
// TODO recyclee(node);
return true;
}
std::atomic<size_t> size_;
std::atomic<Node*> header;
};
#endif

36
http/connection.hpp Normal file
View File

@ -0,0 +1,36 @@
#pragma once
#include "io/network/tcp_stream.hpp"
namespace htpp
{
using memory::literals::operator "" _kB;
class Connection
{
Connection(io::Socket&& socket) : stream(std::move(socket))
{
stream.data = this;
}
void close()
{
delete reinterpret_cast<Connection*>(stream.data);
}
struct Buffers
{
char headers[8_kB];
char body[64_kB];
static constexpr size_t size = sizeof headers + sizeof body;
};
// tcp stream reads into these buffers
Buffers buffers;
// tcp stream this connection reads from
io::TcpStream stream;
};
}

12
http/parser.hpp Normal file
View File

@ -0,0 +1,12 @@
#pragma once
namespace htpp
{
class Parser
{
};
}

24
http/parser_test.cpp Normal file
View File

@ -0,0 +1,24 @@
#include <iostream>
#include "io/network/tcp_server.hpp"
#include "http/worker.hpp"
int main(void)
{
const char* req = "GET /foo/bar/7/comments HTTP/1.1\r\n"
"Content-Type : application/javascript\r\n"
"Content-Length: 3872\r\n"
"Connection:keep-alive\r\n"
"Accept-Ranges: bytes\r\n"
"Last-modified: Sun, 8 october 2015 GMT\r\n"
"\r\n"
"{alo:'bre', bre: 'alo sta ti je'}\r\n";
io::TcpServer<http::Worker> server;
server.bind("0.0.0.0", "7474").listen(8, 128, []() {
std::cout << "response!" << std::endl;
});
return 0;
};

65
http/worker.hpp Normal file
View File

@ -0,0 +1,65 @@
#pragma once
#include "io/network/tcp_reader.hpp"
#include "debug/log.hpp"
namespace http
{
const char* body = "Now that the internet can be accessed on any mobile device, whether a laptop, desktop, tablets, smartphone, websites are designed in responsive web version. It is the ability to change the page and font size according to the screen size of the user. desktop, tablets, smartphone, websites are designed in responsive web version. It is the ability to change the page and font size according to the screen size of the user. Thus a website is accessible anytime on any instrument. CSS3 frameworks were widely accepted in 2014 and is growing in 2015. It reduces time and money by helping not creating different sites for different users";
std::string response = "HTTP/1.1 200 OK\r\nContent-Length:"
+ std::to_string(strlen(body)) + "\r\n\r\n" + body;
class Worker : public io::TcpReader<Worker>
{
public:
char buf[65536];
Worker() = default;
io::TcpStream& on_connect(io::Socket&& socket)
{
auto stream = new io::TcpStream(std::move(socket));
LOG_DEBUG("on_connect socket " << stream->id());
return *stream;
}
void on_error(io::TcpStream& stream)
{
LOG_DEBUG("on_error: " << stream.id());
}
void on_wait_timeout()
{
LOG_DEBUG("Worker on_wait_timeout");
}
Buffer on_alloc(size_t suggested_size)
{
LOG_DEBUG("Allocating buffer");
return Buffer { buf, sizeof buf };
}
void on_read(io::TcpStream& stream, Buffer& buf)
{
LOG_DEBUG("on_read (socket: " << stream.id() << "): '"
<< std::string(buf.ptr, buf.len) << "'");
auto& socket = stream.socket;
auto n = write(socket, response.c_str(), response.size());
LOG_DEBUG("Responded with " << n << " characters");
}
void on_close(io::TcpStream& stream)
{
LOG_DEBUG("on_close: " << stream.id());
stream.close();
}
};
}

1
io/network/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
test

17
io/network/Makefile Normal file
View File

@ -0,0 +1,17 @@
CXX=clang++
CFLAGS=-std=c++1y -O2 -Wall -Wno-unknown-pragmas
LDFLAGS=-lhttp_parser -pthread
INC=-I../../
SOURCES=test.cpp
EXECUTABLE=test
all: $(EXECUTABLE)
$(EXECUTABLE): $(SOURCES)
$(CXX) $(CFLAGS) $(SOURCES) -o $(EXECUTABLE) $(INC) $(LDFLAGS)
.PHONY:
clean:
rm -f test
rm -f *.o

View File

@ -24,7 +24,7 @@ public:
auto status = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, socket, event);
if(UNLIKELY(status))
throw NetworkError("Can't add connection to epoll listener.");
throw NetworkError("Can't add an event to epoll listener.");
}
int wait(Event* events, int max_events, int timeout)

View File

@ -1,100 +0,0 @@
#pragma once
#include <thread>
#include <atomic>
#include "socket.hpp"
#include "epoll.hpp"
#include "tcp_stream.hpp"
#include "utils/crtp.hpp"
namespace io
{
template <class Derived>
class Listener : public Crtp<Derived>
{
public:
Listener() : listener(0)
{
}
Listener(Listener&& other) : listener(0)
{
this->thread = std::move(other.thread);
this->listener = std::move(other.listener);
}
~Listener()
{
LOG_DEBUG("JOIN THREAD");
alive.store(false, std::memory_order_release);
if(thread.joinable())
thread.join();
}
void start()
{
thread = std::thread([this]() { loop(); });
}
void add(Socket& socket, Epoll::Event& event)
{
listener.add(socket, &event);
}
//protected:
void loop()
{
LOG_DEBUG("Thread starting " << listener.id() << " " << hash(std::this_thread::get_id()));
constexpr size_t MAX_EVENTS = 64;
Epoll::Event events[MAX_EVENTS];
while(alive.load(std::memory_order_acquire))
{
auto n = listener.wait(events, MAX_EVENTS, 1000);
for(int i = 0; i < n; ++i)
{
auto& event = events[i];
auto stream = reinterpret_cast<TcpStream*>(event.data.ptr);
if(!(event.events & EPOLLIN))
{
LOG_DEBUG("error !EPOLLIN");
this->derived().on_error(stream);
continue;
}
if(event.events & EPOLLHUP)
{
LOG_DEBUG("error EPOLLHUP");
this->derived().on_error(stream);
continue;
}
if(event.events & EPOLLERR)
{
LOG_DEBUG("error EPOLLERR");
this->derived().on_error(stream);
continue;
}
this->derived().on_read(stream);
}
}
LOG_DEBUG("Thread exiting");
}
std::atomic<bool> alive {true};
std::thread thread;
Epoll listener;
};
}

View File

@ -21,20 +21,19 @@ namespace io
class Socket
{
Socket(int socket) : socket(socket) {}
Socket(int family, int socket_type, int protocol)
{
socket = ::socket(family, socket_type, protocol);
}
public:
Socket(int socket = -1) : socket(socket) {}
Socket(const Socket&) = delete;
Socket(Socket&& other)
{
this->socket = other.socket;
other.socket = -1;
*this = std::forward<Socket>(other);
}
~Socket()
@ -42,23 +41,27 @@ public:
if(socket == -1)
return;
LOG_DEBUG("CLosing Socket " << socket);
close(socket);
}
Socket& operator=(Socket&& other)
{
this->socket = other.socket;
other.socket = -1;
return *this;
}
bool is_open()
{
return socket != -1;
}
static Socket create(const char* addr, const char* port)
static Socket bind(const char* addr, const char* port)
{
auto info = AddrInfo::get(addr, port);
for(struct addrinfo* it = info; it != nullptr; it = it->ai_next)
{
LOG_DEBUG("Trying socket...");
auto s = Socket(it->ai_family, it->ai_socktype, it->ai_protocol);
if(!s.is_open())
@ -68,19 +71,13 @@ public:
if(setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)))
continue;
if(s.bind(it->ai_addr, it->ai_addrlen))
if(::bind(s, it->ai_addr, it->ai_addrlen) == 0)
return std::move(s);
}
throw NetworkError("Unable to bind to socket");
}
bool bind(struct sockaddr* addr, socklen_t len)
{
assert(socket != -1);
return ::bind(socket, addr, len) == 0;
}
void set_non_blocking()
{
auto flags = fcntl(socket, F_GETFL, 0);

View File

@ -0,0 +1,78 @@
#pragma once
#include "socket.hpp"
#include "epoll.hpp"
#include "tcp_stream.hpp"
#include "utils/crtp.hpp"
namespace io
{
template <class Derived, size_t max_events = 64, int wait_timeout = -1>
class TcpListener : public Crtp<Derived>
{
public:
using Crtp<Derived>::derived;
TcpListener(uint32_t flags = 0) : listener(flags) {}
void add(TcpStream& stream)
{
// add the stream to the event listener
listener.add(stream.socket, &stream.event);
}
void wait_and_process_events()
{
// waits for an event/multiple events and returns a maximum of
// max_events and stores them in the events array. it waits for
// wait_timeout milliseconds. if wait_timeout is achieved, returns 0
auto n = listener.wait(events, max_events, wait_timeout);
// go through all events and process them in order
for(int i = 0; i < n; ++i)
{
auto& event = events[i];
auto& stream = *reinterpret_cast<TcpStream*>(event.data.ptr);
// a tcp stream was closed
if(UNLIKELY(event.events & EPOLLRDHUP))
{
this->derived().on_close(stream);
continue;
}
// there was an error on the server side
if(UNLIKELY(!(event.events & EPOLLIN) ||
event.events & (EPOLLHUP | EPOLLERR)))
{
this->derived().on_error(stream);
continue;
}
// we have some data waiting to be read
this->derived().on_data(stream);
}
// this will be optimized out :D
if(wait_timeout < 0)
return;
// if there was events, continue to wait on new events
if(n != 0)
return;
// wait timeout occurred and there were no events. if wait_timeout
// is -1 there will never be any timeouts so client should provide
// an empty function. in that case the conditional above and the
// function call will be optimized out by the compiler
this->derived().on_wait_timeout();
}
protected:
Epoll listener;
Epoll::Event events[max_events];
};
}

91
io/network/tcp_reader.hpp Normal file
View File

@ -0,0 +1,91 @@
#pragma once
#include "tcp_listener.hpp"
#include "tcp_stream.hpp"
namespace io
{
template <class Derived>
class TcpReader : public TcpListener<TcpReader<Derived>>, public Crtp<Derived>
{
using listener_t = TcpListener<TcpReader<Derived>>;
using Crtp<Derived>::derived;
public:
TcpReader() : listener_t(0) {}
struct Buffer
{
char* ptr;
size_t len;
};
bool accept(Socket& socket)
{
// accept a connection from a socket
auto s = socket.accept(nullptr, nullptr);
if(!s.is_open())
return false;
// make the recieved socket non blocking
s.set_non_blocking();
auto& stream = derived().on_connect(std::move(s));
// we want to listen to an incoming event whish is edge triggered and
// we also want to listen on the hangup event
stream.event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
// add the connection to the event listener
this->add(stream);
return true;
}
void on_close(TcpStream& stream)
{
derived().on_close(stream);
}
void on_error(TcpStream& stream)
{
derived().on_error(stream);
}
void on_wait_timeout()
{
derived().on_wait_timeout();
}
void on_data(TcpStream& stream)
{
constexpr size_t suggested_size = 64_kB;
while(true)
{
Buffer buf = derived().on_alloc(suggested_size);
buf.len = read(stream.socket, buf.ptr, buf.len);
if(buf.len == -1)
{
if(UNLIKELY(errno != EAGAIN))
derived().on_error(stream);
break;
}
if(UNLIKELY(buf.len == 0))
{
stream.close();
break;
}
derived().on_read(stream, buf);
}
}
};
}

View File

@ -1,50 +1,49 @@
#pragma once
#include <list>
#include <cstdlib>
#include <cstdio>
#include <unistd.h>
#include <cstring>
#include <thread>
#include <atomic>
#include "epoll.hpp"
#include "debug/log.hpp"
#include "tcp_listener.hpp"
namespace io
{
class TcpConnection
{
};
class TcpServer
template <class T>
class TcpServer : TcpListener<TcpServer<T>, 64, -1>
{
public:
TcpServer(const char* port)
: socket(Socket::create(port)), epoll(0)
TcpServer(const char* addr, const char* port)
: stream(std::move(Socket::bind(addr, port))) {}
~TcpServer()
{
socket.set_non_blocking();
socket.listen(128);
stop();
listener.data.fd = socket;
listener.events = EPOLLIN | EPOLLET;
epoll.add(socket, &listener);
for(auto& worker : workers)
worker.join();
}
void start(int max_events)
template <class F>
void listen(size_t n, size_t backlog, F&& f)
{
Epoll::EventBuffer events(max_events);
while(alive)
for(size_t i = 0; i < n; ++i)
{
auto n = epoll.wait(events, events.size(), -1);
workers.emplace_back();
auto& w = workers.back();
for(int i = 0; i < n; ++i)
process_event(events[i]);
threads[i] = std::thread([this, &w]() {
while(alive)
{
LOG_DEBUG("Worker " << hash(std::this_thread::get_id())
<< " waiting... ");
w.wait_and_process_events();
}
});
}
stream.socket.listen(backlog);
}
void stop()
@ -53,96 +52,42 @@ public:
}
private:
Socket socket;
Epoll epoll;
Epoll::event listener;
std::list<std::thread> threads;
std::list<T> workers;
std::atomic<bool> alive { true };
std::list<Socket> sockets;
void process_event(Epoll::event& event)
TcpStream stream;
size_t idx = 0;
void on_close(TcpStream& stream)
{
//if(UNLIKELY(event.events & (EPOLLERR | EPOLLHUP | EPOLLIN)))
if ((event.events & EPOLLERR) ||
(event.events & EPOLLHUP) ||
(!(event.events & EPOLLIN)))
{
#ifndef NDEBUG
LOG_DEBUG("Epoll error!");
#endif
// close the connection
close(event.data.fd);
return;
}
if(event.data.fd == socket)
{
while(accept_connection()) {}
return;
}
while(read_data(event)) {}
LOG_DEBUG("server on_close!!!!");
}
bool accept_connection()
void on_error(TcpStream& stream)
{
#ifndef NDEBUG
struct sockaddr addr;
socklen_t len;
auto clientt = socket.accept(&addr, &len);
#else
auto clientt = socket.accept(nullptr, nullptr);
#endif
if(UNLIKELY(!clientt.is_open()))
return false;
#ifndef NDEBUG
char host[NI_MAXHOST], port[NI_MAXSERV];
auto status = getnameinfo(&addr, len,
host, sizeof host,
port, sizeof port,
NI_NUMERICHOST | NI_NUMERICSERV);
if(status)
{
LOG_DEBUG("Accepted connection on descriptor " << clientt
<< " (host: " << std::string(host)
<< ", port: " << std::string(port) << ")");
}
#endif
sockets.emplace_back(std::move(clientt));
auto& client = sockets.back();
client.set_non_blocking();
auto ev = new Epoll::event;
ev->events = EPOLLIN | EPOLLET;
ev->data.fd = client;
epoll.add(client, ev);
return true;
LOG_DEBUG("server on_error!!!!");
}
bool read_data(Epoll::event& event)
void on_data(TcpStream&)
{
char buf[512];
auto count = read(event.data.fd, buf, sizeof buf);
if(count == -1)
return false;
if(!count)
while (true)
{
close(event.data.fd);
return false;
LOG_DEBUG("Trying to accept... ");
if(!workers[idx].accept(socket))
{
LOG_DEBUG("Did not accept!");
break;
}
idx = (idx + 1) % workers.size();
LOG_DEBUG("Accepted a new connection!");
}
}
const char* response = "HTTP/1.1 200 OK\r\nContent-Length:0\r\nConnection:Keep-Alive\r\n\r\n";
void on_wait_timeout()
{
write(event.data.fd, response, sizeof response);
return true;
}
};

View File

@ -3,27 +3,31 @@
#include "epoll.hpp"
#include "socket.hpp"
#include "memory/literals.hpp"
namespace io
{
class TcpStream
{
public:
TcpStream(Socket&& socket, uint32_t events)
: socket(std::move(socket))
TcpStream(Socket&& socket) : socket(std::move(socket))
{
event.events = events;
event.data.ptr = this;
}
void close()
{
LOG_DEBUG("CLOSE");
delete reinterpret_cast<TcpStream*>(event.data.ptr);
}
int id() const { return socket.id(); }
Socket socket;
Epoll::Event event;
// custom data we can pass on
void* data;
};
}

Binary file not shown.

View File

@ -8,50 +8,27 @@ std::hash<std::thread::id> hash;
#include "debug/log.hpp"
#include "socket.hpp"
#include "worker.hpp"
#include "http/worker.hpp"
std::array<io::Worker, 16> workers;
std::array<http::Worker, 16> workers;
std::array<std::thread, 16> threads;
std::atomic<bool> alive { true };
void exiting()
{
for(size_t i = 0; i < workers.size(); ++i)
{
auto n = workers[i].requests.load();
std::cout << "worker " << i << " responded to " << n << " requests" << std::endl;
}
LOG_DEBUG("Exiting...");
}
void sigint_handler(int)
{
exiting();
std::abort();
}
#define MAXEVENTS 64
static int
make_socket_non_blocking (int sfd)
{
int flags, s;
flags = fcntl (sfd, F_GETFL, 0);
if (flags == -1)
{
perror ("fcntl");
return -1;
}
flags |= O_NONBLOCK;
s = fcntl (sfd, F_SETFL, flags);
if (s == -1)
{
perror ("fcntl");
return -1;
}
return 0;
}
int main(void)
{
std::atexit(exiting);
@ -60,7 +37,15 @@ int main(void)
for(size_t i = 0; i < workers.size(); ++i)
{
auto& w = workers[i];
w.start();
threads[i] = std::thread([&w]() {
while(alive)
{
LOG_DEBUG("Worker " << hash(std::this_thread::get_id())
<< " waiting... ");
w.wait_and_process_events();
}
});
}
/* size_t WORKERS = std::thread::hardware_concurrency(); */
@ -76,7 +61,7 @@ int main(void)
int idx = 0;
auto socket = io::Socket::create("0.0.0.0", "7474");
auto socket = io::Socket::bind("0.0.0.0", "7474");
socket.set_non_blocking();
socket.listen(1024);
@ -109,7 +94,6 @@ int main(void)
int n, i;
n = epoll_wait (efd, events, MAXEVENTS, -1);
LOG_DEBUG("MASTER WOKEN UP");
for (i = 0; i < n; i++)
{
@ -130,10 +114,15 @@ int main(void)
means one or more incoming connections. */
while (true)
{
LOG_DEBUG("Trying to accept... ");
if(!workers[idx].accept(socket))
{
LOG_DEBUG("Did not accept!");
break;
}
idx = (idx + 1) % workers.size();
LOG_DEBUG("Accepted a new connection!");
}
/* struct sockaddr in_addr; */

View File

@ -5,7 +5,7 @@
namespace io
{
const char* response = "HTTP/1.1 200 OK\r\nContent-Length:0\r\nConnection:Keep-Alive\r\n\r\nHTTP/1.1 200 OK\r\nContent-Length:0\r\nConnection:Keep-Alive\r\n\r\n";
const char* response = "HTTP/1.1 200 OK\r\nContent-Length:0\r\nConnection:Keep-Alive\r\n\r\n";
size_t len = strlen(response);
@ -23,13 +23,8 @@ public:
if(!s.is_open())
return false;
s.set_non_blocking();
this->add(s);
auto stream = new TcpStream(std::move(s), EPOLLIN | EPOLLET);
this->add(stream->socket, stream->event);
LOG_DEBUG("Listening to TCP stream at" << stream->socket.id())
return true;
}

34
memory/lazy_gc.hpp Normal file
View File

@ -0,0 +1,34 @@
#pragma once
#include <atomic>
#include "utils/crtp.hpp"
#include "threading/sync/lockable.hpp"
template <class Derived>
class LazyGC : Crtp<Derived>, Lockable<SpinLock>
{
public:
void add_ref()
{
ref_count.fetch_add(1, std::memory_order_relaxed);
}
void release_ref()
{
// get refcount and subtract atomically
auto count = ref_count.fetch_sub(1, std::memory_order_acq_rel);
// fetch_sub first returns and then subtrarcts so the refcount is
// zero when fetch_sub returns 1
if(count != 1)
return;
auto guard = this->aacquire();
this->derived().vacuum();
}
private:
std::atomic<int> ref_count {0};
};

View File

@ -36,7 +36,7 @@ private:
// points to a newer version of this record. the newer version also has
// this pointer which points to an even more recent version. if no newer
// version is present, this value points to a nullptr
std::atomic<T*> versions;
std::atomic<T*> newer_version;
};
}

View File

@ -4,6 +4,7 @@
#include "transactions/transaction.hpp"
#include "memory/lazy_gc.hpp"
#include "mvcc/mvcc_error.hpp"
namespace mvcc
{
@ -17,11 +18,75 @@ public:
};
VersionList() = default;
VersionList(const tx::Transaction& t)
{
// create a first version of the record
auto v1 = new T();
// mark the record as created by the transaction t
v1->mark_created(t);
head.store(v1, std::memory_order_release);
}
private:
std::atomic<T*> head;
T* find(const tx::Transaction& t)
{
auto r = head.load(std::memory_order_acquire);
// nullptr
// |
// [v1] ...
// |
// [v2] <------+
// | |
// [v3] <------+
// | | Jump backwards until you find a first visible
// [VerList] ----+ version, or you reach the end of the list
//
while(r != nullptr && !r->visible(t))
r = r->next.load(std::memory_order_acquire);
return r;
}
T* update(const tx::Transaction& t)
{
}
bool remove(const tx::Transaction& t)
{
// take a lock on this node
auto guard = this->acquire();
// find the visible record version to delete
auto r = find(t);
// exit if we haven't found any visible records
if(r == nullptr)
return false;
// if the record hasn't been deleted yet, it's ok to delete it
if(!r->tx.max())
{
// mark the record as deleted
r->mark_deleted(t);
return true;
}
auto hints = r->hints.load(std::memory_order_acquire);
if(hints.max.is_committed() || hints.max.is_unknown())
throw MvccError("Can't serialize");
//...
}
void vacuum()
{