From 007a7f1a6d5b718438fd79ad6cf57a7544d4779d Mon Sep 17 00:00:00 2001 From: Dominik Gleich Date: Wed, 10 Jan 2018 13:56:12 +0100 Subject: [PATCH] Change network design from Start/Shutdown to constructor/destructor Summary: Make ServerT start on constructor Remove shutdown from MasterCoordinator Distributed system remove Shutdown Rcp server start and shutdown removed Reviewers: florijan, mferencevic Reviewed By: mferencevic Subscribers: mferencevic, pullbot Differential Revision: https://phabricator.memgraph.io/D1097 --- src/communication/messaging/distributed.cpp | 17 +--- src/communication/messaging/distributed.hpp | 2 - src/communication/raft/raft-inl.hpp | 2 - src/communication/raft/raft.hpp | 6 +- src/communication/raft/rpc.hpp | 51 +++++------- src/communication/raft/test_utils.hpp | 4 - src/communication/rpc/rpc.cpp | 13 ++-- src/communication/rpc/rpc.hpp | 20 ++--- src/communication/server.hpp | 78 +++++++++++-------- src/database/counters.cpp | 3 - src/database/counters.hpp | 2 - src/database/graph_db.cpp | 1 - src/distributed/coordination_master.cpp | 8 +- src/distributed/coordination_master.hpp | 2 +- src/distributed/coordination_worker.cpp | 3 - src/distributed/coordination_worker.hpp | 3 - src/memgraph_bolt.cpp | 17 ++-- src/storage/concurrent_id_mapper_master.cpp | 8 +- src/storage/concurrent_id_mapper_master.hpp | 1 - src/transactions/engine_master.cpp | 11 --- src/transactions/engine_master.hpp | 6 -- tests/concurrent/network_read_hang.cpp | 9 +-- tests/concurrent/network_server.cpp | 9 +-- tests/concurrent/network_session_leak.cpp | 9 +-- tests/distributed/raft/example_client.cpp | 4 +- tests/distributed/raft/example_server.cpp | 7 +- tests/manual/raft_rpc.cpp | 2 - .../unit/concurrent_id_mapper_distributed.cpp | 4 +- tests/unit/counters.cpp | 6 -- tests/unit/distributed_coordination.cpp | 63 ++++++++------- tests/unit/messaging_distributed.cpp | 12 +-- tests/unit/network_timeouts.cpp | 12 +-- tests/unit/raft.cpp | 2 +- tests/unit/rpc.cpp | 10 --- tests/unit/transaction_engine_worker.cpp | 5 -- 35 files changed, 140 insertions(+), 272 deletions(-) diff --git a/src/communication/messaging/distributed.cpp b/src/communication/messaging/distributed.cpp index 9d42ec2f3..d944b2d06 100644 --- a/src/communication/messaging/distributed.cpp +++ b/src/communication/messaging/distributed.cpp @@ -4,7 +4,7 @@ namespace communication::messaging { System::System(const std::string &address, uint16_t port) : endpoint_(address, port) { - // Numbers of worker are quite arbitrary at the point. + // Numbers of workers is quite arbitrary at this point. StartClient(4); StartServer(4); } @@ -13,15 +13,10 @@ System::System(const io::network::NetworkEndpoint &endpoint) : System(endpoint.address(), endpoint.port()) {} System::~System() { + queue_.Shutdown(); for (size_t i = 0; i < pool_.size(); ++i) { pool_[i].join(); } - thread_.join(); -} - -void System::Shutdown() { - queue_.Shutdown(); - server_->Shutdown(); } void System::StartClient(int worker_count) { @@ -44,12 +39,8 @@ void System::StartServer(int worker_count) { } // Initialize server. - server_ = std::make_unique(endpoint_, protocol_data_); + server_ = std::make_unique(endpoint_, protocol_data_, worker_count); endpoint_ = server_->endpoint(); - - // Start server. - thread_ = std::thread( - [worker_count, this]() { this->server_->Start(worker_count); }); } std::shared_ptr System::Open(const std::string &name) { @@ -63,4 +54,4 @@ Writer::Writer(System &system, const std::string &address, uint16_t port, void Writer::Send(std::unique_ptr message) { system_.queue_.Emplace(address_, port_, name_, std::move(message)); } -} +} // namespace communication::messaging diff --git a/src/communication/messaging/distributed.hpp b/src/communication/messaging/distributed.hpp index c838450c6..49dd80b50 100644 --- a/src/communication/messaging/distributed.hpp +++ b/src/communication/messaging/distributed.hpp @@ -66,7 +66,6 @@ class System { ~System(); std::shared_ptr Open(const std::string &name); - void Shutdown(); const io::network::NetworkEndpoint &endpoint() const { return endpoint_; } @@ -107,7 +106,6 @@ class System { Queue queue_; // Server variables. - std::thread thread_; SessionData protocol_data_; std::unique_ptr server_{nullptr}; io::network::NetworkEndpoint endpoint_; diff --git a/src/communication/raft/raft-inl.hpp b/src/communication/raft/raft-inl.hpp index f9836f9f7..6659bf736 100644 --- a/src/communication/raft/raft-inl.hpp +++ b/src/communication/raft/raft-inl.hpp @@ -678,8 +678,6 @@ RaftMember::~RaftMember() { for (auto &peer_thread : peer_threads_) { peer_thread.join(); } - - network_.Shutdown(); } template diff --git a/src/communication/raft/raft.hpp b/src/communication/raft/raft.hpp index 311b6dc23..8b088374d 100644 --- a/src/communication/raft/raft.hpp +++ b/src/communication/raft/raft.hpp @@ -130,10 +130,6 @@ class RaftNetworkInterface { /* This will be called once the RaftMember is ready to start receiving RPCs. */ virtual void Start(RaftMember &member) = 0; - - /* This will be called when RaftMember is exiting. RPC handlers should not be - * called anymore. */ - virtual void Shutdown() = 0; }; template @@ -252,7 +248,7 @@ class RaftMemberImpl { std::mt19937_64 rng_ = std::mt19937_64(std::random_device{}()); }; -} // namespace internal +} // namespace impl template class RaftMember final { diff --git a/src/communication/raft/rpc.hpp b/src/communication/raft/rpc.hpp index 201b117be..ec51b1281 100644 --- a/src/communication/raft/rpc.hpp +++ b/src/communication/raft/rpc.hpp @@ -35,33 +35,25 @@ class RpcNetwork : public RaftNetworkInterface { directory_(std::move(directory)), server_(system, kRaftChannelName) {} - ~RpcNetwork() { - DCHECK(!is_running_) - << "`Shutdown()` should be called before destructing `RpcNetwork`"; - /* We don't want to call `Shutdown` here, instead we push that - * responsibility to caller of `Start`, so `server_` doesn't end up holding - * a reference to a destructed `RaftMember`. */ - } - virtual void Start(RaftMember &member) override { - server_.Register>([&member]( - const PeerRpcRequest &request) { - auto reply = std::make_unique(); - reply->type = request.type; - switch (request.type) { - case RpcType::REQUEST_VOTE: - reply->request_vote = member.OnRequestVote(request.request_vote); - break; - case RpcType::APPEND_ENTRIES: - reply->append_entries = - member.OnAppendEntries(request.append_entries); - break; - default: - LOG(ERROR) << "Unknown RPC type: " << static_cast(request.type); - } - return reply; - }); - server_.Start(); + server_.Register>( + [&member](const PeerRpcRequest &request) { + auto reply = std::make_unique(); + reply->type = request.type; + switch (request.type) { + case RpcType::REQUEST_VOTE: + reply->request_vote = member.OnRequestVote(request.request_vote); + break; + case RpcType::APPEND_ENTRIES: + reply->append_entries = + member.OnAppendEntries(request.append_entries); + break; + default: + LOG(ERROR) << "Unknown RPC type: " + << static_cast(request.type); + } + return reply; + }); } virtual bool SendRequestVote(const MemberId &recipient, @@ -126,19 +118,12 @@ class RpcNetwork : public RaftNetworkInterface { return it->second; } - virtual void Shutdown() override { - is_running_ = false; - server_.Shutdown(); - } - communication::messaging::System &system_; // TODO(mtomic): how to update and distribute this? std::unordered_map directory_; rpc::Server server_; std::unordered_map clients_; - - bool is_running_ = true; }; } // namespace communication::raft diff --git a/src/communication/raft/test_utils.hpp b/src/communication/raft/test_utils.hpp index 6258f72b9..dea7676b0 100644 --- a/src/communication/raft/test_utils.hpp +++ b/src/communication/raft/test_utils.hpp @@ -67,8 +67,6 @@ class NoOpNetworkInterface : public RaftNetworkInterface { } virtual void Start(RaftMember &) override {} - - virtual void Shutdown() override {} }; /* `NextReplyNetworkInterface` has two fields: `on_request_` and `next_reply_` @@ -116,8 +114,6 @@ class NextReplyNetworkInterface : public RaftNetworkInterface { virtual void Start(RaftMember &) override {} - virtual void Shutdown() override {} - std::function &)> on_request_; std::experimental::optional next_reply_; }; diff --git a/src/communication/rpc/rpc.cpp b/src/communication/rpc/rpc.cpp index e4511c4a1..dc9ec21f3 100644 --- a/src/communication/rpc/rpc.cpp +++ b/src/communication/rpc/rpc.cpp @@ -123,12 +123,8 @@ std::unique_ptr Client::Call( } Server::Server(messaging::System &system, const std::string &name) - : system_(system), stream_(system.Open(kProtocolStreamPrefix + name)) {} - -void Server::Start() { + : system_(system), stream_(system.Open(kProtocolStreamPrefix + name)) { // TODO: Add logging. - CHECK(started_ == false) << "Server can't be started multiple times"; - started_ = true; running_thread_ = std::thread([this]() { while (alive_) { auto message = stream_->Await(); @@ -136,8 +132,9 @@ void Server::Start() { auto *request = dynamic_cast(message.get()); if (!request) continue; auto &real_request = request->message(); - auto it = callbacks_.find(real_request.type_index()); - if (it == callbacks_.end()) continue; + auto callbacks_accessor = callbacks_.access(); + auto it = callbacks_accessor.find(real_request.type_index()); + if (it == callbacks_accessor.end()) continue; auto response = it->second(real_request); messaging::Writer writer(system_, request->address(), request->port(), request->stream()); @@ -146,7 +143,7 @@ void Server::Start() { }); } -void Server::Shutdown() { +Server::~Server() { alive_ = false; stream_->Shutdown(); if (running_thread_.joinable()) running_thread_.join(); diff --git a/src/communication/rpc/rpc.hpp b/src/communication/rpc/rpc.hpp index 021e52742..278da664e 100644 --- a/src/communication/rpc/rpc.hpp +++ b/src/communication/rpc/rpc.hpp @@ -3,6 +3,7 @@ #include #include "communication/messaging/distributed.hpp" +#include "data_structures/concurrent/concurrent_map.hpp" #include "io/network/network_endpoint.hpp" namespace communication::rpc { @@ -60,6 +61,7 @@ class Client { class Server { public: Server(messaging::System &system, const std::string &name); + ~Server(); template void Register( @@ -72,9 +74,11 @@ class Server { static_assert(std::is_base_of::value, "TRequestResponse::Response must be derived from Message"); - auto got = callbacks_.emplace( - typeid(typename TRequestResponse::Request), - [callback = callback](const messaging::Message &base_message) { + auto callbacks_accessor = callbacks_.access(); + auto got = callbacks_accessor.insert( + typeid(typename TRequestResponse::Request), [callback = callback]( + const messaging::Message + &base_message) { const auto &message = dynamic_cast( base_message); @@ -83,20 +87,16 @@ class Server { CHECK(got.second) << "Callback for that message type already registered"; } - void Start(); - void Shutdown(); - private: messaging::System &system_; std::shared_ptr stream_; - std::unordered_map( - const messaging::Message &)>> + ConcurrentMap( + const messaging::Message &)>> callbacks_; std::atomic alive_{true}; std::thread running_thread_; - bool started_{false}; }; } // namespace communication::rpc diff --git a/src/communication/server.hpp b/src/communication/server.hpp index b66445b60..ab5ac2fd0 100644 --- a/src/communication/server.hpp +++ b/src/communication/server.hpp @@ -22,8 +22,9 @@ namespace communication { /** * Communication server. - * Listens for incomming connections on the server port and assings them in a - * round-robin manner to it's workers. + * Listens for incomming connections on the server port and assigns them in a + * round-robin manner to it's workers. Started automatically on constructor, and + * stopped at destructor. * * Current Server achitecture: * incomming connection -> server -> worker -> session @@ -40,8 +41,12 @@ class Server { using WorkerT = Worker; using Socket = io::network::Socket; + /** + * Constructs and binds server to endpoint, operates on session data and + * invokes n workers + */ Server(const io::network::NetworkEndpoint &endpoint, - TSessionData &session_data) + TSessionData &session_data, size_t n) : session_data_(session_data) { // Without server we can't continue with application so we can just // terminate here. @@ -53,44 +58,54 @@ class Server { if (!socket_.Listen(1024)) { LOG(FATAL) << "Cannot listen on socket!"; } + working_thread_ = std::thread([this, n]() { + std::cout << fmt::format("Starting {} workers", n) << std::endl; + workers_.reserve(n); + for (size_t i = 0; i < n; ++i) { + workers_.push_back(std::make_unique(session_data_)); + worker_threads_.emplace_back( + [this](WorkerT &worker) -> void { worker.Start(alive_); }, + std::ref(*workers_.back())); + } + std::cout << "Server is fully armed and operational" << std::endl; + std::cout << fmt::format("Listening on {} at {}", + socket_.endpoint().address(), + socket_.endpoint().port()) + << std::endl; + + io::network::SocketEventDispatcher dispatcher; + ConnectionAcceptor acceptor(socket_, *this); + dispatcher.AddListener(socket_.fd(), acceptor, EPOLLIN); + while (alive_) { + dispatcher.WaitAndProcessEvents(); + } + + std::cout << "Shutting down..." << std::endl; + for (auto &worker_thread : worker_threads_) { + worker_thread.join(); + } + }); + } + + ~Server() { + Shutdown(); + AwaitShutdown(); } const auto &endpoint() const { return socket_.endpoint(); } - void Start(size_t n) { - std::cout << fmt::format("Starting {} workers", n) << std::endl; - workers_.reserve(n); - for (size_t i = 0; i < n; ++i) { - workers_.push_back(std::make_unique(session_data_)); - worker_threads_.emplace_back( - [this](WorkerT &worker) -> void { worker.Start(alive_); }, - std::ref(*workers_.back())); - } - std::cout << "Server is fully armed and operational" << std::endl; - std::cout << fmt::format("Listening on {} at {}", - socket_.endpoint().address(), - socket_.endpoint().port()) - << std::endl; - - io::network::SocketEventDispatcher dispatcher; - ConnectionAcceptor acceptor(socket_, *this); - dispatcher.AddListener(socket_.fd(), acceptor, EPOLLIN); - while (alive_) { - dispatcher.WaitAndProcessEvents(); - } - - std::cout << "Shutting down..." << std::endl; - for (auto &worker_thread : worker_threads_) { - worker_thread.join(); - } - } - + /// Stops server manually void Shutdown() { // This should be as simple as possible, so that it can be called inside a // signal handler. alive_.store(false); } + /// Waits for the server to be signaled to shutdown + void AwaitShutdown() { + if (working_thread_.joinable()) working_thread_.join(); + } + private: class ConnectionAcceptor : public io::network::BaseListener { public: @@ -135,6 +150,7 @@ class Server { std::vector> workers_; std::vector worker_threads_; + std::thread working_thread_; std::atomic alive_{true}; int idx_{0}; diff --git a/src/database/counters.cpp b/src/database/counters.cpp index 799634568..e4fbb2cbf 100644 --- a/src/database/counters.cpp +++ b/src/database/counters.cpp @@ -44,9 +44,6 @@ MasterCounters::MasterCounters(communication::messaging::System &system) }); } -void MasterCounters::Start() { rpc_server_.Start(); } -void MasterCounters::Shutdown() { rpc_server_.Shutdown(); } - WorkerCounters::WorkerCounters( communication::messaging::System &system, const io::network::NetworkEndpoint &master_endpoint) diff --git a/src/database/counters.hpp b/src/database/counters.hpp index 82e3533dc..c79e8b358 100644 --- a/src/database/counters.hpp +++ b/src/database/counters.hpp @@ -46,8 +46,6 @@ class SingleNodeCounters : public Counters { class MasterCounters : public SingleNodeCounters { public: MasterCounters(communication::messaging::System &system); - void Start(); - void Shutdown(); private: communication::rpc::Server rpc_server_; diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index 1490b05c1..fef508a34 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -35,7 +35,6 @@ GraphDb::GraphDb(communication::messaging::System &system, tx_engine->StartServer(system); tx_engine_ = std::move(tx_engine); auto counters = std::make_unique(system); - counters->Start(); counters_ = std::move(counters); INIT_MAPPERS(storage::MasterConcurrentIdMapper, system); get_endpoint_ = [&master](int worker_id) { diff --git a/src/distributed/coordination_master.cpp b/src/distributed/coordination_master.cpp index dd5080388..49dd8c7ff 100644 --- a/src/distributed/coordination_master.cpp +++ b/src/distributed/coordination_master.cpp @@ -12,7 +12,6 @@ MasterCoordination::MasterCoordination(communication::messaging::System &system) server_.Register([this](const GetEndpointReq &req) { return std::make_unique(GetEndpoint(req.member)); }); - server_.Start(); } int MasterCoordination::RegisterWorker(int desired_worker_id, @@ -33,7 +32,7 @@ int MasterCoordination::RegisterWorker(int desired_worker_id, return worker_id; } -void MasterCoordination::Shutdown() { +MasterCoordination::~MasterCoordination() { std::lock_guard guard(lock_); for (const auto &kv : workers_) { communication::rpc::Client client(system_, kv.second, @@ -41,14 +40,13 @@ void MasterCoordination::Shutdown() { auto result = client.Call(100ms); CHECK(result) << "Failed to shut down worker: " << kv.first; } - server_.Shutdown(); } Endpoint MasterCoordination::GetEndpoint(int worker_id) const { std::lock_guard guard(lock_); auto found = workers_.find(worker_id); - CHECK(found != workers_.end()) << "No endpoint registered for worker id: " - << worker_id; + CHECK(found != workers_.end()) + << "No endpoint registered for worker id: " << worker_id; return found->second; } } // namespace distributed diff --git a/src/distributed/coordination_master.hpp b/src/distributed/coordination_master.hpp index 6f1b17688..bf84b0141 100644 --- a/src/distributed/coordination_master.hpp +++ b/src/distributed/coordination_master.hpp @@ -29,7 +29,7 @@ class MasterCoordination { MasterCoordination(communication::messaging::System &system); /** Shuts down all the workers and this master server. */ - void Shutdown(); + ~MasterCoordination(); /** Returns the Endpoint for the given worker_id. */ Endpoint GetEndpoint(int worker_id) const; diff --git a/src/distributed/coordination_worker.cpp b/src/distributed/coordination_worker.cpp index 22bc73cb5..892049d45 100644 --- a/src/distributed/coordination_worker.cpp +++ b/src/distributed/coordination_worker.cpp @@ -40,7 +40,6 @@ void WorkerCoordination::WaitForShutdown() { cv.notify_one(); return std::make_unique(); }); - server_.Start(); std::unique_lock lk(mutex); cv.wait(lk, [&shutdown] { return shutdown; }); @@ -50,6 +49,4 @@ void WorkerCoordination::WaitForShutdown() { // convention, but maybe better... std::this_thread::sleep_for(100ms); }; - -void WorkerCoordination::Shutdown() { server_.Shutdown(); } } // namespace distributed diff --git a/src/distributed/coordination_worker.hpp b/src/distributed/coordination_worker.hpp index dedaac16d..ad615eb8a 100644 --- a/src/distributed/coordination_worker.hpp +++ b/src/distributed/coordination_worker.hpp @@ -29,9 +29,6 @@ class WorkerCoordination { * Blocks the calling thread until that has finished. */ void WaitForShutdown(); - /** Shuts the RPC server down. */ - void Shutdown(); - private: communication::messaging::System &system_; communication::rpc::Client client_; diff --git a/src/memgraph_bolt.cpp b/src/memgraph_bolt.cpp index 1d929bea1..fc9e89eb1 100644 --- a/src/memgraph_bolt.cpp +++ b/src/memgraph_bolt.cpp @@ -124,25 +124,23 @@ void MasterMain() { // Bolt server stuff. SessionData session_data{system, master}; NetworkEndpoint endpoint(FLAGS_interface, FLAGS_port); - ServerT server(endpoint, session_data); + ServerT server(endpoint, session_data, FLAGS_num_workers); // Handler for regular termination signals - auto shutdown = [&server, &session_data, &master, &system] { + auto shutdown = [&server, &session_data] { if (is_shutting_down) return; is_shutting_down = 1; // Server needs to be shutdown first and then the database. This prevents a // race condition when a transaction is accepted during server shutdown. server.Shutdown(); session_data.db.Shutdown(); - master.Shutdown(); - system.Shutdown(); - }; + InitSignalHandlers(shutdown); StartMemWarningLogger(); - server.Start(FLAGS_num_workers); + server.AwaitShutdown(); } void WorkerMain() { @@ -162,16 +160,13 @@ void WorkerMain() { // Wait for the shutdown command from the master. worker.WaitForShutdown(); } - - worker.Shutdown(); - system.Shutdown(); } void SingleNodeMain() { google::SetUsageMessage("Memgraph single-node database server"); SessionData session_data; NetworkEndpoint endpoint(FLAGS_interface, FLAGS_port); - ServerT server(endpoint, session_data); + ServerT server(endpoint, session_data, FLAGS_num_workers); // Handler for regular termination signals auto shutdown = [&server, &session_data] { @@ -186,7 +181,7 @@ void SingleNodeMain() { StartMemWarningLogger(); - server.Start(FLAGS_num_workers); + server.AwaitShutdown(); } int main(int argc, char **argv) { diff --git a/src/storage/concurrent_id_mapper_master.cpp b/src/storage/concurrent_id_mapper_master.cpp index ee01b2448..73b067a71 100644 --- a/src/storage/concurrent_id_mapper_master.cpp +++ b/src/storage/concurrent_id_mapper_master.cpp @@ -27,19 +27,13 @@ ID_VALUE_RPC_CALLS(Label) ID_VALUE_RPC_CALLS(EdgeType) ID_VALUE_RPC_CALLS(Property) #undef ID_VALUE_RPC -} +} // namespace template MasterConcurrentIdMapper::MasterConcurrentIdMapper( communication::messaging::System &system) : rpc_server_(system, kConcurrentIdMapperRpc) { RegisterRpc(*this, rpc_server_); - rpc_server_.Start(); -} - -template -MasterConcurrentIdMapper::~MasterConcurrentIdMapper() { - rpc_server_.Shutdown(); } template class MasterConcurrentIdMapper