Add std:🧵:hardware_concurrency() as a default number of workers to communication and rpc
Reviewers: mferencevic Reviewed By: mferencevic Subscribers: pullbot, buda Differential Revision: https://phabricator.memgraph.io/D1185
This commit is contained in:
parent
0639c44ed7
commit
4a4784188e
@ -10,8 +10,9 @@
|
|||||||
|
|
||||||
namespace communication::rpc {
|
namespace communication::rpc {
|
||||||
|
|
||||||
System::System(const io::network::Endpoint &endpoint, const size_t worker_count)
|
System::System(const io::network::Endpoint &endpoint,
|
||||||
: server_(endpoint, *this, worker_count) {}
|
const size_t workers_count)
|
||||||
|
: server_(endpoint, *this, workers_count) {}
|
||||||
|
|
||||||
System::~System() {}
|
System::~System() {}
|
||||||
|
|
||||||
|
@ -18,7 +18,8 @@ class Server;
|
|||||||
|
|
||||||
class System {
|
class System {
|
||||||
public:
|
public:
|
||||||
System(const io::network::Endpoint &endpoint, const size_t worker_count = 4);
|
System(const io::network::Endpoint &endpoint,
|
||||||
|
const size_t workers_count = std::thread::hardware_concurrency());
|
||||||
System(const System &) = delete;
|
System(const System &) = delete;
|
||||||
System(System &&) = delete;
|
System(System &&) = delete;
|
||||||
System &operator=(const System &) = delete;
|
System &operator=(const System &) = delete;
|
||||||
@ -49,7 +50,8 @@ class System {
|
|||||||
|
|
||||||
class Server {
|
class Server {
|
||||||
public:
|
public:
|
||||||
Server(System &system, const std::string &name, int workers_count = 4);
|
Server(System &system, const std::string &name,
|
||||||
|
int workers_count = std::thread::hardware_concurrency());
|
||||||
~Server();
|
~Server();
|
||||||
|
|
||||||
template <typename TRequestResponse>
|
template <typename TRequestResponse>
|
||||||
@ -65,9 +67,8 @@ class Server {
|
|||||||
"TRequestResponse::Response must be derived from Message");
|
"TRequestResponse::Response must be derived from Message");
|
||||||
auto callbacks_accessor = callbacks_.access();
|
auto callbacks_accessor = callbacks_.access();
|
||||||
auto got = callbacks_accessor.insert(
|
auto got = callbacks_accessor.insert(
|
||||||
typeid(typename TRequestResponse::Request), [callback = callback](
|
typeid(typename TRequestResponse::Request),
|
||||||
const Message
|
[callback = callback](const Message &base_message) {
|
||||||
&base_message) {
|
|
||||||
const auto &message =
|
const auto &message =
|
||||||
dynamic_cast<const typename TRequestResponse::Request &>(
|
dynamic_cast<const typename TRequestResponse::Request &>(
|
||||||
base_message);
|
base_message);
|
||||||
|
@ -43,10 +43,10 @@ class Server {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs and binds server to endpoint, operates on session data and
|
* Constructs and binds server to endpoint, operates on session data and
|
||||||
* invokes n workers
|
* invokes workers_count workers
|
||||||
*/
|
*/
|
||||||
Server(const io::network::Endpoint &endpoint, TSessionData &session_data,
|
Server(const io::network::Endpoint &endpoint, TSessionData &session_data,
|
||||||
size_t n)
|
size_t workers_count = std::thread::hardware_concurrency())
|
||||||
: session_data_(session_data) {
|
: session_data_(session_data) {
|
||||||
// Without server we can't continue with application so we can just
|
// Without server we can't continue with application so we can just
|
||||||
// terminate here.
|
// terminate here.
|
||||||
@ -58,10 +58,11 @@ class Server {
|
|||||||
if (!socket_.Listen(1024)) {
|
if (!socket_.Listen(1024)) {
|
||||||
LOG(FATAL) << "Cannot listen on socket!";
|
LOG(FATAL) << "Cannot listen on socket!";
|
||||||
}
|
}
|
||||||
working_thread_ = std::thread([this, n]() {
|
working_thread_ = std::thread([this, workers_count]() {
|
||||||
std::cout << fmt::format("Starting {} workers", n) << std::endl;
|
std::cout << fmt::format("Starting {} workers", workers_count)
|
||||||
workers_.reserve(n);
|
<< std::endl;
|
||||||
for (size_t i = 0; i < n; ++i) {
|
workers_.reserve(workers_count);
|
||||||
|
for (size_t i = 0; i < workers_count; ++i) {
|
||||||
workers_.push_back(std::make_unique<WorkerT>(session_data_));
|
workers_.push_back(std::make_unique<WorkerT>(session_data_));
|
||||||
worker_threads_.emplace_back(
|
worker_threads_.emplace_back(
|
||||||
[this](WorkerT &worker) -> void { worker.Start(alive_); },
|
[this](WorkerT &worker) -> void { worker.Start(alive_); },
|
||||||
@ -74,9 +75,11 @@ class Server {
|
|||||||
<< std::endl;
|
<< std::endl;
|
||||||
|
|
||||||
std::vector<std::unique_ptr<ConnectionAcceptor>> acceptors;
|
std::vector<std::unique_ptr<ConnectionAcceptor>> acceptors;
|
||||||
acceptors.emplace_back(std::make_unique<ConnectionAcceptor>(socket_, *this));
|
acceptors.emplace_back(
|
||||||
|
std::make_unique<ConnectionAcceptor>(socket_, *this));
|
||||||
auto &acceptor = *acceptors.back().get();
|
auto &acceptor = *acceptors.back().get();
|
||||||
io::network::SocketEventDispatcher<ConnectionAcceptor> dispatcher{acceptors};
|
io::network::SocketEventDispatcher<ConnectionAcceptor> dispatcher{
|
||||||
|
acceptors};
|
||||||
dispatcher.AddListener(socket_.fd(), acceptor, EPOLLIN);
|
dispatcher.AddListener(socket_.fd(), acceptor, EPOLLIN);
|
||||||
while (alive_) {
|
while (alive_) {
|
||||||
dispatcher.WaitAndProcessEvents();
|
dispatcher.WaitAndProcessEvents();
|
||||||
|
Loading…
Reference in New Issue
Block a user