diff --git a/src/communication/listener.hpp b/src/communication/listener.hpp index 20f659281..e428207ce 100644 --- a/src/communication/listener.hpp +++ b/src/communication/listener.hpp @@ -44,45 +44,11 @@ class Listener final { int inactivity_timeout_sec, const std::string &service_name, size_t workers_count) : data_(data), - alive_(true), + alive_(false), context_(context), inactivity_timeout_sec_(inactivity_timeout_sec), - service_name_(service_name) { - std::cout << "Starting " << workers_count << " " << service_name_ - << " workers" << std::endl; - for (size_t i = 0; i < workers_count; ++i) { - worker_threads_.emplace_back([this, service_name, i]() { - utils::ThreadSetName(fmt::format("{} worker {}", service_name, i + 1)); - while (alive_) { - WaitAndProcessEvents(); - } - }); - } - - if (inactivity_timeout_sec_ > 0) { - timeout_thread_ = std::thread([this, service_name]() { - utils::ThreadSetName(fmt::format("{} timeout", service_name)); - while (alive_) { - { - std::lock_guard guard(lock_); - for (auto &session : sessions_) { - if (session->TimedOut()) { - LOG(WARNING) << service_name << " session associated with " - << session->socket().endpoint() << " timed out."; - // Here we shutdown the socket to terminate any leftover - // blocking `Write` calls and to signal an event that the - // session is closed. Session cleanup will be done in the event - // process function. - session->socket().Shutdown(); - } - } - } - // TODO (mferencevic): Should this be configurable? - std::this_thread::sleep_for(std::chrono::seconds(1)); - } - }); - } - } + service_name_(service_name), + workers_count_(workers_count) {} ~Listener() { bool worker_alive = false; @@ -124,6 +90,51 @@ class Listener final { sessions_.back().get()); } + /** + * This function starts the listener + */ + void Start() { + CHECK(!alive_) << "The listener is already started!"; + alive_.store(true); + + std::cout << "Starting " << workers_count_ << " " << service_name_ + << " workers" << std::endl; + + std::string service_name(service_name_); + for (size_t i = 0; i < workers_count_; ++i) { + worker_threads_.emplace_back([this, service_name, i]() { + utils::ThreadSetName(fmt::format("{} worker {}", service_name, i + 1)); + while (alive_) { + WaitAndProcessEvents(); + } + }); + } + + if (inactivity_timeout_sec_ > 0) { + timeout_thread_ = std::thread([this, service_name]() { + utils::ThreadSetName(fmt::format("{} timeout", service_name)); + while (alive_) { + { + std::lock_guard guard(lock_); + for (auto &session : sessions_) { + if (session->TimedOut()) { + LOG(WARNING) << service_name << " session associated with " + << session->socket().endpoint() << " timed out."; + // Here we shutdown the socket to terminate any leftover + // blocking `Write` calls and to signal an event that the + // session is closed. Session cleanup will be done in the event + // process function. + session->socket().Shutdown(); + } + } + } + // TODO (mferencevic): Should this be configurable? + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + }); + } + } + /** * This function starts a graceful shutdown of the listener. */ @@ -261,5 +272,6 @@ class Listener final { ServerContext *context_; const int inactivity_timeout_sec_; const std::string service_name_; + const size_t workers_count_; }; } // namespace communication diff --git a/src/communication/rpc/server.cpp b/src/communication/rpc/server.cpp index 325059284..4f92b98a6 100644 --- a/src/communication/rpc/server.cpp +++ b/src/communication/rpc/server.cpp @@ -6,6 +6,10 @@ Server::Server(const io::network::Endpoint &endpoint, size_t workers_count) : server_(endpoint, this, &context_, -1, "RPC", workers_count) {} +bool Server::Start() { + return server_.Start(); +} + void Server::Shutdown() { server_.Shutdown(); } diff --git a/src/communication/rpc/server.hpp b/src/communication/rpc/server.hpp index 89c2fbb6b..dd4236491 100644 --- a/src/communication/rpc/server.hpp +++ b/src/communication/rpc/server.hpp @@ -25,6 +25,7 @@ class Server { Server &operator=(const Server &) = delete; Server &operator=(Server &&) = delete; + bool Start(); void Shutdown(); void AwaitShutdown(); diff --git a/src/communication/server.hpp b/src/communication/server.hpp index 4e4959e08..1c4b46647 100644 --- a/src/communication/server.hpp +++ b/src/communication/server.hpp @@ -50,34 +50,11 @@ class Server final { ServerContext *context, int inactivity_timeout_sec, const std::string &service_name, size_t workers_count = std::thread::hardware_concurrency()) - : listener_(session_data, context, inactivity_timeout_sec, service_name, + : alive_(false), + endpoint_(endpoint), + listener_(session_data, context, inactivity_timeout_sec, service_name, workers_count), - service_name_(service_name) { - // Without server we can't continue with application so we can just - // terminate here. - if (!socket_.Bind(endpoint)) { - LOG(FATAL) << "Cannot bind to socket on " << endpoint; - } - socket_.SetTimeout(1, 0); - if (!socket_.Listen(1024)) { - LOG(FATAL) << "Cannot listen on socket!"; - } - - thread_ = std::thread([this, service_name]() { - utils::ThreadSetName(fmt::format("{} server", service_name)); - - std::cout << service_name << " server is fully armed and operational" - << std::endl; - std::cout << service_name << " listening on " << socket_.endpoint() - << std::endl; - - while (alive_) { - AcceptConnection(); - } - - std::cout << service_name << " shutting down..." << std::endl; - }); - } + service_name_(service_name) {} ~Server() { CHECK(!alive_ && !thread_.joinable()) << "You should call Shutdown and " @@ -90,9 +67,50 @@ class Server final { Server &operator=(const Server &) = delete; Server &operator=(Server &&) = delete; - const auto &endpoint() const { return socket_.endpoint(); } + const auto &endpoint() const { + CHECK(alive_) << "You can't get the server endpoint when it's not running!"; + return socket_.endpoint(); + } - /// Stops server manually + /// Starts the server + bool Start() { + CHECK(!alive_) << "The server was already started!"; + alive_.store(true); + + if (!socket_.Bind(endpoint_)) { + LOG(ERROR) << "Cannot bind to socket on " << endpoint_; + alive_.store(false); + return false; + } + socket_.SetTimeout(1, 0); + if (!socket_.Listen(1024)) { + LOG(ERROR) << "Cannot listen on socket!"; + alive_.store(false); + return false; + } + + listener_.Start(); + + std::string service_name(service_name_); + thread_ = std::thread([this, service_name]() { + utils::ThreadSetName(fmt::format("{} server", service_name)); + + std::cout << service_name_ << " server is fully armed and operational" + << std::endl; + std::cout << service_name_ << " listening on " << socket_.endpoint() + << std::endl; + + while (alive_) { + AcceptConnection(); + } + + std::cout << service_name << " shutting down..." << std::endl; + }); + + return true; + } + + /// Signals the server to start shutting down void Shutdown() { // This should be as simple as possible, so that it can be called inside a // signal handler. @@ -122,10 +140,11 @@ class Server final { listener_.AddConnection(std::move(*s)); } - std::atomic alive_{true}; + std::atomic alive_; std::thread thread_; Socket socket_; + io::network::Endpoint endpoint_; Listener listener_; const std::string service_name_; diff --git a/src/database/distributed/distributed_graph_db.cpp b/src/database/distributed/distributed_graph_db.cpp index 5b12bf33b..5299c3fb9 100644 --- a/src/database/distributed/distributed_graph_db.cpp +++ b/src/database/distributed/distributed_graph_db.cpp @@ -651,108 +651,6 @@ Master::Master(Config config) impl_->tx_engine_.RegisterForTransactionalCacheCleanup( impl_->updates_server_); impl_->tx_engine_.RegisterForTransactionalCacheCleanup(impl_->data_manager_); - - // Start transactional cache cleanup. - impl_->tx_engine_.StartTransactionalCacheCleanup(); - - if (impl_->config_.durability_enabled) - utils::CheckDir(impl_->config_.durability_directory); - - // Durability recovery. - { - // What we recover. - std::experimental::optional recovery_info; - - durability::RecoveryData recovery_data; - // Recover only if necessary. - if (impl_->config_.db_recover_on_startup) { - CHECK(durability::VersionConsistency(impl_->config_.durability_directory)) - << "Contents of durability directory are not compatible with the " - "current version of Memgraph binary!"; - recovery_info = durability::RecoverOnlySnapshot( - impl_->config_.durability_directory, this, &recovery_data, - std::experimental::nullopt, config.worker_id); - } - - // Post-recovery setup and checking. - impl_->coordination_.SetRecoveredSnapshot( - recovery_info ? std::experimental::make_optional( - std::make_pair(recovery_info->durability_version, - recovery_info->snapshot_tx_id)) - : std::experimental::nullopt); - - // Wait till workers report back their recoverable wal txs - if (recovery_info) { - CHECK(impl_->config_.recovering_cluster_size > 0) - << "Invalid cluster recovery size flag. Recovered cluster size " - "should be at least 1"; - while (impl_->coordination_.CountRecoveredWorkers() != - impl_->config_.recovering_cluster_size - 1) { - LOG(INFO) << "Waiting for workers to finish recovering.."; - std::this_thread::sleep_for(2s); - } - - // Get the intersection of recoverable transactions from wal on - // workers and on master - recovery_data.wal_tx_to_recover = - impl_->coordination_.CommonWalTransactions(*recovery_info); - MasterRecoveryTransactions recovery_transactions(this); - durability::RecoverWal(impl_->config_.durability_directory, this, - &recovery_data, &recovery_transactions); - durability::RecoverIndexes(this, recovery_data.indexes); - auto workers_recovered_wal = - impl_->durability_rpc_.RecoverWalAndIndexes(&recovery_data); - workers_recovered_wal.get(); - } - - impl_->dynamic_worker_addition_.Enable(); - } - - // Start the dynamic graph partitioner inside token sharing server - if (impl_->config_.dynamic_graph_partitioner_enabled) { - impl_->token_sharing_server_.Start(); - } - - if (impl_->config_.durability_enabled) { - // move any existing snapshots or wal files to a deprecated folder. - if (!impl_->config_.db_recover_on_startup && - durability::ContainsDurabilityFiles( - impl_->config_.durability_directory)) { - durability::MoveToBackup(impl_->config_.durability_directory); - LOG(WARNING) << "Since Memgraph was not supposed to recover on startup " - "and durability is enabled, your current durability " - "files will likely be overriden. To prevent important " - "data loss, Memgraph has stored those files into a " - ".backup directory inside durability directory"; - } - impl_->wal_.Init(); - snapshot_creator_ = std::make_unique(); - snapshot_creator_->Run( - "Snapshot", std::chrono::seconds(impl_->config_.snapshot_cycle_sec), - [this] { - auto dba = this->Access(); - MakeSnapshot(*dba); - }); - } - - // Start transaction killer. - if (impl_->config_.query_execution_time_sec != -1) { - transaction_killer_.Run( - "TX killer", - std::chrono::seconds(std::max( - 1, std::min(5, impl_->config_.query_execution_time_sec / 4))), - [this]() { - impl_->tx_engine_.LocalForEachActiveTransaction( - [this](tx::Transaction &t) { - if (t.creation_time() + - std::chrono::seconds( - impl_->config_.query_execution_time_sec) < - std::chrono::steady_clock::now()) { - t.set_should_abort(); - }; - }); - }); - } } Master::~Master() {} @@ -839,6 +737,113 @@ io::network::Endpoint Master::GetEndpoint(int worker_id) { return impl_->coordination_.GetEndpoint(worker_id); } +void Master::Start() { + // Start coordination. + CHECK(impl_->coordination_.Start()) << "Couldn't start master coordination!"; + + // Start transactional cache cleanup. + impl_->tx_engine_.StartTransactionalCacheCleanup(); + + if (impl_->config_.durability_enabled) + utils::CheckDir(impl_->config_.durability_directory); + + // Durability recovery. + { + // What we recover. + std::experimental::optional recovery_info; + + durability::RecoveryData recovery_data; + // Recover only if necessary. + if (impl_->config_.db_recover_on_startup) { + CHECK(durability::VersionConsistency(impl_->config_.durability_directory)) + << "Contents of durability directory are not compatible with the " + "current version of Memgraph binary!"; + recovery_info = durability::RecoverOnlySnapshot( + impl_->config_.durability_directory, this, &recovery_data, + std::experimental::nullopt, impl_->config_.worker_id); + } + + // Post-recovery setup and checking. + impl_->coordination_.SetRecoveredSnapshot( + recovery_info ? std::experimental::make_optional( + std::make_pair(recovery_info->durability_version, + recovery_info->snapshot_tx_id)) + : std::experimental::nullopt); + + // Wait till workers report back their recoverable wal txs + if (recovery_info) { + CHECK(impl_->config_.recovering_cluster_size > 0) + << "Invalid cluster recovery size flag. Recovered cluster size " + "should be at least 1"; + while (impl_->coordination_.CountRecoveredWorkers() != + impl_->config_.recovering_cluster_size - 1) { + LOG(INFO) << "Waiting for workers to finish recovering.."; + std::this_thread::sleep_for(2s); + } + + // Get the intersection of recoverable transactions from wal on + // workers and on master + recovery_data.wal_tx_to_recover = + impl_->coordination_.CommonWalTransactions(*recovery_info); + MasterRecoveryTransactions recovery_transactions(this); + durability::RecoverWal(impl_->config_.durability_directory, this, + &recovery_data, &recovery_transactions); + durability::RecoverIndexes(this, recovery_data.indexes); + auto workers_recovered_wal = + impl_->durability_rpc_.RecoverWalAndIndexes(&recovery_data); + workers_recovered_wal.get(); + } + + impl_->dynamic_worker_addition_.Enable(); + } + + // Start the dynamic graph partitioner inside token sharing server + if (impl_->config_.dynamic_graph_partitioner_enabled) { + impl_->token_sharing_server_.Start(); + } + + if (impl_->config_.durability_enabled) { + // move any existing snapshots or wal files to a deprecated folder. + if (!impl_->config_.db_recover_on_startup && + durability::ContainsDurabilityFiles( + impl_->config_.durability_directory)) { + durability::MoveToBackup(impl_->config_.durability_directory); + LOG(WARNING) << "Since Memgraph was not supposed to recover on startup " + "and durability is enabled, your current durability " + "files will likely be overriden. To prevent important " + "data loss, Memgraph has stored those files into a " + ".backup directory inside durability directory"; + } + impl_->wal_.Init(); + snapshot_creator_ = std::make_unique(); + snapshot_creator_->Run( + "Snapshot", std::chrono::seconds(impl_->config_.snapshot_cycle_sec), + [this] { + auto dba = this->Access(); + MakeSnapshot(*dba); + }); + } + + // Start transaction killer. + if (impl_->config_.query_execution_time_sec != -1) { + transaction_killer_.Run( + "TX killer", + std::chrono::seconds(std::max( + 1, std::min(5, impl_->config_.query_execution_time_sec / 4))), + [this]() { + impl_->tx_engine_.LocalForEachActiveTransaction( + [this](tx::Transaction &t) { + if (t.creation_time() + + std::chrono::seconds( + impl_->config_.query_execution_time_sec) < + std::chrono::steady_clock::now()) { + t.set_should_abort(); + }; + }); + }); + } +} + bool Master::AwaitShutdown(std::function call_before_shutdown) { bool ret = impl_->coordination_.AwaitShutdown( @@ -973,11 +978,8 @@ class Worker { DistributedVertexAccessor vertex_accessor_{config_.worker_id, &data_manager_, &updates_clients_}; - explicit Worker(const Config &config, database::Worker *self) - : config_(config), self_(self) { - cluster_discovery_.RegisterWorker(config.worker_id, - config.durability_directory); - } + Worker(const Config &config, database::Worker *self) + : config_(config), self_(self) {} // TODO: Some things may depend on order of construction/destruction. We also // have a lot of circular pointers among members. It would be a good idea to @@ -1028,88 +1030,6 @@ Worker::Worker(Config config) impl_->tx_engine_.RegisterForTransactionalCacheCleanup(impl_->data_manager_); impl_->tx_engine_.RegisterForTransactionalCacheCleanup( impl_->produce_server_); - - // Start transactional cache cleanup. - impl_->tx_engine_.StartTransactionalCacheCleanup(); - - if (impl_->config_.durability_enabled) - utils::CheckDir(impl_->config_.durability_directory); - - // Durability recovery. We need to check this flag for workers that are added - // after the "main" cluster recovery. - if (impl_->config_.db_recover_on_startup) { - // What we should recover (version, transaction_id) pair. - auto snapshot_to_recover = impl_->cluster_discovery_.snapshot_to_recover(); - - // What we recover. - std::experimental::optional recovery_info; - - durability::RecoveryData recovery_data; - // Recover only if necessary. - if (snapshot_to_recover) { - // check version consistency. - if (!durability::DistributedVersionConsistency( - snapshot_to_recover->first)) - LOG(FATAL) << "Memgraph worker failed to recover due to version " - "inconsistency with the master."; - if (!durability::VersionConsistency(impl_->config_.durability_directory)) - LOG(FATAL) - << "Contents of durability directory are not compatible with the " - "current version of Memgraph binary!"; - recovery_info = durability::RecoverOnlySnapshot( - impl_->config_.durability_directory, this, &recovery_data, - snapshot_to_recover->second, config.worker_id); - } - - // Post-recovery setup and checking. - if (snapshot_to_recover && - (!recovery_info || - snapshot_to_recover->second != recovery_info->snapshot_tx_id)) - LOG(FATAL) << "Memgraph worker failed to recover the database state " - "recovered on the master"; - impl_->cluster_discovery_.NotifyWorkerRecovered(recovery_info); - } else { - // Check with master if we're a dynamically added worker and need to update - // our indices. - auto indexes = impl_->dynamic_worker_registration_.GetIndicesToCreate(); - if (!indexes.empty()) { - durability::RecoverIndexes(this, indexes); - } - } - - if (impl_->config_.durability_enabled) { - // move any existing snapshots or wal files to a deprecated folder. - if (!impl_->config_.db_recover_on_startup && - durability::ContainsDurabilityFiles( - impl_->config_.durability_directory)) { - durability::MoveToBackup(impl_->config_.durability_directory); - LOG(WARNING) << "Since Memgraph was not supposed to recover on startup " - "and durability is enabled, your current durability " - "files will likely be overriden. To prevent important " - "data loss, Memgraph has stored those files into a " - ".backup directory inside durability directory"; - } - impl_->wal_.Init(); - } - - // Start transaction killer. - if (impl_->config_.query_execution_time_sec != -1) { - transaction_killer_.Run( - "TX killer", - std::chrono::seconds(std::max( - 1, std::min(5, impl_->config_.query_execution_time_sec / 4))), - [this]() { - impl_->tx_engine_.LocalForEachActiveTransaction( - [this](tx::Transaction &t) { - if (t.creation_time() + - std::chrono::seconds( - impl_->config_.query_execution_time_sec) < - std::chrono::steady_clock::now()) { - t.set_should_abort(); - }; - }); - }); - } } Worker::~Worker() {} @@ -1192,6 +1112,97 @@ io::network::Endpoint Worker::GetEndpoint(int worker_id) { return impl_->coordination_.GetEndpoint(worker_id); } +void Worker::Start() { + // Start coordination. + CHECK(impl_->coordination_.Start()) << "Couldn't start worker coordination!"; + + // Register to the master. + impl_->cluster_discovery_.RegisterWorker(impl_->config_.worker_id, + impl_->config_.durability_directory); + + // Start transactional cache cleanup. + impl_->tx_engine_.StartTransactionalCacheCleanup(); + + if (impl_->config_.durability_enabled) + utils::CheckDir(impl_->config_.durability_directory); + + // Durability recovery. We need to check this flag for workers that are added + // after the "main" cluster recovery. + if (impl_->config_.db_recover_on_startup) { + // What we should recover (version, transaction_id) pair. + auto snapshot_to_recover = impl_->cluster_discovery_.snapshot_to_recover(); + + // What we recover. + std::experimental::optional recovery_info; + + durability::RecoveryData recovery_data; + // Recover only if necessary. + if (snapshot_to_recover) { + // check version consistency. + if (!durability::DistributedVersionConsistency( + snapshot_to_recover->first)) + LOG(FATAL) << "Memgraph worker failed to recover due to version " + "inconsistency with the master."; + if (!durability::VersionConsistency(impl_->config_.durability_directory)) + LOG(FATAL) + << "Contents of durability directory are not compatible with the " + "current version of Memgraph binary!"; + recovery_info = durability::RecoverOnlySnapshot( + impl_->config_.durability_directory, this, &recovery_data, + snapshot_to_recover->second, impl_->config_.worker_id); + } + + // Post-recovery setup and checking. + if (snapshot_to_recover && + (!recovery_info || + snapshot_to_recover->second != recovery_info->snapshot_tx_id)) + LOG(FATAL) << "Memgraph worker failed to recover the database state " + "recovered on the master"; + impl_->cluster_discovery_.NotifyWorkerRecovered(recovery_info); + } else { + // Check with master if we're a dynamically added worker and need to update + // our indices. + auto indexes = impl_->dynamic_worker_registration_.GetIndicesToCreate(); + if (!indexes.empty()) { + durability::RecoverIndexes(this, indexes); + } + } + + if (impl_->config_.durability_enabled) { + // move any existing snapshots or wal files to a deprecated folder. + if (!impl_->config_.db_recover_on_startup && + durability::ContainsDurabilityFiles( + impl_->config_.durability_directory)) { + durability::MoveToBackup(impl_->config_.durability_directory); + LOG(WARNING) << "Since Memgraph was not supposed to recover on startup " + "and durability is enabled, your current durability " + "files will likely be overriden. To prevent important " + "data loss, Memgraph has stored those files into a " + ".backup directory inside durability directory"; + } + impl_->wal_.Init(); + } + + // Start transaction killer. + if (impl_->config_.query_execution_time_sec != -1) { + transaction_killer_.Run( + "TX killer", + std::chrono::seconds(std::max( + 1, std::min(5, impl_->config_.query_execution_time_sec / 4))), + [this]() { + impl_->tx_engine_.LocalForEachActiveTransaction( + [this](tx::Transaction &t) { + if (t.creation_time() + + std::chrono::seconds( + impl_->config_.query_execution_time_sec) < + std::chrono::steady_clock::now()) { + t.set_should_abort(); + }; + }); + }); + } +} + bool Worker::AwaitShutdown(std::function call_before_shutdown) { bool ret = impl_->coordination_.AwaitShutdown( [this, &call_before_shutdown](bool is_cluster_alive) -> bool { diff --git a/src/database/distributed/distributed_graph_db.hpp b/src/database/distributed/distributed_graph_db.hpp index 3acca9b6d..9ac8fcb2a 100644 --- a/src/database/distributed/distributed_graph_db.hpp +++ b/src/database/distributed/distributed_graph_db.hpp @@ -66,6 +66,8 @@ class Master final : public DistributedGraphDb { /** Gets the endpoint of the worker with the given id. */ // TODO make const once Coordination::GetEndpoint is const. io::network::Endpoint GetEndpoint(int worker_id); + + void Start(); bool AwaitShutdown(std::function call_before_shutdown = [] {}); void Shutdown(); @@ -113,6 +115,8 @@ class Worker final : public DistributedGraphDb { /** Gets the endpoint of the worker with the given id. */ // TODO make const once Coordination::GetEndpoint is const. io::network::Endpoint GetEndpoint(int worker_id); + + void Start(); bool AwaitShutdown(std::function call_before_shutdown = [] {}); void Shutdown(); diff --git a/src/distributed/coordination.cpp b/src/distributed/coordination.cpp index f975f64e3..82bc4ad9c 100644 --- a/src/distributed/coordination.cpp +++ b/src/distributed/coordination.cpp @@ -13,18 +13,16 @@ Coordination::Coordination(const io::network::Endpoint &worker_endpoint, : server_(worker_endpoint, server_workers_count), worker_id_(worker_id), thread_pool_(client_workers_count, "RPC client") { - // The master endpoint should be added to workers and not to the master. if (worker_id != 0) { // The master is always worker 0. - // TODO (mferencevic): Strictly speaking, this isn't correct when creating a - // `CoordinationMaster` because the supplied `master_endpoint` is supplied - // before the server starts listening on the address. Eg. if `0.0.0.0:0` is - // supplied that should be first resolved by the server when it binds to - // that address and `server_.endpoint()` should be used. Currently, - // `workers_[0]` isn't used on the master so all is well. + // We only emplace the master endpoint when this instance isn't the + // `MasterCoordination`. This is because we don't know the exact master + // endpoint until the master server is started. The `MasterCoordination` + // will emplace the master endpoint when the server is started. Eg. if + // `0.0.0.0:0` is supplied as the master endpoint that should be first + // resolved by the server when it binds to that address and + // `server_.endpoint()` should be used. workers_.emplace(0, master_endpoint); - } else { - workers_.emplace(0, server_.endpoint()); } } diff --git a/src/distributed/coordination_master.cpp b/src/distributed/coordination_master.cpp index 9974b86ae..c1d4a789e 100644 --- a/src/distributed/coordination_master.cpp +++ b/src/distributed/coordination_master.cpp @@ -21,11 +21,7 @@ MasterCoordination::MasterCoordination(const Endpoint &master_endpoint, int server_workers_count, int client_workers_count) : Coordination(master_endpoint, 0, {}, server_workers_count, - client_workers_count) { - // TODO (mferencevic): Move this to an explicit `Start` method. - scheduler_.Run("Heartbeat", std::chrono::seconds(kHeartbeatIntervalSeconds), - [this] { IssueHeartbeats(); }); -} + client_workers_count) {} MasterCoordination::~MasterCoordination() { CHECK(!alive_) << "You must call Shutdown and AwaitShutdown on " @@ -118,6 +114,14 @@ std::vector MasterCoordination::CommonWalTransactions( return tx_intersection; } +bool MasterCoordination::Start() { + if (!server_.Start()) return false; + AddWorker(0, server_.endpoint()); + scheduler_.Run("Heartbeat", std::chrono::seconds(kHeartbeatIntervalSeconds), + [this] { IssueHeartbeats(); }); + return true; +} + bool MasterCoordination::AwaitShutdown( std::function call_before_shutdown) { // Wait for a shutdown notification. diff --git a/src/distributed/coordination_master.hpp b/src/distributed/coordination_master.hpp index 0a8ab180e..6d641536b 100644 --- a/src/distributed/coordination_master.hpp +++ b/src/distributed/coordination_master.hpp @@ -60,6 +60,9 @@ class MasterCoordination final : public Coordination { std::vector CommonWalTransactions( const durability::RecoveryInfo &master_info) const; + /// Starts the coordination and its servers. + bool Start(); + /// Waits while the cluster is in a valid state or the `Shutdown` method is /// called (suitable for use with signal handlers). Blocks the calling thread /// until that has finished. diff --git a/src/distributed/coordination_worker.cpp b/src/distributed/coordination_worker.cpp index fbf07e94f..80b38dff5 100644 --- a/src/distributed/coordination_worker.cpp +++ b/src/distributed/coordination_worker.cpp @@ -70,6 +70,10 @@ void WorkerCoordination::RegisterWorker(int worker_id, AddWorker(worker_id, endpoint); } +bool WorkerCoordination::Start() { + return server_.Start(); +} + bool WorkerCoordination::AwaitShutdown( std::function call_before_shutdown) { // Wait for a shutdown notification. diff --git a/src/distributed/coordination_worker.hpp b/src/distributed/coordination_worker.hpp index 23e427c53..6ce150208 100644 --- a/src/distributed/coordination_worker.hpp +++ b/src/distributed/coordination_worker.hpp @@ -30,6 +30,9 @@ class WorkerCoordination final : public Coordination { /// Registers the worker with the given endpoint. void RegisterWorker(int worker_id, io::network::Endpoint endpoint); + /// Starts the coordination and its servers. + bool Start(); + /// Starts listening for a remote shutdown command (issued by the master) or /// for the `Shutdown` method to be called (suitable for use with signal /// handlers). Blocks the calling thread until that has finished. diff --git a/src/memgraph.cpp b/src/memgraph.cpp index 24a49314a..1ba593812 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -100,6 +100,7 @@ void SingleNodeMain() { }; InitSignalHandlers(shutdown); + CHECK(server.Start()) << "Couldn't start the Bolt server!"; server.AwaitShutdown(); } diff --git a/src/memgraph_distributed.cpp b/src/memgraph_distributed.cpp index 8e005638d..6b9fa6478 100644 --- a/src/memgraph_distributed.cpp +++ b/src/memgraph_distributed.cpp @@ -98,6 +98,12 @@ void MasterMain() { InitSignalHandlers(shutdown); + // Start the Bolt server. + CHECK(server.Start()) << "Couldn't start the Bolt server!"; + + // Start the database. + db.Start(); + // The return code of `AwaitShutdown` is ignored because we want the database // to exit cleanly no matter what. db.AwaitShutdown([&server] { @@ -121,6 +127,9 @@ void WorkerMain() { InitSignalHandlers(shutdown); + // Start the database. + db.Start(); + // The return code of `AwaitShutdown` is ignored because we want the database // to exit cleanly no matter what. db.AwaitShutdown(); diff --git a/tests/concurrent/network_read_hang.cpp b/tests/concurrent/network_read_hang.cpp index 792a67787..c98220ddd 100644 --- a/tests/concurrent/network_read_hang.cpp +++ b/tests/concurrent/network_read_hang.cpp @@ -67,6 +67,7 @@ TEST(Network, SocketReadHangOnConcurrentConnections) { communication::ServerContext context; communication::Server server(endpoint, &data, &context, -1, "Test", N); + ASSERT_TRUE(server.Start()); const auto &ep = server.endpoint(); // start clients diff --git a/tests/concurrent/network_server.cpp b/tests/concurrent/network_server.cpp index 7f6d005f6..23a9adcd8 100644 --- a/tests/concurrent/network_server.cpp +++ b/tests/concurrent/network_server.cpp @@ -23,6 +23,7 @@ TEST(Network, Server) { int N = (std::thread::hardware_concurrency() + 1) / 2; ContextT context; ServerT server(endpoint, &session_data, &context, -1, "Test", N); + ASSERT_TRUE(server.Start()); const auto &ep = server.endpoint(); // start clients diff --git a/tests/concurrent/network_session_leak.cpp b/tests/concurrent/network_session_leak.cpp index fc5ef1f77..b777b9220 100644 --- a/tests/concurrent/network_session_leak.cpp +++ b/tests/concurrent/network_session_leak.cpp @@ -24,6 +24,7 @@ TEST(Network, SessionLeak) { TestData session_data; ContextT context; ServerT server(endpoint, &session_data, &context, -1, "Test", 2); + ASSERT_TRUE(server.Start()); // start clients int N = 50; diff --git a/tests/integration/ssl/tester.cpp b/tests/integration/ssl/tester.cpp index e3aad5e65..cf5178bdb 100644 --- a/tests/integration/ssl/tester.cpp +++ b/tests/integration/ssl/tester.cpp @@ -55,6 +55,7 @@ int main(int argc, char **argv) { FLAGS_server_verify_peer); communication::Server server( {"127.0.0.1", 0}, &echo_data, &server_context, -1, "SSL", 1); + server.Start(); // Initialize the client. communication::ClientContext client_context(FLAGS_client_key_file, diff --git a/tests/manual/distributed_common.hpp b/tests/manual/distributed_common.hpp index c2844ddef..c96c921b4 100644 --- a/tests/manual/distributed_common.hpp +++ b/tests/manual/distributed_common.hpp @@ -22,8 +22,10 @@ namespace fs = std::experimental::filesystem; class WorkerInThread { public: explicit WorkerInThread(database::Config config) : worker_(config) { - thread_ = - std::thread([this, config] { EXPECT_TRUE(worker_.AwaitShutdown()); }); + thread_ = std::thread([this, config] { + worker_.Start(); + EXPECT_TRUE(worker_.AwaitShutdown()); + }); } ~WorkerInThread() { @@ -52,6 +54,7 @@ class Cluster { master_ = std::make_unique(master_config); interpreter_ = std::make_unique(master_.get()); + master_->Start(); std::this_thread::sleep_for(kInitTime); auto worker_config = [this](int worker_id) { diff --git a/tests/manual/distributed_query_planner.cpp b/tests/manual/distributed_query_planner.cpp index b40f6a46d..b22f72ff7 100644 --- a/tests/manual/distributed_query_planner.cpp +++ b/tests/manual/distributed_query_planner.cpp @@ -42,6 +42,7 @@ int main(int argc, char *argv[]) { {ShowDistributedCommand, 1, "Show the Nth plan as for distributed execution"}); database::Master db; + db.Start(); auto dba = db.Access(); RunInteractivePlanning(dba.get()); db.Shutdown(); diff --git a/tests/manual/distributed_repl.cpp b/tests/manual/distributed_repl.cpp index a9a340a3c..aba4a1cea 100644 --- a/tests/manual/distributed_repl.cpp +++ b/tests/manual/distributed_repl.cpp @@ -27,8 +27,10 @@ const std::string kLocal = "127.0.0.1"; class WorkerInThread { public: explicit WorkerInThread(database::Config config) : worker_(config) { - thread_ = - std::thread([this, config] { EXPECT_TRUE(worker_.AwaitShutdown()); }); + thread_ = std::thread([this, config] { + worker_.Start(); + EXPECT_TRUE(worker_.AwaitShutdown()); + }); } ~WorkerInThread() { @@ -60,6 +62,7 @@ int main(int argc, char *argv[]) { // Flag needs to be updated due to props on disk storage. FLAGS_durability_directory = GetDurabilityDirectory(tmp_dir, 0); auto master = std::make_unique(master_config); + master->Start(); // Allow the master to get initialized before making workers. std::this_thread::sleep_for(std::chrono::milliseconds(250)); diff --git a/tests/manual/ssl_server.cpp b/tests/manual/ssl_server.cpp index 155cc1dd1..c4ef53402 100644 --- a/tests/manual/ssl_server.cpp +++ b/tests/manual/ssl_server.cpp @@ -65,6 +65,7 @@ int main(int argc, char **argv) { FLAGS_ca_file, FLAGS_verify_peer); communication::Server server(endpoint, &echo_data, &context, -1, "SSL", 1); + server.Start(); while (echo_data.alive) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); diff --git a/tests/unit/concurrent_id_mapper_distributed.cpp b/tests/unit/concurrent_id_mapper_distributed.cpp index 3c913e34a..e4d7aa53f 100644 --- a/tests/unit/concurrent_id_mapper_distributed.cpp +++ b/tests/unit/concurrent_id_mapper_distributed.cpp @@ -23,6 +23,7 @@ class DistributedConcurrentIdMapperTest : public ::testing::Test { worker_mapper_; void SetUp() override { + coordination_.Start(); master_client_pool_.emplace(coordination_.GetServerEndpoint()); master_mapper_.emplace(&coordination_); worker_mapper_.emplace(&master_client_pool_.value()); diff --git a/tests/unit/counters.cpp b/tests/unit/counters.cpp index 26e7e1e7c..2ee0b73ae 100644 --- a/tests/unit/counters.cpp +++ b/tests/unit/counters.cpp @@ -9,6 +9,8 @@ TEST(CountersDistributed, All) { TestMasterCoordination coordination; database::MasterCounters master(&coordination); + coordination.Start(); + communication::rpc::ClientPool master_client_pool( coordination.GetServerEndpoint()); diff --git a/tests/unit/distributed_common.hpp b/tests/unit/distributed_common.hpp index 01285caeb..7e8ae5ec5 100644 --- a/tests/unit/distributed_common.hpp +++ b/tests/unit/distributed_common.hpp @@ -20,8 +20,10 @@ namespace fs = std::experimental::filesystem; class WorkerInThread { public: explicit WorkerInThread(database::Config config) : worker_(config) { - thread_ = - std::thread([this, config] { EXPECT_TRUE(worker_.AwaitShutdown()); }); + thread_ = std::thread([this, config] { + worker_.Start(); + EXPECT_TRUE(worker_.AwaitShutdown()); + }); } ~WorkerInThread() { @@ -59,6 +61,7 @@ class DistributedGraphDbTest : public ::testing::Test { // TODO (buda): Fix sometime in the future - not mission critical. master_config.recovering_cluster_size = 1; master_ = std::make_unique(modify_config(master_config)); + master_->Start(); std::this_thread::sleep_for(kInitTime); auto worker_config = [this](int worker_id) { @@ -183,9 +186,9 @@ class Cluster { // Flag needs to be updated due to props on disk storage. FLAGS_durability_directory = GetDurabilityDirectory(0); - auto master_tmp = std::make_unique(master_config); - auto master_endpoint = master_tmp->endpoint(); - master_ = std::move(master_tmp); + master_ = std::make_unique(master_config); + master_->Start(); + auto master_endpoint = master_->endpoint(); const auto kInitTime = 200ms; std::this_thread::sleep_for(kInitTime); diff --git a/tests/unit/distributed_coordination.cpp b/tests/unit/distributed_coordination.cpp index 19a99a27c..5ef4cf683 100644 --- a/tests/unit/distributed_coordination.cpp +++ b/tests/unit/distributed_coordination.cpp @@ -40,6 +40,7 @@ class WorkerCoordinationInThread { worker_thread_ = std::thread( [this, master_endpoint, durability_directory, desired_id, &init_done] { worker.emplace(master_endpoint, desired_id); + ASSERT_TRUE(worker->coord.Start()); worker->discovery.RegisterWorker(desired_id, durability_directory); init_done = true; // We don't shutdown the worker coordination here because it will be @@ -88,6 +89,7 @@ TEST_F(Distributed, Coordination) { std::vector> workers; MasterCoordination master_coord({kLocal, 0}); + ASSERT_TRUE(master_coord.Start()); master_coord.SetRecoveredSnapshot(std::experimental::nullopt); ClusterDiscoveryMaster master_discovery_(&master_coord, tmp_dir("master")); @@ -118,6 +120,7 @@ TEST_F(Distributed, DesiredAndUniqueId) { std::vector> workers; MasterCoordination master_coord({kLocal, 0}); + ASSERT_TRUE(master_coord.Start()); master_coord.SetRecoveredSnapshot(std::experimental::nullopt); ClusterDiscoveryMaster master_discovery_(&master_coord, tmp_dir("master")); @@ -140,6 +143,7 @@ TEST_F(Distributed, CoordinationWorkersId) { std::vector> workers; MasterCoordination master_coord({kLocal, 0}); + ASSERT_TRUE(master_coord.Start()); master_coord.SetRecoveredSnapshot(std::experimental::nullopt); ClusterDiscoveryMaster master_discovery_(&master_coord, tmp_dir("master")); @@ -165,6 +169,7 @@ TEST_F(Distributed, ClusterDiscovery) { std::vector> workers; MasterCoordination master_coord({kLocal, 0}); + ASSERT_TRUE(master_coord.Start()); master_coord.SetRecoveredSnapshot(std::experimental::nullopt); ClusterDiscoveryMaster master_discovery_(&master_coord, tmp_dir("master")); std::vector ids; @@ -195,6 +200,7 @@ TEST_F(Distributed, KeepsTrackOfRecovered) { std::vector> workers; MasterCoordination master_coord({kLocal, 0}); + ASSERT_TRUE(master_coord.Start()); master_coord.SetRecoveredSnapshot(std::experimental::nullopt); ClusterDiscoveryMaster master_discovery_(&master_coord, tmp_dir("master")); int worker_count = 10; diff --git a/tests/unit/distributed_dynamic_worker.cpp b/tests/unit/distributed_dynamic_worker.cpp index 208da40ea..308feaedf 100644 --- a/tests/unit/distributed_dynamic_worker.cpp +++ b/tests/unit/distributed_dynamic_worker.cpp @@ -24,6 +24,7 @@ class DistributedDynamicWorker : public ::testing::Test { FLAGS_durability_directory = GetDurabilityDirectory(0); auto master = std::make_unique(modify_config(master_config)); + master->Start(); std::this_thread::sleep_for(200ms); return master; } diff --git a/tests/unit/network_timeouts.cpp b/tests/unit/network_timeouts.cpp index e66cc5f18..5e796e8fc 100644 --- a/tests/unit/network_timeouts.cpp +++ b/tests/unit/network_timeouts.cpp @@ -61,6 +61,7 @@ TEST(NetworkTimeouts, InactiveSession) { communication::ServerContext context; communication::Server server{ {"127.0.0.1", 0}, &test_data, &context, 2, "Test", 1}; + ASSERT_TRUE(server.Start()); // Create the client and connect to the server. io::network::Socket client; @@ -92,6 +93,7 @@ TEST(NetworkTimeouts, ActiveSession) { communication::ServerContext context; communication::Server server{ {"127.0.0.1", 0}, &test_data, &context, 2, "Test", 1}; + ASSERT_TRUE(server.Start()); // Create the client and connect to the server. io::network::Socket client; diff --git a/tests/unit/rpc.cpp b/tests/unit/rpc.cpp index 1b72ade75..e272f1e81 100644 --- a/tests/unit/rpc.cpp +++ b/tests/unit/rpc.cpp @@ -93,6 +93,7 @@ TEST(Rpc, Call) { SumRes res(req.x + req.y); Save(res, res_builder); }); + ASSERT_TRUE(server.Start()); std::this_thread::sleep_for(100ms); Client client(server.endpoint()); @@ -112,6 +113,7 @@ TEST(Rpc, Abort) { SumRes res(req.x + req.y); Save(res, res_builder); }); + ASSERT_TRUE(server.Start()); std::this_thread::sleep_for(100ms); Client client(server.endpoint()); @@ -142,6 +144,7 @@ TEST(Rpc, ClientPool) { SumRes res(req.x + req.y); Save(res, res_builder); }); + ASSERT_TRUE(server.Start()); std::this_thread::sleep_for(100ms); Client client(server.endpoint()); @@ -194,6 +197,7 @@ TEST(Rpc, LargeMessage) { Load(&res, req_reader); Save(res, res_builder); }); + ASSERT_TRUE(server.Start()); std::this_thread::sleep_for(100ms); std::string testdata(100000, 'a'); diff --git a/tests/unit/test_coordination.hpp b/tests/unit/test_coordination.hpp index 070e75095..3ae93f66a 100644 --- a/tests/unit/test_coordination.hpp +++ b/tests/unit/test_coordination.hpp @@ -1,5 +1,7 @@ #pragma once +#include + #include "distributed/coordination.hpp" const std::string kLocal = "127.0.0.1"; @@ -12,6 +14,10 @@ class TestMasterCoordination : public distributed::Coordination { TestMasterCoordination() : distributed::Coordination({kLocal, 0}, 0, {kLocal, 0}) {} + void Start() { + ASSERT_TRUE(server_.Start()); + } + void Stop() { server_.Shutdown(); server_.AwaitShutdown(); @@ -24,6 +30,10 @@ class TestWorkerCoordination : public distributed::Coordination { int worker_id) : distributed::Coordination({kLocal, 0}, worker_id, master_endpoint) {} + void Start() { + ASSERT_TRUE(server_.Start()); + } + void Stop() { server_.Shutdown(); server_.AwaitShutdown(); diff --git a/tests/unit/transaction_engine_distributed.cpp b/tests/unit/transaction_engine_distributed.cpp index a94d79e24..5a47c148f 100644 --- a/tests/unit/transaction_engine_distributed.cpp +++ b/tests/unit/transaction_engine_distributed.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -16,53 +17,65 @@ using namespace distributed; class WorkerEngineTest : public testing::Test { protected: + void SetUp() override { + master_coordination_ = std::make_unique(); + master_coordination_->Start(); + + master_ = std::make_unique(master_coordination_.get()); + + worker_coordination_ = std::make_unique( + master_coordination_->GetServerEndpoint(), 1); + worker_coordination_->Start(); + + worker_ = std::make_unique(worker_coordination_.get()); + } + void TearDown() override { - std::thread thread([this] { worker_coordination_.Stop(); }); - master_coordination_.Stop(); + std::thread thread([this] { worker_coordination_->Stop(); }); + master_coordination_->Stop(); if (thread.joinable()) thread.join(); } - TestMasterCoordination master_coordination_; - EngineMaster master_{&master_coordination_}; + std::unique_ptr master_coordination_; + std::unique_ptr master_; - TestWorkerCoordination worker_coordination_{ - master_coordination_.GetServerEndpoint(), 1}; - EngineWorker worker_{&worker_coordination_}; + std::unique_ptr worker_coordination_; + std::unique_ptr worker_; }; TEST_F(WorkerEngineTest, BeginOnWorker) { - worker_.Begin(); - auto second = worker_.Begin(); - EXPECT_EQ(master_.RunningTransaction(second->id_)->snapshot().size(), 1); + worker_->Begin(); + auto second = worker_->Begin(); + EXPECT_EQ(master_->RunningTransaction(second->id_)->snapshot().size(), 1); } TEST_F(WorkerEngineTest, AdvanceOnWorker) { - auto tx = worker_.Begin(); + auto tx = worker_->Begin(); auto cid = tx->cid(); - EXPECT_EQ(worker_.Advance(tx->id_), cid + 1); + EXPECT_EQ(worker_->Advance(tx->id_), cid + 1); } TEST_F(WorkerEngineTest, CommitOnWorker) { - auto tx = worker_.Begin(); + auto tx = worker_->Begin(); auto tx_id = tx->id_; - worker_.Commit(*tx); - EXPECT_TRUE(master_.Info(tx_id).is_committed()); + worker_->Commit(*tx); + EXPECT_TRUE(master_->Info(tx_id).is_committed()); } TEST_F(WorkerEngineTest, AbortOnWorker) { - auto tx = worker_.Begin(); + auto tx = worker_->Begin(); auto tx_id = tx->id_; - worker_.Abort(*tx); - EXPECT_TRUE(master_.Info(tx_id).is_aborted()); + worker_->Abort(*tx); + EXPECT_TRUE(master_->Info(tx_id).is_aborted()); } TEST_F(WorkerEngineTest, RunningTransaction) { - master_.Begin(); - master_.Begin(); - worker_.RunningTransaction(1); - worker_.RunningTransaction(2); + master_->Begin(); + master_->Begin(); + worker_->RunningTransaction(1); + worker_->RunningTransaction(2); int count = 0; - worker_.LocalForEachActiveTransaction([&count](Transaction &t) { + worker_->LocalForEachActiveTransaction([&count](Transaction &t) { ++count; if (t.id_ == 1) { EXPECT_EQ(t.snapshot(), tx::Snapshot(std::vector{})); @@ -74,73 +87,73 @@ TEST_F(WorkerEngineTest, RunningTransaction) { } TEST_F(WorkerEngineTest, Info) { - auto *tx_1 = master_.Begin(); - auto *tx_2 = master_.Begin(); + auto *tx_1 = master_->Begin(); + auto *tx_2 = master_->Begin(); // We can't check active transactions in the worker (see comments there for // info). - master_.Commit(*tx_1); - EXPECT_TRUE(master_.Info(1).is_committed()); - EXPECT_TRUE(worker_.Info(1).is_committed()); - master_.Abort(*tx_2); - EXPECT_TRUE(master_.Info(2).is_aborted()); - EXPECT_TRUE(worker_.Info(2).is_aborted()); + master_->Commit(*tx_1); + EXPECT_TRUE(master_->Info(1).is_committed()); + EXPECT_TRUE(worker_->Info(1).is_committed()); + master_->Abort(*tx_2); + EXPECT_TRUE(master_->Info(2).is_aborted()); + EXPECT_TRUE(worker_->Info(2).is_aborted()); } TEST_F(WorkerEngineTest, GlobalGcSnapshot) { - auto *tx_1 = master_.Begin(); - master_.Begin(); - master_.Commit(*tx_1); - EXPECT_EQ(master_.GlobalGcSnapshot(), tx::Snapshot({1, 2})); - EXPECT_EQ(worker_.GlobalGcSnapshot(), master_.GlobalGcSnapshot()); + auto *tx_1 = master_->Begin(); + master_->Begin(); + master_->Commit(*tx_1); + EXPECT_EQ(master_->GlobalGcSnapshot(), tx::Snapshot({1, 2})); + EXPECT_EQ(worker_->GlobalGcSnapshot(), master_->GlobalGcSnapshot()); } TEST_F(WorkerEngineTest, GlobalActiveTransactions) { - auto *tx_1 = master_.Begin(); - master_.Begin(); - auto *tx_3 = master_.Begin(); - master_.Begin(); - master_.Commit(*tx_1); - master_.Abort(*tx_3); - EXPECT_EQ(worker_.GlobalActiveTransactions(), tx::Snapshot({2, 4})); + auto *tx_1 = master_->Begin(); + master_->Begin(); + auto *tx_3 = master_->Begin(); + master_->Begin(); + master_->Commit(*tx_1); + master_->Abort(*tx_3); + EXPECT_EQ(worker_->GlobalActiveTransactions(), tx::Snapshot({2, 4})); } TEST_F(WorkerEngineTest, LocalLast) { - master_.Begin(); - EXPECT_EQ(worker_.LocalLast(), 0); - worker_.RunningTransaction(1); - EXPECT_EQ(worker_.LocalLast(), 1); - master_.Begin(); - EXPECT_EQ(worker_.LocalLast(), 1); - master_.Begin(); - EXPECT_EQ(worker_.LocalLast(), 1); - master_.Begin(); - worker_.RunningTransaction(4); - EXPECT_EQ(worker_.LocalLast(), 4); + master_->Begin(); + EXPECT_EQ(worker_->LocalLast(), 0); + worker_->RunningTransaction(1); + EXPECT_EQ(worker_->LocalLast(), 1); + master_->Begin(); + EXPECT_EQ(worker_->LocalLast(), 1); + master_->Begin(); + EXPECT_EQ(worker_->LocalLast(), 1); + master_->Begin(); + worker_->RunningTransaction(4); + EXPECT_EQ(worker_->LocalLast(), 4); } TEST_F(WorkerEngineTest, LocalForEachActiveTransaction) { - master_.Begin(); - worker_.RunningTransaction(1); - master_.Begin(); - master_.Begin(); - master_.Begin(); - worker_.RunningTransaction(4); + master_->Begin(); + worker_->RunningTransaction(1); + master_->Begin(); + master_->Begin(); + master_->Begin(); + worker_->RunningTransaction(4); std::unordered_set local; - worker_.LocalForEachActiveTransaction( + worker_->LocalForEachActiveTransaction( [&local](Transaction &t) { local.insert(t.id_); }); EXPECT_EQ(local, std::unordered_set({1, 4})); } TEST_F(WorkerEngineTest, EnsureTxIdGreater) { - ASSERT_LE(master_.Begin()->id_, 40); - worker_.EnsureNextIdGreater(42); - EXPECT_EQ(master_.Begin()->id_, 43); - EXPECT_EQ(worker_.Begin()->id_, 44); + ASSERT_LE(master_->Begin()->id_, 40); + worker_->EnsureNextIdGreater(42); + EXPECT_EQ(master_->Begin()->id_, 43); + EXPECT_EQ(worker_->Begin()->id_, 44); } TEST_F(WorkerEngineTest, GlobalNext) { - auto tx = master_.Begin(); - EXPECT_NE(worker_.LocalLast(), worker_.GlobalLast()); - EXPECT_EQ(master_.LocalLast(), worker_.GlobalLast()); - EXPECT_EQ(worker_.GlobalLast(), tx->id_); + auto tx = master_->Begin(); + EXPECT_NE(worker_->LocalLast(), worker_->GlobalLast()); + EXPECT_EQ(master_->LocalLast(), worker_->GlobalLast()); + EXPECT_EQ(worker_->GlobalLast(), tx->id_); }