Add explicit start to servers

Reviewers: teon.banek

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1661
This commit is contained in:
Matej Ferencevic 2018-10-16 10:58:41 +02:00
parent 285e02d5ec
commit cdaf7581bf
30 changed files with 475 additions and 348 deletions

View File

@ -44,45 +44,11 @@ class Listener final {
int inactivity_timeout_sec, const std::string &service_name,
size_t workers_count)
: data_(data),
alive_(true),
alive_(false),
context_(context),
inactivity_timeout_sec_(inactivity_timeout_sec),
service_name_(service_name) {
std::cout << "Starting " << workers_count << " " << service_name_
<< " workers" << std::endl;
for (size_t i = 0; i < workers_count; ++i) {
worker_threads_.emplace_back([this, service_name, i]() {
utils::ThreadSetName(fmt::format("{} worker {}", service_name, i + 1));
while (alive_) {
WaitAndProcessEvents();
}
});
}
if (inactivity_timeout_sec_ > 0) {
timeout_thread_ = std::thread([this, service_name]() {
utils::ThreadSetName(fmt::format("{} timeout", service_name));
while (alive_) {
{
std::lock_guard<utils::SpinLock> guard(lock_);
for (auto &session : sessions_) {
if (session->TimedOut()) {
LOG(WARNING) << service_name << " session associated with "
<< session->socket().endpoint() << " timed out.";
// Here we shutdown the socket to terminate any leftover
// blocking `Write` calls and to signal an event that the
// session is closed. Session cleanup will be done in the event
// process function.
session->socket().Shutdown();
}
}
}
// TODO (mferencevic): Should this be configurable?
std::this_thread::sleep_for(std::chrono::seconds(1));
}
});
}
}
service_name_(service_name),
workers_count_(workers_count) {}
~Listener() {
bool worker_alive = false;
@ -124,6 +90,51 @@ class Listener final {
sessions_.back().get());
}
/**
* This function starts the listener
*/
void Start() {
CHECK(!alive_) << "The listener is already started!";
alive_.store(true);
std::cout << "Starting " << workers_count_ << " " << service_name_
<< " workers" << std::endl;
std::string service_name(service_name_);
for (size_t i = 0; i < workers_count_; ++i) {
worker_threads_.emplace_back([this, service_name, i]() {
utils::ThreadSetName(fmt::format("{} worker {}", service_name, i + 1));
while (alive_) {
WaitAndProcessEvents();
}
});
}
if (inactivity_timeout_sec_ > 0) {
timeout_thread_ = std::thread([this, service_name]() {
utils::ThreadSetName(fmt::format("{} timeout", service_name));
while (alive_) {
{
std::lock_guard<utils::SpinLock> guard(lock_);
for (auto &session : sessions_) {
if (session->TimedOut()) {
LOG(WARNING) << service_name << " session associated with "
<< session->socket().endpoint() << " timed out.";
// Here we shutdown the socket to terminate any leftover
// blocking `Write` calls and to signal an event that the
// session is closed. Session cleanup will be done in the event
// process function.
session->socket().Shutdown();
}
}
}
// TODO (mferencevic): Should this be configurable?
std::this_thread::sleep_for(std::chrono::seconds(1));
}
});
}
}
/**
* This function starts a graceful shutdown of the listener.
*/
@ -261,5 +272,6 @@ class Listener final {
ServerContext *context_;
const int inactivity_timeout_sec_;
const std::string service_name_;
const size_t workers_count_;
};
} // namespace communication

View File

@ -6,6 +6,10 @@ Server::Server(const io::network::Endpoint &endpoint,
size_t workers_count)
: server_(endpoint, this, &context_, -1, "RPC", workers_count) {}
bool Server::Start() {
return server_.Start();
}
void Server::Shutdown() {
server_.Shutdown();
}

View File

