Reduce distributed tests flakiness

Summary:
The single-binary distributed tests didn't wait for the whole cluster to be up
and running before they started running their tests. Now all tests ensure that
all workers are registered to the master and all other workers before starting
any tests.

Reviewers: mculinovic

Reviewed By: mculinovic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1724
This commit is contained in:
Matej Ferencevic 2018-11-06 15:48:18 +01:00
parent 06d4568950
commit cc8f199a5f
4 changed files with 68 additions and 0 deletions

View File

@ -73,6 +73,18 @@ class Cluster {
std::make_unique<WorkerInThread>(worker_config(i + 1)));
std::this_thread::sleep_for(kInitTime);
}
// Wait for the whole cluster to be up and running.
std::this_thread::sleep_for(kInitTime);
while (master_->GetWorkerIds().size() < worker_count + 1) {
std::this_thread::sleep_for(kInitTime);
}
for (int i = 0; i < worker_count; ++i) {
while (workers_[i]->worker_.GetWorkerIds().size() < worker_count + 1) {
std::this_thread::sleep_for(kInitTime);
}
}
std::this_thread::sleep_for(kInitTime);
}
void Stop() {

View File

@ -78,6 +78,18 @@ int main(int argc, char *argv[]) {
workers.emplace_back(std::make_unique<WorkerInThread>(config));
}
// Wait for the whole cluster to be up and running.
std::this_thread::sleep_for(std::chrono::milliseconds(200));
while (master->GetWorkerIds().size() < FLAGS_worker_count + 1) {
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
for (int i = 0; i < FLAGS_worker_count; ++i) {
while (workers[i]->worker_.GetWorkerIds().size() < FLAGS_worker_count + 1) {
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(200));
// Start the REPL
{
query::DistributedInterpreter interpreter(master.get());

View File

@ -81,6 +81,18 @@ class DistributedGraphDbTest : public ::testing::Test {
modify_config(worker_config(i + 1))));
std::this_thread::sleep_for(kInitTime);
}
// Wait for the whole cluster to be up and running.
std::this_thread::sleep_for(kInitTime);
while (master_->GetWorkerIds().size() < kWorkerCount + 1) {
std::this_thread::sleep_for(kInitTime);
}
for (int i = 0; i < kWorkerCount; ++i) {
while (workers_[i]->worker_.GetWorkerIds().size() < kWorkerCount + 1) {
std::this_thread::sleep_for(kInitTime);
}
}
std::this_thread::sleep_for(kInitTime);
}
void SetUp() override {
@ -207,6 +219,18 @@ class Cluster {
FLAGS_durability_directory = GetDurabilityDirectory(i + 1);
workers_.emplace_back(
std::make_unique<WorkerInThread>(worker_config(i + 1)));
std::this_thread::sleep_for(kInitTime);
}
// Wait for the whole cluster to be up and running.
std::this_thread::sleep_for(kInitTime);
while (master_->GetWorkerIds().size() < num_workers + 1) {
std::this_thread::sleep_for(kInitTime);
}
for (int i = 0; i < num_workers; ++i) {
while (workers_[i]->worker_.GetWorkerIds().size() < num_workers + 1) {
std::this_thread::sleep_for(kInitTime);
}
}
std::this_thread::sleep_for(kInitTime);
}

View File

@ -92,8 +92,18 @@ TEST_F(DistributedDynamicWorker, IndexExistsOnNewWorker) {
EXPECT_EQ(CountIterable(dba->Vertices(label, property, false)), 100);
}
auto num_workers = master->GetWorkerIds().size();
auto worker1 = CreateWorker(master->endpoint(), 1, modify_config);
while (master->GetWorkerIds().size() < num_workers + 1) {
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
while (worker1->worker_.GetWorkerIds().size() < num_workers + 1) {
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
std::this_thread::sleep_for(std::chrono::milliseconds(200));
// Check that the new worker has that index
{
auto dba = worker1->db()->Access();
@ -171,8 +181,18 @@ TEST_F(DistributedDynamicWorker, IndexExistsOnNewWorkerAfterRecovery) {
EXPECT_TRUE(dba->LabelPropertyIndexExists(label, property));
}
auto num_workers = master->GetWorkerIds().size();
auto worker1 = CreateWorker(master->endpoint(), 1, modify_config);
while (master->GetWorkerIds().size() < num_workers + 1) {
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
while (worker1->worker_.GetWorkerIds().size() < num_workers + 1) {
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
std::this_thread::sleep_for(std::chrono::milliseconds(200));
// Check that the new worker has that index.
{
auto dba = worker1->db()->Access();