From 4a4784188eddf474a3b0bac2b7f66bf4c7626d6f Mon Sep 17 00:00:00 2001 From: Marko Budiselic Date: Thu, 8 Feb 2018 16:30:01 +0100 Subject: [PATCH] Add std::thread::hardware_concurrency() as a default number of workers to communication and rpc Reviewers: mferencevic Reviewed By: mferencevic Subscribers: pullbot, buda Differential Revision: https://phabricator.memgraph.io/D1185 --- src/communication/rpc/server.cpp | 5 +++-- src/communication/rpc/server.hpp | 11 ++++++----- src/communication/server.hpp | 19 +++++++++++-------- 3 files changed, 20 insertions(+), 15 deletions(-) diff --git a/src/communication/rpc/server.cpp b/src/communication/rpc/server.cpp index 752830a32..daec28e52 100644 --- a/src/communication/rpc/server.cpp +++ b/src/communication/rpc/server.cpp @@ -10,8 +10,9 @@ namespace communication::rpc { -System::System(const io::network::Endpoint &endpoint, const size_t worker_count) - : server_(endpoint, *this, worker_count) {} +System::System(const io::network::Endpoint &endpoint, + const size_t workers_count) + : server_(endpoint, *this, workers_count) {} System::~System() {} diff --git a/src/communication/rpc/server.hpp b/src/communication/rpc/server.hpp index 19e518988..32535ae4b 100644 --- a/src/communication/rpc/server.hpp +++ b/src/communication/rpc/server.hpp @@ -18,7 +18,8 @@ class Server; class System { public: - System(const io::network::Endpoint &endpoint, const size_t worker_count = 4); + System(const io::network::Endpoint &endpoint, + const size_t workers_count = std::thread::hardware_concurrency()); System(const System &) = delete; System(System &&) = delete; System &operator=(const System &) = delete; @@ -49,7 +50,8 @@ class System { class Server { public: - Server(System &system, const std::string &name, int workers_count = 4); + Server(System &system, const std::string &name, + int workers_count = std::thread::hardware_concurrency()); ~Server(); template @@ -65,9 +67,8 @@ class Server { "TRequestResponse::Response must be derived from Message"); auto callbacks_accessor = callbacks_.access(); auto got = callbacks_accessor.insert( - typeid(typename TRequestResponse::Request), [callback = callback]( - const Message - &base_message) { + typeid(typename TRequestResponse::Request), + [callback = callback](const Message &base_message) { const auto &message = dynamic_cast( base_message); diff --git a/src/communication/server.hpp b/src/communication/server.hpp index 0eae5cc0a..051bf4494 100644 --- a/src/communication/server.hpp +++ b/src/communication/server.hpp @@ -43,10 +43,10 @@ class Server { /** * Constructs and binds server to endpoint, operates on session data and - * invokes n workers + * invokes workers_count workers */ Server(const io::network::Endpoint &endpoint, TSessionData &session_data, - size_t n) + size_t workers_count = std::thread::hardware_concurrency()) : session_data_(session_data) { // Without server we can't continue with application so we can just // terminate here. @@ -58,10 +58,11 @@ class Server { if (!socket_.Listen(1024)) { LOG(FATAL) << "Cannot listen on socket!"; } - working_thread_ = std::thread([this, n]() { - std::cout << fmt::format("Starting {} workers", n) << std::endl; - workers_.reserve(n); - for (size_t i = 0; i < n; ++i) { + working_thread_ = std::thread([this, workers_count]() { + std::cout << fmt::format("Starting {} workers", workers_count) + << std::endl; + workers_.reserve(workers_count); + for (size_t i = 0; i < workers_count; ++i) { workers_.push_back(std::make_unique(session_data_)); worker_threads_.emplace_back( [this](WorkerT &worker) -> void { worker.Start(alive_); }, @@ -74,9 +75,11 @@ class Server { << std::endl; std::vector> acceptors; - acceptors.emplace_back(std::make_unique(socket_, *this)); + acceptors.emplace_back( + std::make_unique(socket_, *this)); auto &acceptor = *acceptors.back().get(); - io::network::SocketEventDispatcher dispatcher{acceptors}; + io::network::SocketEventDispatcher dispatcher{ + acceptors}; dispatcher.AddListener(socket_.fd(), acceptor, EPOLLIN); while (alive_) { dispatcher.WaitAndProcessEvents();