Fix flakyness in tests/unit/distributed_coord

Summary:
Instead of waiting for a fix period for the coordinations to start and
coordinate with the master, wait for each of them individually to report
being done.

Also: rename `WorkerInThread` to `WorkerCoordinationInThread`.

Reviewers: dgleich, teon.banek, msantl

Reviewed By: msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1288
This commit is contained in:
florijan 2018-03-12 11:26:40 +01:00
parent 67092ae4d7
commit 44aefe775e

View File

@ -1,3 +1,4 @@
#include <atomic>
#include <experimental/optional>
#include <memory>
#include <thread>
@ -19,15 +20,21 @@ using namespace std::literals::chrono_literals;
const int kWorkerCount = 5;
const std::string kLocal = "127.0.0.1";
class WorkerInThread {
class WorkerCoordinationInThread {
public:
WorkerInThread(io::network::Endpoint master_endpoint, int desired_id = -1) {
worker_thread_ = std::thread([this, master_endpoint, desired_id] {
server_.emplace(Endpoint(kLocal, 0));
coord_.emplace(*server_, master_endpoint);
worker_id_ = coord_->RegisterWorker(desired_id);
coord_->WaitForShutdown();
});
WorkerCoordinationInThread(io::network::Endpoint master_endpoint,
int desired_id = -1) {
std::atomic<bool> init_done{false};
worker_thread_ =
std::thread([this, master_endpoint, desired_id, &init_done] {
server_.emplace(Endpoint(kLocal, 0));
coord_.emplace(*server_, master_endpoint);
worker_id_ = coord_->RegisterWorker(desired_id);
init_done = true;
coord_->WaitForShutdown();
});
while (!init_done) std::this_thread::sleep_for(10ms);
}
int worker_id() const { return worker_id_; }
@ -44,21 +51,18 @@ class WorkerInThread {
TEST(Distributed, Coordination) {
Server master_server({kLocal, 0});
std::vector<std::unique_ptr<WorkerInThread>> workers;
std::vector<std::unique_ptr<WorkerCoordinationInThread>> workers;
{
MasterCoordination master_coord(master_server);
for (int i = 0; i < kWorkerCount; ++i)
workers.emplace_back(
std::make_unique<WorkerInThread>(master_server.endpoint()));
// Wait till all the workers are safely initialized.
std::this_thread::sleep_for(300ms);
workers.emplace_back(std::make_unique<WorkerCoordinationInThread>(
master_server.endpoint()));
// Expect that all workers have a different ID.
std::unordered_set<int> worker_ids;
for (const auto &w : workers) worker_ids.insert(w->worker_id());
EXPECT_EQ(worker_ids.size(), kWorkerCount);
ASSERT_EQ(worker_ids.size(), kWorkerCount);
// Check endpoints.
for (auto &w1 : workers) {
@ -73,16 +77,14 @@ TEST(Distributed, Coordination) {
TEST(Distributed, DesiredAndUniqueId) {
Server master_server({kLocal, 0});
std::vector<std::unique_ptr<WorkerInThread>> workers;
std::vector<std::unique_ptr<WorkerCoordinationInThread>> workers;
{
MasterCoordination master_coord(master_server);
workers.emplace_back(
std::make_unique<WorkerInThread>(master_server.endpoint(), 42));
std::this_thread::sleep_for(200ms);
workers.emplace_back(
std::make_unique<WorkerInThread>(master_server.endpoint(), 42));
std::this_thread::sleep_for(200ms);
workers.emplace_back(std::make_unique<WorkerCoordinationInThread>(
master_server.endpoint(), 42));
workers.emplace_back(std::make_unique<WorkerCoordinationInThread>(
master_server.endpoint(), 42));
EXPECT_EQ(workers[0]->worker_id(), 42);
EXPECT_NE(workers[1]->worker_id(), 42);
@ -93,16 +95,14 @@ TEST(Distributed, DesiredAndUniqueId) {
TEST(Distributed, CoordinationWorkersId) {
Server master_server({kLocal, 0});
std::vector<std::unique_ptr<WorkerInThread>> workers;
std::vector<std::unique_ptr<WorkerCoordinationInThread>> workers;
{
MasterCoordination master_coord(master_server);
workers.emplace_back(
std::make_unique<WorkerInThread>(master_server.endpoint(), 42));
std::this_thread::sleep_for(200ms);
workers.emplace_back(
std::make_unique<WorkerInThread>(master_server.endpoint(), 42));
std::this_thread::sleep_for(200ms);
workers.emplace_back(std::make_unique<WorkerCoordinationInThread>(
master_server.endpoint(), 42));
workers.emplace_back(std::make_unique<WorkerCoordinationInThread>(
master_server.endpoint(), 42));
std::vector<int> ids;
ids.push_back(0);