#include #include #include #include #include #include "gtest/gtest.h" #include "communication/messaging/distributed.hpp" #include "distributed/coordination_master.hpp" #include "distributed/coordination_worker.hpp" #include "io/network/network_endpoint.hpp" using communication::messaging::System; using namespace distributed; const int kWorkerCount = 5; const std::string kLocal = "127.0.0.1"; class WorkerInThread { public: WorkerInThread(io::network::NetworkEndpoint master_endpoint, int desired_id = -1) { worker_thread_ = std::thread([this, master_endpoint, desired_id] { system_.emplace(kLocal, 0); coord_.emplace(*system_, master_endpoint); worker_id_ = coord_->RegisterWorker(desired_id); coord_->WaitForShutdown(); coord_->Shutdown(); system_->Shutdown(); }); } int worker_id() const { return worker_id_; } auto endpoint() const { return system_->endpoint(); } auto worker_endpoint(int worker_id) { return coord_->GetEndpoint(worker_id); } void join() { worker_thread_.join(); } private: std::thread worker_thread_; std::experimental::optional system_; std::experimental::optional coord_; std::atomic worker_id_{0}; }; TEST(Distributed, Coordination) { System master_system(kLocal, 0); MasterCoordination master_coord(master_system); std::vector> workers; for (int i = 0; i < kWorkerCount; ++i) workers.emplace_back( std::make_unique(master_system.endpoint())); // Wait till all the workers are safely initialized. std::this_thread::sleep_for(300ms); // Expect that all workers have a different ID. std::unordered_set worker_ids; for (const auto &w : workers) worker_ids.insert(w->worker_id()); EXPECT_EQ(worker_ids.size(), kWorkerCount); // Check endpoints. for (auto &w1 : workers) { for (auto &w2 : workers) { EXPECT_EQ(w1->worker_endpoint(w2->worker_id()), w2->endpoint()); } } // Coordinated shutdown. master_coord.Shutdown(); master_system.Shutdown(); for (auto &worker : workers) worker->join(); } TEST(Distributed, DesiredAndUniqueId) { System master_system(kLocal, 0); MasterCoordination master_coord(master_system); WorkerInThread w1(master_system.endpoint(), 42); std::this_thread::sleep_for(200ms); WorkerInThread w2(master_system.endpoint(), 42); std::this_thread::sleep_for(200ms); EXPECT_EQ(w1.worker_id(), 42); EXPECT_NE(w2.worker_id(), 42); master_coord.Shutdown(); w1.join(); w2.join(); master_system.Shutdown(); }