@ -25,6 +25,7 @@ class Server {
Server &operator=(const Server &) = delete;
Server &operator=(Server &&) = delete;
bool Start();
void Shutdown();
void AwaitShutdown();

View File

@ -50,34 +50,11 @@ class Server final {
ServerContext *context, int inactivity_timeout_sec,
const std::string &service_name,
size_t workers_count = std::thread::hardware_concurrency())
: listener_(session_data, context, inactivity_timeout_sec, service_name,
: alive_(false),
endpoint_(endpoint),
listener_(session_data, context, inactivity_timeout_sec, service_name,
workers_count),
service_name_(service_name) {
// Without server we can't continue with application so we can just
// terminate here.
if (!socket_.Bind(endpoint)) {
LOG(FATAL) << "Cannot bind to socket on " << endpoint;
}
socket_.SetTimeout(1, 0);
if (!socket_.Listen(1024)) {
LOG(FATAL) << "Cannot listen on socket!";
}
thread_ = std::thread([this, service_name]() {
utils::ThreadSetName(fmt::format("{} server", service_name));
std::cout << service_name << " server is fully armed and operational"
<< std::endl;
std::cout << service_name << " listening on " << socket_.endpoint()
<< std::endl;
while (alive_) {
AcceptConnection();
}
std::cout << service_name << " shutting down..." << std::endl;
});
}
service_name_(service_name) {}
~Server() {
CHECK(!alive_ && !thread_.joinable()) << "You should call Shutdown and "
@ -90,9 +67,50 @@ class Server final {
Server &operator=(const Server &) = delete;
Server &operator=(Server &&) = delete;
const auto &endpoint() const { return socket_.endpoint(); }
const auto &endpoint() const {
CHECK(alive_) << "You can't get the server endpoint when it's not running!";
return socket_.endpoint();
}
/// Stops server manually
/// Starts the server
bool Start() {
CHECK(!alive_) << "The server was already started!";
alive_.store(true);
if (!socket_.Bind(endpoint_)) {
LOG(ERROR) << "Cannot bind to socket on " << endpoint_;
alive_.store(false);
return false;
}
socket_.SetTimeout(1, 0);
if (!socket_.Listen(1024)) {
LOG(ERROR) << "Cannot listen on socket!";
alive_.store(false);
return false;
}
listener_.Start();
std::string service_name(service_name_);
thread_ = std::thread([this, service_name]() {
utils::ThreadSetName(fmt::format("{} server", service_name));
std::cout << service_name_ << " server is fully armed and operational"
<< std::endl;
std::cout << service_name_ << " listening on " << socket_.endpoint()
<< std::endl;
while (alive_) {
AcceptConnection();
}
std::cout << service_name << " shutting down..." << std::endl;
});
return true;
}
/// Signals the server to start shutting down
void Shutdown() {
// This should be as simple as possible, so that it can be called inside a
// signal handler.
@ -122,10 +140,11 @@ class Server final {
listener_.AddConnection(std::move(*s));
}
std::atomic<bool> alive_{true};
std::atomic<bool> alive_;
std::thread thread_;
Socket socket_;
io::network::Endpoint endpoint_;
Listener<TSession, TSessionData> listener_;
const std::string service_name_;

View File

@ -651,108 +651,6 @@ Master::Master(Config config)
impl_->tx_engine_.RegisterForTransactionalCacheCleanup(
impl_->updates_server_);
impl_->tx_engine_.RegisterForTransactionalCacheCleanup(impl_->data_manager_);
// Start transactional cache cleanup.
impl_->tx_engine_.StartTransactionalCacheCleanup();
if (impl_->config_.durability_enabled)
utils::CheckDir(impl_->config_.durability_directory);
// Durability recovery.
{
// What we recover.
std::experimental::optional<durability::RecoveryInfo> recovery_info;
durability::RecoveryData recovery_data;
// Recover only if necessary.
if (impl_->config_.db_recover_on_startup) {
CHECK(durability::VersionConsistency(impl_->config_.durability_directory))
<< "Contents of durability directory are not compatible with the "
"current version of Memgraph binary!";
recovery_info = durability::RecoverOnlySnapshot(
impl_->config_.durability_directory, this, &recovery_data,
std::experimental::nullopt, config.worker_id);
}
// Post-recovery setup and checking.
impl_->coordination_.SetRecoveredSnapshot(
recovery_info ? std::experimental::make_optional(
std::make_pair(recovery_info->durability_version,
recovery_info->snapshot_tx_id))
: std::experimental::nullopt);
// Wait till workers report back their recoverable wal txs
if (recovery_info) {
CHECK(impl_->config_.recovering_cluster_size > 0)
<< "Invalid cluster recovery size flag. Recovered cluster size "
"should be at least 1";
while (impl_->coordination_.CountRecoveredWorkers() !=
impl_->config_.recovering_cluster_size - 1) {
LOG(INFO) << "Waiting for workers to finish recovering..";
std::this_thread::sleep_for(2s);
}
// Get the intersection of recoverable transactions from wal on
// workers and on master
recovery_data.wal_tx_to_recover =
impl_->coordination_.CommonWalTransactions(*recovery_info);
MasterRecoveryTransactions recovery_transactions(this);
durability::RecoverWal(impl_->config_.durability_directory, this,
&recovery_data, &recovery_transactions);
durability::RecoverIndexes(this, recovery_data.indexes);
auto workers_recovered_wal =
impl_->durability_rpc_.RecoverWalAndIndexes(&recovery_data);
workers_recovered_wal.get();
}
impl_->dynamic_worker_addition_.Enable();
}
// Start the dynamic graph partitioner inside token sharing server
if (impl_->config_.dynamic_graph_partitioner_enabled) {
impl_->token_sharing_server_.Start();
}
if (impl_->config_.durability_enabled) {
// move any existing snapshots or wal files to a deprecated folder.
if (!impl_->config_.db_recover_on_startup &&
durability::ContainsDurabilityFiles(
impl_->config_.durability_directory)) {
durability::MoveToBackup(impl_->config_.durability_directory);
LOG(WARNING) << "Since Memgraph was not supposed to recover on startup "
"and durability is enabled, your current durability "
"files will likely be overriden. To prevent important "
"data loss, Memgraph has stored those files into a "
".backup directory inside durability directory";
}
impl_->wal_.Init();
snapshot_creator_ = std::make_unique<utils::Scheduler>();
snapshot_creator_->Run(
"Snapshot", std::chrono::seconds(impl_->config_.snapshot_cycle_sec),
[this] {
auto dba = this->Access();
MakeSnapshot(*dba);
});
}
// Start transaction killer.
if (impl_->config_.query_execution_time_sec != -1) {
transaction_killer_.Run(
"TX killer",
std::chrono::seconds(std::max(
1, std::min(5, impl_->config_.query_execution_time_sec / 4))),
[this]() {
impl_->tx_engine_.LocalForEachActiveTransaction(
[this](tx::Transaction &t) {
if (t.creation_time() +
std::chrono::seconds(
impl_->config_.query_execution_time_sec) <
std::chrono::steady_clock::now()) {
t.set_should_abort();
};
});
});
}
}
Master::~Master() {}
@ -839,6 +737,113 @@ io::network::Endpoint Master::GetEndpoint(int worker_id) {
return impl_->coordination_.GetEndpoint(worker_id);
}
void Master::Start() {
// Start coordination.
CHECK(impl_->coordination_.Start()) << "Couldn't start master coordination!";
// Start transactional cache cleanup.
impl_->tx_engine_.StartTransactionalCacheCleanup();
if (impl_->config_.durability_enabled)
utils::CheckDir(impl_->config_.durability_directory);
// Durability recovery.
{
// What we recover.
std::experimental::optional<durability::RecoveryInfo> recovery_info;
durability::RecoveryData recovery_data;
// Recover only if necessary.
if (impl_->config_.db_recover_on_startup) {
CHECK(durability::VersionConsistency(impl_->config_.durability_directory))
<< "Contents of durability directory are not compatible with the "
"current version of Memgraph binary!";
recovery_info = durability::RecoverOnlySnapshot(
impl_->config_.durability_directory, this, &recovery_data,
std::experimental::nullopt, impl_->config_.worker_id);
}
// Post-recovery setup and checking.
impl_->coordination_.SetRecoveredSnapshot(
recovery_info ? std::experimental::make_optional(
std::make_pair(recovery_info->durability_version,
recovery_info->snapshot_tx_id))
: std::experimental::nullopt);
// Wait till workers report back their recoverable wal txs
if (recovery_info) {
CHECK(impl_->config_.recovering_cluster_size > 0)
<< "Invalid cluster recovery size flag. Recovered cluster size "
"should be at least 1";
while (impl_->coordination_.CountRecoveredWorkers() !=
impl_->config_.recovering_cluster_size - 1) {
LOG(INFO) << "Waiting for workers to finish recovering..";
std::this_thread::sleep_for(2s);
}
// Get the intersection of recoverable transactions from wal on
// workers and on master
recovery_data.wal_tx_to_recover =
impl_->coordination_.CommonWalTransactions(*recovery_info);
MasterRecoveryTransactions recovery_transactions(this);
durability::RecoverWal(impl_->config_.durability_directory, this,
&recovery_data, &recovery_transactions);
durability::RecoverIndexes(this, recovery_data.indexes);
auto workers_recovered_wal =
impl_->durability_rpc_.RecoverWalAndIndexes(&recovery_data);
workers_recovered_wal.get();
}
impl_->dynamic_worker_addition_.Enable();
}
// Start the dynamic graph partitioner inside token sharing server
if (impl_->config_.dynamic_graph_partitioner_enabled) {
impl_->token_sharing_server_.Start();
}
if (impl_->config_.durability_enabled) {
// move any existing snapshots or wal files to a deprecated folder.
if (!impl_->config_.db_recover_on_startup &&
durability::ContainsDurabilityFiles(
impl_->config_.durability_directory)) {
durability::MoveToBackup(impl_->config_.durability_directory);
LOG(WARNING) << "Since Memgraph was not supposed to recover on startup "
"and durability is enabled, your current durability "
"files will likely be overriden. To prevent important "
"data loss, Memgraph has stored those files into a "
".backup directory inside durability directory";
}
impl_->wal_.Init();
snapshot_creator_ = std::make_unique<utils::Scheduler>();
snapshot_creator_->Run(
"Snapshot", std::chrono::seconds(impl_->config_.snapshot_cycle_sec),
[this] {
auto dba = this->Access();
MakeSnapshot(*dba);
});
}
// Start transaction killer.
if (impl_->config_.query_execution_time_sec != -1) {
transaction_killer_.Run(
"TX killer",
std::chrono::seconds(std::max(
1, std::min(5, impl_->config_.query_execution_time_sec / 4))),
[this]() {
impl_->tx_engine_.LocalForEachActiveTransaction(
[this](tx::Transaction &t) {
if (t.creation_time() +
std::chrono::seconds(
impl_->config_.query_execution_time_sec) <
std::chrono::steady_clock::now()) {
t.set_should_abort();
};
});
});
}
}
bool Master::AwaitShutdown(std::function<void(void)> call_before_shutdown) {
bool ret =
impl_->coordination_.AwaitShutdown(
@ -973,11 +978,8 @@ class Worker {
DistributedVertexAccessor vertex_accessor_{config_.worker_id, &data_manager_,
&updates_clients_};
explicit Worker(const Config &config, database::Worker *self)
: config_(config), self_(self) {
cluster_discovery_.RegisterWorker(config.worker_id,
config.durability_directory);
}
Worker(const Config &config, database::Worker *self)
: config_(config), self_(self) {}
// TODO: Some things may depend on order of construction/destruction. We also
// have a lot of circular pointers among members. It would be a good idea to
@ -1028,88 +1030,6 @@ Worker::Worker(Config config)
impl_->tx_engine_.RegisterForTransactionalCacheCleanup(impl_->data_manager_);
impl_->tx_engine_.RegisterForTransactionalCacheCleanup(
impl_->produce_server_);
// Start transactional cache cleanup.
impl_->tx_engine_.StartTransactionalCacheCleanup();
if (impl_->config_.durability_enabled)
utils::CheckDir(impl_->config_.durability_directory);
// Durability recovery. We need to check this flag for workers that are added
// after the "main" cluster recovery.
if (impl_->config_.db_recover_on_startup) {
// What we should recover (version, transaction_id) pair.
auto snapshot_to_recover = impl_->cluster_discovery_.snapshot_to_recover();
// What we recover.
std::experimental::optional<durability::RecoveryInfo> recovery_info;
durability::RecoveryData recovery_data;
// Recover only if necessary.
if (snapshot_to_recover) {
// check version consistency.
if (!durability::DistributedVersionConsistency(
snapshot_to_recover->first))
LOG(FATAL) << "Memgraph worker failed to recover due to version "
"inconsistency with the master.";
if (!durability::VersionConsistency(impl_->config_.durability_directory))
LOG(FATAL)
<< "Contents of durability directory are not compatible with the "
"current version of Memgraph binary!";
recovery_info = durability::RecoverOnlySnapshot(
impl_->config_.durability_directory, this, &recovery_data,
snapshot_to_recover->second, config.worker_id);
}
// Post-recovery setup and checking.
if (snapshot_to_recover &&
(!recovery_info ||
snapshot_to_recover->second != recovery_info->snapshot_tx_id))
LOG(FATAL) << "Memgraph worker failed to recover the database state "
"recovered on the master";
impl_->cluster_discovery_.NotifyWorkerRecovered(recovery_info);
} else {
// Check with master if we're a dynamically added worker and need to update
// our indices.
auto indexes = impl_->dynamic_worker_registration_.GetIndicesToCreate();
if (!indexes.empty()) {
durability::RecoverIndexes(this, indexes);
}
}
if (impl_->config_.durability_enabled) {
// move any existing snapshots or wal files to a deprecated folder.
if (!impl_->config_.db_recover_on_startup &&
durability::ContainsDurabilityFiles(
impl_->config_.durability_directory)) {
durability::MoveToBackup(impl_->config_.durability_directory);
LOG(WARNING) << "Since Memgraph was not supposed to recover on startup "
"and durability is enabled, your current durability "
"files will likely be overriden. To prevent important "
"data loss, Memgraph has stored those files into a "
".backup directory inside durability directory";
}
impl_->wal_.Init();
}
// Start transaction killer.
if (impl_->config_.query_execution_time_sec != -1) {
transaction_killer_.Run(
"TX killer",
std::chrono::seconds(std::max(
1, std::min(5, impl_->config_.query_execution_time_sec / 4))),
[this]() {
impl_->tx_engine_.LocalForEachActiveTransaction(
[this](tx::Transaction &t) {
if (t.creation_time() +
std::chrono::seconds(
impl_->config_.query_execution_time_sec) <
std::chrono::steady_clock::now()) {
t.set_should_abort();
};
});
});
}
}
Worker::~Worker() {}
@ -1192,6 +1112,97 @@ io::network::Endpoint Worker::GetEndpoint(int worker_id) {
return impl_->coordination_.GetEndpoint(worker_id);
}
void Worker::Start() {
// Start coordination.
CHECK(impl_->coordination_.Start()) << "Couldn't start worker coordination!";
// Register to the master.
impl_->cluster_discovery_.RegisterWorker(impl_->config_.worker_id,
impl_->config_.durability_directory);
// Start transactional cache cleanup.
impl_->tx_engine_.StartTransactionalCacheCleanup();
if (impl_->config_.durability_enabled)
utils::CheckDir(impl_->config_.durability_directory);
// Durability recovery. We need to check this flag for workers that are added
// after the "main" cluster recovery.
if (impl_->config_.db_recover_on_startup) {
// What we should recover (version, transaction_id) pair.
auto snapshot_to_recover = impl_->cluster_discovery_.snapshot_to_recover();
// What we recover.
std::experimental::optional<durability::RecoveryInfo> recovery_info;
durability::RecoveryData recovery_data;
// Recover only if necessary.
if (snapshot_to_recover) {
// check version consistency.
if (!durability::DistributedVersionConsistency(
snapshot_to_recover->first))
LOG(FATAL) << "Memgraph worker failed to recover due to version "
"inconsistency with the master.";
if (!durability::VersionConsistency(impl_->config_.durability_directory))
LOG(FATAL)
<< "Contents of durability directory are not compatible with the "
"current version of Memgraph binary!";
recovery_info = durability::RecoverOnlySnapshot(
impl_->config_.durability_directory, this, &recovery_data,
snapshot_to_recover->second, impl_->config_.worker_id);
}
// Post-recovery setup and checking.
if (snapshot_to_recover &&
(!recovery_info ||
snapshot_to_recover->second != recovery_info->snapshot_tx_id))
LOG(FATAL) << "Memgraph worker failed to recover the database state "
"recovered on the master";
impl_->cluster_discovery_.NotifyWorkerRecovered(recovery_info);
} else {
// Check with master if we're a dynamically added worker and need to update
// our indices.
auto indexes = impl_->dynamic_worker_registration_.GetIndicesToCreate();
if (!indexes.empty()) {
durability::RecoverIndexes(this, indexes);
}
}
if (impl_->config_.durability_enabled) {
// move any existing snapshots or wal files to a deprecated folder.
if (!impl_->config_.db_recover_on_startup &&
durability::ContainsDurabilityFiles(
impl_->config_.durability_directory)) {
durability::MoveToBackup(impl_->config_.durability_directory);
LOG(WARNING) << "Since Memgraph was not supposed to recover on startup "
"and durability is enabled, your current durability "
"files will likely be overriden. To prevent important "
"data loss, Memgraph has stored those files into a "
".backup directory inside durability directory";
}
impl_->wal_.Init();
}
// Start transaction killer.
if (impl_->config_.query_execution_time_sec != -1) {
transaction_killer_.Run(
"TX killer",
std::chrono::seconds(std::max(
1, std::min(5, impl_->config_.query_execution_time_sec / 4))),
[this]() {
impl_->tx_engine_.LocalForEachActiveTransaction(
[this](tx::Transaction &t) {
if (t.creation_time() +
std::chrono::seconds(
impl_->config_.query_execution_time_sec) <
std::chrono::steady_clock::now()) {
t.set_should_abort();
};
});
});
}
}
bool Worker::AwaitShutdown(std::function<void(void)> call_before_shutdown) {
bool ret = impl_->coordination_.AwaitShutdown(
[this, &call_before_shutdown](bool is_cluster_alive) -> bool {

View File

@ -66,6 +66,8 @@ class Master final : public DistributedGraphDb {
/** Gets the endpoint of the worker with the given id. */
// TODO make const once Coordination::GetEndpoint is const.
io::network::Endpoint GetEndpoint(int worker_id);
void Start();
bool AwaitShutdown(std::function<void(void)> call_before_shutdown = [] {});
void Shutdown();
@ -113,6 +115,8 @@ class Worker final : public DistributedGraphDb {
/** Gets the endpoint of the worker with the given id. */
// TODO make const once Coordination::GetEndpoint is const.
io::network::Endpoint GetEndpoint(int worker_id);
void Start();
bool AwaitShutdown(std::function<void(void)> call_before_shutdown = [] {});
void Shutdown();

View File

@ -13,18 +13,16 @@ Coordination::Coordination(const io::network::Endpoint &worker_endpoint,
: server_(worker_endpoint, server_workers_count),
worker_id_(worker_id),
thread_pool_(client_workers_count, "RPC client") {
// The master endpoint should be added to workers and not to the master.
if (worker_id != 0) {
// The master is always worker 0.
// TODO (mferencevic): Strictly speaking, this isn't correct when creating a
// `CoordinationMaster` because the supplied `master_endpoint` is supplied
// before the server starts listening on the address. Eg. if `0.0.0.0:0` is
// supplied that should be first resolved by the server when it binds to
// that address and `server_.endpoint()` should be used. Currently,
// `workers_[0]` isn't used on the master so all is well.
// We only emplace the master endpoint when this instance isn't the
// `MasterCoordination`. This is because we don't know the exact master
// endpoint until the master server is started. The `MasterCoordination`
// will emplace the master endpoint when the server is started. Eg. if
// `0.0.0.0:0` is supplied as the master endpoint that should be first
// resolved by the server when it binds to that address and
// `server_.endpoint()` should be used.
workers_.emplace(0, master_endpoint);
} else {
workers_.emplace(0, server_.endpoint());
}
}

View File

@ -21,11 +21,7 @@ MasterCoordination::MasterCoordination(const Endpoint &master_endpoint,
int server_workers_count,
int client_workers_count)
: Coordination(master_endpoint, 0, {}, server_workers_count,
client_workers_count) {
// TODO (mferencevic): Move this to an explicit `Start` method.
scheduler_.Run("Heartbeat", std::chrono::seconds(kHeartbeatIntervalSeconds),
[this] { IssueHeartbeats(); });
}
client_workers_count) {}
MasterCoordination::~MasterCoordination() {
CHECK(!alive_) << "You must call Shutdown and AwaitShutdown on "
@ -118,6 +114,14 @@ std::vector<tx::TransactionId> MasterCoordination::CommonWalTransactions(
return tx_intersection;
}
bool MasterCoordination::Start() {
if (!server_.Start()) return false;
AddWorker(0, server_.endpoint());
scheduler_.Run("Heartbeat", std::chrono::seconds(kHeartbeatIntervalSeconds),
[this] { IssueHeartbeats(); });
return true;
}
bool MasterCoordination::AwaitShutdown(
std::function<bool(bool)> call_before_shutdown) {
// Wait for a shutdown notification.

View File

@ -60,6 +60,9 @@ class MasterCoordination final : public Coordination {
std::vector<tx::TransactionId> CommonWalTransactions(
const durability::RecoveryInfo &master_info) const;
/// Starts the coordination and its servers.
bool Start();
/// Waits while the cluster is in a valid state or the `Shutdown` method is
/// called (suitable for use with signal handlers). Blocks the calling thread
/// until that has finished.

View File

@ -70,6 +70,10 @@ void WorkerCoordination::RegisterWorker(int worker_id,
AddWorker(worker_id, endpoint);
}
bool WorkerCoordination::Start() {
return server_.Start();
}
bool WorkerCoordination::AwaitShutdown(
std::function<bool(bool)> call_before_shutdown) {
// Wait for a shutdown notification.

View File

@ -30,6 +30,9 @@ class WorkerCoordination final : public Coordination {
/// Registers the worker with the given endpoint.
void RegisterWorker(int worker_id, io::network::Endpoint endpoint);
/// Starts the coordination and its servers.
bool Start();
/// Starts listening for a remote shutdown command (issued by the master) or
/// for the `Shutdown` method to be called (suitable for use with signal
/// handlers). Blocks the calling thread until that has finished.

View File

@ -100,6 +100,7 @@ void SingleNodeMain() {
};
InitSignalHandlers(shutdown);
CHECK(server.Start()) << "Couldn't start the Bolt server!";
server.AwaitShutdown();
}

View File

@ -98,6 +98,12 @@ void MasterMain() {
InitSignalHandlers(shutdown);
// Start the Bolt server.
CHECK(server.Start()) << "Couldn't start the Bolt server!";
// Start the database.
db.Start();
// The return code of `AwaitShutdown` is ignored because we want the database
// to exit cleanly no matter what.
db.AwaitShutdown([&server] {
@ -121,6 +127,9 @@ void WorkerMain() {
InitSignalHandlers(shutdown);
// Start the database.
db.Start();
// The return code of `AwaitShutdown` is ignored because we want the database
// to exit cleanly no matter what.
db.AwaitShutdown();

View File

@ -67,6 +67,7 @@ TEST(Network, SocketReadHangOnConcurrentConnections) {
communication::ServerContext context;
communication::Server<TestSession, TestData> server(endpoint, &data, &context,
-1, "Test", N);
ASSERT_TRUE(server.Start());
const auto &ep = server.endpoint();
// start clients

View File

@ -23,6 +23,7 @@ TEST(Network, Server) {
int N = (std::thread::hardware_concurrency() + 1) / 2;
ContextT context;
ServerT server(endpoint, &session_data, &context, -1, "Test", N);
ASSERT_TRUE(server.Start());
const auto &ep = server.endpoint();
// start clients

View File

@ -24,6 +24,7 @@ TEST(Network, SessionLeak) {
TestData session_data;
ContextT context;
ServerT server(endpoint, &session_data, &context, -1, "Test", 2);
ASSERT_TRUE(server.Start());
// start clients
int N = 50;

View File

@ -55,6 +55,7 @@ int main(int argc, char **argv) {
FLAGS_server_verify_peer);
communication::Server<EchoSession, EchoData> server(
{"127.0.0.1", 0}, &echo_data, &server_context, -1, "SSL", 1);
server.Start();
// Initialize the client.
communication::ClientContext client_context(FLAGS_client_key_file,

View File

@ -22,8 +22,10 @@ namespace fs = std::experimental::filesystem;
class WorkerInThread {
public:
explicit WorkerInThread(database::Config config) : worker_(config) {
thread_ =
std::thread([this, config] { EXPECT_TRUE(worker_.AwaitShutdown()); });
thread_ = std::thread([this, config] {
worker_.Start();
EXPECT_TRUE(worker_.AwaitShutdown());
});
}
~WorkerInThread() {
@ -52,6 +54,7 @@ class Cluster {
master_ = std::make_unique<database::Master>(master_config);
interpreter_ =
std::make_unique<query::DistributedInterpreter>(master_.get());
master_->Start();
std::this_thread::sleep_for(kInitTime);
auto worker_config = [this](int worker_id) {

View File

@ -42,6 +42,7 @@ int main(int argc, char *argv[]) {
{ShowDistributedCommand, 1,
"Show the Nth plan as for distributed execution"});
database::Master db;
db.Start();
auto dba = db.Access();
RunInteractivePlanning(dba.get());
db.Shutdown();

View File

@ -27,8 +27,10 @@ const std::string kLocal = "127.0.0.1";
class WorkerInThread {
public:
explicit WorkerInThread(database::Config config) : worker_(config) {
thread_ =
std::thread([this, config] { EXPECT_TRUE(worker_.AwaitShutdown()); });
thread_ = std::thread([this, config] {
worker_.Start();
EXPECT_TRUE(worker_.AwaitShutdown());
});
}
~WorkerInThread() {
@ -60,6 +62,7 @@ int main(int argc, char *argv[]) {
// Flag needs to be updated due to props on disk storage.
FLAGS_durability_directory = GetDurabilityDirectory(tmp_dir, 0);
auto master = std::make_unique<database::Master>(master_config);
master->Start();
// Allow the master to get initialized before making workers.
std::this_thread::sleep_for(std::chrono::milliseconds(250));

View File

@ -65,6 +65,7 @@ int main(int argc, char **argv) {
FLAGS_ca_file, FLAGS_verify_peer);
communication::Server<EchoSession, EchoData> server(endpoint, &echo_data,
&context, -1, "SSL", 1);
server.Start();
while (echo_data.alive) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));

View File

@ -23,6 +23,7 @@ class DistributedConcurrentIdMapperTest : public ::testing::Test {
worker_mapper_;
void SetUp() override {
coordination_.Start();
master_client_pool_.emplace(coordination_.GetServerEndpoint());
master_mapper_.emplace(&coordination_);
worker_mapper_.emplace(&master_client_pool_.value());

View File

@ -9,6 +9,8 @@
TEST(CountersDistributed, All) {
TestMasterCoordination coordination;
database::MasterCounters master(&coordination);
coordination.Start();
communication::rpc::ClientPool master_client_pool(
coordination.GetServerEndpoint());

View File

@ -20,8 +20,10 @@ namespace fs = std::experimental::filesystem;
class WorkerInThread {
public:
explicit WorkerInThread(database::Config config) : worker_(config) {
thread_ =
std::thread([this, config] { EXPECT_TRUE(worker_.AwaitShutdown()); });
thread_ = std::thread([this, config] {
worker_.Start();
EXPECT_TRUE(worker_.AwaitShutdown());
});
}
~WorkerInThread() {
@ -59,6 +61,7 @@ class DistributedGraphDbTest : public ::testing::Test {
// TODO (buda): Fix sometime in the future - not mission critical.
master_config.recovering_cluster_size = 1;
master_ = std::make_unique<database::Master>(modify_config(master_config));
master_->Start();
std::this_thread::sleep_for(kInitTime);
auto worker_config = [this](int worker_id) {
@ -183,9 +186,9 @@ class Cluster {
// Flag needs to be updated due to props on disk storage.
FLAGS_durability_directory = GetDurabilityDirectory(0);
auto master_tmp = std::make_unique<database::Master>(master_config);
auto master_endpoint = master_tmp->endpoint();
master_ = std::move(master_tmp);
master_ = std::make_unique<database::Master>(master_config);
master_->Start();
auto master_endpoint = master_->endpoint();
const auto kInitTime = 200ms;
std::this_thread::sleep_for(kInitTime);

View File

@ -40,6 +40,7 @@ class WorkerCoordinationInThread {
worker_thread_ = std::thread(
[this, master_endpoint, durability_directory, desired_id, &init_done] {
worker.emplace(master_endpoint, desired_id);
ASSERT_TRUE(worker->coord.Start());
worker->discovery.RegisterWorker(desired_id, durability_directory);
init_done = true;
// We don't shutdown the worker coordination here because it will be
@ -88,6 +89,7 @@ TEST_F(Distributed, Coordination) {
std::vector<std::unique_ptr<WorkerCoordinationInThread>> workers;
MasterCoordination master_coord({kLocal, 0});
ASSERT_TRUE(master_coord.Start());
master_coord.SetRecoveredSnapshot(std::experimental::nullopt);
ClusterDiscoveryMaster master_discovery_(&master_coord, tmp_dir("master"));
@ -118,6 +120,7 @@ TEST_F(Distributed, DesiredAndUniqueId) {
std::vector<std::unique_ptr<WorkerCoordinationInThread>> workers;
MasterCoordination master_coord({kLocal, 0});
ASSERT_TRUE(master_coord.Start());
master_coord.SetRecoveredSnapshot(std::experimental::nullopt);
ClusterDiscoveryMaster master_discovery_(&master_coord, tmp_dir("master"));
@ -140,6 +143,7 @@ TEST_F(Distributed, CoordinationWorkersId) {
std::vector<std::unique_ptr<WorkerCoordinationInThread>> workers;
MasterCoordination master_coord({kLocal, 0});
ASSERT_TRUE(master_coord.Start());
master_coord.SetRecoveredSnapshot(std::experimental::nullopt);
ClusterDiscoveryMaster master_discovery_(&master_coord, tmp_dir("master"));
@ -165,6 +169,7 @@ TEST_F(Distributed, ClusterDiscovery) {
std::vector<std::unique_ptr<WorkerCoordinationInThread>> workers;
MasterCoordination master_coord({kLocal, 0});
ASSERT_TRUE(master_coord.Start());
master_coord.SetRecoveredSnapshot(std::experimental::nullopt);
ClusterDiscoveryMaster master_discovery_(&master_coord, tmp_dir("master"));
std::vector<int> ids;
@ -195,6 +200,7 @@ TEST_F(Distributed, KeepsTrackOfRecovered) {
std::vector<std::unique_ptr<WorkerCoordinationInThread>> workers;
MasterCoordination master_coord({kLocal, 0});
ASSERT_TRUE(master_coord.Start());
master_coord.SetRecoveredSnapshot(std::experimental::nullopt);
ClusterDiscoveryMaster master_discovery_(&master_coord, tmp_dir("master"));
int worker_count = 10;

View File

@ -24,6 +24,7 @@ class DistributedDynamicWorker : public ::testing::Test {
FLAGS_durability_directory = GetDurabilityDirectory(0);
auto master =
std::make_unique<database::Master>(modify_config(master_config));
master->Start();
std::this_thread::sleep_for(200ms);
return master;
}

View File

@ -61,6 +61,7 @@ TEST(NetworkTimeouts, InactiveSession) {
communication::ServerContext context;
communication::Server<TestSession, TestData> server{
{"127.0.0.1", 0}, &test_data, &context, 2, "Test", 1};
ASSERT_TRUE(server.Start());
// Create the client and connect to the server.
io::network::Socket client;
@ -92,6 +93,7 @@ TEST(NetworkTimeouts, ActiveSession) {
communication::ServerContext context;
communication::Server<TestSession, TestData> server{
{"127.0.0.1", 0}, &test_data, &context, 2, "Test", 1};
ASSERT_TRUE(server.Start());
// Create the client and connect to the server.
io::network::Socket client;

View File

@ -93,6 +93,7 @@ TEST(Rpc, Call) {
SumRes res(req.x + req.y);
Save(res, res_builder);
});
ASSERT_TRUE(server.Start());
std::this_thread::sleep_for(100ms);
Client client(server.endpoint());
@ -112,6 +113,7 @@ TEST(Rpc, Abort) {
SumRes res(req.x + req.y);
Save(res, res_builder);
});
ASSERT_TRUE(server.Start());
std::this_thread::sleep_for(100ms);
Client client(server.endpoint());
@ -142,6 +144,7 @@ TEST(Rpc, ClientPool) {
SumRes res(req.x + req.y);
Save(res, res_builder);
});
ASSERT_TRUE(server.Start());
std::this_thread::sleep_for(100ms);
Client client(server.endpoint());
@ -194,6 +197,7 @@ TEST(Rpc, LargeMessage) {
Load(&res, req_reader);
Save(res, res_builder);
});
ASSERT_TRUE(server.Start());
std::this_thread::sleep_for(100ms);
std::string testdata(100000, 'a');

View File

@ -1,5 +1,7 @@
#pragma once
#include <gtest/gtest.h>
#include "distributed/coordination.hpp"
const std::string kLocal = "127.0.0.1";
@ -12,6 +14,10 @@ class TestMasterCoordination : public distributed::Coordination {
TestMasterCoordination()
: distributed::Coordination({kLocal, 0}, 0, {kLocal, 0}) {}
void Start() {
ASSERT_TRUE(server_.Start());
}
void Stop() {
server_.Shutdown();
server_.AwaitShutdown();
@ -24,6 +30,10 @@ class TestWorkerCoordination : public distributed::Coordination {
int worker_id)
: distributed::Coordination({kLocal, 0}, worker_id, master_endpoint) {}
void Start() {
ASSERT_TRUE(server_.Start());
}
void Stop() {
server_.Shutdown();
server_.AwaitShutdown();

View File

@ -1,4 +1,5 @@
#include <algorithm>
#include <memory>
#include <mutex>
#include <unordered_set>
#include <vector>
@ -16,53 +17,65 @@ using namespace distributed;
class WorkerEngineTest : public testing::Test {
protected:
void SetUp() override {
master_coordination_ = std::make_unique<TestMasterCoordination>();
master_coordination_->Start();
master_ = std::make_unique<EngineMaster>(master_coordination_.get());
worker_coordination_ = std::make_unique<TestWorkerCoordination>(
master_coordination_->GetServerEndpoint(), 1);
worker_coordination_->Start();
worker_ = std::make_unique<EngineWorker>(worker_coordination_.get());
}
void TearDown() override {
std::thread thread([this] { worker_coordination_.Stop(); });
master_coordination_.Stop();
std::thread thread([this] { worker_coordination_->Stop(); });
master_coordination_->Stop();
if (thread.joinable()) thread.join();
}
TestMasterCoordination master_coordination_;
EngineMaster master_{&master_coordination_};
std::unique_ptr<TestMasterCoordination> master_coordination_;
std::unique_ptr<EngineMaster> master_;
TestWorkerCoordination worker_coordination_{
master_coordination_.GetServerEndpoint(), 1};
EngineWorker worker_{&worker_coordination_};
std::unique_ptr<TestWorkerCoordination> worker_coordination_;
std::unique_ptr<EngineWorker> worker_;
};
TEST_F(WorkerEngineTest, BeginOnWorker) {
worker_.Begin();
auto second = worker_.Begin();
EXPECT_EQ(master_.RunningTransaction(second->id_)->snapshot().size(), 1);
worker_->Begin();
auto second = worker_->Begin();
EXPECT_EQ(master_->RunningTransaction(second->id_)->snapshot().size(), 1);
}
TEST_F(WorkerEngineTest, AdvanceOnWorker) {
auto tx = worker_.Begin();
auto tx = worker_->Begin();
auto cid = tx->cid();
EXPECT_EQ(worker_.Advance(tx->id_), cid + 1);
EXPECT_EQ(worker_->Advance(tx->id_), cid + 1);
}
TEST_F(WorkerEngineTest, CommitOnWorker) {
auto tx = worker_.Begin();
auto tx = worker_->Begin();
auto tx_id = tx->id_;
worker_.Commit(*tx);
EXPECT_TRUE(master_.Info(tx_id).is_committed());
worker_->Commit(*tx);
EXPECT_TRUE(master_->Info(tx_id).is_committed());
}
TEST_F(WorkerEngineTest, AbortOnWorker) {
auto tx = worker_.Begin();
auto tx = worker_->Begin();
auto tx_id = tx->id_;
worker_.Abort(*tx);
EXPECT_TRUE(master_.Info(tx_id).is_aborted());
worker_->Abort(*tx);
EXPECT_TRUE(master_->Info(tx_id).is_aborted());
}
TEST_F(WorkerEngineTest, RunningTransaction) {
master_.Begin();
master_.Begin();
worker_.RunningTransaction(1);
worker_.RunningTransaction(2);
master_->Begin();
master_->Begin();
worker_->RunningTransaction(1);
worker_->RunningTransaction(2);
int count = 0;
worker_.LocalForEachActiveTransaction([&count](Transaction &t) {
worker_->LocalForEachActiveTransaction([&count](Transaction &t) {
++count;
if (t.id_ == 1) {
EXPECT_EQ(t.snapshot(), tx::Snapshot(std::vector<tx::TransactionId>{}));
@ -74,73 +87,73 @@ TEST_F(WorkerEngineTest, RunningTransaction) {
}
TEST_F(WorkerEngineTest, Info) {
auto *tx_1 = master_.Begin();
auto *tx_2 = master_.Begin();
auto *tx_1 = master_->Begin();
auto *tx_2 = master_->Begin();
// We can't check active transactions in the worker (see comments there for
// info).
master_.Commit(*tx_1);
EXPECT_TRUE(master_.Info(1).is_committed());
EXPECT_TRUE(worker_.Info(1).is_committed());
master_.Abort(*tx_2);
EXPECT_TRUE(master_.Info(2).is_aborted());
EXPECT_TRUE(worker_.Info(2).is_aborted());
master_->Commit(*tx_1);
EXPECT_TRUE(master_->Info(1).is_committed());
EXPECT_TRUE(worker_->Info(1).is_committed());
master_->Abort(*tx_2);
EXPECT_TRUE(master_->Info(2).is_aborted());
EXPECT_TRUE(worker_->Info(2).is_aborted());
}
TEST_F(WorkerEngineTest, GlobalGcSnapshot) {
auto *tx_1 = master_.Begin();
master_.Begin();
master_.Commit(*tx_1);
EXPECT_EQ(master_.GlobalGcSnapshot(), tx::Snapshot({1, 2}));
EXPECT_EQ(worker_.GlobalGcSnapshot(), master_.GlobalGcSnapshot());
auto *tx_1 = master_->Begin();
master_->Begin();
master_->Commit(*tx_1);
EXPECT_EQ(master_->GlobalGcSnapshot(), tx::Snapshot({1, 2}));
EXPECT_EQ(worker_->GlobalGcSnapshot(), master_->GlobalGcSnapshot());
}
TEST_F(WorkerEngineTest, GlobalActiveTransactions) {
auto *tx_1 = master_.Begin();
master_.Begin();
auto *tx_3 = master_.Begin();
master_.Begin();
master_.Commit(*tx_1);
master_.Abort(*tx_3);
EXPECT_EQ(worker_.GlobalActiveTransactions(), tx::Snapshot({2, 4}));
auto *tx_1 = master_->Begin();
master_->Begin();
auto *tx_3 = master_->Begin();
master_->Begin();
master_->Commit(*tx_1);
master_->Abort(*tx_3);
EXPECT_EQ(worker_->GlobalActiveTransactions(), tx::Snapshot({2, 4}));
}
TEST_F(WorkerEngineTest, LocalLast) {
master_.Begin();
EXPECT_EQ(worker_.LocalLast(), 0);
worker_.RunningTransaction(1);
EXPECT_EQ(worker_.LocalLast(), 1);
master_.Begin();
EXPECT_EQ(worker_.LocalLast(), 1);
master_.Begin();
EXPECT_EQ(worker_.LocalLast(), 1);
master_.Begin();
worker_.RunningTransaction(4);
EXPECT_EQ(worker_.LocalLast(), 4);
master_->Begin();
EXPECT_EQ(worker_->LocalLast(), 0);
worker_->RunningTransaction(1);
EXPECT_EQ(worker_->LocalLast(), 1);
master_->Begin();
EXPECT_EQ(worker_->LocalLast(), 1);
master_->Begin();
EXPECT_EQ(worker_->LocalLast(), 1);
master_->Begin();
worker_->RunningTransaction(4);
EXPECT_EQ(worker_->LocalLast(), 4);
}
TEST_F(WorkerEngineTest, LocalForEachActiveTransaction) {
master_.Begin();
worker_.RunningTransaction(1);
master_.Begin();
master_.Begin();
master_.Begin();
worker_.RunningTransaction(4);
master_->Begin();
worker_->RunningTransaction(1);
master_->Begin();
master_->Begin();
master_->Begin();
worker_->RunningTransaction(4);
std::unordered_set<tx::TransactionId> local;
worker_.LocalForEachActiveTransaction(
worker_->LocalForEachActiveTransaction(
[&local](Transaction &t) { local.insert(t.id_); });
EXPECT_EQ(local, std::unordered_set<tx::TransactionId>({1, 4}));
}
TEST_F(WorkerEngineTest, EnsureTxIdGreater) {
ASSERT_LE(master_.Begin()->id_, 40);
worker_.EnsureNextIdGreater(42);
EXPECT_EQ(master_.Begin()->id_, 43);
EXPECT_EQ(worker_.Begin()->id_, 44);
ASSERT_LE(master_->Begin()->id_, 40);
worker_->EnsureNextIdGreater(42);
EXPECT_EQ(master_->Begin()->id_, 43);
EXPECT_EQ(worker_->Begin()->id_, 44);
}
TEST_F(WorkerEngineTest, GlobalNext) {
auto tx = master_.Begin();
EXPECT_NE(worker_.LocalLast(), worker_.GlobalLast());
EXPECT_EQ(master_.LocalLast(), worker_.GlobalLast());
EXPECT_EQ(worker_.GlobalLast(), tx->id_);
auto tx = master_->Begin();
EXPECT_NE(worker_->LocalLast(), worker_->GlobalLast());
EXPECT_EQ(master_->LocalLast(), worker_->GlobalLast());
EXPECT_EQ(worker_->GlobalLast(), tx->id_);
}