benchmarking and fixes on the server

This commit is contained in:
Dominik Tomičević 2015-10-29 00:34:43 +01:00
parent bce9762add
commit 57c624698f
6 changed files with 80 additions and 31 deletions

View File

@ -19,7 +19,7 @@ public:
freeaddrinfo(info);
}
static AddrInfo get(const char* port)
static AddrInfo get(const char* addr, const char* port)
{
struct addrinfo hints;
memset(&hints, 0, sizeof(struct addrinfo));
@ -29,7 +29,7 @@ public:
hints.ai_flags = AI_PASSIVE;
struct addrinfo* result;
auto status = getaddrinfo("0.0.0.0", port, &hints, &result);
auto status = getaddrinfo(addr, port, &hints, &result);
if(status != 0)
throw NetworkError(gai_strerror(status));

View File

@ -32,6 +32,11 @@ public:
return epoll_wait(epoll_fd, events, max_events, timeout);
}
int id() const
{
return epoll_fd;
}
private:
int epoll_fd;
};

View File

@ -18,10 +18,9 @@ class Listener : public Crtp<Derived>
public:
Listener() : listener(0)
{
thread = std::thread([this]() { loop(); });
}
Listener(Listener&& other)
Listener(Listener&& other) : listener(0)
{
this->thread = std::move(other.thread);
this->listener = std::move(other.listener);
@ -29,8 +28,16 @@ public:
~Listener()
{
LOG_DEBUG("JOIN THREAD");
alive.store(false, std::memory_order_release);
thread.join();
if(thread.joinable())
thread.join();
}
void start()
{
thread = std::thread([this]() { loop(); });
}
void add(Socket& socket, Epoll::Event& event)
@ -38,16 +45,18 @@ public:
listener.add(socket, &event);
}
private:
//protected:
void loop()
{
LOG_DEBUG("Thread starting " << listener.id() << " " << hash(std::this_thread::get_id()));
constexpr size_t MAX_EVENTS = 64;
Epoll::Event events[MAX_EVENTS];
while(alive.load(std::memory_order_acquire))
{
auto n = listener.wait(events, MAX_EVENTS, -1);
auto n = listener.wait(events, MAX_EVENTS, 1000);
for(int i = 0; i < n; ++i)
{
@ -78,6 +87,8 @@ private:
this->derived().on_read(stream);
}
}
LOG_DEBUG("Thread exiting");
}
std::atomic<bool> alive {true};

View File

@ -51,9 +51,9 @@ public:
return socket != -1;
}
static Socket create(const char* port)
static Socket create(const char* addr, const char* port)
{
auto info = AddrInfo::get(port);
auto info = AddrInfo::get(addr, port);
for(struct addrinfo* it = info; it != nullptr; it = it->ai_next)
{
@ -64,9 +64,9 @@ public:
if(!s.is_open())
continue;
/* int on = 1; */
/* if(setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))) */
/* continue; */
int on = 1;
if(setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)))
continue;
if(s.bind(it->ai_addr, it->ai_addrlen))
return std::move(s);

View File

@ -1,11 +1,32 @@
#include <iostream>
#include <vector>
#include <thread>
#include <signal.h>
std::hash<std::thread::id> hash;
#include "debug/log.hpp"
#include "socket.hpp"
#include "worker.hpp"
std::array<io::Worker, 16> workers;
void exiting()
{
for(size_t i = 0; i < workers.size(); ++i)
{
auto n = workers[i].requests.load();
std::cout << "worker " << i << " responded to " << n << " requests" << std::endl;
}
}
void sigint_handler(int)
{
exiting();
std::abort();
}
#define MAXEVENTS 64
static int
@ -33,14 +54,29 @@ make_socket_non_blocking (int sfd)
int main(void)
{
std::vector<io::Worker> workers;
std::atexit(exiting);
signal(SIGINT, sigint_handler);
for(size_t i = 0; i < std::thread::hardware_concurrency(); ++i)
workers.emplace_back();
for(size_t i = 0; i < workers.size(); ++i)
{
auto& w = workers[i];
w.start();
}
/* size_t WORKERS = std::thread::hardware_concurrency(); */
/* std::vector<io::Worker> workers; */
/* workers.resize(WORKERS); */
/* for(size_t i = 0; i < WORKERS; ++i) */
/* { */
/* workers.push_back(std::move(io::Worker())); */
/* workers.back().start(); */
/* } */
int idx = 0;
auto socket = io::Socket::create("7474");
auto socket = io::Socket::create("0.0.0.0", "7474");
socket.set_non_blocking();
socket.listen(1024);
@ -72,8 +108,9 @@ int main(void)
{
int n, i;
LOG_DEBUG("MASTER WAITING = " << idx);
n = epoll_wait (efd, events, MAXEVENTS, -1);
LOG_DEBUG("MASTER WOKEN UP");
for (i = 0; i < n; i++)
{
if ((events[i].events & EPOLLERR) ||
@ -93,13 +130,10 @@ int main(void)
means one or more incoming connections. */
while (true)
{
LOG_DEBUG("IDX = " << idx);
idx = (idx + 1) % workers.size();
auto& worker = workers[i];
if(!worker.accept(socket))
if(!workers[idx].accept(socket))
break;
idx = (idx + 1) % workers.size();
}
/* struct sockaddr in_addr; */
@ -208,6 +242,5 @@ int main(void)
free (events);
return 0;
}

View File

@ -5,7 +5,7 @@
namespace io
{
const char* response = "HTTP/1.1 200 OK\r\nContent-Length:0\r\nConnection:Keep-Alive\r\n\r\n";
const char* response = "HTTP/1.1 200 OK\r\nContent-Length:0\r\nConnection:Keep-Alive\r\n\r\nHTTP/1.1 200 OK\r\nContent-Length:0\r\nConnection:Keep-Alive\r\n\r\n";
size_t len = strlen(response);
@ -14,11 +14,7 @@ class Worker : public Listener<Worker>
char buf[512];
public:
Worker() = default;
Worker(Worker&& other)
{
}
using Listener::Listener;
bool accept(Socket& socket)
{
@ -42,6 +38,8 @@ public:
delete stream;
}
std::atomic<int> requests {0};
void on_read(TcpStream* stream)
{
int done = 0;
@ -78,8 +76,10 @@ public:
int k = write(stream->socket, resp, len - sum);
sum += k;
resp += k;
}
requests.fetch_add(1, std::memory_order_relaxed);
}
if (done)