Change network design from Start/Shutdown to constructor/destructor

Summary:
Make ServerT start on constructor

Remove shutdown from MasterCoordinator

Distributed system remove Shutdown

Rcp server start and shutdown removed

Reviewers: florijan, mferencevic

Reviewed By: mferencevic

Subscribers: mferencevic, pullbot

Differential Revision: https://phabricator.memgraph.io/D1097
This commit is contained in:
Dominik Gleich 2018-01-10 13:56:12 +01:00
parent 499ad3ba15
commit 007a7f1a6d
35 changed files with 140 additions and 272 deletions

View File

@ -4,7 +4,7 @@ namespace communication::messaging {
System::System(const std::string &address, uint16_t port)
: endpoint_(address, port) {
// Numbers of worker are quite arbitrary at the point.
// Numbers of workers is quite arbitrary at this point.
StartClient(4);
StartServer(4);
}
@ -13,15 +13,10 @@ System::System(const io::network::NetworkEndpoint &endpoint)
: System(endpoint.address(), endpoint.port()) {}
System::~System() {
queue_.Shutdown();
for (size_t i = 0; i < pool_.size(); ++i) {
pool_[i].join();
}
thread_.join();
}
void System::Shutdown() {
queue_.Shutdown();
server_->Shutdown();
}
void System::StartClient(int worker_count) {
@ -44,12 +39,8 @@ void System::StartServer(int worker_count) {
}
// Initialize server.
server_ = std::make_unique<ServerT>(endpoint_, protocol_data_);
server_ = std::make_unique<ServerT>(endpoint_, protocol_data_, worker_count);
endpoint_ = server_->endpoint();
// Start server.
thread_ = std::thread(
[worker_count, this]() { this->server_->Start(worker_count); });
}
std::shared_ptr<EventStream> System::Open(const std::string &name) {
@ -63,4 +54,4 @@ Writer::Writer(System &system, const std::string &address, uint16_t port,
void Writer::Send(std::unique_ptr<Message> message) {
system_.queue_.Emplace(address_, port_, name_, std::move(message));
}
}
} // namespace communication::messaging

View File

@ -66,7 +66,6 @@ class System {
~System();
std::shared_ptr<EventStream> Open(const std::string &name);
void Shutdown();
const io::network::NetworkEndpoint &endpoint() const { return endpoint_; }
@ -107,7 +106,6 @@ class System {
Queue<NetworkMessage> queue_;
// Server variables.
std::thread thread_;
SessionData protocol_data_;
std::unique_ptr<ServerT> server_{nullptr};
io::network::NetworkEndpoint endpoint_;

View File

@ -678,8 +678,6 @@ RaftMember<State>::~RaftMember() {
for (auto &peer_thread : peer_threads_) {
peer_thread.join();
}
network_.Shutdown();
}
template <class State>

View File

@ -130,10 +130,6 @@ class RaftNetworkInterface {
/* This will be called once the RaftMember is ready to start receiving RPCs.
*/
virtual void Start(RaftMember<State> &member) = 0;
/* This will be called when RaftMember is exiting. RPC handlers should not be
* called anymore. */
virtual void Shutdown() = 0;
};
template <class State>
@ -252,7 +248,7 @@ class RaftMemberImpl {
std::mt19937_64 rng_ = std::mt19937_64(std::random_device{}());
};
} // namespace internal
} // namespace impl
template <class State>
class RaftMember final {

View File

@ -35,33 +35,25 @@ class RpcNetwork : public RaftNetworkInterface<State> {
directory_(std::move(directory)),
server_(system, kRaftChannelName) {}
~RpcNetwork() {
DCHECK(!is_running_)
<< "`Shutdown()` should be called before destructing `RpcNetwork`";
/* We don't want to call `Shutdown` here, instead we push that
* responsibility to caller of `Start`, so `server_` doesn't end up holding
* a reference to a destructed `RaftMember`. */
}
virtual void Start(RaftMember<State> &member) override {
server_.Register<PeerProtocol<State>>([&member](
const PeerRpcRequest<State> &request) {
auto reply = std::make_unique<PeerRpcReply>();
reply->type = request.type;
switch (request.type) {
case RpcType::REQUEST_VOTE:
reply->request_vote = member.OnRequestVote(request.request_vote);
break;
case RpcType::APPEND_ENTRIES:
reply->append_entries =
member.OnAppendEntries(request.append_entries);
break;
default:
LOG(ERROR) << "Unknown RPC type: " << static_cast<int>(request.type);
}
return reply;
});
server_.Start();
server_.Register<PeerProtocol<State>>(
[&member](const PeerRpcRequest<State> &request) {
auto reply = std::make_unique<PeerRpcReply>();
reply->type = request.type;
switch (request.type) {
case RpcType::REQUEST_VOTE:
reply->request_vote = member.OnRequestVote(request.request_vote);
break;
case RpcType::APPEND_ENTRIES:
reply->append_entries =
member.OnAppendEntries(request.append_entries);
break;
default:
LOG(ERROR) << "Unknown RPC type: "
<< static_cast<int>(request.type);
}
return reply;
});
}
virtual bool SendRequestVote(const MemberId &recipient,
@ -126,19 +118,12 @@ class RpcNetwork : public RaftNetworkInterface<State> {
return it->second;
}
virtual void Shutdown() override {
is_running_ = false;
server_.Shutdown();
}
communication::messaging::System &system_;
// TODO(mtomic): how to update and distribute this?
std::unordered_map<MemberId, io::network::NetworkEndpoint> directory_;
rpc::Server server_;
std::unordered_map<MemberId, communication::rpc::Client> clients_;
bool is_running_ = true;
};
} // namespace communication::raft

View File

@ -67,8 +67,6 @@ class NoOpNetworkInterface : public RaftNetworkInterface<State> {
}
virtual void Start(RaftMember<State> &) override {}
virtual void Shutdown() override {}
};
/* `NextReplyNetworkInterface` has two fields: `on_request_` and `next_reply_`
@ -116,8 +114,6 @@ class NextReplyNetworkInterface : public RaftNetworkInterface<State> {
virtual void Start(RaftMember<State> &) override {}
virtual void Shutdown() override {}
std::function<void(const PeerRpcRequest<State> &)> on_request_;
std::experimental::optional<PeerRpcReply> next_reply_;
};

View File

@ -123,12 +123,8 @@ std::unique_ptr<messaging::Message> Client::Call(
}
Server::Server(messaging::System &system, const std::string &name)
: system_(system), stream_(system.Open(kProtocolStreamPrefix + name)) {}
void Server::Start() {
: system_(system), stream_(system.Open(kProtocolStreamPrefix + name)) {
// TODO: Add logging.
CHECK(started_ == false) << "Server can't be started multiple times";
started_ = true;
running_thread_ = std::thread([this]() {
while (alive_) {
auto message = stream_->Await();
@ -136,8 +132,9 @@ void Server::Start() {
auto *request = dynamic_cast<Request *>(message.get());
if (!request) continue;
auto &real_request = request->message();
auto it = callbacks_.find(real_request.type_index());
if (it == callbacks_.end()) continue;
auto callbacks_accessor = callbacks_.access();
auto it = callbacks_accessor.find(real_request.type_index());
if (it == callbacks_accessor.end()) continue;
auto response = it->second(real_request);
messaging::Writer writer(system_, request->address(), request->port(),
request->stream());
@ -146,7 +143,7 @@ void Server::Start() {
});
}
void Server::Shutdown() {
Server::~Server() {
alive_ = false;
stream_->Shutdown();
if (running_thread_.joinable()) running_thread_.join();

View File

@ -3,6 +3,7 @@
#include <type_traits>
#include "communication/messaging/distributed.hpp"
#include "data_structures/concurrent/concurrent_map.hpp"
#include "io/network/network_endpoint.hpp"
namespace communication::rpc {
@ -60,6 +61,7 @@ class Client {
class Server {
public:
Server(messaging::System &system, const std::string &name);
~Server();
template <typename TRequestResponse>
void Register(
@ -72,9 +74,11 @@ class Server {
static_assert(std::is_base_of<messaging::Message,
typename TRequestResponse::Response>::value,
"TRequestResponse::Response must be derived from Message");
auto got = callbacks_.emplace(
typeid(typename TRequestResponse::Request),
[callback = callback](const messaging::Message &base_message) {
auto callbacks_accessor = callbacks_.access();
auto got = callbacks_accessor.insert(
typeid(typename TRequestResponse::Request), [callback = callback](
const messaging::Message
&base_message) {
const auto &message =
dynamic_cast<const typename TRequestResponse::Request &>(
base_message);
@ -83,20 +87,16 @@ class Server {
CHECK(got.second) << "Callback for that message type already registered";
}
void Start();
void Shutdown();
private:
messaging::System &system_;
std::shared_ptr<messaging::EventStream> stream_;
std::unordered_map<std::type_index,
std::function<std::unique_ptr<messaging::Message>(
const messaging::Message &)>>
ConcurrentMap<std::type_index,
std::function<std::unique_ptr<messaging::Message>(
const messaging::Message &)>>
callbacks_;
std::atomic<bool> alive_{true};
std::thread running_thread_;
bool started_{false};
};
} // namespace communication::rpc

View File

@ -22,8 +22,9 @@ namespace communication {
/**
* Communication server.
* Listens for incomming connections on the server port and assings them in a
* round-robin manner to it's workers.
* Listens for incomming connections on the server port and assigns them in a
* round-robin manner to it's workers. Started automatically on constructor, and
* stopped at destructor.
*
* Current Server achitecture:
* incomming connection -> server -> worker -> session
@ -40,8 +41,12 @@ class Server {
using WorkerT = Worker<TSession, TSessionData>;
using Socket = io::network::Socket;
/**
* Constructs and binds server to endpoint, operates on session data and
* invokes n workers
*/
Server(const io::network::NetworkEndpoint &endpoint,
TSessionData &session_data)
TSessionData &session_data, size_t n)
: session_data_(session_data) {
// Without server we can't continue with application so we can just
// terminate here.
@ -53,44 +58,54 @@ class Server {
if (!socket_.Listen(1024)) {
LOG(FATAL) << "Cannot listen on socket!";
}
working_thread_ = std::thread([this, n]() {
std::cout << fmt::format("Starting {} workers", n) << std::endl;
workers_.reserve(n);
for (size_t i = 0; i < n; ++i) {
workers_.push_back(std::make_unique<WorkerT>(session_data_));
worker_threads_.emplace_back(
[this](WorkerT &worker) -> void { worker.Start(alive_); },
std::ref(*workers_.back()));
}
std::cout << "Server is fully armed and operational" << std::endl;
std::cout << fmt::format("Listening on {} at {}",
socket_.endpoint().address(),
socket_.endpoint().port())
<< std::endl;
io::network::SocketEventDispatcher<ConnectionAcceptor> dispatcher;
ConnectionAcceptor acceptor(socket_, *this);
dispatcher.AddListener(socket_.fd(), acceptor, EPOLLIN);
while (alive_) {
dispatcher.WaitAndProcessEvents();
}
std::cout << "Shutting down..." << std::endl;
for (auto &worker_thread : worker_threads_) {
worker_thread.join();
}
});
}
~Server() {
Shutdown();
AwaitShutdown();
}
const auto &endpoint() const { return socket_.endpoint(); }
void Start(size_t n) {
std::cout << fmt::format("Starting {} workers", n) << std::endl;
workers_.reserve(n);
for (size_t i = 0; i < n; ++i) {
workers_.push_back(std::make_unique<WorkerT>(session_data_));
worker_threads_.emplace_back(
[this](WorkerT &worker) -> void { worker.Start(alive_); },
std::ref(*workers_.back()));
}
std::cout << "Server is fully armed and operational" << std::endl;
std::cout << fmt::format("Listening on {} at {}",
socket_.endpoint().address(),
socket_.endpoint().port())
<< std::endl;
io::network::SocketEventDispatcher<ConnectionAcceptor> dispatcher;
ConnectionAcceptor acceptor(socket_, *this);
dispatcher.AddListener(socket_.fd(), acceptor, EPOLLIN);
while (alive_) {
dispatcher.WaitAndProcessEvents();
}
std::cout << "Shutting down..." << std::endl;
for (auto &worker_thread : worker_threads_) {
worker_thread.join();
}
}
/// Stops server manually
void Shutdown() {
// This should be as simple as possible, so that it can be called inside a
// signal handler.
alive_.store(false);
}
/// Waits for the server to be signaled to shutdown
void AwaitShutdown() {
if (working_thread_.joinable()) working_thread_.join();
}
private:
class ConnectionAcceptor : public io::network::BaseListener {
public:
@ -135,6 +150,7 @@ class Server {
std::vector<std::unique_ptr<WorkerT>> workers_;
std::vector<std::thread> worker_threads_;
std::thread working_thread_;
std::atomic<bool> alive_{true};
int idx_{0};

View File

@ -44,9 +44,6 @@ MasterCounters::MasterCounters(communication::messaging::System &system)
});
}
void MasterCounters::Start() { rpc_server_.Start(); }
void MasterCounters::Shutdown() { rpc_server_.Shutdown(); }
WorkerCounters::WorkerCounters(
communication::messaging::System &system,
const io::network::NetworkEndpoint &master_endpoint)

