From 7c8d15b949f2798b6b32e2f7dded6ae68c603ffe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dominik=20Tomic=CC=8Cevic=CC=81?= Date: Sun, 22 Nov 2015 21:35:40 +0100 Subject: [PATCH] checking in all work because i won't be able to work on it for some time --- data_structures/skiplist/old_skiplist.hpp | 257 +++++++++++++ data_structures/skiplist/skiplist.hpp | 439 ++++++++++++---------- http/connection.hpp | 36 ++ http/parser.hpp | 12 + http/parser_test.cpp | 24 ++ http/worker.hpp | 65 ++++ io/network/.gitignore | 1 + io/network/Makefile | 17 + io/network/epoll.hpp | 2 +- io/network/listener.hpp | 100 ----- io/network/socket.hpp | 31 +- io/network/tcp_listener.hpp | 78 ++++ io/network/tcp_reader.hpp | 91 +++++ io/network/tcp_server.hpp | 157 +++----- io/network/tcp_stream.hpp | 12 +- io/network/test | Bin 45880 -> 29240 bytes io/network/test.cpp | 57 ++- io/network/worker.hpp | 11 +- memory/lazy_gc.hpp | 34 ++ mvcc/mvcc.hpp | 4 +- mvcc/version.hpp | 2 +- mvcc/version_list.hpp | 67 +++- transactions/engine.hpp | 4 +- 23 files changed, 1019 insertions(+), 482 deletions(-) create mode 100644 data_structures/skiplist/old_skiplist.hpp create mode 100644 http/connection.hpp create mode 100644 http/parser.hpp create mode 100644 http/parser_test.cpp create mode 100644 http/worker.hpp create mode 100644 io/network/.gitignore create mode 100644 io/network/Makefile delete mode 100644 io/network/listener.hpp create mode 100644 io/network/tcp_listener.hpp create mode 100644 io/network/tcp_reader.hpp create mode 100644 memory/lazy_gc.hpp diff --git a/data_structures/skiplist/old_skiplist.hpp b/data_structures/skiplist/old_skiplist.hpp new file mode 100644 index 000000000..39602c3bf --- /dev/null +++ b/data_structures/skiplist/old_skiplist.hpp @@ -0,0 +1,257 @@ +#ifndef MEMGRAPH_DATA_STRUCTURES_SKIPLIST_SKIPLIST_HPP +#define MEMGRAPH_DATA_STRUCTURES_SKIPLIST_SKIPLIST_HPP + +#include +#include +#include + +#include "new_height.hpp" +#include "skipnode.hpp" + +// concurrent skiplist based on the implementation described in +// "A Provably Correct Scalable Concurrent Skip List" +// https://www.cs.tau.ac.il/~shanir/nir-pubs-web/Papers/OPODIS2006-BA.pdf + +template , + class lock_type=SpinLock> +class SkipList +{ + using Node = SkipNode; + +public: + SkipList() + : size_(0), + header(Node::create(MAX_HEIGHT, nullptr, nullptr)) {} + + ~SkipList() + { + for(Node* current = header.load(); current;) + { + Node* next = current->forward(0); + Node::destroy(current); + current = next; + } + } + + size_t size() const + { + return size_.load(); + } + + uint8_t height() const + { + return MAX_HEIGHT; + } + +//private: + + bool greater(const K* const key, const Node* node) + { + return node && compare()(*node->key, *key); + } + + bool less(const K* const key, const Node* node) + { + return (node == nullptr) || compare()(*key, *node->key); + } + + size_t increment_size(size_t delta) + { + return size_.fetch_add(delta) + delta; + } + + int find_path(Node* from, + int start_level, + const K* const key, + Node* preds[], + Node* succs[]) + { + int lfound = -1; + Node* pred = from; + + for(int level = start_level; level >= 0; --level) + { + Node* node = pred->forward(level); + + while(greater(key, node)) + pred = node, node = pred->forward(level); + + + if(lfound == -1 && !less(key, node)) + lfound = level; + + preds[level] = pred; + succs[level] = node; + } + + return lfound; + } + + Node* find(const K* const key) + { + Node* pred = header.load(); + Node* node = nullptr; + + uint8_t level = pred->height; + bool found = false; + + while(!found) + { + // descend down first, facebook says it works better xD but make + // some tests when you have time to determine the best strategy + for(; level > 0 && + less(key, node = pred->forward(level - 1)); --level) {} + + if(level == 0) + return nullptr; + + --level; + + while(greater(key, node)) + pred = node, node = node->forward(level); + + found = !less(key, node); + } + + return node; + } + + template + bool lock_nodes(uint8_t height, + std::unique_lock guards[MAX_HEIGHT], + Node* preds[MAX_HEIGHT], + Node* succs[MAX_HEIGHT]) + { + Node *prepred, *pred, *succ = nullptr; + bool valid = true; + + for(int level = 0; valid && level < height; ++level) + { + pred = preds[level], succ = succs[level]; + + if(pred != prepred) + guards[level] = pred->guard(), prepred = pred; + + valid = !pred->marked() && pred->forward(level) == succ; + + if(ADDING) + valid = valid && (succ == nullptr || !succ->marked()); + } + + return valid; + } + + bool insert(K* key, T* item) + { + Node *preds[MAX_HEIGHT], *succs[MAX_HEIGHT]; + + while(true) + { + auto head = header.load(); + auto lfound = find_path(head, MAX_HEIGHT - 1, key, preds, succs); + + if(lfound != -1) + { + auto found = succs[lfound]; + + if(!found->marked()) + { + while(!found->fully_linked()) {} + return false; + } + + continue; + } + + auto node_height = new_height(MAX_HEIGHT); + std::unique_lock guards[MAX_HEIGHT]; + + // try to acquire the locks for predecessors up to the height of + // the new node. release the locks and try again if someone else + // has the locks + if(!lock_nodes(node_height, guards, preds, succs)) + continue; + + // you have the locks, create a new node + auto new_node = Node::create(node_height, key, item); + + // link the predecessors and successors, e.g. + // + // 4 HEAD ... P ------------------------> S ... NULL + // 3 HEAD ... ... P -----> NEW ---------> S ... NULL + // 2 HEAD ... ... P -----> NEW -----> S ... ... NULL + // 1 HEAD ... ... ... P -> NEW -> S ... ... ... NULL + for(uint8_t level = 0; level < node_height; ++level) + { + new_node->forward(level, succs[level]); + preds[level]->forward(level, new_node); + } + + new_node->set_fully_linked(); + increment_size(1); + + return true; + } + } + + bool ok_delete(Node* node, int level) + { + return node->fully_linked() + && node->height - 1 == level + && !node->marked(); + } + + bool remove(const K* const key) + { + Node* node = nullptr; + std::unique_lock node_guard; + bool marked = false; + int node_height = 0; + + Node* preds[MAX_HEIGHT], *succs[MAX_HEIGHT]; + + while(true) + { + auto head = header.load(); + auto lfound = find_path(head, MAX_HEIGHT - 1, key, preds, succs); + + if(!marked && (lfound == -1 || !ok_delete(succs[lfound], lfound))) + return false; + + if(!marked) + { + node = succs[lfound]; + node_height = node->height; + node_guard = node->guard(); + + if(node->marked()) + return false; + + node->set_marked(); + } + + std::unique_lock guards[MAX_HEIGHT]; + + if(!lock_nodes(node_height, guards, preds, succs)) + continue; + + for(int level = node_height - 1; level >= 0; --level) + preds[level]->forward(level, node->forward(level)); + + increment_size(-1); + break; + } + + // TODO recyclee(node); + return true; + } + + std::atomic size_; + std::atomic header; +}; + + +#endif diff --git a/data_structures/skiplist/skiplist.hpp b/data_structures/skiplist/skiplist.hpp index 39602c3bf..573a1da99 100644 --- a/data_structures/skiplist/skiplist.hpp +++ b/data_structures/skiplist/skiplist.hpp @@ -1,77 +1,269 @@ -#ifndef MEMGRAPH_DATA_STRUCTURES_SKIPLIST_SKIPLIST_HPP -#define MEMGRAPH_DATA_STRUCTURES_SKIPLIST_SKIPLIST_HPP +#pragma once #include -#include -#include +#include -#include "new_height.hpp" -#include "skipnode.hpp" +#include "threading/hazard_ptr.hpp" +#include "threading/sync/lockable.hpp" +#include "threading/sync/spinlock.hpp" + +#include "utils/random/fast_binomial.hpp" // concurrent skiplist based on the implementation described in // "A Provably Correct Scalable Concurrent Skip List" // https://www.cs.tau.ac.il/~shanir/nir-pubs-web/Papers/OPODIS2006-BA.pdf -template , - class lock_type=SpinLock> +template class SkipList { - using Node = SkipNode; + static thread_local FastBinomial rnd; + + struct Flags + { + enum node_flags : uint8_t { + MARKED = 0x01, + FULLY_LINKED = 0x10, + }; + + bool is_marked() const + { + return flags.load(std::memory_order_acquire) & MARKED; + } + + void set_marked() + { + flags.fetch_or(MARKED, std::memory_order_release); + } + + bool is_fully_linked() const + { + return flags.load(std::memory_order_acquire) & FULLY_LINKED; + } + + void set_fully_linked() + { + flags.fetch_or(FULLY_LINKED, std::memory_order_release); + } + + private: + std::atomic flags {0}; + }; + + class Node : Lockable + { + friend class SkipList; + + public: + class reference + { + public: + reference() {} + + reference(Node* node, bool store_hazard = true) + : node(node), hazard(store_hazard ? node : nullptr) {} + + reference(const reference&) = delete; + + reference(reference&& other) + : node(other.node), hazard(std::move(other.hazard)) + { + other.node = nullptr; + } + + reference& operator=(const reference& other) = delete; + + reference& operator=(const reference&& other) + { + node = other.node; + other.node = nullptr; + hazard = std::move(other.hazard); + + return *this; + } + + T& operator*() { return *node; } + const T& operator*() const { return *node; } + + T* operator->() { return node; } + const T* operator->() const { return node; } + + operator T*() { return node; } + const operator T*() const { return node; } + + const hazard_ptr& get_hazard() const + { + return hazard; + } + + private: + hazard_ptr hazard; + Node* node; + }; + + K key; + T item; + + const uint8_t height; + Flags flags; + + static Node* create(K key, T item, uint8_t height) + { + // [ Node ][Node*][Node*][Node*]...[Node*] + // | | | | | + // | 0 1 2 height-1 + // |----------------||-----------------------------| + // space for Node space for tower pointers + // structure right after the Node + // structure + auto size = sizeof(Node) + height * sizeof(std::atomic); + auto node = static_cast(std::malloc(size)); + + // we have raw memory and we need to construct an object + // of type Node on it + return new (node) Node(key, item, height); + } + + static void destroy(Node* node) + { + node->~Node(); + free(node); + } + + Node::reference forward(size_t level) const + { + while(true) + { + auto ref = Node::reference(load_tower(level)); + + if(ref.get_hazard() == load_tower(level)) + return std::move(ref); + } + } + + private: + Node(K key, T item, uint8_t height) + : key(key), item(item), height(height) + { + // here we assume, that the memory for N towers (N = height) has + // been allocated right after the Node structure so we need to + // initialize that memory + for(auto i = 0; i < height; ++i) + new (&tower[i]) std::atomic {nullptr}; + } + + ~Node() + { + for(auto i = 0; i < height; ++i) + tower[i].~atomic(); + } + + void forward(size_t level, Node* next) + { + tower[level].store(next, std::memory_order_release); + } + + Node* load_tower(size_t level) + { + return tower[level].load(std::memory_order_acquire); + } + + // this creates an array of the size zero. we can't put any sensible + // value here since we don't know what size it will be untill the + // node is allocated. we could make it a Node** but then we would + // have two memory allocations, one for node and one for the forward + // list. this way we avoid expensive malloc/free calls and also cache + // thrashing when following a pointer on the heap + std::atomic tower[0]; + }; + + using node_ref_t = typename Node::reference; public: - SkipList() - : size_(0), - header(Node::create(MAX_HEIGHT, nullptr, nullptr)) {} - - ~SkipList() + class iterator { - for(Node* current = header.load(); current;) + public: + iterator() = default; + iterator(node_ref_t&& node) : node(std::move(node)) {} + iterator(const iterator&) = delete; + + T& operator*() { return node; } + T* operator->() { return node; } + + operator T*() { return node; } + + iterator& operator++() { - Node* next = current->forward(0); - Node::destroy(current); - current = next; + node = node->forward(0); + return *this; } - } + + iterator& operator++(int) + { + return operator++(); + } + + private: + node_ref_t node; + }; + + SkipList() : header(Node::create(K(), T(), H)) {} + + auto begin() { return iterator(header->forward(0)); } + auto end() { return iterator(); } size_t size() const { - return size_.load(); + return count.load(std::memory_order_acquire); } - uint8_t height() const + iterator find(const K& key) { - return MAX_HEIGHT; + auto pred = node_ref_t(header); + node_ref_t node; + + uint8_t h = pred->height - 1; + + while(true) + { + // try to descend down first the next key on this layer overshoots + for(; h >= 0 && less(key, node = pred->forward(h)); --h) {} + + // if we overshoot at every layer, item doesn't exist + if(h < 0) + return iterator(); + + // the item is farther to the right, continue going right as long + // as the key is greater than the current node's key + while(greater(key, node)) + pred = node, node = node->forward(h); + + // check if we have a hit. if not, we need to descend down again + if(!less(key, node)) + return iterator(std::move(node)); + } } -//private: +private: + std::atomic count {0}; + Node* header; - bool greater(const K* const key, const Node* node) + bool greater(const K& key, const Node* const node) { - return node && compare()(*node->key, *key); + return node && key > node->key; } - bool less(const K* const key, const Node* node) + bool less(const K& key, const Node* const node) { - return (node == nullptr) || compare()(*key, *node->key); + return (node == nullptr) || key < node->key; } - size_t increment_size(size_t delta) - { - return size_.fetch_add(delta) + delta; - } - - int find_path(Node* from, - int start_level, - const K* const key, - Node* preds[], - Node* succs[]) + int find_path(Node& from, int start, const K& key, + node_ref_t (preds&)[], + node_ref_t (succs&)[]) { int lfound = -1; Node* pred = from; - + for(int level = start_level; level >= 0; --level) { Node* node = pred->forward(level); @@ -89,169 +281,4 @@ public: return lfound; } - - Node* find(const K* const key) - { - Node* pred = header.load(); - Node* node = nullptr; - - uint8_t level = pred->height; - bool found = false; - - while(!found) - { - // descend down first, facebook says it works better xD but make - // some tests when you have time to determine the best strategy - for(; level > 0 && - less(key, node = pred->forward(level - 1)); --level) {} - - if(level == 0) - return nullptr; - - --level; - - while(greater(key, node)) - pred = node, node = node->forward(level); - - found = !less(key, node); - } - - return node; - } - - template - bool lock_nodes(uint8_t height, - std::unique_lock guards[MAX_HEIGHT], - Node* preds[MAX_HEIGHT], - Node* succs[MAX_HEIGHT]) - { - Node *prepred, *pred, *succ = nullptr; - bool valid = true; - - for(int level = 0; valid && level < height; ++level) - { - pred = preds[level], succ = succs[level]; - - if(pred != prepred) - guards[level] = pred->guard(), prepred = pred; - - valid = !pred->marked() && pred->forward(level) == succ; - - if(ADDING) - valid = valid && (succ == nullptr || !succ->marked()); - } - - return valid; - } - - bool insert(K* key, T* item) - { - Node *preds[MAX_HEIGHT], *succs[MAX_HEIGHT]; - - while(true) - { - auto head = header.load(); - auto lfound = find_path(head, MAX_HEIGHT - 1, key, preds, succs); - - if(lfound != -1) - { - auto found = succs[lfound]; - - if(!found->marked()) - { - while(!found->fully_linked()) {} - return false; - } - - continue; - } - - auto node_height = new_height(MAX_HEIGHT); - std::unique_lock guards[MAX_HEIGHT]; - - // try to acquire the locks for predecessors up to the height of - // the new node. release the locks and try again if someone else - // has the locks - if(!lock_nodes(node_height, guards, preds, succs)) - continue; - - // you have the locks, create a new node - auto new_node = Node::create(node_height, key, item); - - // link the predecessors and successors, e.g. - // - // 4 HEAD ... P ------------------------> S ... NULL - // 3 HEAD ... ... P -----> NEW ---------> S ... NULL - // 2 HEAD ... ... P -----> NEW -----> S ... ... NULL - // 1 HEAD ... ... ... P -> NEW -> S ... ... ... NULL - for(uint8_t level = 0; level < node_height; ++level) - { - new_node->forward(level, succs[level]); - preds[level]->forward(level, new_node); - } - - new_node->set_fully_linked(); - increment_size(1); - - return true; - } - } - - bool ok_delete(Node* node, int level) - { - return node->fully_linked() - && node->height - 1 == level - && !node->marked(); - } - - bool remove(const K* const key) - { - Node* node = nullptr; - std::unique_lock node_guard; - bool marked = false; - int node_height = 0; - - Node* preds[MAX_HEIGHT], *succs[MAX_HEIGHT]; - - while(true) - { - auto head = header.load(); - auto lfound = find_path(head, MAX_HEIGHT - 1, key, preds, succs); - - if(!marked && (lfound == -1 || !ok_delete(succs[lfound], lfound))) - return false; - - if(!marked) - { - node = succs[lfound]; - node_height = node->height; - node_guard = node->guard(); - - if(node->marked()) - return false; - - node->set_marked(); - } - - std::unique_lock guards[MAX_HEIGHT]; - - if(!lock_nodes(node_height, guards, preds, succs)) - continue; - - for(int level = node_height - 1; level >= 0; --level) - preds[level]->forward(level, node->forward(level)); - - increment_size(-1); - break; - } - - // TODO recyclee(node); - return true; - } - - std::atomic size_; - std::atomic header; }; - - -#endif diff --git a/http/connection.hpp b/http/connection.hpp new file mode 100644 index 000000000..ea72e1044 --- /dev/null +++ b/http/connection.hpp @@ -0,0 +1,36 @@ +#pragma once + +#include "io/network/tcp_stream.hpp" + +namespace htpp +{ +using memory::literals::operator "" _kB; + +class Connection +{ + Connection(io::Socket&& socket) : stream(std::move(socket)) + { + stream.data = this; + } + + void close() + { + delete reinterpret_cast(stream.data); + } + + struct Buffers + { + char headers[8_kB]; + char body[64_kB]; + + static constexpr size_t size = sizeof headers + sizeof body; + }; + + // tcp stream reads into these buffers + Buffers buffers; + + // tcp stream this connection reads from + io::TcpStream stream; +}; + +} diff --git a/http/parser.hpp b/http/parser.hpp new file mode 100644 index 000000000..33c84dec0 --- /dev/null +++ b/http/parser.hpp @@ -0,0 +1,12 @@ +#pragma once + +namespace htpp +{ + +class Parser +{ + + +}; + +} diff --git a/http/parser_test.cpp b/http/parser_test.cpp new file mode 100644 index 000000000..41b951fce --- /dev/null +++ b/http/parser_test.cpp @@ -0,0 +1,24 @@ +#include + +#include "io/network/tcp_server.hpp" +#include "http/worker.hpp" + +int main(void) +{ + const char* req = "GET /foo/bar/7/comments HTTP/1.1\r\n" + "Content-Type : application/javascript\r\n" + "Content-Length: 3872\r\n" + "Connection:keep-alive\r\n" + "Accept-Ranges: bytes\r\n" + "Last-modified: Sun, 8 october 2015 GMT\r\n" + "\r\n" + "{alo:'bre', bre: 'alo sta ti je'}\r\n"; + + io::TcpServer server; + + server.bind("0.0.0.0", "7474").listen(8, 128, []() { + std::cout << "response!" << std::endl; + }); + + return 0; +}; diff --git a/http/worker.hpp b/http/worker.hpp new file mode 100644 index 000000000..029375105 --- /dev/null +++ b/http/worker.hpp @@ -0,0 +1,65 @@ +#pragma once + +#include "io/network/tcp_reader.hpp" +#include "debug/log.hpp" + +namespace http +{ + +const char* body = "Now that the internet can be accessed on any mobile device, whether a laptop, desktop, tablets, smartphone, websites are designed in responsive web version. It is the ability to change the page and font size according to the screen size of the user. desktop, tablets, smartphone, websites are designed in responsive web version. It is the ability to change the page and font size according to the screen size of the user. Thus a website is accessible anytime on any instrument. CSS3 frameworks were widely accepted in 2014 and is growing in 2015. It reduces time and money by helping not creating different sites for different users"; + +std::string response = "HTTP/1.1 200 OK\r\nContent-Length:" + + std::to_string(strlen(body)) + "\r\n\r\n" + body; + +class Worker : public io::TcpReader +{ +public: + char buf[65536]; + + Worker() = default; + + io::TcpStream& on_connect(io::Socket&& socket) + { + auto stream = new io::TcpStream(std::move(socket)); + LOG_DEBUG("on_connect socket " << stream->id()); + + return *stream; + } + + void on_error(io::TcpStream& stream) + { + LOG_DEBUG("on_error: " << stream.id()); + } + + void on_wait_timeout() + { + LOG_DEBUG("Worker on_wait_timeout"); + } + + Buffer on_alloc(size_t suggested_size) + { + LOG_DEBUG("Allocating buffer"); + + return Buffer { buf, sizeof buf }; + } + + void on_read(io::TcpStream& stream, Buffer& buf) + { + LOG_DEBUG("on_read (socket: " << stream.id() << "): '" + << std::string(buf.ptr, buf.len) << "'"); + + auto& socket = stream.socket; + + auto n = write(socket, response.c_str(), response.size()); + + LOG_DEBUG("Responded with " << n << " characters"); + } + + void on_close(io::TcpStream& stream) + { + LOG_DEBUG("on_close: " << stream.id()); + stream.close(); + } +}; + +} diff --git a/io/network/.gitignore b/io/network/.gitignore new file mode 100644 index 000000000..9daeafb98 --- /dev/null +++ b/io/network/.gitignore @@ -0,0 +1 @@ +test diff --git a/io/network/Makefile b/io/network/Makefile new file mode 100644 index 000000000..245054369 --- /dev/null +++ b/io/network/Makefile @@ -0,0 +1,17 @@ +CXX=clang++ +CFLAGS=-std=c++1y -O2 -Wall -Wno-unknown-pragmas +LDFLAGS=-lhttp_parser -pthread + +INC=-I../../ +SOURCES=test.cpp +EXECUTABLE=test + +all: $(EXECUTABLE) + +$(EXECUTABLE): $(SOURCES) + $(CXX) $(CFLAGS) $(SOURCES) -o $(EXECUTABLE) $(INC) $(LDFLAGS) + +.PHONY: +clean: + rm -f test + rm -f *.o diff --git a/io/network/epoll.hpp b/io/network/epoll.hpp index 7dd5af63c..11c3764de 100644 --- a/io/network/epoll.hpp +++ b/io/network/epoll.hpp @@ -24,7 +24,7 @@ public: auto status = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, socket, event); if(UNLIKELY(status)) - throw NetworkError("Can't add connection to epoll listener."); + throw NetworkError("Can't add an event to epoll listener."); } int wait(Event* events, int max_events, int timeout) diff --git a/io/network/listener.hpp b/io/network/listener.hpp deleted file mode 100644 index c77722cf0..000000000 --- a/io/network/listener.hpp +++ /dev/null @@ -1,100 +0,0 @@ -#pragma once - -#include -#include - -#include "socket.hpp" -#include "epoll.hpp" -#include "tcp_stream.hpp" - -#include "utils/crtp.hpp" - -namespace io -{ - -template -class Listener : public Crtp -{ -public: - Listener() : listener(0) - { - } - - Listener(Listener&& other) : listener(0) - { - this->thread = std::move(other.thread); - this->listener = std::move(other.listener); - } - - ~Listener() - { - LOG_DEBUG("JOIN THREAD"); - alive.store(false, std::memory_order_release); - - if(thread.joinable()) - thread.join(); - } - - void start() - { - thread = std::thread([this]() { loop(); }); - } - - void add(Socket& socket, Epoll::Event& event) - { - listener.add(socket, &event); - } - -//protected: - void loop() - { - LOG_DEBUG("Thread starting " << listener.id() << " " << hash(std::this_thread::get_id())); - - constexpr size_t MAX_EVENTS = 64; - - Epoll::Event events[MAX_EVENTS]; - - while(alive.load(std::memory_order_acquire)) - { - auto n = listener.wait(events, MAX_EVENTS, 1000); - - for(int i = 0; i < n; ++i) - { - auto& event = events[i]; - auto stream = reinterpret_cast(event.data.ptr); - - if(!(event.events & EPOLLIN)) - { - LOG_DEBUG("error !EPOLLIN"); - this->derived().on_error(stream); - continue; - } - - if(event.events & EPOLLHUP) - { - LOG_DEBUG("error EPOLLHUP"); - this->derived().on_error(stream); - continue; - } - - if(event.events & EPOLLERR) - { - LOG_DEBUG("error EPOLLERR"); - this->derived().on_error(stream); - continue; - } - - this->derived().on_read(stream); - } - } - - LOG_DEBUG("Thread exiting"); - } - - std::atomic alive {true}; - std::thread thread; - - Epoll listener; -}; - -} diff --git a/io/network/socket.hpp b/io/network/socket.hpp index c60a8b658..47725d007 100644 --- a/io/network/socket.hpp +++ b/io/network/socket.hpp @@ -21,44 +21,47 @@ namespace io class Socket { - Socket(int socket) : socket(socket) {} - Socket(int family, int socket_type, int protocol) { socket = ::socket(family, socket_type, protocol); } public: + Socket(int socket = -1) : socket(socket) {} + Socket(const Socket&) = delete; - + Socket(Socket&& other) { - this->socket = other.socket; - other.socket = -1; + *this = std::forward(other); } ~Socket() { if(socket == -1) return; - - LOG_DEBUG("CLosing Socket " << socket); + close(socket); } + Socket& operator=(Socket&& other) + { + this->socket = other.socket; + other.socket = -1; + return *this; + } + bool is_open() { return socket != -1; } - static Socket create(const char* addr, const char* port) + static Socket bind(const char* addr, const char* port) { auto info = AddrInfo::get(addr, port); for(struct addrinfo* it = info; it != nullptr; it = it->ai_next) { - LOG_DEBUG("Trying socket..."); - auto s = Socket(it->ai_family, it->ai_socktype, it->ai_protocol); if(!s.is_open()) @@ -68,19 +71,13 @@ public: if(setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))) continue; - if(s.bind(it->ai_addr, it->ai_addrlen)) + if(::bind(s, it->ai_addr, it->ai_addrlen) == 0) return std::move(s); } throw NetworkError("Unable to bind to socket"); } - bool bind(struct sockaddr* addr, socklen_t len) - { - assert(socket != -1); - return ::bind(socket, addr, len) == 0; - } - void set_non_blocking() { auto flags = fcntl(socket, F_GETFL, 0); diff --git a/io/network/tcp_listener.hpp b/io/network/tcp_listener.hpp new file mode 100644 index 000000000..ea8b3c256 --- /dev/null +++ b/io/network/tcp_listener.hpp @@ -0,0 +1,78 @@ +#pragma once + +#include "socket.hpp" +#include "epoll.hpp" +#include "tcp_stream.hpp" + +#include "utils/crtp.hpp" + +namespace io +{ + +template +class TcpListener : public Crtp +{ +public: + using Crtp::derived; + + TcpListener(uint32_t flags = 0) : listener(flags) {} + + void add(TcpStream& stream) + { + // add the stream to the event listener + listener.add(stream.socket, &stream.event); + } + + void wait_and_process_events() + { + // waits for an event/multiple events and returns a maximum of + // max_events and stores them in the events array. it waits for + // wait_timeout milliseconds. if wait_timeout is achieved, returns 0 + auto n = listener.wait(events, max_events, wait_timeout); + + // go through all events and process them in order + for(int i = 0; i < n; ++i) + { + auto& event = events[i]; + auto& stream = *reinterpret_cast(event.data.ptr); + + // a tcp stream was closed + if(UNLIKELY(event.events & EPOLLRDHUP)) + { + this->derived().on_close(stream); + continue; + } + + // there was an error on the server side + if(UNLIKELY(!(event.events & EPOLLIN) || + event.events & (EPOLLHUP | EPOLLERR))) + { + this->derived().on_error(stream); + continue; + } + + // we have some data waiting to be read + this->derived().on_data(stream); + } + + // this will be optimized out :D + if(wait_timeout < 0) + return; + + // if there was events, continue to wait on new events + if(n != 0) + return; + + // wait timeout occurred and there were no events. if wait_timeout + // is -1 there will never be any timeouts so client should provide + // an empty function. in that case the conditional above and the + // function call will be optimized out by the compiler + this->derived().on_wait_timeout(); + } + +protected: + Epoll listener; + Epoll::Event events[max_events]; +}; + +} diff --git a/io/network/tcp_reader.hpp b/io/network/tcp_reader.hpp new file mode 100644 index 000000000..fec242509 --- /dev/null +++ b/io/network/tcp_reader.hpp @@ -0,0 +1,91 @@ +#pragma once + +#include "tcp_listener.hpp" +#include "tcp_stream.hpp" + +namespace io +{ + +template +class TcpReader : public TcpListener>, public Crtp +{ + using listener_t = TcpListener>; + using Crtp::derived; + +public: + TcpReader() : listener_t(0) {} + + struct Buffer + { + char* ptr; + size_t len; + }; + + bool accept(Socket& socket) + { + // accept a connection from a socket + auto s = socket.accept(nullptr, nullptr); + + if(!s.is_open()) + return false; + + // make the recieved socket non blocking + s.set_non_blocking(); + + auto& stream = derived().on_connect(std::move(s)); + + // we want to listen to an incoming event whish is edge triggered and + // we also want to listen on the hangup event + stream.event.events = EPOLLIN | EPOLLET | EPOLLRDHUP; + + // add the connection to the event listener + this->add(stream); + + return true; + } + + void on_close(TcpStream& stream) + { + derived().on_close(stream); + } + + void on_error(TcpStream& stream) + { + derived().on_error(stream); + } + + void on_wait_timeout() + { + derived().on_wait_timeout(); + } + + void on_data(TcpStream& stream) + { + constexpr size_t suggested_size = 64_kB; + + while(true) + { + Buffer buf = derived().on_alloc(suggested_size); + + buf.len = read(stream.socket, buf.ptr, buf.len); + + if(buf.len == -1) + { + if(UNLIKELY(errno != EAGAIN)) + derived().on_error(stream); + + break; + } + + if(UNLIKELY(buf.len == 0)) + { + stream.close(); + break; + } + + derived().on_read(stream, buf); + } + } +}; + +} diff --git a/io/network/tcp_server.hpp b/io/network/tcp_server.hpp index 95daffbeb..c3965298a 100644 --- a/io/network/tcp_server.hpp +++ b/io/network/tcp_server.hpp @@ -1,50 +1,49 @@ #pragma once #include - -#include -#include -#include -#include - +#include #include -#include "epoll.hpp" +#include "debug/log.hpp" +#include "tcp_listener.hpp" namespace io { -class TcpConnection -{ - -}; - -class TcpServer +template +class TcpServer : TcpListener, 64, -1> { public: - TcpServer(const char* port) - : socket(Socket::create(port)), epoll(0) + TcpServer(const char* addr, const char* port) + : stream(std::move(Socket::bind(addr, port))) {} + + ~TcpServer() { - socket.set_non_blocking(); - socket.listen(128); + stop(); - listener.data.fd = socket; - listener.events = EPOLLIN | EPOLLET; - - epoll.add(socket, &listener); + for(auto& worker : workers) + worker.join(); } - void start(int max_events) + template + void listen(size_t n, size_t backlog, F&& f) { - Epoll::EventBuffer events(max_events); - - while(alive) + for(size_t i = 0; i < n; ++i) { - auto n = epoll.wait(events, events.size(), -1); - - for(int i = 0; i < n; ++i) - process_event(events[i]); + workers.emplace_back(); + auto& w = workers.back(); + + threads[i] = std::thread([this, &w]() { + while(alive) + { + LOG_DEBUG("Worker " << hash(std::this_thread::get_id()) + << " waiting... "); + w.wait_and_process_events(); + } + }); } + + stream.socket.listen(backlog); } void stop() @@ -53,96 +52,42 @@ public: } private: - Socket socket; - Epoll epoll; - Epoll::event listener; + std::list threads; + std::list workers; std::atomic alive { true }; - std::list sockets; - void process_event(Epoll::event& event) + TcpStream stream; + size_t idx = 0; + + void on_close(TcpStream& stream) { - //if(UNLIKELY(event.events & (EPOLLERR | EPOLLHUP | EPOLLIN))) - - if ((event.events & EPOLLERR) || - (event.events & EPOLLHUP) || - (!(event.events & EPOLLIN))) - { -#ifndef NDEBUG - LOG_DEBUG("Epoll error!"); -#endif - // close the connection - close(event.data.fd); - return; - } - - if(event.data.fd == socket) - { - while(accept_connection()) {} - return; - } - - while(read_data(event)) {} + LOG_DEBUG("server on_close!!!!"); } - bool accept_connection() + void on_error(TcpStream& stream) { -#ifndef NDEBUG - struct sockaddr addr; - socklen_t len; - - auto clientt = socket.accept(&addr, &len); -#else - auto clientt = socket.accept(nullptr, nullptr); -#endif - if(UNLIKELY(!clientt.is_open())) - return false; - -#ifndef NDEBUG - char host[NI_MAXHOST], port[NI_MAXSERV]; - auto status = getnameinfo(&addr, len, - host, sizeof host, - port, sizeof port, - NI_NUMERICHOST | NI_NUMERICSERV); - - if(status) - { - LOG_DEBUG("Accepted connection on descriptor " << clientt - << " (host: " << std::string(host) - << ", port: " << std::string(port) << ")"); - } -#endif - sockets.emplace_back(std::move(clientt)); - auto& client = sockets.back(); - - client.set_non_blocking(); - - auto ev = new Epoll::event; - ev->events = EPOLLIN | EPOLLET; - ev->data.fd = client; - - epoll.add(client, ev); - return true; + LOG_DEBUG("server on_error!!!!"); } - bool read_data(Epoll::event& event) + void on_data(TcpStream&) { - char buf[512]; - - auto count = read(event.data.fd, buf, sizeof buf); - - if(count == -1) - return false; - - if(!count) + while (true) { - close(event.data.fd); - return false; + LOG_DEBUG("Trying to accept... "); + if(!workers[idx].accept(socket)) + { + LOG_DEBUG("Did not accept!"); + break; + } + + idx = (idx + 1) % workers.size(); + LOG_DEBUG("Accepted a new connection!"); } + } - const char* response = "HTTP/1.1 200 OK\r\nContent-Length:0\r\nConnection:Keep-Alive\r\n\r\n"; + void on_wait_timeout() + { - write(event.data.fd, response, sizeof response); - return true; } }; diff --git a/io/network/tcp_stream.hpp b/io/network/tcp_stream.hpp index 2bd60e841..3e7e5368c 100644 --- a/io/network/tcp_stream.hpp +++ b/io/network/tcp_stream.hpp @@ -3,27 +3,31 @@ #include "epoll.hpp" #include "socket.hpp" +#include "memory/literals.hpp" + namespace io { class TcpStream { public: - TcpStream(Socket&& socket, uint32_t events) - : socket(std::move(socket)) + TcpStream(Socket&& socket) : socket(std::move(socket)) { - event.events = events; event.data.ptr = this; } void close() { - LOG_DEBUG("CLOSE"); delete reinterpret_cast(event.data.ptr); } + int id() const { return socket.id(); } + Socket socket; Epoll::Event event; + + // custom data we can pass on + void* data; }; } diff --git a/io/network/test b/io/network/test index 3ab776a5c728de3c59051c9fd7a93b68b2d2da99..0e8abce12909a4727380102d7aff70a2ab097908 100755 GIT binary patch literal 29240 zcmeHw4R~9{neO;EP6)Pu1PtNlBsjqdiIE(~v4IvjvJx4T#KcKR3901RaxAbdttB}L z(5=CQMi>{D-EHY^e?srJOS^k-(_1bZvNSOy#3^lgQo2ji(o0PXR3iMO{Dko9`@VB# zZv^YVmt`Qp=p-0&=?gv`4Tdv27h z!sVJ&K-hazazBAu02la8Bhl z>GSm_tEmu%1Q?IHPAKXp2i`BqY^jV8yJ~Hc*YT(x;~7p9Hk7=kHl@F&x_nHI*ZC^^ z{Cp=k@=g+^)dW@9i{=-i?evd0q!_4EdNeI$LEuuc53GRahZs0`+UUV*Cu*X6jD;<^Brc4sS3 z4d?<~2Chn6O!rb;Y_7|2RpUAX*Lk?;>TpeyJ|PwnP>Qa~kNGxX#5j1=nm`=is8#eVhJSyuqE0 z>q1;rxHv}WF2Xej7yHFxTO`o9=gE_VQp$NV$h0VJLcpMp%X;cPD>GZFCA zEgSy59QoGf$hR^_zICYNZ2J5lM>_Z9NM}|K{OTO|yGsAvGo2{mgzLzm&jE-~&m$S~ zY%c>>rmIouk7dZ;l0%;vIq=m;*vL#Ad=MA)v7PP2-QsejGX#;@=%vm^zZBk}qIBnw zZ$>>(ewUKh{k<)R{4+=|TRZ+R2mTdRKj94h-^tPLzsP~tD}8olq|<}+ssEmg^6t!` z|8q(oRV^tpJ4e2+=D<(Sk^c8{^s9Gs;Omgi8PMNw`=u`0{%iv`2bXb*d*aFXE!sXX4|$o9FA(_8f-)2y}_O>O>Icm(S~$=(caczG}6@O zZfV8M-B>ME$|ztp3L6P#DI*eZ^@U@-0bgIRHyUg0Layyy%_x-&*^J8X@wK%^+d5@^ zFdY;uwAI%Y=-CqO^aXl*L%q${v?=-4uC5TI1HQnvwm>)<4E30$a4hbP^+bc+2w()I z~>iu910t(~n=SpsU&ZN^TobeCD_YnvuTHly3H{M-rM7TC= zXjsSRky8TtSH zm%fZC{50`hgfmX>xj{_jOT>eUKQ|b}RD6mUQGDz?h8Kz_Qt9Q1`z`sI;RTlP0_16? z4I+N={hOd_v3Odg_ctfMf+tIncuw)r`S;=xo-dB8^zF<1jD{HIyk9c7>G>JUt>;_} z)AKOm`W=e6p5G9kLqIt_Zy?V1GVPjGzI<1q{@QJ@;7~l}{1zP5k#e0DTop-%g)KPC zrrkCR&b+njx8Qn?!?2weT)%@6AGYB7{gL>67M$;7+U>I7`n``~4_I)%lWDixf?MAY z_gHYge`+^k!P(EWJ8Z$N^Wq~GobPzr9kt+m@6~S1g6nr}h9xYxe%~a1+=BC+RJ)TF zT)&ITFqdBvwu$w9wZwv3-}h}6obR#PmAm}X%XIk8sGZ^ROD}Ol`pjFU%P&2|XG@=X zv%CD#L;PatGjDY+zw{8VkUsO~bor%+_+05TZyuLldWg@HKJ(V>@=Fi#OQg@dZE*Re zhj^v*nK!@7FFnNPOP_h`bor%+IM+F}3%mT%OPp&L+HG_BrI+|3=`(NrF2D2;=bC_a zJ6(S1CC>E|?S@@`=_PKLKJ#{;%P&2|)zXsWce(tM60ep%^Y(ztFFnNh4y)a6mtT5` z*GQjv+vD;}4{@$bXgA{WOE2-I(r4ZdyZq8ayiWSe+Yy&vdWfr~Jjow*`6VTOx%8Pg z&%nzip24EPB13p~j6{nP*8ukn94t9thL10~9zo-CZ@|6mA}4(0I~kZ98;6^_hI}6R zBa*+Gd_MUR$uA>M)0W&V`OC@Eq$PJrzM4GEXmVKc3&_*7CHo~mhdk9zh9!RDK2}$}Te;h&TRrQ~YS69)rbk!tvC@uP<_nhCLKONaA)xWxP6X zJX%rd8MJxgk9r0kwR!5Fh{C^?ShMuh9h*{S%Wo(*l|6)Cj7Tu0IJB*4< z&vk|;KC=rUX1+832Hs=SP~x&PiUKB#qRw1HZ)Rap0&o2|%=SOO%SJ&-z4dQ{{RhRC zc!nA)%02a?z-|M!pX;Uj_N3bKXT*+zOZ|v+NRNwu?0>HpGU)z zjP5=k8jK>&n;m5rHDZ2$S)*U9^DL?a@eG_S^~7K3o3)3%iI2lGHtlD?_!xxmASgB+ z;8U=pO7r^|zRq*ot1^K{KP*53p7?Rkqlv3Ld51hd|1df;mD*`KHR!qNfSG^)r=F`c zgovH74iOVQ=-QhO6+HnCiFSbAzezp z=ERLBt>NpWfm&ypsJi{btd2@N?e_DDy>A!U@1k}Z9$}|&yutx$aI{o5{D@1U!=BN` z3h0XnG=Yh)vzd2{>?QIrZ_#QDM$QICozqD?qoV&ZCpxYi+p>c>%Q0YW8|+S)0e>Y% zhbQjEVCP`o%jI?$2XFkt#Ou)8Gk60=^Nwet8$3hTl&swGO0;YbhyU@iNAr%I#+De& z8)_bppD}PUFIM9jMD71}9D{erHJ!qzJ*=sg#~$Nj1O#Ox4+50I&?!t`Mz>m76Ui-!cW0& zlCCQj0=k#)qjd)V*4yLb11HDJ?%0hS9OIIYKR)mQkhg%uZ>X4#ie<(1dxpHQrSOUP zYw=$rq%iSI9s4L{7gp?dB6Qz$+!9@=rKb2n$1|Rxt_qZ=aj>QWJs{Qodnqcbp^?@4 zV&YOn*fH|AfAF>?+>R5&tB0=aa}5?gGB9>7>(Gh97rL%!e55dT{jn1CxA6l5zc2H& z9qIKttLC2?7xF8yNFdFGRN~i4!h)w=w_z<@C>f7c?L#pK|Sg=}p2Z1Nopp_D!>&#|*6 z`nPBBmh!|FRL5OKUz&p{O-17LX~DjBR=$ znUhE>J}{%Kh|Dp`l%H8l=2^*{KXVG1pG(FlFClYKG8MC?lKFSZR9-lZ%=aX-aQ10r zzAhPi#dI>mlBv1mbTR{ysasG=rcW|gE-Dk&%A!4O_6&S}n~2>A5*D#GkWLZ10mLt2 z>p(V$*eZ}_5nBP`5iuu-Q^e{)>O`y>#4ci&f>erF1&ASH7l4$D*x4X95i0{J5wQ~a zP|M$NAaO2~Nwsg<^Qjy?kJC3>!i^U8!c5p!qJQ7&9cr(b?v1|(`!sk<;b5d>uw{B8 zAIUVtA8&wBjb005Djz>TLf;Y6nC|5Ru3-=P#`tUA_;ZQp{zyw0o9P*hln=JpfF^p7 zhT|1H_&-C=fWW}d3I{$YjCnnS8!OTCWA~Vb;PKZ@L-4FJ1aBc?RzvWz6qJVGet3^n zy5m*e_;}($r0vB#Vc=jnr%0PPN!o;2(rDs5C<;4Ap?e@iBeVjkD|^84h#@d|#l8ZR z#PO74B=H%9#vdJcXFPiGZ9m2mtUw@(=@^MWk+>3pc<@B$snK-6)W1mS|23um5{Ml; z|JY(P^ba!M1N-)>&~p%)wRELOrYU}iX6}G2(A$ZB0%RF~>!2`RhHPlb!B0D$;d?-0 zCq&|p$Ga*@*jB69!X=4|X%Wk9c_Puo;0FY}d?W%!9rTxg^$hJM${E9Lj}Sf~u0f0WTHpgRE(+~b;FM1$O zGo?3<`^z^TNKGG`;^R$2TMHWoubP-*B%Xv?m|V1a+s2xPd?jwjE61w5gAcLsR_-_v z{Zzx?KDHp9^>`n7^c9qvEzTO5fo48-0p??aiw9lrpEzj5M%9$0gd<@t>ZED>aTw(3 zE9+ml^;fAeBZtf%Fd4IqkEZwuKJ{)pP>3;d>;jdkXK=}s3k_7%%o32qFJui06^IEe zJJz#_uR$ri2aHFTU|JhH*fjVJ+4Jzss~G%`3Ehjd?r1U9Et1!AN~pRB2QvBLkK_4S zj>@t;KOe>mZ>m47<7mWN>3f`t&&RtmZw7BRjJ9%4)Y5pN^{7ui$4jXV!+X1+@66me^|4P@joiWs!u0U3b*)&1aGFygQJ#-@l zd+^a6TMT-lRfemjWwFuG+u9x28tT0%f&@_WTZ8R^uI(~~aMWyoYP+LGRwPp1g6$@( zKeP+rC9)N;ohF8^V90z};n++R*lujzZgd8^!j$d_q1SK+0Gak+M+b6}ZNm2M2=!Vp zR)0iz)~;>7%u(e)CU#@>HK$K=p$!pbkyq|?xm=t+?euBFUM2s;(we0;Y5NQ`qqiRz zawo(z;cD%^j~4g|Qz!V)mID<&M(s{&Crk6Z5L zFh})aEQis(hglJw{;n<=+-!K|q|^ICo%2jy?+?RXoqAtO7<(;l3i#B1sA%gJ8OM6F zWBG{AZM4@nLIAux-1$ z+U?ePlj{1}t?Ho}b2qEot;*G}>SavnAD(T>eFPn?$*1xUt9*85xn{4n02jwHv&4e&?sIOEFO80c}(FcQ@Wt?gRDX zwQB^_hQ)&fXg{b8@ppk%f{uXh0kvZVr-b@~)`9x5;^YTC4%!d83u{UbfbPa3+(}Ra z3%pLeP}@N_fYQw{iO9QojmX<(%bPWQO385E6dQ4VEPNm_KE6Q7kYtxP_YP40@N?ti zcIYNbZOco`uP!@nYe~Pj>bxr!EUuVijbD(KpP4S>dr)_)DA-VH8$j8aUk$E);7?`X zUhp2&=~ji4P`(}4ap;YzO*!JX;z|I=0;Wu#gn7~x+}!MyH87=sTW$-?dNA{@K==sa zeHY~_tXYbWS*5mRrRB>?jb){k%S!DHrFD&^;ejc47Y`MEvGAUPJE1pqz5!Py^g9=d z6prPH*PI@YdHp>u8~Vt%K?|>}bBbl9wuaL3hEijo;EPC`j&hISx({;qgBIQaIa4pl zHI~{33cpB2DEB-ru4&vzxg*)rWBq&$dUitY9QdvE{~+)%aMfiPD(m^Dz&BvZ3zBk7 z=Xn6lz^A29Q~rJ6o-Ftbl*tL4AM3SaItzi<0bfec(jOXG2hb;M!P$;C0B8DNAf8cY zOz&2N??L!nhO6`z!dnQO_48Hw-#6BqSH9Q)@Traq?r4W*ThrNemxMRzOpMI+@Qw;MJl06**Lo_t7uvA|tw3>BhB z-(BP>wclCnDXklrvbxk6F0DfyASKz~(d*IoOJw_FRZ|>AI>Q z`1Wo6{i1;VOo(sjAbPk6vXANf4_yBXmvn6qPAVtH3!bK`(iH8~LG&=!4{_xU z<8~n~>8Li-{!em`;bPjwOp1sbmvlPq14z1oh)iF5n7}478*y=GiF94KGvr=!+i|hU zr5i73GE=!&BlO6gP9TVj+J2GD=W$KJ#Xg>QCvLNFNvCz1UvLR|?r4yX9_D);Io6IP z-|s_zDSb5q>F8k_d2|C5d{EPIjzTBjK|pObC)pH! zUzy2ky|>BGiQkj!{P7J&&L9q#n(|oYm%J_~UYjKERpBa`|K~)Xe~_8c3DvH}GfjQZ zR&<`CwTdoRbiJY-iuNgbr=nk0^gD_^r05|^x&5Cv^x?Ryc{9f#GqjKG5EH}oCB~|#r*rMv{ zm}INxn}A%dcGw+up~UbLQ!GZSuL;KDs@kgRMGm79>!6X#*0{ayh8DMPac%Ww9r(p* z*LJV*-h+rHjrNKvtbJA~ebX2p!g>Q;t&E_2;jXBt3gSnx zq6({oxT6$7goWB$qphMU(CL%w{=Uw3#MK-U#V-=gAB+0rHySeyUu$n~>vl5@?Z2@N z5s_bOcd!ldLdXKQDw(gD$L2^xRJDb=`K=+dfwS=PmuUIUNg@0#-T7JZ0^!Vx7pCl{ zHL(J`F=_jnju(q;cBueXP}{R~`4XY+VmdxGWk1vLX{qN(I)0iMQ+6mFpDy%sE*(ET z_54Z4ONCvHgLJ$sbuLRfZWD*AomNQt3-A)m<+e;$z_)Al9J7WM;B*iDd`rh?2yUm% zbOm@yWwXZx;%s48&nau10z8%U^D7-M7h^8J6_Wk}%efHNz=G7%cmm~opb(2%6Nu^K zb3YHSv6=ewd7qEp?~lxKnu6+HrErlkA8Al{<~-#(;Caxq((Q+DVqU^`^t>$fIRu>f z>iG}j(8+a$EbX!fVcE*{sEl=n;ButZ{injY3}I{Wv_N!`zbD7ra8WqlESgTvF)6c*=fwO+}c;T8HUBANh_bEMJD#k>J^7{Lgo>$$i zaQz)?shNzp80k@7fA`Yz-&DBYa7sz39(+8%Mas`M@wqY->t6~VQ}SGbp}QA2`vZTp zLZ{QYU*e@gf2Y#(tY0bIZg)y~jjzE3GEbBW{k^G1hH|^6iRd&J_X>$m7rT^QmCcPft8>V= z=D_=bGhhAP^D>q14|B->EC>Dq@HzRg1M{3R1#M5>SMvHhyGmZnfuYD&u3Hq&U#eN% zF5re0Q2s}NQ-A&aT95ZPf%9A){z`z(X*b_I-D((4du6zCxP3tjK^nyvmPpy zU8+=(UIk7)`GXd#n}!EDQ~ew`^VPqB>6g+%{7T~G;<(E%5nVs8sr31qKsx>5qW<(8dID#8 z_4oXg{g?RU^RNE9lGpo0biWz_enx(|s8n`FW{%rWfwR2FRsZMwgKlnVc73kQfe%Z3 zy5J9@=v1|dujP<`7&!Gwxcn4PxkJFIPruT~e0mD;dzC(ahDH~VcOl*YF2~VjPKoI9 zmcbCQ-Ss{NB`&Ifv)_sgd%Il8YdKA?&w=+Sd{;*Q+yR{WJC*+DsD%CrxDEYec1HdG z3-D~`fc-||r9$t&(EanI!u384UEUcGV7baO%5|~AZ5enS@NDD4E9K7>b~TQ4J6@YZ zzAp#7-lf}d4RG1d)wp{~1#DHg zG1n=P`xX9gz&Wn4{G}Xt{#mB}el=b& zt(9&Xa73BpF3f>Da^Q^$*Zbpif;~CpzbbLe-;^EWUKP4;NPP19{yzg}{SRN_lydjT zJ8o^g(MU9gUlQ`jsjFRUeBP#(wK#!o;-EEOv>OM7VLxRAhd`M}vRNRXuRY}3(iPg= z+U0AHhI%8u*4Q?IMHlR&!^XR+rAun-ka89w9~Xqo#T*|NXnMEfWF~9`^|i;kySF2X zg~QgpU^Gn>yEda$ZQ-yyr^&irgw-`3-DHKTaa>a-gaeu~p(T@{rIVoANyucGx=Awi z!neY^dRc=Pr>?~|`~NT83CfN8f?pZTGXoI~Rgvp3Os7*G7igXah}`8_gKd$Rd;$uU zf;>)9N#eXg9w%t(QrCb35SwrS;$j#no(gLjRXtU$c+lp=372UJ^6W*+8Il<_CO=7Wz-Ii$K|YlypE8=3 zGOE^5gP#cD^O+C7_-XG7^akZAnHhOxo-COWjLc7QFTHtyC=OM{`Iza6O&o=pjw1%Y zG~(P=oWhwp!IpgkS*M@3O3SYz>+bqdT13t-($7#$OO?%%rILAmZkpVmKJ-_b_*4HK z1AoiHU-;s;jY`c*ZZ(P+3%{n{%aN6)GE#z|Gk!dedZ2tA=Tlh z=|RxQI--|s;V;FY6%RtzQ}Og;kbP+;Y3=%qeyz_wHU~>)W;i1_^}tX6)ddt;>q4c`eni~r6xT`V@CP-g^6L;J51XruYz=>cq|F@0=AS>A+3B>LyJ{Y@jK)o` zyPC~iG2Ng`NVY5+5Rb3~u=uWJyG^_D&yRl=#ExV9;yMSVVW z0R1GC)#z}nZ3}zl@wx$dzH~kKHTZ29IoZK$uag4+KWp-K*Veeb!5)X(UA>sc^ZH=O zeBs^@e^%*}54i~Da;mJ+u8DH;3|v(4M1T2c2VJL?&ol~4Y1h!bc#wA5f!k@ac)qkp zLdx)_YhpQ0+nmQvJa6@kk|sCn&(BQTYCZott#f5`s@m-VoNv9=tqwJws2x7M&7an# zW2wW7(_to?_9HuUBJ9Kdh1B#SV;tl-z&k@*>(n`OL|bR6nPFx>so5NA-!6@y)e0@< zdDb*nI5#=H7;?*qdOA%y_d8uPc?Nd+;nryaW{+c!oXBEQm$NoBUzYfI3$;x|P*bQ= zx8NBEcdHa94s$lkD39 zEoLLW9-QgF8I5ui1~iIo9tWNgDCewudjxbKWO*W;*l1!?1%cD^~n z)B)e-`S#uFZ1ePF%p=ApFh`Af_W1;&`dH3`>y@O|A{$hBhOo*rU zQR)BpWN1M}3p02Jc&@eT{GItuaQNP+fSqcgTF2At-m>x0PaQA@%270FTMxS^_c{F5=^J%G+hgr zds{WH_k$f)@sFr@)Sqc+{S_?Q6t3gzePj3G`wZPSlce(3@pbvT5zcg%DLK7A*M{#M zbb3E8WmwOd@%ep$^;(RJPVZ+PnPKwhnIv`B^`L=Xc&zdDezh?bU;g$~3xg`PQuK2f z@%26(n=0T`tczQnj<2x+c&z#B{nzJTYD%bXpv6=5sDZeOuj^m$le4S%!@6Qrgv|WE zlo5Z#;gpj4|DN$T%~qFLf8RtPMRfjp-`(MAGgend1l@i*&F^N!*ZZhyII!S$>x4~q zQu+S_IJ>GEZX&f`#%V^#EN^E1qYz_P)$#Q{9Gi-tDWA#z0&!V>9bfOOEB_1QA4Oa? zk&dtT4;m`I{(pALjHLR%F4uF2Yt3KpGqf*509>ZNn%DG2$XMg+{fKoc{sRoOI2}*Z z*AQlnKj!jF=`oiiqQNAoyN;)Ux8NbB%kNa*2WK~$68>skQj<4KlDfYOAM>}T5YwaM zeD^qz}Mfa=xaUfrs%V1^R`Dqd+bJ%TC8#92r02d)$er^9azIEB*O}V4% NP4EBCupgQE{~y^cD|Y|@ literal 45880 zcmeHw4R}=5we|@K5K)s*QKO=c7&WNGnIwb+6=p~VW@G}9grABy36lwlB$?^_1hBS7 z1Ijd{aJ^nj`+0Az?X|Y`xuw>tNPQrR68oc4y|q+Zp{=!Nj40NM*sAlr?>>9aoSDoh z*50r8`JTJyIkWd#d+oK?UVH7|v(GshJ0jnn360-9zQDC1w#c79KQ5?8Ag zYIf}$?JR8qFynFAMGSo-r2JvYTKR>NE+2jY!{a1koZSdWf2sj#%2yK7XMQPXmqEk7 zuiF5XuW1&6U^>dG1qBGd)d)y`hY&{7HY)*TI*P}17Mac>>6Kq1UuLg0{UOrM@PSo=48C0ilpkS1;FPb@U5#|2d`9%g{+CFK%euQ(*HmBD z-9Dqct2erS#`@Zt88y`v;Xp;Dke_tSa>1gdB1KXkGJQS#t8sa7O~X}-YZk5sTy#v^ zh>LWKab1avI+}s23D;a)mADq+x*XSZTy&@6a^Wh+H5nJ}faNR0MVU^)Wp-{8P$$Dx zCY%M~t8h_%C$5Wdor8<+0$dYCNYfU=FOp$;SKv}Qo+KA-u2WkaIx}Jp06KeC`20 zTe-R<-g=H5p_Hr3!QU?JykC}gg-r5b4!ixQ9DF(?pF@(5>hA_~=UoFq$D>& z94E7t_c25hzayi(|0f5ZQ*z|{QVzRn%faU=(9=$|b*(}`rMK=Jdb>YIzBlEdAClz~ z%uRn4Irt=U$ZyX8p{ z`D}pv#9Jg@&7Ypi!GAr{O~bYST)X68csJ&dzc+_|`g7nf%7H&K2VR%?S}p1Fypluy zn{)8#$U*;bj(Wc#NBzE@gMNDs{%_}?pCR$X8RgP2UkpdueV1IqIbx0G^{(j&^m@aQ zwot_D)x1~rd6)RtbcG}SPIP4E=UUy5Qx7{C-=$6L%?m(}+1kMVx(tWu(Rs z$TkzPMsW>%U{-nQ=U zfG?eI_4R?SUYS>5W=D54+$kU=tuZI{wS^-{DN3;>+7<#c-{+!TA*hJifHr^v``daT zeApj~xP8Fde2~u@3AJ@a!YHA032;@AuFI#W-7Vf0tJm3F<84~lT;+vQL=Hk^qx`n2 zP_#GF)dO9JLV-|2CB+j&A-^JPj?|&zd%Aj2Wh`Dx3o@!pWz^NX#t<%>RHA;Utwqg| z+O9xYsIS_MDrm4t#YAeLx3+G75)rPqIhz*xl9kJ}8G_bVvhHDqjFYl}do~H2Gkx9u zwh(Ky((8rErNYOaqHVwclFt57(TWXFU=2?))Al%THIp_BDK95?Wou5Ym4|g zMVSm+ue5rbz5bqHWP>;455hnLp|&-Cp-tGfZKhENP(G?)b+jW}=?(o?frfH=*Q{Uf z4f;djKyO=j7ufb$Db})2#B6K|7-o*_jhYX1hn-zW*1N7plrCinCW8~r2lbNC>H}55 zXvFE0dh++8^DwkzYAhi?YB=ndG_nD$4um4etvOQdgKZo6enPW_9kRlN)hrJBdwT-y z$V7B38TMGcuCvYPR5C09Rp7s zZot;WN!pW`zo__qrugWYUD#qM(4J1lPXGraeGq=^rs)v=MD16IC(kvX+f@q&Y(P#v`ZxY z8C_l2COJWyC1FF}aY=ctp;jh?E}1Ij)cyc6N9j^dt$&H<{*-cRT}iy!D`A{ke-VE^ z5z?vk6!8}-Bz-yGCf;hot9dx_+%HQyIe#Xe`!C9=c{TBK6%u}vEHBl^dQxtM2~TTN zuGNG$EX0WEG~t|oyzr_@M^!6ahpwen<9q4&4jNs;kTRc=Kb%xP55dP{e33< zOcQ>G3D5mq<#w9z+~-$r*o3cDNcc~g@M7?jar;epwGYgw113E8-jzFK!t)-2a=Hm` zS4j9rOn8S0f6RoRXToc8ADH%EZ^9Rv@C_z>i3#sC;Y&^U`6j%@gkNC7mz(e|6W(gV zyG{676aFd_-fqG#G~rz)yvKxZGT|Fd_!TDnA``yVgy){6a-Al8lS0CG4ZL3H8XW)h zQj6x=G#nYPe^VfzDBNozLNGCN7f2EpJc|Ef?CB&fAlyl!_@M;c1rHGB5Eb7q;O`RV z&=emQ@I8b%B*k|K_-lkYRLAcY@a=>-G{v_G_)CO2B*ps$d<$V#dps!M8whgUDm_tom6YxC39Ae@} zJ_O=|*@R079};jS;mL&e3;1Hf9Ae_b0-i>gLrZ*zfX^g6h49@1{tRIbCGl+notI7VtfU&nLV? zz+WTGAtQdbfNv+vp(4Ibz+WQFsaCvSz_$?QkP#0G_y)opD&nmI4iV-M5pNRk^@KSz z#O(rJO_)PM+$!K}2y-ZiTLip>Fo%G6iGZ&n%&tGK33wi1cKPum?~D2;Y$be1z?FpA z^~d)M_+rBB^5eq-o<^8meSC+2&m>$;_-+AzhVV?n+XOs`FuU}4zktUPo<%q);J-8j zW)~iB74V-3v+IsG3HUX_?6Tu_0soFLr=oGIfL|oct~+iK@N!aI2g~f5z6KRRT*;nV;vOchS>g)!*ok|TxVZv1 zrEHIjsKUPV!3k zc)9AHjI4ESIviPSJG!eB!;;Y zdsW|EiUb2+DAbBK9m5btrp(cfoSh@%hMEVf3RI&lIqm>t4MYBN)V*0kIsh5ywCxQM~a>5Ml5R!6SA@ z+3R2hh3W7xTKBt#wuta<09r#Vp??NR2aX}xY9!lZ2X;3{N&Q9O-LWTJLwA)O0Ikz@ zbT>yoX5SB3{~C-h30x_NpWJPM$h$ZwdZ4~zC>Q*O*irrIcR@TbT*w56gm(Ao-$5E% zVpk=SJaRey96LEda-jxEpe*p!gm28H;pDhA>_7ev_t~#*jAqz>cZ5;?8*Dy;}z*5By>Ni66U3SRx2x=q& zJ+Yd<4J2&s%WtExy0*Jw4P}M4VR|L5p{aJqV8LAjht3!{mRG##`{0E2D)Cz1CV~ zhNRFVv`~~>kf+sJ9!)#eT~=yTQg=bAYowvfVpLB_PKrgT!UMT-eB-Sqa{y&#>IYX*ILiFCSJVe;Uli%@Bhj6oa^1AuC5!)N<<^f zd)6I$ffmM!(Ot0fjl;#84k5$Q3tWRIf4j+|?U@f{h!(I5<TEFFD7<;fx)bCl)y-~lDfi~5z(@gaaWQtb- zgVAZ!j=%UfAT)fVH`;hNi_ipCjQVF8WolGvI4a)sWt0{RgI5|YsOF8LQ9&bz2@cwI zS8U5iHmVdK=9sl*eV1JT>}wj0N_KJjzl%nGztLw8J}N5GQ};pS5_G$0l&cDvqMwRq z?;4#YdsBDao6+B}3m3xydTr3Xsq1gEzj;@;GN*ydwdl#N*ogi+bVqt4Sm-YNQBU5q zS~?}w*ovtot}aaK=Hh`CBP!+AKSSN;07DI%tUOixhgtlOv2sNnt;sO++k|RvEDOe8 zU>i6tTe>tQez{3(8yP6EpM_%L=K_-?xM@=|@&QJMSxR$$nOeEZoVs}H8D^~n$zdWe z`l|?0hf_gK|2+76PC$dwuV4jC{1d>_2+kmQ>@AE|az4V@Ax71|Vy;x0Hcz@77<8|u znbSOA(DqCiw`(vz4RJN*bR{eIrj=+}N>NMFB37m%mZwFurXsFQi}0r++L9_Q-nv96 zB~n(XUx$Lm>j9c#=O<(BjGdJpJ3ARWjj@-c$IeK`7BKc4fT^jBnE$2Cm$tJl$F3-2 zGvg4b{|DAhp22SkpmEbH#T)C<|9OVG%Pj1}I9fW4zUE(GzJi}IKuo}Tx^b|*tQ?7= zH^xuqsA;y6oIP9)tNb^%4p`+j3RQ>dk8c6Sq@)MU1f?XnM@VFhUz}8&4-=#RQ$)IA zPwrm$S&Us`Vglr?L=HU3Uc(i8Ltg-8VXSIhff>fNpmXfx`o|S}S$`UHCpKU)QGh9V zI4L-fS{@w#TbBhi7=PIRF)aE&Pl1K4wOox?K^mM$md|92QsY&tz8e`+gX{HbNQB{| ztiUk{!*C3?l%1p>IU*)&4Ka+t#*FRDY%d@vB|!;iWv?S0rW%_fh;a>0Eo+Efjx0)% zm0(<|ujRZDJ*KGNXWj%lcEEM>6Ri4`j%yvO9M?I#tM+1aM~R9}B|7V)OQiay;*C#H zRi?@=-pJZZT7LX1fc8-FdJyG7Rj|NlsV!xNySGBw`gb6d^#d_RKPO}nv%6Gh6G`tpav1t;tTtunP2d}k0+UU=DnvRr9xdJ& zW%;0qN{AHi1YmCA+EQZyZABM3c=>Kj05#ilwqbn}M2$T*@LnQPcJq&UfE$r8ZLD`< zKhZl8iJ5(*4ZWciE8JCkjrtl!mK?yK0knTCxp$wy`g-qhB4PheQC}7gqZ?ia*~kP| zm+kQGWNk6(WpEf>Ce>pZz0TmeaR>okK|{niBS)O0~9^#rnbe*(KKpBj=LnKdXXv#xlRUGmdFidCD-$& z{(s>0+Q!&EckE~G*e~3%cVi#u-^VuL9xKd3A8#mFpFyEx`S#6gF_L~B<+!m7TKZ7G zfcT-c7=mFIutpDh(ccQ%gRX%QVJ)b`SM{fX^TfjCuGqC@7NhpROLdJP6LcG#y^BC> z1^VL0T(_OP#c4r1hjGE?^fw_7lwE7;s5o2r29i)+D}=h77%5E6u0E~aEy;F2I+fId z?RK!GVyuuP)zpMBSoNR22Juh_*!@mteB9MguHOb8yQxQ3!*?+@GPlQ3HyzuAJ>C2| z#}jw#!@)VWmuyFE&*@JhGWH}EsX;^xe1KK91p$uBMCcb{$zW)&8-%d;*dsF9YU&u`W#W`kZx=x6>oI02&REEd#72v zmA6Hb+PUkGD0~qx#pm`T6OF^q&975Dn73nK4lMZ~Gf!FIm*R$qg}Rj@Vq2k!7RWm& zY%Zf<=oqYFaQF{GAoM(s8a4$Jyh{`-X@X+P)+0=C1{mmPb6X}$>7t#e5$oJkjaIouS~9r1Y)&Eke*a`c;(j<|kNW?hdDE@RJ(ea;aL;=pC;@wN7A6hSzH*u#n(d zV#Mt2>N%<6tz!7&ix;U&6W6?$Ow|mxz2o!#ywuJI47C zLCg-^u?fk%J+asHT1HTwsb!5Uxu@>!h|Lqbv<&l?efe0wt;LxArrw4uTmzrS_Q-Xv zoA*)*ciqd;mzm5ncxjp1fZp^HcS#QFkN+-_Xao~Cb}0(oT_a53$~g z1c7B?qm@jq(aa!tEhN!>{M!x2`gdfKD8wAmdFM{Q6Da+kh>z{nVz}`|IjwE(q2ZX08Be-$ znKl|>Vc&q*qo;J3?oYUl+XkMx!;woI+zfNe&9E?JadR^)@@st^!l{j}_prjyr~DRw zUgDv#byt{gM>(5UF2@zf^03JsHajO!@wbxhzVf?c|e8p1~Sp-uN^)VJ~&l z@FR?iRF2pS*}z8ZC(*;S?*L}{oaqfbR>C_qlkM2uwU?||2_CUvxhwX#E9PO_#qySu zb76@G4nfSpYk`4nUh5jFFZ`&xY6I`(VgC;sJ+Xs{CNQZ#>EozRP}HS{>cuU-|2G+O zV)z~ESlQQJv$h+sAJ1uIoZ~l00J{bklwgKklCT(X{H%lK8rs$@_Qt+gxN`>;T{jY$ zTKrJ4ZFu0liz8?3CQg45+KfGB8{YKe=tSIYgsmP}$(wTB|}XMkbbGW#=07 z3iYo#9`)BMg&vpsCmGZ$6Ro8y_rkuKFmu3qlDWDEulw(>^xYPeo3rJCCkiow$JsuT z`fv^2R2rWM0y)1ijW;*H&SNKB5|w^w;(d9=8}}Ee{W0E7+y;y9*fw}mN$U0xXHBgj z(r-nj@dnXW0LBd>i)ZNmGCn!(v?b!}*r5-CFE^*YhD_ouBs7ngId>_qORadWNlVd@ ziddZ%(VL36J~`SKZ@q;=P+;V*|L1Qo^m6Ps5q!%`a0dx)Pbat-BSw-#m;~(MOdLFB zf+ZxFO^msmpL7QG12fCSufg)`>3NQr30@|_e*%-NZ}(u2<`{Io&-y%qYQL|fc;f*S zORSzeu|Mi;AM{xstSEQ?Cl*1wcpQjLxq`@nePYIX1Z+LHIrbWGSS)pMv9un4j-B&B zO}&spU&~D-%#jFi@ndYTiVedPVThQvZm71v$~Fpv!dBxg1jW>DO95?+H&M33H=?_3v7%O&P zgCdCX5;=#+fxV)_{Cr|QRj_sD>EeSGZza#pyO?ED(ZhnHjoN2NLBY?cG*eGw( zC=U}n4x>zM)6S*NOf9{5OdkFttXR*T1EbwOHI=OnW# z5?L9waAq|A1{gO%0B;#Ipe+Lq!ENaTzc&+nj|4vl22HF1<2{$3;_wkS0I-e9$;-gLDWfhU9%rl)k!IK6so%7s zg=~5*V)YDO@aCl!&GwR!esHd9`itBuy@fTp-Icf3o%aOj!=$f!aox*~!MVHeppO_! zzW58!;GEW(qu`o_!WS+iot?=42s z{*AiMRyV~Pmr`0&f0JvIZ-%*?4?mA`o&`+DwtvL@9Eax2^YhQ*&TeviQ}<_FvHgQ{ z2Tv2H)v&(Q+adC%;b`W)n!`Y|vW)$e7POQnuz9Fo2(iV?6B2tCu914fhdv7RekA7I;oHVf5SD!QM0oqg7vdjB z8w<)>!PhlZUAAXEGvJ$5_8e-&At>UFle6bQP$T;X5!xvd7$+=npAM&N_U<7{=laGV z`v6K{`Trf|H~K?y{udj0AG!z6;d;Lwr#LK}_9XikJ22^IKNbLkrKX8yQS$c7C2yg= zMtEqAx00~2lCed5Wpcn9P2UW4q#~|NYccav5%WwvLx*TF)n$4Nnvb)eFh#Buk=)N) z%UDl(>|z;PqBkC+WMV_-LKqE-ia@NrjSf7`{y%2FX%)=uBlnxOaqnWxepBZvJ9Lw4zp3*X2=xE&ep4N6 zA=!Uq-)}k!o00RE%>AbC{mk4$9mjst_0VSS{ic7!cw!MPBe~ymf>GFwG+m6@>H9tA z+Y1)lUhwdg3b%y|M^EC(c(yPX)=7ooV!7Yg-@Kp-2^ zoBG#y2X-)=Y)Gycw>dW*ja<2h4<+kjznHuExUr21#8rcLl^>yC;M>>#`C2V9_e=Hl zqs?-?p6tpUU&4*R`|=_y#r!}N1-*P}|DD@4+fg>uX-}hTX=G1>4&6hFVBL-T z(`X_G6Z?yIAK`rhH_acL&Hsd&j!#7beL1K{2S(B9a+vrm){SVdI07M$4~f%5B5Dq! z!2Si~W!R7Czi?*HihXtTIj)^QyyJxn^%E;5yWg$Am@S?gc7@d_b*lTZq4V{n9#I zNPHEL5aoQcD7PYIN|bVna;6l8=)94nFprYuU|)&K;(g}H*&{~ev=PigRQfP?AqQvU zh|Y1(bEl;St8wbIFp$OEgzYOVeex3Y(?UYPBq0c=DwBKrdc55}D!g_pXU_&bA6nuOIIc3G~ih zh!>d5aCCR|`A?p7@+56m^{nc&_lPI}uNP@_G`BdHSe857E$&4NELU2z2wo^v;R^<} zuHJ}+uM;VsZqQrAJ5MHQ?uHel6OiRRXVYSj$Gu3az?)Y21{8m&q7#Yx0$uHvuE6Zs z9*Mnz!16$7tv@tp_U!IJAXq-#va-P+f|Tvxmx^8`SqsjtrA=lK+Av<`f_H3%@v0w7 zOMR1tZ)Iugv9v`jkYb(4B3%X}+u5`6A}_uPrkr`T_=3$wx{If4=C=pUo{cvR;mt?j z8udjiz0vM&OK%`z>BDP++O>L*k#DnjDGW0-UJoNOBwr(9_Utbj0VvPV0ELYN(&g1w z2Lj!ulFy!Pypl!;D08@Qx>n!Tdr`!K*S=UFmaJp)60e4_B-O61)K*zYOlb|QCD3az zlX?#<|8?@gf_4c22@c|fmhbxJvu%i zzZBN20OQ4G4quYBuRq-_ma zj^V7h6H2%LO2a=S{Ug%X<{0=A={H9ztH{q6z{|aiH<;laRKe~xpC4~BQ}4W*Dc)zs zS5j4a=Rq*MrwH#i^SkF?v#7EwP^)YhDXP3RhBY~zd@qr6X?LH~+vkMC>!q3U}l>z#otE z(((K?-JykspZQ)v`zix(Z!-KQ>9`(-|={_@Vsznt#~p8>-Ab|(_U2p@YK zdn&jYQ~pFE!8?EDPhu*7X=y2D+uR{ufri_O z#HMg7{PLF)iOujke*^yTx4)c79D{%CU?O3`y}Cl26sUz?3cm?{IefmEp%#8W{Ppm+ z!`}>_?_WO#pYQ$L%(Er%x5LLPpS1hp-~)d%<>P zBnq)(vjV;qej&DSR>1GX##cZ5_3*dD-wb~T{O$0cf`1?UL-2L@e7tu3+lfRe^tBnj z75-CyM*h^_yU-u}+GC)He;@pEu@9zcQ)I>D-LOQCF^(?(h9-}YbYebVVVZBqt7+AUE5mgN^6DsK!PE}y+!vM1D=M8=WSLh~ zKCj5?D5_mhbY#N7#4Qts#(%kBYyPd{M7nE{?f~+A9=1?WxF&nL{rPF>nAaV+`k%#G z81fdZPo{GeSsIF(1`57R{-k>p*L~Q87=d4KAf3)G=_vmzxSFuR^F{KT(wR+;hN57a z7=r&)&~JY(k@yvqRnVDC2RRlL-JNe_Ksg#f*Mv6VC*2Ov%_}N#6qPP0+Lk|%|7And zl%t#ca5Lf}(j7_iGwYEBG^(MCP0&di*eJ^EfaS;{liwAf zyAO22zBU^CVE4&9nVjXh3v`D@$J!U)J$?ZAO#Av7@cXmi`7Y(BfX}qA_kkY<-fUmg z#}p9iz-Oxu%2kc{6&Oo0?S<)@fag8)f*nac8SNB0U?ZKdW&BW5Y2a0c%R_z`la|w7 zew|Dw>R0qzuoTioK(`H>)sx7-v|HJupzF^|^(%LQuJ%WXL>cK?K&R@|o@&1h;CBVa zs3weYeuPQ0bsTE-%lXbC%TNJ&=q=-2Mb=v!6KS9kL#>L*@ zbOL7})tT^rOrQ!tlVYbjIf}1+4bd0FUrK;niWONRe^vfUAhl-x(J?^le2u{MxXN+e zLEv^=XD~qbJzOMvlfdJ+NcLv}Oz2~PE|LEb%TvuTB@m7QrtKo+z{Rw^1cc{NzJ=;NgzG(G=@R*mqEzf5X&XNVI`2XRXd6)i?FcX>ziW~A z76LQ}L1u7BR|#B%KNfovfy?kWBog57Q5o|tW1hfY znTpW?Fws!Ml+%kg##5_BL4_1kXFI=bq}sQYBzvOxL@F((!_D) z{RV%uK;hoSKRM@-(fhbaCR~A(W-|P|6Y!TzLsI5BJ^vyG*7A>z0aoa%1lHp^$N*g; zf78u0DxT~_ekg!&4A>AFM+s^GQ2(jwI_L*@b8jBzu5lP5?A(;f??x zw*w?QufQO?cp?BhfS({%IJ13!Q{prRuD~?`SqZV31>fSLeTeIbQDT-O)i!DnAshp2 z8&N`yxXN+eK!6xi+kn2#1BNR4HZF~`l_FgC6Zj%7CKfJ{e-G7lCVM_s$P8S0)IZB7 z+}~0M$iEwj{}~sLp=qjyPb9z^PI6b}tV0B4c!LyPTvy|Y6Ls{#OJ4JsQ|Q%`O=4w;28o2@53tRDY1^f1Ta4DW2~NH^Y24xEbcVx#>wzyDu-|1dMBJ7vAzEB%M0zfbxv zNdHyoAC>-uvkm!Alm3O$uaW+I=`WLhhxGfTf2;KGl>WWae@OcKr2m5SUzPq*=}$OE z<}dvVrC%fc`O;q|{SN8(N&i;q-zoijrT>uh_euW+>Ax!dqtc&%vmkV*N&iCW*GPZ9 z^p{D$L;8Kvzg7BoO8;KzKP3Hq(tknvuS)-@^e4#nf130!lzxr$=SzQ?^gE>AC;eNc zf2Z{CmHtE0-zR-hNYmdPvfY18`gcqJUg_U2{SoQo(+#sN^=yq_vdeu37l9B~Fji(d_hA)mSg1;<@1%W9(4PtyU$qPC0Atu4Y+ZGD7Z7_JK@bx|APf9ehM@TmP7kURw35>GXw~>Yvl`CnWpvbo`{`c$SVoQR|oeTRQ$EO^vtd_>+_4 zU^>1?+a~Ql9bcUMK5RO^L~HWe%_tGh$CoO&ji2fAIjNW9sW~nmb8$7!rsJn*e9da6 z%g1P$ZCuFLPSdP%yfmlD$LOfW(R6&N7L2r-Q6ij=U6A3dcr`0XCEtUXv-^gm=eDldJt*Xd z%*Llr(2s4uTY%>#L1w@4uM(fx&+tBTUY3625#XnxU1$sKWT+W-tZ9D$-h$bAX8*v) zU%54r>GCmQ&xSuU2fjk!Pty1nY&x(NZVvF2N8PtSLE>+g^x8DLz+5cxyf4o0W~%f}O5lawmKuC`uYqoS}0<%p_79@@6lwF=N;fJGOgJwzyd5Az7%+V_p?dbt;)9ycp?8ByI=!QxF;ok zzsD|Majy%Zaaea{ljpP?_yxcVJy#lhcyE}lD+m280zY33Aw1qa@Hr}O8azbOa) z8S)v2c67O2q*v?tKS+9Y4oO7f@5D()J8kW26)21N$GIxtv&lIi@kivkQIc!-0k6_3 z9Q`K*{~}GD3sUv?KJYB>-An9(e7P*|6bL*S`c(5hG%MlG2cGoXYU~2`OZu6>XOnYr z4!lqBFV!|@wD%i;w+PPZe|G?n(PB)0`;8p@p9Y@xu;N;~U{o&I9Ri;6tMgm4B>qNp z@svOFd)C{5C!fL$J`Vy<`{${9I*a(ny>rP&f=Ta?tdRJJfzPJ@UkdsnO`XfClk`*2gjg;j|e>w5x!M)EwL zvfF1Qy*k&Y@V^Cq^0-oMhm>Ek*A4@pO`elc5!vu%Iq)vvsfX4kLrydw;hqPcWLt)vR05AAW8Bsp_-_JF zIn{a0(`Cv>a?t-0_!6|c%=-Pk;?rms92QDGZ|2}LX$tjrM)LcHD7|3!9D9%WaHfzQS#DCt{m2K|UgrD?b2p#ORf{6mtD zI@hb@c|HgIA9LWXFqmv|x`3yhY?l4A6jN)=L4P~&Y)972_6vm*ug=wqJF#H)ErB2V zJ^Gh{&nC}ba^Nj6LD8Q}ISZwrmjlo7MU!}o#9uG*mJIqY0-r74yMUjDw3*+j-<^a0 z*E#TS!&2GwBQXbhK&XSS)fX^n+)Tu^0y)V)#C{?}806rVNJqNxy2YxN^w5xtO zuc(#s#DN$6;gxoQQRVGF+rT%?G4N`<*bIC&dG5`D-zE8|=P(rimFHyV69Jy`*qe-e zWi@K=NPO$n2A*ft=nBrw&gXQAAI_*32k_+IzrZfi^QuF z=k$dl;Ybv3MB;b1uc}|-^|+f`yj~4oYsQDl@x&9pJ1;(g-s=zJ(WEgbueUwmUDF*{ z-PY}G$5+e4-nQs^4cp0hz61}XRLrWZ!{@8B5qY`QZERC}vGp958P0CZFQsx1h~C;WqJg=rl|g6u#)3kuD1+3kqL|&P-P`7F9b2H8x!pK3|=| zs%mV8)nkaN$EK?uTfpiptj5ZKuV!b6fKO#-V8)iEPV+ACES~4^;B)8E)n2zZSr_~S zyQkLc?FxuH8eZ@GC5}d?*SV+x24z^63`+)!=g)6;ws>0{^E^&cq*XJ&Xl_E_X)J!_ zz2UQq9F6XJew!VSF3oiE3*}XKcFehCQM0$kcuvdNWV0FH75De?rUvhG7~c;!K1Oc0 ztM7U9%i-YEEIwN=(Z)x_*UU?&(p};ykIo0my7wro9?RL}0 z+eNmvD(ID8ocD?cIO{8&OBObVGZf)n(}QOvh309QSM_<9$eyjfyDc0>PiGXZ3Lm*g zzAVoFsK@N^tXQMB9S^dG0vl*U6cD#jd%Hdt^?QR6C_SxC5n0{t?`Vs5N05sBvQZ79 zs=e2=cQ=i-l!eWanZC$|pdZF&t3r0`y4wA{qCq;-sVi*g;I9!mR&@u~IMEJgdeHE? zh2~)o&DGu>4Yq++8tQ_bWEhdS<7w1w$S({yvsN>nQA|BfSIy^ag#ucdGoEY{I;rgn zgvE1&)ou(Rd`v9E5U#;f!O17^KH(EwY;)B+?fAK9rhKXrm)B zERYW&XGDtzSbu27lMs9yDowwwY*UhDD$iz+{|k)bbf& z(WslfX%C5}j{%|?dc~OubpGAl7?wjlI1&-@%l;v~V`QN+4Y#EQIo26NPyn3^9*4$1 zo;^SQVJOv_Dyi7?!~Bw_O4!U6eme2ABQkwY9VZa18DZn z4C=y=F{Uw_$FiAgkeZ_kOO(AlUekny%^_Tk@hOB9^;4H#2{%d&NYpK|OPJADxk zgMaKJwzg^*H2Qrx=7iXGB6S>7FpP8ruqQ3d+-p2ZpQ*H#Whq-qGkaDDMY3UZ#wdfT z5{By3jCTTz;X~W_B+rzm)pv~{oM~EP2>lUbMaF2A^XYHX91xQ~%y&2ju5Vl2)n~KK zLYOlVZ&F*RYV%abG!?>$B0r8irN?6|NU855gw0a7(C=|3KHidwDU`VlRIl!i`sKu` z=3^B)R>lUan)OsdYnb<7bnn9ZD{jCW5PX5)2DfLm6|>AL6=xt}2UR_RK3E^R#x~AP zo6M6_XY))7Y-T0Uez@6Cv zc+pp?B^dh>jA59QW$izVHIpyWohoZpMWxX}%WA5=9&5WXow%3)e(Jf$*m^$Ben!lO zyTZXhn6oVo2ghampTvCGajEe%eF!tu^N;FcGZq5g_He-4+1A^RBY_+_)Z(i#765bN?5|ug zmK?z-CMbPfp-8k1W^e9d)0dz%8~okfn0JFE7-Fd%Fs4Qr%EJCoB-;$hxrEI%HJ#!_ z3Y#5Pud}(v+q4kxT2T$n*m9_-(~OtTq?(`HpGa@hM(@YSwT8PsW*nl8%@oH@-OXW7Pqo}XNN&@jihTOqr(tVa(!fGJC0U3zZ;OJqyK4EGak;#I+Pv$VrcZ@C$E_38WEzr)r-y4@t- zDfO|-2+^9-)QXNY$KGh>f=HMnss=0Kh_5rf#@LTzo-OK9Y?%tX88f&qSZ4H}rVSk@ z_qkZxa%F~|vYFeNkqubT1<~yWLflGCw;Ze}Q+q|;$_in}rfIy~Up2R8Y~>j%q_zGH z*;h#-{c7x8_GT#|UqP1M3_8|@F!xQzd9TS`zEo4S)!^M}-d^mQuI3gpb_Ouh3W_b( z40e9;RDl?!|3Tfa=mplab#Y~wO+xRQ_3OQmJsiLTBVDMpK5K?$U=aG0vj2S>;K!|` zg|zj9V4Sk6Pc8Z1Y6of-%=FdbJ$QXuhp*SXXNVEbZXq<*Tl^e<97U~{>SkPjYkLMZGQypfZ!wo$05?3I)9*$ z=Oe(Q0V@5}adtp>ACg2^1eE$y=U5fSh5*k}sPy)6c0d}Q_Y$7ALi#G5ItQX)o+H7L z(qww|JTKoTN4H&`TVNiPU8ToyAmQfXpGQ7b{_43gD~>+Uspo+WDD_;0f)^mbXK__} z^?X?f=icbiOhjGQE19nKaaMX8Lu&QLi|rE}VLvdxbO{90y8sN)Ck# zAz)6go|EgB>G{4uvs35(l>dc{^y+!K{T3tD12fI3dB!Q-4FELfzj2ve5Nup#(5Z93 z24v_;h3Us|F}>2iIZvAj1)4(S}5dV5{)N>rfENEN^E1 zBcQ?2;ADFByiiHdV3SFo2|o`qmR}y*PCn09dYtJGAT5ii(yQkdEi!$HtXOs4OXaW1 z_1ld6)$@&38aAAwH=q=(@UJ4mEWdgVvR0PI)Sw zg5N=aI8}c2oYD@N{wWn96WFD%yuTno`K<~eeU<)SGw`IPQ$CJWC-W8&@~<&!1o*LH xv&#Slp9~cV@}@o`)e+f`;n-dBOV98{|kSgl$!tm diff --git a/io/network/test.cpp b/io/network/test.cpp index 16c6d2444..73a96090c 100644 --- a/io/network/test.cpp +++ b/io/network/test.cpp @@ -8,50 +8,27 @@ std::hash hash; #include "debug/log.hpp" #include "socket.hpp" -#include "worker.hpp" +#include "http/worker.hpp" -std::array workers; +std::array workers; +std::array threads; + +std::atomic alive { true }; void exiting() { - for(size_t i = 0; i < workers.size(); ++i) - { - auto n = workers[i].requests.load(); - std::cout << "worker " << i << " responded to " << n << " requests" << std::endl; - } + LOG_DEBUG("Exiting..."); } void sigint_handler(int) { + exiting(); std::abort(); } #define MAXEVENTS 64 -static int -make_socket_non_blocking (int sfd) -{ - int flags, s; - - flags = fcntl (sfd, F_GETFL, 0); - if (flags == -1) - { - perror ("fcntl"); - return -1; - } - - flags |= O_NONBLOCK; - s = fcntl (sfd, F_SETFL, flags); - if (s == -1) - { - perror ("fcntl"); - return -1; - } - - return 0; -} - int main(void) { std::atexit(exiting); @@ -60,7 +37,15 @@ int main(void) for(size_t i = 0; i < workers.size(); ++i) { auto& w = workers[i]; - w.start(); + + threads[i] = std::thread([&w]() { + while(alive) + { + LOG_DEBUG("Worker " << hash(std::this_thread::get_id()) + << " waiting... "); + w.wait_and_process_events(); + } + }); } /* size_t WORKERS = std::thread::hardware_concurrency(); */ @@ -76,7 +61,7 @@ int main(void) int idx = 0; - auto socket = io::Socket::create("0.0.0.0", "7474"); + auto socket = io::Socket::bind("0.0.0.0", "7474"); socket.set_non_blocking(); socket.listen(1024); @@ -109,7 +94,6 @@ int main(void) int n, i; n = epoll_wait (efd, events, MAXEVENTS, -1); - LOG_DEBUG("MASTER WOKEN UP"); for (i = 0; i < n; i++) { @@ -130,12 +114,17 @@ int main(void) means one or more incoming connections. */ while (true) { + LOG_DEBUG("Trying to accept... "); if(!workers[idx].accept(socket)) + { + LOG_DEBUG("Did not accept!"); break; + } idx = (idx + 1) % workers.size(); + LOG_DEBUG("Accepted a new connection!"); } - + /* struct sockaddr in_addr; */ /* socklen_t in_len; */ /* int infd; */ diff --git a/io/network/worker.hpp b/io/network/worker.hpp index 8a8487aa6..9e7692bb2 100644 --- a/io/network/worker.hpp +++ b/io/network/worker.hpp @@ -5,7 +5,7 @@ namespace io { - const char* response = "HTTP/1.1 200 OK\r\nContent-Length:0\r\nConnection:Keep-Alive\r\n\r\nHTTP/1.1 200 OK\r\nContent-Length:0\r\nConnection:Keep-Alive\r\n\r\n"; + const char* response = "HTTP/1.1 200 OK\r\nContent-Length:0\r\nConnection:Keep-Alive\r\n\r\n"; size_t len = strlen(response); @@ -23,13 +23,8 @@ public: if(!s.is_open()) return false; - s.set_non_blocking(); - - auto stream = new TcpStream(std::move(s), EPOLLIN | EPOLLET); - - this->add(stream->socket, stream->event); - - LOG_DEBUG("Listening to TCP stream at" << stream->socket.id()) + this->add(s); + return true; } diff --git a/memory/lazy_gc.hpp b/memory/lazy_gc.hpp new file mode 100644 index 000000000..00d8b63fa --- /dev/null +++ b/memory/lazy_gc.hpp @@ -0,0 +1,34 @@ +#pragma once + +#include + +#include "utils/crtp.hpp" +#include "threading/sync/lockable.hpp" + +template +class LazyGC : Crtp, Lockable +{ +public: + void add_ref() + { + ref_count.fetch_add(1, std::memory_order_relaxed); + } + + void release_ref() + { + // get refcount and subtract atomically + auto count = ref_count.fetch_sub(1, std::memory_order_acq_rel); + + // fetch_sub first returns and then subtrarcts so the refcount is + // zero when fetch_sub returns 1 + if(count != 1) + return; + + auto guard = this->aacquire(); + + this->derived().vacuum(); + } + +private: + std::atomic ref_count {0}; +}; diff --git a/mvcc/mvcc.hpp b/mvcc/mvcc.hpp index c35248ad0..4f95f5554 100644 --- a/mvcc/mvcc.hpp +++ b/mvcc/mvcc.hpp @@ -37,7 +37,7 @@ public: bool visible(const tx::Transaction& t) { // TODO check if the record was created by a transaction that has been - // aborted. one might implement this by checking the hints in mvcc + // aborted. one might implement this by checking the hints in mvcc // anc/or consulting the commit log // Mike Olson says 17 march 1993: the tests in this routine are correct; @@ -68,7 +68,7 @@ public: T* latest_visible(const tx::Transaction& t) { T* record = static_cast(this), *newer = this->newer(); - + // move down through the versions of the nodes until you find the first // one visible to this transaction. if no visible records are found, // the function returns a nullptr diff --git a/mvcc/version.hpp b/mvcc/version.hpp index 924e84eff..87fcb13bb 100644 --- a/mvcc/version.hpp +++ b/mvcc/version.hpp @@ -36,7 +36,7 @@ private: // points to a newer version of this record. the newer version also has // this pointer which points to an even more recent version. if no newer // version is present, this value points to a nullptr - std::atomic versions; + std::atomic newer_version; }; } diff --git a/mvcc/version_list.hpp b/mvcc/version_list.hpp index a2e63b170..b92ea5458 100644 --- a/mvcc/version_list.hpp +++ b/mvcc/version_list.hpp @@ -4,6 +4,7 @@ #include "transactions/transaction.hpp" #include "memory/lazy_gc.hpp" +#include "mvcc/mvcc_error.hpp" namespace mvcc { @@ -17,11 +18,75 @@ public: }; - VersionList() = default; + VersionList(const tx::Transaction& t) + { + // create a first version of the record + auto v1 = new T(); + + // mark the record as created by the transaction t + v1->mark_created(t); + + head.store(v1, std::memory_order_release); + } private: std::atomic head; + T* find(const tx::Transaction& t) + { + auto r = head.load(std::memory_order_acquire); + + // nullptr + // | + // [v1] ... + // | + // [v2] <------+ + // | | + // [v3] <------+ + // | | Jump backwards until you find a first visible + // [VerList] ----+ version, or you reach the end of the list + // + while(r != nullptr && !r->visible(t)) + r = r->next.load(std::memory_order_acquire); + + return r; + } + + T* update(const tx::Transaction& t) + { + + + + } + + bool remove(const tx::Transaction& t) + { + // take a lock on this node + auto guard = this->acquire(); + + // find the visible record version to delete + auto r = find(t); + + // exit if we haven't found any visible records + if(r == nullptr) + return false; + + // if the record hasn't been deleted yet, it's ok to delete it + if(!r->tx.max()) + { + // mark the record as deleted + r->mark_deleted(t); + return true; + } + + auto hints = r->hints.load(std::memory_order_acquire); + + if(hints.max.is_committed() || hints.max.is_unknown()) + throw MvccError("Can't serialize"); + + //... + } + void vacuum() { diff --git a/transactions/engine.hpp b/transactions/engine.hpp index efe8be1ad..9ac69ad56 100644 --- a/transactions/engine.hpp +++ b/transactions/engine.hpp @@ -27,7 +27,7 @@ public: using sptr = std::shared_ptr; Engine() : counter(0) {} - + const Transaction& begin() { auto guard = this->acquire_unique(); @@ -68,7 +68,7 @@ public: { auto guard = this->acquire_unique(); CommitLog::get().set_aborted(t.id); - + finalize(t); }