diff --git a/src/communication/messaging/distributed.cpp b/src/communication/messaging/distributed.cpp index 9d42ec2f3..d944b2d06 100644 --- a/src/communication/messaging/distributed.cpp +++ b/src/communication/messaging/distributed.cpp @@ -4,7 +4,7 @@ namespace communication::messaging { System::System(const std::string &address, uint16_t port) : endpoint_(address, port) { - // Numbers of worker are quite arbitrary at the point. + // Numbers of workers is quite arbitrary at this point. StartClient(4); StartServer(4); } @@ -13,15 +13,10 @@ System::System(const io::network::NetworkEndpoint &endpoint) : System(endpoint.address(), endpoint.port()) {} System::~System() { + queue_.Shutdown(); for (size_t i = 0; i < pool_.size(); ++i) { pool_[i].join(); } - thread_.join(); -} - -void System::Shutdown() { - queue_.Shutdown(); - server_->Shutdown(); } void System::StartClient(int worker_count) { @@ -44,12 +39,8 @@ void System::StartServer(int worker_count) { } // Initialize server. - server_ = std::make_unique<ServerT>(endpoint_, protocol_data_); + server_ = std::make_unique<ServerT>(endpoint_, protocol_data_, worker_count); endpoint_ = server_->endpoint(); - - // Start server. - thread_ = std::thread( - [worker_count, this]() { this->server_->Start(worker_count); }); } std::shared_ptr<EventStream> System::Open(const std::string &name) { @@ -63,4 +54,4 @@ Writer::Writer(System &system, const std::string &address, uint16_t port, void Writer::Send(std::unique_ptr<Message> message) { system_.queue_.Emplace(address_, port_, name_, std::move(message)); } -} +} // namespace communication::messaging diff --git a/src/communication/messaging/distributed.hpp b/src/communication/messaging/distributed.hpp index c838450c6..49dd80b50 100644 --- a/src/communication/messaging/distributed.hpp +++ b/src/communication/messaging/distributed.hpp @@ -66,7 +66,6 @@ class System { ~System(); std::shared_ptr<EventStream> Open(const std::string &name); - void Shutdown(); const io::network::NetworkEndpoint &endpoint() const { return endpoint_; } @@ -107,7 +106,6 @@ class System { Queue<NetworkMessage> queue_; // Server variables. - std::thread thread_; SessionData protocol_data_; std::unique_ptr<ServerT> server_{nullptr}; io::network::NetworkEndpoint endpoint_; diff --git a/src/communication/raft/raft-inl.hpp b/src/communication/raft/raft-inl.hpp index f9836f9f7..6659bf736 100644 --- a/src/communication/raft/raft-inl.hpp +++ b/src/communication/raft/raft-inl.hpp @@ -678,8 +678,6 @@ RaftMember<State>::~RaftMember() { for (auto &peer_thread : peer_threads_) { peer_thread.join(); } - - network_.Shutdown(); } template <class State> diff --git a/src/communication/raft/raft.hpp b/src/communication/raft/raft.hpp index 311b6dc23..8b088374d 100644 --- a/src/communication/raft/raft.hpp +++ b/src/communication/raft/raft.hpp @@ -130,10 +130,6 @@ class RaftNetworkInterface { /* This will be called once the RaftMember is ready to start receiving RPCs. */ virtual void Start(RaftMember<State> &member) = 0; - - /* This will be called when RaftMember is exiting. RPC handlers should not be - * called anymore. */ - virtual void Shutdown() = 0; }; template <class State> @@ -252,7 +248,7 @@ class RaftMemberImpl { std::mt19937_64 rng_ = std::mt19937_64(std::random_device{}()); }; -} // namespace internal +} // namespace impl template <class State> class RaftMember final { diff --git a/src/communication/raft/rpc.hpp b/src/communication/raft/rpc.hpp index 201b117be..ec51b1281 100644 --- a/src/communication/raft/rpc.hpp +++ b/src/communication/raft/rpc.hpp @@ -35,33 +35,25 @@ class RpcNetwork : public RaftNetworkInterface<State> { directory_(std::move(directory)), server_(system, kRaftChannelName) {} - ~RpcNetwork() { - DCHECK(!is_running_) - << "`Shutdown()` should be called before destructing `RpcNetwork`"; - /* We don't want to call `Shutdown` here, instead we push that - * responsibility to caller of `Start`, so `server_` doesn't end up holding - * a reference to a destructed `RaftMember`. */ - } - virtual void Start(RaftMember<State> &member) override { - server_.Register<PeerProtocol<State>>([&member]( - const PeerRpcRequest<State> &request) { - auto reply = std::make_unique<PeerRpcReply>(); - reply->type = request.type; - switch (request.type) { - case RpcType::REQUEST_VOTE: - reply->request_vote = member.OnRequestVote(request.request_vote); - break; - case RpcType::APPEND_ENTRIES: - reply->append_entries = - member.OnAppendEntries(request.append_entries); - break; - default: - LOG(ERROR) << "Unknown RPC type: " << static_cast<int>(request.type); - } - return reply; - }); - server_.Start(); + server_.Register<PeerProtocol<State>>( + [&member](const PeerRpcRequest<State> &request) { + auto reply = std::make_unique<PeerRpcReply>(); + reply->type = request.type; + switch (request.type) { + case RpcType::REQUEST_VOTE: + reply->request_vote = member.OnRequestVote(request.request_vote); + break; + case RpcType::APPEND_ENTRIES: + reply->append_entries = + member.OnAppendEntries(request.append_entries); + break; + default: + LOG(ERROR) << "Unknown RPC type: " + << static_cast<int>(request.type); + } + return reply; + }); } virtual bool SendRequestVote(const MemberId &recipient, @@ -126,19 +118,12 @@ class RpcNetwork : public RaftNetworkInterface<State> { return it->second; } - virtual void Shutdown() override { - is_running_ = false; - server_.Shutdown(); - } - communication::messaging::System &system_; // TODO(mtomic): how to update and distribute this? std::unordered_map<MemberId, io::network::NetworkEndpoint> directory_; rpc::Server server_; std::unordered_map<MemberId, communication::rpc::Client> clients_; - - bool is_running_ = true; }; } // namespace communication::raft diff --git a/src/communication/raft/test_utils.hpp b/src/communication/raft/test_utils.hpp index 6258f72b9..dea7676b0 100644 --- a/src/communication/raft/test_utils.hpp +++ b/src/communication/raft/test_utils.hpp @@ -67,8 +67,6 @@ class NoOpNetworkInterface : public RaftNetworkInterface<State> { } virtual void Start(RaftMember<State> &) override {} - - virtual void Shutdown() override {} }; /* `NextReplyNetworkInterface` has two fields: `on_request_` and `next_reply_` @@ -116,8 +114,6 @@ class NextReplyNetworkInterface : public RaftNetworkInterface<State> { virtual void Start(RaftMember<State> &) override {} - virtual void Shutdown() override {} - std::function<void(const PeerRpcRequest<State> &)> on_request_; std::experimental::optional<PeerRpcReply> next_reply_; }; diff --git a/src/communication/rpc/rpc.cpp b/src/communication/rpc/rpc.cpp index e4511c4a1..dc9ec21f3 100644 --- a/src/communication/rpc/rpc.cpp +++ b/src/communication/rpc/rpc.cpp @@ -123,12 +123,8 @@ std::unique_ptr<messaging::Message> Client::Call( } Server::Server(messaging::System &system, const std::string &name) - : system_(system), stream_(system.Open(kProtocolStreamPrefix + name)) {} - -void Server::Start() { + : system_(system), stream_(system.Open(kProtocolStreamPrefix + name)) { // TODO: Add logging. - CHECK(started_ == false) << "Server can't be started multiple times"; - started_ = true; running_thread_ = std::thread([this]() { while (alive_) { auto message = stream_->Await(); @@ -136,8 +132,9 @@ void Server::Start() { auto *request = dynamic_cast<Request *>(message.get()); if (!request) continue; auto &real_request = request->message(); - auto it = callbacks_.find(real_request.type_index()); - if (it == callbacks_.end()) continue; + auto callbacks_accessor = callbacks_.access(); + auto it = callbacks_accessor.find(real_request.type_index()); + if (it == callbacks_accessor.end()) continue; auto response = it->second(real_request); messaging::Writer writer(system_, request->address(), request->port(), request->stream()); @@ -146,7 +143,7 @@ void Server::Start() { }); } -void Server::Shutdown() { +Server::~Server() { alive_ = false; stream_->Shutdown(); if (running_thread_.joinable()) running_thread_.join(); diff --git a/src/communication/rpc/rpc.hpp b/src/communication/rpc/rpc.hpp index 021e52742..278da664e 100644 --- a/src/communication/rpc/rpc.hpp +++ b/src/communication/rpc/rpc.hpp @@ -3,6 +3,7 @@ #include <type_traits> #include "communication/messaging/distributed.hpp" +#include "data_structures/concurrent/concurrent_map.hpp" #include "io/network/network_endpoint.hpp" namespace communication::rpc { @@ -60,6 +61,7 @@ class Client { class Server { public: Server(messaging::System &system, const std::string &name); + ~Server(); template <typename TRequestResponse> void Register( @@ -72,9 +74,11 @@ class Server { static_assert(std::is_base_of<messaging::Message, typename TRequestResponse::Response>::value, "TRequestResponse::Response must be derived from Message"); - auto got = callbacks_.emplace( - typeid(typename TRequestResponse::Request), - [callback = callback](const messaging::Message &base_message) { + auto callbacks_accessor = callbacks_.access(); + auto got = callbacks_accessor.insert( + typeid(typename TRequestResponse::Request), [callback = callback]( + const messaging::Message + &base_message) { const auto &message = dynamic_cast<const typename TRequestResponse::Request &>( base_message); @@ -83,20 +87,16 @@ class Server { CHECK(got.second) << "Callback for that message type already registered"; } - void Start(); - void Shutdown(); - private: messaging::System &system_; std::shared_ptr<messaging::EventStream> stream_; - std::unordered_map<std::type_index, - std::function<std::unique_ptr<messaging::Message>( - const messaging::Message &)>> + ConcurrentMap<std::type_index, + std::function<std::unique_ptr<messaging::Message>( + const messaging::Message &)>> callbacks_; std::atomic<bool> alive_{true}; std::thread running_thread_; - bool started_{false}; }; } // namespace communication::rpc diff --git a/src/communication/server.hpp b/src/communication/server.hpp index b66445b60..ab5ac2fd0 100644 --- a/src/communication/server.hpp +++ b/src/communication/server.hpp @@ -22,8 +22,9 @@ namespace communication { /** * Communication server. - * Listens for incomming connections on the server port and assings them in a - * round-robin manner to it's workers. + * Listens for incomming connections on the server port and assigns them in a + * round-robin manner to it's workers. Started automatically on constructor, and + * stopped at destructor. * * Current Server achitecture: * incomming connection -> server -> worker -> session @@ -40,8 +41,12 @@ class Server { using WorkerT = Worker<TSession, TSessionData>; using Socket = io::network::Socket; + /** + * Constructs and binds server to endpoint, operates on session data and + * invokes n workers + */ Server(const io::network::NetworkEndpoint &endpoint, - TSessionData &session_data) + TSessionData &session_data, size_t n) : session_data_(session_data) { // Without server we can't continue with application so we can just // terminate here. @@ -53,44 +58,54 @@ class Server { if (!socket_.Listen(1024)) { LOG(FATAL) << "Cannot listen on socket!"; } + working_thread_ = std::thread([this, n]() { + std::cout << fmt::format("Starting {} workers", n) << std::endl; + workers_.reserve(n); + for (size_t i = 0; i < n; ++i) { + workers_.push_back(std::make_unique<WorkerT>(session_data_)); + worker_threads_.emplace_back( + [this](WorkerT &worker) -> void { worker.Start(alive_); }, + std::ref(*workers_.back())); + } + std::cout << "Server is fully armed and operational" << std::endl; + std::cout << fmt::format("Listening on {} at {}", + socket_.endpoint().address(), + socket_.endpoint().port()) + << std::endl; + + io::network::SocketEventDispatcher<ConnectionAcceptor> dispatcher; + ConnectionAcceptor acceptor(socket_, *this); + dispatcher.AddListener(socket_.fd(), acceptor, EPOLLIN); + while (alive_) { + dispatcher.WaitAndProcessEvents(); + } + + std::cout << "Shutting down..." << std::endl; + for (auto &worker_thread : worker_threads_) { + worker_thread.join(); + } + }); + } + + ~Server() { + Shutdown(); + AwaitShutdown(); } const auto &endpoint() const { return socket_.endpoint(); } - void Start(size_t n) { - std::cout << fmt::format("Starting {} workers", n) << std::endl; - workers_.reserve(n); - for (size_t i = 0; i < n; ++i) { - workers_.push_back(std::make_unique<WorkerT>(session_data_)); - worker_threads_.emplace_back( - [this](WorkerT &worker) -> void { worker.Start(alive_); }, - std::ref(*workers_.back())); - } - std::cout << "Server is fully armed and operational" << std::endl; - std::cout << fmt::format("Listening on {} at {}", - socket_.endpoint().address(), - socket_.endpoint().port()) - << std::endl; - - io::network::SocketEventDispatcher<ConnectionAcceptor> dispatcher; - ConnectionAcceptor acceptor(socket_, *this); - dispatcher.AddListener(socket_.fd(), acceptor, EPOLLIN); - while (alive_) { - dispatcher.WaitAndProcessEvents(); - } - - std::cout << "Shutting down..." << std::endl; - for (auto &worker_thread : worker_threads_) { - worker_thread.join(); - } - } - + /// Stops server manually void Shutdown() { // This should be as simple as possible, so that it can be called inside a // signal handler. alive_.store(false); } + /// Waits for the server to be signaled to shutdown + void AwaitShutdown() { + if (working_thread_.joinable()) working_thread_.join(); + } + private: class ConnectionAcceptor : public io::network::BaseListener { public: @@ -135,6 +150,7 @@ class Server { std::vector<std::unique_ptr<WorkerT>> workers_; std::vector<std::thread> worker_threads_; + std::thread working_thread_; std::atomic<bool> alive_{true}; int idx_{0}; diff --git a/src/database/counters.cpp b/src/database/counters.cpp index 799634568..e4fbb2cbf 100644 --- a/src/database/counters.cpp +++ b/src/database/counters.cpp @@ -44,9 +44,6 @@ MasterCounters::MasterCounters(communication::messaging::System &system) }); } -void MasterCounters::Start() { rpc_server_.Start(); } -void MasterCounters::Shutdown() { rpc_server_.Shutdown(); } - WorkerCounters::WorkerCounters( communication::messaging::System &system, const io::network::NetworkEndpoint &master_endpoint) diff --git a/src/database/counters.hpp b/src/database/counters.hpp index 82e3533dc..c79e8b358 100644 --- a/src/database/counters.hpp +++ b/src/database/counters.hpp @@ -46,8 +46,6 @@ class SingleNodeCounters : public Counters { class MasterCounters : public SingleNodeCounters { public: MasterCounters(communication::messaging::System &system); - void Start(); - void Shutdown(); private: communication::rpc::Server rpc_server_; diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index 1490b05c1..fef508a34 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -35,7 +35,6 @@ GraphDb::GraphDb(communication::messaging::System &system, tx_engine->StartServer(system); tx_engine_ = std::move(tx_engine); auto counters = std::make_unique<database::MasterCounters>(system); - counters->Start(); counters_ = std::move(counters); INIT_MAPPERS(storage::MasterConcurrentIdMapper, system); get_endpoint_ = [&master](int worker_id) { diff --git a/src/distributed/coordination_master.cpp b/src/distributed/coordination_master.cpp index dd5080388..49dd8c7ff 100644 --- a/src/distributed/coordination_master.cpp +++ b/src/distributed/coordination_master.cpp @@ -12,7 +12,6 @@ MasterCoordination::MasterCoordination(communication::messaging::System &system) server_.Register<GetEndpointRpc>([this](const GetEndpointReq &req) { return std::make_unique<GetEndpointRes>(GetEndpoint(req.member)); }); - server_.Start(); } int MasterCoordination::RegisterWorker(int desired_worker_id, @@ -33,7 +32,7 @@ int MasterCoordination::RegisterWorker(int desired_worker_id, return worker_id; } -void MasterCoordination::Shutdown() { +MasterCoordination::~MasterCoordination() { std::lock_guard<std::mutex> guard(lock_); for (const auto &kv : workers_) { communication::rpc::Client client(system_, kv.second, @@ -41,14 +40,13 @@ void MasterCoordination::Shutdown() { auto result = client.Call<StopWorkerRpc>(100ms); CHECK(result) << "Failed to shut down worker: " << kv.first; } - server_.Shutdown(); } Endpoint MasterCoordination::GetEndpoint(int worker_id) const { std::lock_guard<std::mutex> guard(lock_); auto found = workers_.find(worker_id); - CHECK(found != workers_.end()) << "No endpoint registered for worker id: " - << worker_id; + CHECK(found != workers_.end()) + << "No endpoint registered for worker id: " << worker_id; return found->second; } } // namespace distributed diff --git a/src/distributed/coordination_master.hpp b/src/distributed/coordination_master.hpp index 6f1b17688..bf84b0141 100644 --- a/src/distributed/coordination_master.hpp +++ b/src/distributed/coordination_master.hpp @@ -29,7 +29,7 @@ class MasterCoordination { MasterCoordination(communication::messaging::System &system); /** Shuts down all the workers and this master server. */ - void Shutdown(); + ~MasterCoordination(); /** Returns the Endpoint for the given worker_id. */ Endpoint GetEndpoint(int worker_id) const; diff --git a/src/distributed/coordination_worker.cpp b/src/distributed/coordination_worker.cpp index 22bc73cb5..892049d45 100644 --- a/src/distributed/coordination_worker.cpp +++ b/src/distributed/coordination_worker.cpp @@ -40,7 +40,6 @@ void WorkerCoordination::WaitForShutdown() { cv.notify_one(); return std::make_unique<StopWorkerRes>(); }); - server_.Start(); std::unique_lock<std::mutex> lk(mutex); cv.wait(lk, [&shutdown] { return shutdown; }); @@ -50,6 +49,4 @@ void WorkerCoordination::WaitForShutdown() { // convention, but maybe better... std::this_thread::sleep_for(100ms); }; - -void WorkerCoordination::Shutdown() { server_.Shutdown(); } } // namespace distributed diff --git a/src/distributed/coordination_worker.hpp b/src/distributed/coordination_worker.hpp index dedaac16d..ad615eb8a 100644 --- a/src/distributed/coordination_worker.hpp +++ b/src/distributed/coordination_worker.hpp @@ -29,9 +29,6 @@ class WorkerCoordination { * Blocks the calling thread until that has finished. */ void WaitForShutdown(); - /** Shuts the RPC server down. */ - void Shutdown(); - private: communication::messaging::System &system_; communication::rpc::Client client_; diff --git a/src/memgraph_bolt.cpp b/src/memgraph_bolt.cpp index 1d929bea1..fc9e89eb1 100644 --- a/src/memgraph_bolt.cpp +++ b/src/memgraph_bolt.cpp @@ -124,25 +124,23 @@ void MasterMain() { // Bolt server stuff. SessionData session_data{system, master}; NetworkEndpoint endpoint(FLAGS_interface, FLAGS_port); - ServerT server(endpoint, session_data); + ServerT server(endpoint, session_data, FLAGS_num_workers); // Handler for regular termination signals - auto shutdown = [&server, &session_data, &master, &system] { + auto shutdown = [&server, &session_data] { if (is_shutting_down) return; is_shutting_down = 1; // Server needs to be shutdown first and then the database. This prevents a // race condition when a transaction is accepted during server shutdown. server.Shutdown(); session_data.db.Shutdown(); - master.Shutdown(); - system.Shutdown(); - }; + InitSignalHandlers(shutdown); StartMemWarningLogger(); - server.Start(FLAGS_num_workers); + server.AwaitShutdown(); } void WorkerMain() { @@ -162,16 +160,13 @@ void WorkerMain() { // Wait for the shutdown command from the master. worker.WaitForShutdown(); } - - worker.Shutdown(); - system.Shutdown(); } void SingleNodeMain() { google::SetUsageMessage("Memgraph single-node database server"); SessionData session_data; NetworkEndpoint endpoint(FLAGS_interface, FLAGS_port); - ServerT server(endpoint, session_data); + ServerT server(endpoint, session_data, FLAGS_num_workers); // Handler for regular termination signals auto shutdown = [&server, &session_data] { @@ -186,7 +181,7 @@ void SingleNodeMain() { StartMemWarningLogger(); - server.Start(FLAGS_num_workers); + server.AwaitShutdown(); } int main(int argc, char **argv) { diff --git a/src/storage/concurrent_id_mapper_master.cpp b/src/storage/concurrent_id_mapper_master.cpp index ee01b2448..73b067a71 100644 --- a/src/storage/concurrent_id_mapper_master.cpp +++ b/src/storage/concurrent_id_mapper_master.cpp @@ -27,19 +27,13 @@ ID_VALUE_RPC_CALLS(Label) ID_VALUE_RPC_CALLS(EdgeType) ID_VALUE_RPC_CALLS(Property) #undef ID_VALUE_RPC -} +} // namespace template <typename TId> MasterConcurrentIdMapper<TId>::MasterConcurrentIdMapper( communication::messaging::System &system) : rpc_server_(system, kConcurrentIdMapperRpc) { RegisterRpc(*this, rpc_server_); - rpc_server_.Start(); -} - -template <typename TId> -MasterConcurrentIdMapper<TId>::~MasterConcurrentIdMapper() { - rpc_server_.Shutdown(); } template class MasterConcurrentIdMapper<Label>; diff --git a/src/storage/concurrent_id_mapper_master.hpp b/src/storage/concurrent_id_mapper_master.hpp index 0d0ff241b..af899af53 100644 --- a/src/storage/concurrent_id_mapper_master.hpp +++ b/src/storage/concurrent_id_mapper_master.hpp @@ -14,7 +14,6 @@ template <typename TId> class MasterConcurrentIdMapper : public SingleNodeConcurrentIdMapper<TId> { public: MasterConcurrentIdMapper(communication::messaging::System &system); - ~MasterConcurrentIdMapper(); private: communication::rpc::Server rpc_server_; diff --git a/src/transactions/engine_master.cpp b/src/transactions/engine_master.cpp index b522aa2c9..651b489ff 100644 --- a/src/transactions/engine_master.cpp +++ b/src/transactions/engine_master.cpp @@ -11,10 +11,6 @@ namespace tx { MasterEngine::MasterEngine(durability::WriteAheadLog *wal) : wal_(wal) {} -MasterEngine::~MasterEngine() { - if (rpc_server_) StopServer(); -} - Transaction *MasterEngine::Begin() { std::lock_guard<SpinLock> guard(lock_); @@ -134,13 +130,6 @@ void MasterEngine::StartServer(communication::messaging::System &system) { rpc_server_->Register<IsActiveRpc>([this](const IsActiveReq &req) { return std::make_unique<IsActiveRes>(GlobalIsActive(req.member)); }); - - rpc_server_->Start(); } -void MasterEngine::StopServer() { - CHECK(rpc_server_) << "Can't stop a server that's not running"; - rpc_server_->Shutdown(); - rpc_server_ = std::experimental::nullopt; -} } // namespace tx diff --git a/src/transactions/engine_master.hpp b/src/transactions/engine_master.hpp index 99861c0fd..53e66d565 100644 --- a/src/transactions/engine_master.hpp +++ b/src/transactions/engine_master.hpp @@ -35,9 +35,6 @@ class MasterEngine : public Engine { */ MasterEngine(durability::WriteAheadLog *wal = nullptr); - /** Stops the tx server if it's running. */ - ~MasterEngine(); - /** * Begins a transaction and returns a pointer to * it's object. @@ -76,9 +73,6 @@ class MasterEngine : public Engine { /** Starts the RPC server of the master transactional engine. */ void StartServer(communication::messaging::System &system); - /** Stops the RPC server of the master transactional engine. */ - void StopServer(); - private: std::atomic<transaction_id_t> counter_{0}; CommitLog clog_; diff --git a/tests/concurrent/network_read_hang.cpp b/tests/concurrent/network_read_hang.cpp index 6696076d3..e7373838f 100644 --- a/tests/concurrent/network_read_hang.cpp +++ b/tests/concurrent/network_read_hang.cpp @@ -76,12 +76,9 @@ TEST(Network, SocketReadHangOnConcurrentConnections) { // initialize server TestData data; - communication::Server<TestSession, TestData> server(endpoint, data); - - // start server int N = (std::thread::hardware_concurrency() + 1) / 2; int Nc = N * 3; - std::thread server_thread([&] { server.Start(N); }); + communication::Server<TestSession, TestData> server(endpoint, data, N); const auto &ep = server.endpoint(); // start clients @@ -95,10 +92,6 @@ TEST(Network, SocketReadHangOnConcurrentConnections) { // cleanup clients for (int i = 0; i < Nc; ++i) clients[i].join(); - - // stop server - server.Shutdown(); - server_thread.join(); } int main(int argc, char **argv) { diff --git a/tests/concurrent/network_server.cpp b/tests/concurrent/network_server.cpp index 342278193..75b6878f1 100644 --- a/tests/concurrent/network_server.cpp +++ b/tests/concurrent/network_server.cpp @@ -18,11 +18,8 @@ TEST(Network, Server) { // initialize server TestData session_data; - ServerT server(endpoint, session_data); - - // start server int N = (std::thread::hardware_concurrency() + 1) / 2; - std::thread server_thread([&] { server.Start(N); }); + ServerT server(endpoint, session_data, N); const auto &ep = server.endpoint(); // start clients @@ -33,10 +30,6 @@ TEST(Network, Server) { // cleanup clients for (int i = 0; i < N; ++i) clients[i].join(); - - // stop server - server.Shutdown(); - server_thread.join(); } int main(int argc, char **argv) { diff --git a/tests/concurrent/network_session_leak.cpp b/tests/concurrent/network_session_leak.cpp index 9872ff804..36c550e9a 100644 --- a/tests/concurrent/network_session_leak.cpp +++ b/tests/concurrent/network_session_leak.cpp @@ -22,10 +22,7 @@ TEST(Network, SessionLeak) { // initialize server TestData session_data; - ServerT server(endpoint, session_data); - - // start server - std::thread server_thread([&] { server.Start(2); }); + ServerT server(endpoint, session_data, 2); // start clients int N = 50; @@ -43,10 +40,6 @@ TEST(Network, SessionLeak) { for (int i = 0; i < N; ++i) clients[i].join(); std::this_thread::sleep_for(2s); - - // stop server - server.Shutdown(); - server_thread.join(); } // run with "valgrind --leak-check=full ./network_session_leak" to check for diff --git a/tests/distributed/raft/example_client.cpp b/tests/distributed/raft/example_client.cpp index b80086f4e..be91091c1 100644 --- a/tests/distributed/raft/example_client.cpp +++ b/tests/distributed/raft/example_client.cpp @@ -12,8 +12,8 @@ #include "utils/signals/handler.hpp" #include "utils/terminate_handler.hpp" -using communication::messaging::System; using communication::messaging::Message; +using communication::messaging::System; using namespace communication::rpc; using namespace std::literals::chrono_literals; @@ -52,7 +52,5 @@ int main(int argc, char **argv) { } } - client_system.Shutdown(); - return 0; } diff --git a/tests/distributed/raft/example_server.cpp b/tests/distributed/raft/example_server.cpp index 069b8e84b..43610c128 100644 --- a/tests/distributed/raft/example_server.cpp +++ b/tests/distributed/raft/example_server.cpp @@ -11,8 +11,8 @@ #include "utils/signals/handler.hpp" #include "utils/terminate_handler.hpp" -using communication::messaging::System; using communication::messaging::Message; +using communication::messaging::System; using namespace communication::rpc; using namespace std::literals::chrono_literals; @@ -37,12 +37,10 @@ int main(int argc, char **argv) { std::ofstream log(FLAGS_log, std::ios_base::app); // Handler for regular termination signals. - auto shutdown = [&server, &server_system, &log]() { + auto shutdown = [&log]() { if (is_shutting_down) return; is_shutting_down = 1; log.close(); - server.Shutdown(); - server_system.Shutdown(); exit(0); }; @@ -70,7 +68,6 @@ int main(int argc, char **argv) { stol(FLAGS_port)); }); - server.Start(); LOG(INFO) << "Raft RPC server started"; // Sleep until shutdown detected. std::this_thread::sleep_until( diff --git a/tests/manual/raft_rpc.cpp b/tests/manual/raft_rpc.cpp index 6776abd8e..1f064b70a 100644 --- a/tests/manual/raft_rpc.cpp +++ b/tests/manual/raft_rpc.cpp @@ -55,7 +55,5 @@ int main(int argc, char *argv[]) { } } - my_system.Shutdown(); - return 0; } diff --git a/tests/unit/concurrent_id_mapper_distributed.cpp b/tests/unit/concurrent_id_mapper_distributed.cpp index f0ab5ae2e..085d4d303 100644 --- a/tests/unit/concurrent_id_mapper_distributed.cpp +++ b/tests/unit/concurrent_id_mapper_distributed.cpp @@ -27,9 +27,7 @@ class DistributedConcurrentIdMapperTest : public ::testing::Test { } void TearDown() override { worker_mapper_ = std::experimental::nullopt; - worker_system_.Shutdown(); master_mapper_ = std::experimental::nullopt; - master_system_.Shutdown(); } }; @@ -47,7 +45,7 @@ TYPED_TEST(DistributedConcurrentIdMapperTest, Basic) { auto id2 = worker.value_to_id("v2"); EXPECT_EQ(master.id_to_value(id2), "v2"); - EXPECT_EQ(master.value_to_id("v2"),id2); + EXPECT_EQ(master.value_to_id("v2"), id2); EXPECT_NE(id1, id2); } diff --git a/tests/unit/counters.cpp b/tests/unit/counters.cpp index dbeee9168..f0e1bdad4 100644 --- a/tests/unit/counters.cpp +++ b/tests/unit/counters.cpp @@ -8,7 +8,6 @@ const std::string kLocal = "127.0.0.1"; TEST(CountersDistributed, All) { communication::messaging::System master_sys(kLocal, 0); database::MasterCounters master(master_sys); - master.Start(); communication::messaging::System w1_sys(kLocal, 0); database::WorkerCounters w1(w1_sys, master_sys.endpoint()); @@ -26,9 +25,4 @@ TEST(CountersDistributed, All) { EXPECT_EQ(w2.Get("b"), 1); w1.Set("b", 42); EXPECT_EQ(w2.Get("b"), 42); - - w2_sys.Shutdown(); - w1_sys.Shutdown(); - master.Shutdown(); - master_sys.Shutdown(); } diff --git a/tests/unit/distributed_coordination.cpp b/tests/unit/distributed_coordination.cpp index ee4386c59..a434bc197 100644 --- a/tests/unit/distributed_coordination.cpp +++ b/tests/unit/distributed_coordination.cpp @@ -26,8 +26,6 @@ class WorkerInThread { coord_.emplace(*system_, master_endpoint); worker_id_ = coord_->RegisterWorker(desired_id); coord_->WaitForShutdown(); - coord_->Shutdown(); - system_->Shutdown(); }); } @@ -45,48 +43,49 @@ class WorkerInThread { TEST(Distributed, Coordination) { System master_system(kLocal, 0); - MasterCoordination master_coord(master_system); - std::vector<std::unique_ptr<WorkerInThread>> workers; - for (int i = 0; i < kWorkerCount; ++i) - workers.emplace_back( - std::make_unique<WorkerInThread>(master_system.endpoint())); + { + MasterCoordination master_coord(master_system); - // Wait till all the workers are safely initialized. - std::this_thread::sleep_for(300ms); + for (int i = 0; i < kWorkerCount; ++i) + workers.emplace_back( + std::make_unique<WorkerInThread>(master_system.endpoint())); - // Expect that all workers have a different ID. - std::unordered_set<int> worker_ids; - for (const auto &w : workers) worker_ids.insert(w->worker_id()); - EXPECT_EQ(worker_ids.size(), kWorkerCount); + // Wait till all the workers are safely initialized. + std::this_thread::sleep_for(300ms); - // Check endpoints. - for (auto &w1 : workers) { - for (auto &w2 : workers) { - EXPECT_EQ(w1->worker_endpoint(w2->worker_id()), w2->endpoint()); + // Expect that all workers have a different ID. + std::unordered_set<int> worker_ids; + for (const auto &w : workers) worker_ids.insert(w->worker_id()); + EXPECT_EQ(worker_ids.size(), kWorkerCount); + + // Check endpoints. + for (auto &w1 : workers) { + for (auto &w2 : workers) { + EXPECT_EQ(w1->worker_endpoint(w2->worker_id()), w2->endpoint()); + } } - } + } // Coordinated shutdown. - // Coordinated shutdown. - master_coord.Shutdown(); - master_system.Shutdown(); for (auto &worker : workers) worker->join(); } TEST(Distributed, DesiredAndUniqueId) { System master_system(kLocal, 0); - MasterCoordination master_coord(master_system); + std::vector<std::unique_ptr<WorkerInThread>> workers; + { + MasterCoordination master_coord(master_system); - WorkerInThread w1(master_system.endpoint(), 42); - std::this_thread::sleep_for(200ms); - WorkerInThread w2(master_system.endpoint(), 42); - std::this_thread::sleep_for(200ms); + workers.emplace_back( + std::make_unique<WorkerInThread>(master_system.endpoint(), 42)); + std::this_thread::sleep_for(200ms); + workers.emplace_back( + std::make_unique<WorkerInThread>(master_system.endpoint(), 42)); + std::this_thread::sleep_for(200ms); - EXPECT_EQ(w1.worker_id(), 42); - EXPECT_NE(w2.worker_id(), 42); + EXPECT_EQ(workers[0]->worker_id(), 42); + EXPECT_NE(workers[1]->worker_id(), 42); + } - master_coord.Shutdown(); - w1.join(); - w2.join(); - master_system.Shutdown(); + for (auto &worker : workers) worker->join(); } diff --git a/tests/unit/messaging_distributed.cpp b/tests/unit/messaging_distributed.cpp index 2158f639a..9dc47c093 100644 --- a/tests/unit/messaging_distributed.cpp +++ b/tests/unit/messaging_distributed.cpp @@ -40,13 +40,12 @@ BOOST_CLASS_EXPORT(MessageInt); #define GET_X(p) dynamic_cast<MessageInt *>((p).get())->x /** - * Test do the services start up without crashes. - */ + * Test do the services start up without crashes. + */ TEST(SimpleTests, StartAndShutdown) { System system("127.0.0.1", 0); // do nothing std::this_thread::sleep_for(500ms); - system.Shutdown(); } TEST(Messaging, Pop) { @@ -60,8 +59,6 @@ TEST(Messaging, Pop) { EXPECT_EQ(stream->Poll(), nullptr); writer.Send<MessageInt>(10); EXPECT_EQ(GET_X(stream->Await()), 10); - master_system.Shutdown(); - slave_system.Shutdown(); } TEST(Messaging, Await) { @@ -82,8 +79,6 @@ TEST(Messaging, Await) { EXPECT_EQ(stream->Poll(), nullptr); EXPECT_EQ(stream->Await(), nullptr); t.join(); - master_system.Shutdown(); - slave_system.Shutdown(); } TEST(Messaging, RecreateChannelAfterClosing) { @@ -106,7 +101,4 @@ TEST(Messaging, RecreateChannelAfterClosing) { EXPECT_EQ(stream->Poll(), nullptr); writer.Send<MessageInt>(30); EXPECT_EQ(GET_X(stream->Await()), 30); - - master_system.Shutdown(); - slave_system.Shutdown(); } diff --git a/tests/unit/network_timeouts.cpp b/tests/unit/network_timeouts.cpp index ceb87464d..24f65e42c 100644 --- a/tests/unit/network_timeouts.cpp +++ b/tests/unit/network_timeouts.cpp @@ -17,10 +17,10 @@ DECLARE_int32(session_inactivity_timeout); using namespace std::chrono_literals; class TestClientSocket; +using communication::bolt::ClientException; +using communication::bolt::SessionData; using io::network::NetworkEndpoint; using io::network::Socket; -using communication::bolt::SessionData; -using communication::bolt::ClientException; using SessionT = communication::bolt::Session<Socket>; using ResultStreamT = SessionT::ResultStreamT; using ServerT = communication::Server<SessionT, SessionData>; @@ -28,15 +28,9 @@ using ClientT = communication::bolt::Client<Socket>; class RunningServer { public: - ~RunningServer() { - server_.Shutdown(); - server_thread_.join(); - } - SessionData session_data_; NetworkEndpoint endpoint_{"127.0.0.1", "0"}; - ServerT server_{endpoint_, session_data_}; - std::thread server_thread_{[&] { server_.Start(1); }}; + ServerT server_{endpoint_, session_data_, 1}; }; class TestClient : public ClientT { diff --git a/tests/unit/raft.cpp b/tests/unit/raft.cpp index 2ac03225d..30b206e5b 100644 --- a/tests/unit/raft.cpp +++ b/tests/unit/raft.cpp @@ -572,7 +572,7 @@ INSTANTIATE_TEST_CASE_P( true, {{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}, {7}, {7}}}, /* no truncation, partial match between our log and appended entries - */ + */ OnAppendEntriesTestParam{ 8, {{1}, {1}, {1}, {4}, {4}, {5}}, diff --git a/tests/unit/rpc.cpp b/tests/unit/rpc.cpp index c17710213..40ba4c5d8 100644 --- a/tests/unit/rpc.cpp +++ b/tests/unit/rpc.cpp @@ -60,7 +60,6 @@ TEST(Rpc, Call) { server.Register<Sum>([](const SumReq &request) { return std::make_unique<SumRes>(request.x + request.y); }); - server.Start(); std::this_thread::sleep_for(100ms); System client_system("127.0.0.1", 0); @@ -68,10 +67,6 @@ TEST(Rpc, Call) { "main"); auto sum = client.Call<Sum>(300ms, 10, 20); EXPECT_EQ(sum->sum, 30); - - server.Shutdown(); - server_system.Shutdown(); - client_system.Shutdown(); } TEST(Rpc, Timeout) { @@ -81,7 +76,6 @@ TEST(Rpc, Timeout) { std::this_thread::sleep_for(300ms); return std::make_unique<SumRes>(request.x + request.y); }); - server.Start(); std::this_thread::sleep_for(100ms); System client_system("127.0.0.1", 0); @@ -89,8 +83,4 @@ TEST(Rpc, Timeout) { "main"); auto sum = client.Call<Sum>(100ms, 10, 20); EXPECT_FALSE(sum); - - server.Shutdown(); - server_system.Shutdown(); - client_system.Shutdown(); } diff --git a/tests/unit/transaction_engine_worker.cpp b/tests/unit/transaction_engine_worker.cpp index 6b585e399..f0d580c3b 100644 --- a/tests/unit/transaction_engine_worker.cpp +++ b/tests/unit/transaction_engine_worker.cpp @@ -22,11 +22,6 @@ class WorkerEngineTest : public testing::Test { WorkerEngine worker_{worker_system_, master_system_.endpoint()}; void SetUp() override { master_.StartServer(master_system_); } - void TearDown() override { - worker_system_.Shutdown(); - master_.StopServer(); - master_system_.Shutdown(); - } }; TEST_F(WorkerEngineTest, LocalBegin) {