View File

@ -46,8 +46,6 @@ class SingleNodeCounters : public Counters {
class MasterCounters : public SingleNodeCounters {
public:
MasterCounters(communication::messaging::System &system);
void Start();
void Shutdown();
private:
communication::rpc::Server rpc_server_;

View File

@ -35,7 +35,6 @@ GraphDb::GraphDb(communication::messaging::System &system,
tx_engine->StartServer(system);
tx_engine_ = std::move(tx_engine);
auto counters = std::make_unique<database::MasterCounters>(system);
counters->Start();
counters_ = std::move(counters);
INIT_MAPPERS(storage::MasterConcurrentIdMapper, system);
get_endpoint_ = [&master](int worker_id) {

View File

@ -12,7 +12,6 @@ MasterCoordination::MasterCoordination(communication::messaging::System &system)
server_.Register<GetEndpointRpc>([this](const GetEndpointReq &req) {
return std::make_unique<GetEndpointRes>(GetEndpoint(req.member));
});
server_.Start();
}
int MasterCoordination::RegisterWorker(int desired_worker_id,
@ -33,7 +32,7 @@ int MasterCoordination::RegisterWorker(int desired_worker_id,
return worker_id;
}
void MasterCoordination::Shutdown() {
MasterCoordination::~MasterCoordination() {
std::lock_guard<std::mutex> guard(lock_);
for (const auto &kv : workers_) {
communication::rpc::Client client(system_, kv.second,
@ -41,14 +40,13 @@ void MasterCoordination::Shutdown() {
auto result = client.Call<StopWorkerRpc>(100ms);
CHECK(result) << "Failed to shut down worker: " << kv.first;
}
server_.Shutdown();
}
Endpoint MasterCoordination::GetEndpoint(int worker_id) const {
std::lock_guard<std::mutex> guard(lock_);
auto found = workers_.find(worker_id);
CHECK(found != workers_.end()) << "No endpoint registered for worker id: "
<< worker_id;
CHECK(found != workers_.end())
<< "No endpoint registered for worker id: " << worker_id;
return found->second;
}
} // namespace distributed

View File

@ -29,7 +29,7 @@ class MasterCoordination {
MasterCoordination(communication::messaging::System &system);
/** Shuts down all the workers and this master server. */
void Shutdown();
~MasterCoordination();
/** Returns the Endpoint for the given worker_id. */
Endpoint GetEndpoint(int worker_id) const;

View File

@ -40,7 +40,6 @@ void WorkerCoordination::WaitForShutdown() {
cv.notify_one();
return std::make_unique<StopWorkerRes>();
});
server_.Start();
std::unique_lock<std::mutex> lk(mutex);
cv.wait(lk, [&shutdown] { return shutdown; });
@ -50,6 +49,4 @@ void WorkerCoordination::WaitForShutdown() {
// convention, but maybe better...
std::this_thread::sleep_for(100ms);
};
void WorkerCoordination::Shutdown() { server_.Shutdown(); }
} // namespace distributed

View File

@ -29,9 +29,6 @@ class WorkerCoordination {
* Blocks the calling thread until that has finished. */
void WaitForShutdown();
/** Shuts the RPC server down. */
void Shutdown();
private:
communication::messaging::System &system_;
communication::rpc::Client client_;

View File

@ -124,25 +124,23 @@ void MasterMain() {
// Bolt server stuff.
SessionData session_data{system, master};
NetworkEndpoint endpoint(FLAGS_interface, FLAGS_port);
ServerT server(endpoint, session_data);
ServerT server(endpoint, session_data, FLAGS_num_workers);
// Handler for regular termination signals
auto shutdown = [&server, &session_data, &master, &system] {
auto shutdown = [&server, &session_data] {
if (is_shutting_down) return;
is_shutting_down = 1;
// Server needs to be shutdown first and then the database. This prevents a
// race condition when a transaction is accepted during server shutdown.
server.Shutdown();
session_data.db.Shutdown();
master.Shutdown();
system.Shutdown();
};
InitSignalHandlers(shutdown);
StartMemWarningLogger();
server.Start(FLAGS_num_workers);
server.AwaitShutdown();
}
void WorkerMain() {
@ -162,16 +160,13 @@ void WorkerMain() {
// Wait for the shutdown command from the master.
worker.WaitForShutdown();
}
worker.Shutdown();
system.Shutdown();
}
void SingleNodeMain() {
google::SetUsageMessage("Memgraph single-node database server");
SessionData session_data;
NetworkEndpoint endpoint(FLAGS_interface, FLAGS_port);
ServerT server(endpoint, session_data);
ServerT server(endpoint, session_data, FLAGS_num_workers);
// Handler for regular termination signals
auto shutdown = [&server, &session_data] {
@ -186,7 +181,7 @@ void SingleNodeMain() {
StartMemWarningLogger();
server.Start(FLAGS_num_workers);
server.AwaitShutdown();
}
int main(int argc, char **argv) {

View File

@ -27,19 +27,13 @@ ID_VALUE_RPC_CALLS(Label)
ID_VALUE_RPC_CALLS(EdgeType)
ID_VALUE_RPC_CALLS(Property)
#undef ID_VALUE_RPC
}
} // namespace
template <typename TId>
MasterConcurrentIdMapper<TId>::MasterConcurrentIdMapper(
communication::messaging::System &system)
: rpc_server_(system, kConcurrentIdMapperRpc) {
RegisterRpc(*this, rpc_server_);
rpc_server_.Start();
}
template <typename TId>
MasterConcurrentIdMapper<TId>::~MasterConcurrentIdMapper() {
rpc_server_.Shutdown();
}
template class MasterConcurrentIdMapper<Label>;

View File

@ -14,7 +14,6 @@ template <typename TId>
class MasterConcurrentIdMapper : public SingleNodeConcurrentIdMapper<TId> {
public:
MasterConcurrentIdMapper(communication::messaging::System &system);
~MasterConcurrentIdMapper();
private:
communication::rpc::Server rpc_server_;

View File

@ -11,10 +11,6 @@ namespace tx {
MasterEngine::MasterEngine(durability::WriteAheadLog *wal) : wal_(wal) {}
MasterEngine::~MasterEngine() {
if (rpc_server_) StopServer();
}
Transaction *MasterEngine::Begin() {
std::lock_guard<SpinLock> guard(lock_);
@ -134,13 +130,6 @@ void MasterEngine::StartServer(communication::messaging::System &system) {
rpc_server_->Register<IsActiveRpc>([this](const IsActiveReq &req) {
return std::make_unique<IsActiveRes>(GlobalIsActive(req.member));
});
rpc_server_->Start();
}
void MasterEngine::StopServer() {
CHECK(rpc_server_) << "Can't stop a server that's not running";
rpc_server_->Shutdown();
rpc_server_ = std::experimental::nullopt;
}
} // namespace tx

View File

@ -35,9 +35,6 @@ class MasterEngine : public Engine {
*/
MasterEngine(durability::WriteAheadLog *wal = nullptr);
/** Stops the tx server if it's running. */
~MasterEngine();
/**
* Begins a transaction and returns a pointer to
* it's object.
@ -76,9 +73,6 @@ class MasterEngine : public Engine {
/** Starts the RPC server of the master transactional engine. */
void StartServer(communication::messaging::System &system);
/** Stops the RPC server of the master transactional engine. */
void StopServer();
private:
std::atomic<transaction_id_t> counter_{0};
CommitLog clog_;

View File

@ -76,12 +76,9 @@ TEST(Network, SocketReadHangOnConcurrentConnections) {
// initialize server
TestData data;
communication::Server<TestSession, TestData> server(endpoint, data);
// start server
int N = (std::thread::hardware_concurrency() + 1) / 2;
int Nc = N * 3;
std::thread server_thread([&] { server.Start(N); });
communication::Server<TestSession, TestData> server(endpoint, data, N);
const auto &ep = server.endpoint();
// start clients
@ -95,10 +92,6 @@ TEST(Network, SocketReadHangOnConcurrentConnections) {
// cleanup clients
for (int i = 0; i < Nc; ++i) clients[i].join();
// stop server
server.Shutdown();
server_thread.join();
}
int main(int argc, char **argv) {

View File

@ -18,11 +18,8 @@ TEST(Network, Server) {
// initialize server
TestData session_data;
ServerT server(endpoint, session_data);
// start server
int N = (std::thread::hardware_concurrency() + 1) / 2;
std::thread server_thread([&] { server.Start(N); });
ServerT server(endpoint, session_data, N);
const auto &ep = server.endpoint();
// start clients
@ -33,10 +30,6 @@ TEST(Network, Server) {
// cleanup clients
for (int i = 0; i < N; ++i) clients[i].join();
// stop server
server.Shutdown();
server_thread.join();
}
int main(int argc, char **argv) {

View File

@ -22,10 +22,7 @@ TEST(Network, SessionLeak) {
// initialize server
TestData session_data;
ServerT server(endpoint, session_data);
// start server
std::thread server_thread([&] { server.Start(2); });
ServerT server(endpoint, session_data, 2);
// start clients
int N = 50;
@ -43,10 +40,6 @@ TEST(Network, SessionLeak) {
for (int i = 0; i < N; ++i) clients[i].join();
std::this_thread::sleep_for(2s);
// stop server
server.Shutdown();
server_thread.join();
}
// run with "valgrind --leak-check=full ./network_session_leak" to check for

View File

@ -12,8 +12,8 @@
#include "utils/signals/handler.hpp"
#include "utils/terminate_handler.hpp"
using communication::messaging::System;
using communication::messaging::Message;
using communication::messaging::System;
using namespace communication::rpc;
using namespace std::literals::chrono_literals;
@ -52,7 +52,5 @@ int main(int argc, char **argv) {
}
}
client_system.Shutdown();
return 0;
}

View File

@ -11,8 +11,8 @@
#include "utils/signals/handler.hpp"
#include "utils/terminate_handler.hpp"
using communication::messaging::System;
using communication::messaging::Message;
using communication::messaging::System;
using namespace communication::rpc;
using namespace std::literals::chrono_literals;
@ -37,12 +37,10 @@ int main(int argc, char **argv) {
std::ofstream log(FLAGS_log, std::ios_base::app);
// Handler for regular termination signals.
auto shutdown = [&server, &server_system, &log]() {
auto shutdown = [&log]() {
if (is_shutting_down) return;
is_shutting_down = 1;
log.close();
server.Shutdown();
server_system.Shutdown();
exit(0);
};
@ -70,7 +68,6 @@ int main(int argc, char **argv) {
stol(FLAGS_port));
});
server.Start();
LOG(INFO) << "Raft RPC server started";
// Sleep until shutdown detected.
std::this_thread::sleep_until(

View File

@ -55,7 +55,5 @@ int main(int argc, char *argv[]) {
}
}
my_system.Shutdown();
return 0;
}

View File

@ -27,9 +27,7 @@ class DistributedConcurrentIdMapperTest : public ::testing::Test {
}
void TearDown() override {
worker_mapper_ = std::experimental::nullopt;
worker_system_.Shutdown();
master_mapper_ = std::experimental::nullopt;
master_system_.Shutdown();
}
};
@ -47,7 +45,7 @@ TYPED_TEST(DistributedConcurrentIdMapperTest, Basic) {
auto id2 = worker.value_to_id("v2");
EXPECT_EQ(master.id_to_value(id2), "v2");
EXPECT_EQ(master.value_to_id("v2"),id2);
EXPECT_EQ(master.value_to_id("v2"), id2);
EXPECT_NE(id1, id2);
}

View File

@ -8,7 +8,6 @@ const std::string kLocal = "127.0.0.1";
TEST(CountersDistributed, All) {
communication::messaging::System master_sys(kLocal, 0);
database::MasterCounters master(master_sys);
master.Start();
communication::messaging::System w1_sys(kLocal, 0);
database::WorkerCounters w1(w1_sys, master_sys.endpoint());
@ -26,9 +25,4 @@ TEST(CountersDistributed, All) {
EXPECT_EQ(w2.Get("b"), 1);
w1.Set("b", 42);
EXPECT_EQ(w2.Get("b"), 42);
w2_sys.Shutdown();
w1_sys.Shutdown();
master.Shutdown();
master_sys.Shutdown();
}

View File

@ -26,8 +26,6 @@ class WorkerInThread {
coord_.emplace(*system_, master_endpoint);
worker_id_ = coord_->RegisterWorker(desired_id);
coord_->WaitForShutdown();
coord_->Shutdown();
system_->Shutdown();
});
}
@ -45,48 +43,49 @@ class WorkerInThread {
TEST(Distributed, Coordination) {
System master_system(kLocal, 0);
MasterCoordination master_coord(master_system);
std::vector<std::unique_ptr<WorkerInThread>> workers;
for (int i = 0; i < kWorkerCount; ++i)
workers.emplace_back(
std::make_unique<WorkerInThread>(master_system.endpoint()));
{
MasterCoordination master_coord(master_system);
// Wait till all the workers are safely initialized.
std::this_thread::sleep_for(300ms);
for (int i = 0; i < kWorkerCount; ++i)
workers.emplace_back(
std::make_unique<WorkerInThread>(master_system.endpoint()));
// Expect that all workers have a different ID.
std::unordered_set<int> worker_ids;
for (const auto &w : workers) worker_ids.insert(w->worker_id());
EXPECT_EQ(worker_ids.size(), kWorkerCount);
// Wait till all the workers are safely initialized.
std::this_thread::sleep_for(300ms);
// Check endpoints.
for (auto &w1 : workers) {
for (auto &w2 : workers) {
EXPECT_EQ(w1->worker_endpoint(w2->worker_id()), w2->endpoint());
// Expect that all workers have a different ID.
std::unordered_set<int> worker_ids;
for (const auto &w : workers) worker_ids.insert(w->worker_id());
EXPECT_EQ(worker_ids.size(), kWorkerCount);
// Check endpoints.
for (auto &w1 : workers) {
for (auto &w2 : workers) {
EXPECT_EQ(w1->worker_endpoint(w2->worker_id()), w2->endpoint());
}
}
}
} // Coordinated shutdown.
// Coordinated shutdown.
master_coord.Shutdown();
master_system.Shutdown();
for (auto &worker : workers) worker->join();
}
TEST(Distributed, DesiredAndUniqueId) {
System master_system(kLocal, 0);
MasterCoordination master_coord(master_system);
std::vector<std::unique_ptr<WorkerInThread>> workers;
{
MasterCoordination master_coord(master_system);
WorkerInThread w1(master_system.endpoint(), 42);
std::this_thread::sleep_for(200ms);
WorkerInThread w2(master_system.endpoint(), 42);
std::this_thread::sleep_for(200ms);
workers.emplace_back(
std::make_unique<WorkerInThread>(master_system.endpoint(), 42));
std::this_thread::sleep_for(200ms);
workers.emplace_back(
std::make_unique<WorkerInThread>(master_system.endpoint(), 42));
std::this_thread::sleep_for(200ms);
EXPECT_EQ(w1.worker_id(), 42);
EXPECT_NE(w2.worker_id(), 42);
EXPECT_EQ(workers[0]->worker_id(), 42);
EXPECT_NE(workers[1]->worker_id(), 42);
}
master_coord.Shutdown();
w1.join();
w2.join();
master_system.Shutdown();
for (auto &worker : workers) worker->join();
}

View File

@ -40,13 +40,12 @@ BOOST_CLASS_EXPORT(MessageInt);
#define GET_X(p) dynamic_cast<MessageInt *>((p).get())->x
/**
* Test do the services start up without crashes.
*/
* Test do the services start up without crashes.
*/
TEST(SimpleTests, StartAndShutdown) {
System system("127.0.0.1", 0);
// do nothing
std::this_thread::sleep_for(500ms);
system.Shutdown();
}
TEST(Messaging, Pop) {
@ -60,8 +59,6 @@ TEST(Messaging, Pop) {
EXPECT_EQ(stream->Poll(), nullptr);
writer.Send<MessageInt>(10);
EXPECT_EQ(GET_X(stream->Await()), 10);
master_system.Shutdown();
slave_system.Shutdown();
}
TEST(Messaging, Await) {
@ -82,8 +79,6 @@ TEST(Messaging, Await) {
EXPECT_EQ(stream->Poll(), nullptr);
EXPECT_EQ(stream->Await(), nullptr);
t.join();
master_system.Shutdown();
slave_system.Shutdown();
}
TEST(Messaging, RecreateChannelAfterClosing) {
@ -106,7 +101,4 @@ TEST(Messaging, RecreateChannelAfterClosing) {
EXPECT_EQ(stream->Poll(), nullptr);
writer.Send<MessageInt>(30);
EXPECT_EQ(GET_X(stream->Await()), 30);
master_system.Shutdown();
slave_system.Shutdown();
}

View File

@ -17,10 +17,10 @@ DECLARE_int32(session_inactivity_timeout);
using namespace std::chrono_literals;
class TestClientSocket;
using communication::bolt::ClientException;
using communication::bolt::SessionData;
using io::network::NetworkEndpoint;
using io::network::Socket;
using communication::bolt::SessionData;
using communication::bolt::ClientException;
using SessionT = communication::bolt::Session<Socket>;
using ResultStreamT = SessionT::ResultStreamT;
using ServerT = communication::Server<SessionT, SessionData>;
@ -28,15 +28,9 @@ using ClientT = communication::bolt::Client<Socket>;
class RunningServer {
public:
~RunningServer() {
server_.Shutdown();
server_thread_.join();
}
SessionData session_data_;
NetworkEndpoint endpoint_{"127.0.0.1", "0"};
ServerT server_{endpoint_, session_data_};
std::thread server_thread_{[&] { server_.Start(1); }};
ServerT server_{endpoint_, session_data_, 1};
};
class TestClient : public ClientT {

View File

@ -572,7 +572,7 @@ INSTANTIATE_TEST_CASE_P(
true,
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}, {7}, {7}}},
/* no truncation, partial match between our log and appended entries
*/
*/
OnAppendEntriesTestParam{
8,
{{1}, {1}, {1}, {4}, {4}, {5}},

View File

@ -60,7 +60,6 @@ TEST(Rpc, Call) {
server.Register<Sum>([](const SumReq &request) {
return std::make_unique<SumRes>(request.x + request.y);
});
server.Start();
std::this_thread::sleep_for(100ms);
System client_system("127.0.0.1", 0);
@ -68,10 +67,6 @@ TEST(Rpc, Call) {
"main");
auto sum = client.Call<Sum>(300ms, 10, 20);
EXPECT_EQ(sum->sum, 30);
server.Shutdown();
server_system.Shutdown();
client_system.Shutdown();
}
TEST(Rpc, Timeout) {
@ -81,7 +76,6 @@ TEST(Rpc, Timeout) {
std::this_thread::sleep_for(300ms);
return std::make_unique<SumRes>(request.x + request.y);
});
server.Start();
std::this_thread::sleep_for(100ms);
System client_system("127.0.0.1", 0);
@ -89,8 +83,4 @@ TEST(Rpc, Timeout) {
"main");
auto sum = client.Call<Sum>(100ms, 10, 20);
EXPECT_FALSE(sum);
server.Shutdown();
server_system.Shutdown();
client_system.Shutdown();
}

View File

@ -22,11 +22,6 @@ class WorkerEngineTest : public testing::Test {
WorkerEngine worker_{worker_system_, master_system_.endpoint()};
void SetUp() override { master_.StartServer(master_system_); }
void TearDown() override {
worker_system_.Shutdown();
master_.StopServer();
master_system_.Shutdown();
}
};
TEST_F(WorkerEngineTest, LocalBegin) {