diff --git a/tests/manual/distributed_common.hpp b/tests/manual/distributed_common.hpp index c96c921b4..d4ca3aad2 100644 --- a/tests/manual/distributed_common.hpp +++ b/tests/manual/distributed_common.hpp @@ -73,6 +73,18 @@ class Cluster { std::make_unique(worker_config(i + 1))); std::this_thread::sleep_for(kInitTime); } + + // Wait for the whole cluster to be up and running. + std::this_thread::sleep_for(kInitTime); + while (master_->GetWorkerIds().size() < worker_count + 1) { + std::this_thread::sleep_for(kInitTime); + } + for (int i = 0; i < worker_count; ++i) { + while (workers_[i]->worker_.GetWorkerIds().size() < worker_count + 1) { + std::this_thread::sleep_for(kInitTime); + } + } + std::this_thread::sleep_for(kInitTime); } void Stop() { diff --git a/tests/manual/distributed_repl.cpp b/tests/manual/distributed_repl.cpp index aba4a1cea..3c663e995 100644 --- a/tests/manual/distributed_repl.cpp +++ b/tests/manual/distributed_repl.cpp @@ -78,6 +78,18 @@ int main(int argc, char *argv[]) { workers.emplace_back(std::make_unique(config)); } + // Wait for the whole cluster to be up and running. + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + while (master->GetWorkerIds().size() < FLAGS_worker_count + 1) { + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } + for (int i = 0; i < FLAGS_worker_count; ++i) { + while (workers[i]->worker_.GetWorkerIds().size() < FLAGS_worker_count + 1) { + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + // Start the REPL { query::DistributedInterpreter interpreter(master.get()); diff --git a/tests/unit/distributed_common.hpp b/tests/unit/distributed_common.hpp index 7e8ae5ec5..770401e98 100644 --- a/tests/unit/distributed_common.hpp +++ b/tests/unit/distributed_common.hpp @@ -81,6 +81,18 @@ class DistributedGraphDbTest : public ::testing::Test { modify_config(worker_config(i + 1)))); std::this_thread::sleep_for(kInitTime); } + + // Wait for the whole cluster to be up and running. + std::this_thread::sleep_for(kInitTime); + while (master_->GetWorkerIds().size() < kWorkerCount + 1) { + std::this_thread::sleep_for(kInitTime); + } + for (int i = 0; i < kWorkerCount; ++i) { + while (workers_[i]->worker_.GetWorkerIds().size() < kWorkerCount + 1) { + std::this_thread::sleep_for(kInitTime); + } + } + std::this_thread::sleep_for(kInitTime); } void SetUp() override { @@ -207,6 +219,18 @@ class Cluster { FLAGS_durability_directory = GetDurabilityDirectory(i + 1); workers_.emplace_back( std::make_unique(worker_config(i + 1))); + std::this_thread::sleep_for(kInitTime); + } + + // Wait for the whole cluster to be up and running. + std::this_thread::sleep_for(kInitTime); + while (master_->GetWorkerIds().size() < num_workers + 1) { + std::this_thread::sleep_for(kInitTime); + } + for (int i = 0; i < num_workers; ++i) { + while (workers_[i]->worker_.GetWorkerIds().size() < num_workers + 1) { + std::this_thread::sleep_for(kInitTime); + } } std::this_thread::sleep_for(kInitTime); } diff --git a/tests/unit/distributed_dynamic_worker.cpp b/tests/unit/distributed_dynamic_worker.cpp index 06ade932e..544452ee0 100644 --- a/tests/unit/distributed_dynamic_worker.cpp +++ b/tests/unit/distributed_dynamic_worker.cpp @@ -92,8 +92,18 @@ TEST_F(DistributedDynamicWorker, IndexExistsOnNewWorker) { EXPECT_EQ(CountIterable(dba->Vertices(label, property, false)), 100); } + auto num_workers = master->GetWorkerIds().size(); + auto worker1 = CreateWorker(master->endpoint(), 1, modify_config); + while (master->GetWorkerIds().size() < num_workers + 1) { + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } + while (worker1->worker_.GetWorkerIds().size() < num_workers + 1) { + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + // Check that the new worker has that index { auto dba = worker1->db()->Access(); @@ -171,8 +181,18 @@ TEST_F(DistributedDynamicWorker, IndexExistsOnNewWorkerAfterRecovery) { EXPECT_TRUE(dba->LabelPropertyIndexExists(label, property)); } + auto num_workers = master->GetWorkerIds().size(); + auto worker1 = CreateWorker(master->endpoint(), 1, modify_config); + while (master->GetWorkerIds().size() < num_workers + 1) { + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } + while (worker1->worker_.GetWorkerIds().size() < num_workers + 1) { + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + // Check that the new worker has that index. { auto dba = worker1->db()->Access();