#pragma once #include #include #include #include #include "dbms/dbms.hpp" #include "query/engine.hpp" #include "communication/worker.hpp" #include "io/network/event_listener.hpp" #include "logging/default.hpp" #include "utils/assert.hpp" namespace communication { /** * Communication server. * Listens for incomming connections on the server port and assings them in a * round-robin manner to it's workers. * * Current Server achitecture: * incomming connection -> server -> worker -> session * * @tparam Session the server can handle different Sessions, each session * represents a different protocol so the same network infrastructure * can be used for handling different protocols * @tparam OutputStream the server has to get the output stream as a template parameter because the output stream is templated * @tparam Socket the input/output socket that should be used */ template class Server : public io::network::EventListener> { using Event = io::network::Epoll::Event; public: Server(Socket &&socket, Dbms &dbms, QueryEngine &query_engine) : dbms_(dbms), query_engine_(query_engine), socket_(std::forward(socket)), logger_(logging::log->logger("communication::Server")) { event_.data.fd = socket_; // TODO: EPOLLET is hard to use -> figure out how should EPOLLET be used // event.events = EPOLLIN | EPOLLET; event_.events = EPOLLIN; this->listener_.Add(socket_, &event_); } void Start(size_t n) { workers_.reserve(n); for (size_t i = 0; i < n; ++i) { workers_.push_back( std::make_shared>( dbms_, query_engine_)); workers_.back()->Start(alive_); } while (alive_) { this->WaitAndProcessEvents(); } } void Shutdown() { alive_.store(false); for (auto &worker : workers_) worker->thread_.join(); } void OnConnect() { debug_assert(idx_ < workers_.size(), "Invalid worker id."); logger_.trace("on connect"); if (UNLIKELY(!workers_[idx_]->Accept(socket_))) return; idx_ = idx_ == (int)workers_.size() - 1 ? 0 : idx_ + 1; } void OnWaitTimeout() {} void OnDataEvent(Event &event) { if (UNLIKELY(socket_ != event.data.fd)) return; this->derived().OnConnect(); } template void OnExceptionEvent(Event &event, Args &&... args) { // TODO: Do something about it logger_.warn("epoll exception"); } void OnCloseEvent(Event &event) { close(event.data.fd); } void OnErrorEvent(Event &event) { close(event.data.fd); } private: std::vector::sptr> workers_; std::atomic alive_{true}; int idx_{0}; Dbms &dbms_; QueryEngine &query_engine_; Event event_; Socket socket_; Logger logger_; }; }