From 44aefe775eb2466257e3c24f9bdd18e06b84d9a7 Mon Sep 17 00:00:00 2001 From: florijan <florijan@memgraph.io> Date: Mon, 12 Mar 2018 11:26:40 +0100 Subject: [PATCH] Fix flakyness in tests/unit/distributed_coord Summary: Instead of waiting for a fix period for the coordinations to start and coordinate with the master, wait for each of them individually to report being done. Also: rename `WorkerInThread` to `WorkerCoordinationInThread`. Reviewers: dgleich, teon.banek, msantl Reviewed By: msantl Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1288 --- tests/unit/distributed_coordination.cpp | 58 ++++++++++++------------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/tests/unit/distributed_coordination.cpp b/tests/unit/distributed_coordination.cpp index e2159a477..e8de16055 100644 --- a/tests/unit/distributed_coordination.cpp +++ b/tests/unit/distributed_coordination.cpp @@ -1,3 +1,4 @@ +#include <atomic> #include <experimental/optional> #include <memory> #include <thread> @@ -19,15 +20,21 @@ using namespace std::literals::chrono_literals; const int kWorkerCount = 5; const std::string kLocal = "127.0.0.1"; -class WorkerInThread { +class WorkerCoordinationInThread { public: - WorkerInThread(io::network::Endpoint master_endpoint, int desired_id = -1) { - worker_thread_ = std::thread([this, master_endpoint, desired_id] { - server_.emplace(Endpoint(kLocal, 0)); - coord_.emplace(*server_, master_endpoint); - worker_id_ = coord_->RegisterWorker(desired_id); - coord_->WaitForShutdown(); - }); + WorkerCoordinationInThread(io::network::Endpoint master_endpoint, + int desired_id = -1) { + std::atomic<bool> init_done{false}; + worker_thread_ = + std::thread([this, master_endpoint, desired_id, &init_done] { + server_.emplace(Endpoint(kLocal, 0)); + coord_.emplace(*server_, master_endpoint); + worker_id_ = coord_->RegisterWorker(desired_id); + init_done = true; + coord_->WaitForShutdown(); + }); + + while (!init_done) std::this_thread::sleep_for(10ms); } int worker_id() const { return worker_id_; } @@ -44,21 +51,18 @@ class WorkerInThread { TEST(Distributed, Coordination) { Server master_server({kLocal, 0}); - std::vector<std::unique_ptr<WorkerInThread>> workers; + std::vector<std::unique_ptr<WorkerCoordinationInThread>> workers; { MasterCoordination master_coord(master_server); for (int i = 0; i < kWorkerCount; ++i) - workers.emplace_back( - std::make_unique<WorkerInThread>(master_server.endpoint())); - - // Wait till all the workers are safely initialized. - std::this_thread::sleep_for(300ms); + workers.emplace_back(std::make_unique<WorkerCoordinationInThread>( + master_server.endpoint())); // Expect that all workers have a different ID. std::unordered_set<int> worker_ids; for (const auto &w : workers) worker_ids.insert(w->worker_id()); - EXPECT_EQ(worker_ids.size(), kWorkerCount); + ASSERT_EQ(worker_ids.size(), kWorkerCount); // Check endpoints. for (auto &w1 : workers) { @@ -73,16 +77,14 @@ TEST(Distributed, Coordination) { TEST(Distributed, DesiredAndUniqueId) { Server master_server({kLocal, 0}); - std::vector<std::unique_ptr<WorkerInThread>> workers; + std::vector<std::unique_ptr<WorkerCoordinationInThread>> workers; { MasterCoordination master_coord(master_server); - workers.emplace_back( - std::make_unique<WorkerInThread>(master_server.endpoint(), 42)); - std::this_thread::sleep_for(200ms); - workers.emplace_back( - std::make_unique<WorkerInThread>(master_server.endpoint(), 42)); - std::this_thread::sleep_for(200ms); + workers.emplace_back(std::make_unique<WorkerCoordinationInThread>( + master_server.endpoint(), 42)); + workers.emplace_back(std::make_unique<WorkerCoordinationInThread>( + master_server.endpoint(), 42)); EXPECT_EQ(workers[0]->worker_id(), 42); EXPECT_NE(workers[1]->worker_id(), 42); @@ -93,16 +95,14 @@ TEST(Distributed, DesiredAndUniqueId) { TEST(Distributed, CoordinationWorkersId) { Server master_server({kLocal, 0}); - std::vector<std::unique_ptr<WorkerInThread>> workers; + std::vector<std::unique_ptr<WorkerCoordinationInThread>> workers; { MasterCoordination master_coord(master_server); - workers.emplace_back( - std::make_unique<WorkerInThread>(master_server.endpoint(), 42)); - std::this_thread::sleep_for(200ms); - workers.emplace_back( - std::make_unique<WorkerInThread>(master_server.endpoint(), 42)); - std::this_thread::sleep_for(200ms); + workers.emplace_back(std::make_unique<WorkerCoordinationInThread>( + master_server.endpoint(), 42)); + workers.emplace_back(std::make_unique<WorkerCoordinationInThread>( + master_server.endpoint(), 42)); std::vector<int> ids; ids.push_back(0);