diff --git a/src/communication/messaging/distributed.cpp b/src/communication/messaging/distributed.cpp index 178a28086..7da42de34 100644 --- a/src/communication/messaging/distributed.cpp +++ b/src/communication/messaging/distributed.cpp @@ -3,7 +3,7 @@ namespace communication::messaging { System::System(const std::string &address, uint16_t port) - : address_(address), port_(port) { + : endpoint_(address, port) { // Numbers of worker are quite arbitrary at the point. StartClient(4); StartServer(4); @@ -40,10 +40,9 @@ void System::StartServer(int worker_count) { LOG(FATAL) << "Tried to start a running server!"; } - // Initialize endpoint. - Endpoint endpoint(address_.c_str(), port_); // Initialize server. - server_ = std::make_unique<ServerT>(endpoint, protocol_data_); + server_ = std::make_unique<ServerT>(endpoint_, protocol_data_); + endpoint_ = server_->endpoint(); // Start server. thread_ = std::thread( diff --git a/src/communication/messaging/distributed.hpp b/src/communication/messaging/distributed.hpp index b1d1983ff..9e0b0c579 100644 --- a/src/communication/messaging/distributed.hpp +++ b/src/communication/messaging/distributed.hpp @@ -72,8 +72,7 @@ class System { std::shared_ptr<EventStream> Open(const std::string &name); void Shutdown(); - const std::string &address() const { return address_; } - uint16_t port() const { return port_; } + const io::network::NetworkEndpoint &endpoint() const { return endpoint_; } private: using Endpoint = io::network::NetworkEndpoint; @@ -116,8 +115,7 @@ class System { std::thread thread_; SessionData protocol_data_; std::unique_ptr<ServerT> server_{nullptr}; - std::string address_; - uint16_t port_; + io::network::NetworkEndpoint endpoint_; LocalSystem &system_ = protocol_data_.system; }; diff --git a/src/communication/rpc/rpc.cpp b/src/communication/rpc/rpc.cpp index 2dc2e7465..b799119d4 100644 --- a/src/communication/rpc/rpc.cpp +++ b/src/communication/rpc/rpc.cpp @@ -77,7 +77,8 @@ Client::Client(messaging::System &system, const std::string &address, std::unique_ptr<messaging::Message> Client::Call( std::chrono::system_clock::duration timeout, std::unique_ptr<messaging::Message> message) { - auto request = std::make_unique<Request>(system_.address(), system_.port(), + auto request = std::make_unique<Request>(system_.endpoint().address(), + system_.endpoint().port(), stream_->name(), std::move(message)); auto message_id = request->message_id(); writer_.Send(std::move(request)); diff --git a/tests/unit/messaging_distributed.cpp b/tests/unit/messaging_distributed.cpp index d8e878f95..b2e48ab8b 100644 --- a/tests/unit/messaging_distributed.cpp +++ b/tests/unit/messaging_distributed.cpp @@ -31,17 +31,18 @@ CEREAL_REGISTER_TYPE(MessageInt); * Test do the services start up without crashes. */ TEST(SimpleTests, StartAndShutdown) { - System system("127.0.0.1", 10000); + System system("127.0.0.1", 0); // do nothing std::this_thread::sleep_for(500ms); system.Shutdown(); } TEST(Messaging, Pop) { - System master_system("127.0.0.1", 10000); - System slave_system("127.0.0.1", 10001); + System master_system("127.0.0.1", 0); + System slave_system("127.0.0.1", 0); auto stream = master_system.Open("main"); - Writer writer(slave_system, "127.0.0.1", 10000, "main"); + Writer writer(slave_system, "127.0.0.1", master_system.endpoint().port(), + "main"); std::this_thread::sleep_for(100ms); EXPECT_EQ(stream->Poll(), nullptr); @@ -52,10 +53,11 @@ TEST(Messaging, Pop) { } TEST(Messaging, Await) { - System master_system("127.0.0.1", 10000); - System slave_system("127.0.0.1", 10001); + System master_system("127.0.0.1", 0); + System slave_system("127.0.0.1", 0); auto stream = master_system.Open("main"); - Writer writer(slave_system, "127.0.0.1", 10000, "main"); + Writer writer(slave_system, "127.0.0.1", master_system.endpoint().port(), + "main"); std::this_thread::sleep_for(100ms); std::thread t([&] { @@ -73,10 +75,11 @@ TEST(Messaging, Await) { } TEST(Messaging, RecreateChannelAfterClosing) { - System master_system("127.0.0.1", 10000); - System slave_system("127.0.0.1", 10001); + System master_system("127.0.0.1", 0); + System slave_system("127.0.0.1", 0); auto stream = master_system.Open("main"); - Writer writer(slave_system, "127.0.0.1", 10000, "main"); + Writer writer(slave_system, "127.0.0.1", master_system.endpoint().port(), + "main"); std::this_thread::sleep_for(100ms); writer.Send<MessageInt>(10); diff --git a/tests/unit/rpc.cpp b/tests/unit/rpc.cpp index 385ad4e86..fac41a30a 100644 --- a/tests/unit/rpc.cpp +++ b/tests/unit/rpc.cpp @@ -39,7 +39,7 @@ CEREAL_REGISTER_TYPE(SumRes); using Sum = RequestResponse<SumReq, SumRes>; TEST(Rpc, Call) { - System server_system("127.0.0.1", 10000); + System server_system("127.0.0.1", 0); Server server(server_system, "main"); server.Register<Sum>([](const SumReq &request) { return std::make_unique<SumRes>(request.x + request.y); @@ -47,8 +47,9 @@ TEST(Rpc, Call) { std::thread server_thread([&] { server.Start(); }); std::this_thread::sleep_for(100ms); - System client_system("127.0.0.1", 10001); - Client client(client_system, "127.0.0.1", 10000, "main"); + System client_system("127.0.0.1", 0); + Client client(client_system, "127.0.0.1", server_system.endpoint().port(), + "main"); auto sum = client.Call<Sum>(300ms, 10, 20); EXPECT_EQ(sum->sum, 30); @@ -59,7 +60,7 @@ TEST(Rpc, Call) { } TEST(Rpc, Timeout) { - System server_system("127.0.0.1", 10000); + System server_system("127.0.0.1", 0); Server server(server_system, "main"); server.Register<Sum>([](const SumReq &request) { std::this_thread::sleep_for(300ms); @@ -68,8 +69,9 @@ TEST(Rpc, Timeout) { std::thread server_thread([&] { server.Start(); }); std::this_thread::sleep_for(100ms); - System client_system("127.0.0.1", 10001); - Client client(client_system, "127.0.0.1", 10000, "main"); + System client_system("127.0.0.1", 0); + Client client(client_system, "127.0.0.1", server_system.endpoint().port(), + "main"); auto sum = client.Call<Sum>(100ms, 10, 20); EXPECT_FALSE(sum);