Fix flakyness in rpc/messaging tests

Summary: In this diff I just wanted to fix tests' flakyness. We can discuss if we want to always pass endpoint as an argument and never pass address:port pair explicitly. However if we decide that, I will do that change in another diff.

Reviewers: dgleich, florijan

Reviewed By: dgleich, florijan

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1049
This commit is contained in:
Mislav Bradac 2017-12-12 16:44:45 +01:00
parent 19e96c98b6
commit cf7d9070d8
5 changed files with 28 additions and 25 deletions

View File

@ -3,7 +3,7 @@
namespace communication::messaging { namespace communication::messaging {
System::System(const std::string &address, uint16_t port) System::System(const std::string &address, uint16_t port)
: address_(address), port_(port) { : endpoint_(address, port) {
// Numbers of worker are quite arbitrary at the point. // Numbers of worker are quite arbitrary at the point.
StartClient(4); StartClient(4);
StartServer(4); StartServer(4);
@ -40,10 +40,9 @@ void System::StartServer(int worker_count) {
LOG(FATAL) << "Tried to start a running server!"; LOG(FATAL) << "Tried to start a running server!";
} }
// Initialize endpoint.
Endpoint endpoint(address_.c_str(), port_);
// Initialize server. // Initialize server.
server_ = std::make_unique<ServerT>(endpoint, protocol_data_); server_ = std::make_unique<ServerT>(endpoint_, protocol_data_);
endpoint_ = server_->endpoint();
// Start server. // Start server.
thread_ = std::thread( thread_ = std::thread(

View File

@ -72,8 +72,7 @@ class System {
std::shared_ptr<EventStream> Open(const std::string &name); std::shared_ptr<EventStream> Open(const std::string &name);
void Shutdown(); void Shutdown();
const std::string &address() const { return address_; } const io::network::NetworkEndpoint &endpoint() const { return endpoint_; }
uint16_t port() const { return port_; }
private: private:
using Endpoint = io::network::NetworkEndpoint; using Endpoint = io::network::NetworkEndpoint;
@ -116,8 +115,7 @@ class System {
std::thread thread_; std::thread thread_;
SessionData protocol_data_; SessionData protocol_data_;
std::unique_ptr<ServerT> server_{nullptr}; std::unique_ptr<ServerT> server_{nullptr};
std::string address_; io::network::NetworkEndpoint endpoint_;
uint16_t port_;
LocalSystem &system_ = protocol_data_.system; LocalSystem &system_ = protocol_data_.system;
}; };

View File

@ -77,7 +77,8 @@ Client::Client(messaging::System &system, const std::string &address,
std::unique_ptr<messaging::Message> Client::Call( std::unique_ptr<messaging::Message> Client::Call(
std::chrono::system_clock::duration timeout, std::chrono::system_clock::duration timeout,
std::unique_ptr<messaging::Message> message) { std::unique_ptr<messaging::Message> message) {
auto request = std::make_unique<Request>(system_.address(), system_.port(), auto request = std::make_unique<Request>(system_.endpoint().address(),
system_.endpoint().port(),
stream_->name(), std::move(message)); stream_->name(), std::move(message));
auto message_id = request->message_id(); auto message_id = request->message_id();
writer_.Send(std::move(request)); writer_.Send(std::move(request));

View File

@ -31,17 +31,18 @@ CEREAL_REGISTER_TYPE(MessageInt);
* Test do the services start up without crashes. * Test do the services start up without crashes.
*/ */
TEST(SimpleTests, StartAndShutdown) { TEST(SimpleTests, StartAndShutdown) {
System system("127.0.0.1", 10000); System system("127.0.0.1", 0);
// do nothing // do nothing
std::this_thread::sleep_for(500ms); std::this_thread::sleep_for(500ms);
system.Shutdown(); system.Shutdown();
} }
TEST(Messaging, Pop) { TEST(Messaging, Pop) {
System master_system("127.0.0.1", 10000); System master_system("127.0.0.1", 0);
System slave_system("127.0.0.1", 10001); System slave_system("127.0.0.1", 0);
auto stream = master_system.Open("main"); auto stream = master_system.Open("main");
Writer writer(slave_system, "127.0.0.1", 10000, "main"); Writer writer(slave_system, "127.0.0.1", master_system.endpoint().port(),
"main");
std::this_thread::sleep_for(100ms); std::this_thread::sleep_for(100ms);
EXPECT_EQ(stream->Poll(), nullptr); EXPECT_EQ(stream->Poll(), nullptr);
@ -52,10 +53,11 @@ TEST(Messaging, Pop) {
} }
TEST(Messaging, Await) { TEST(Messaging, Await) {
System master_system("127.0.0.1", 10000); System master_system("127.0.0.1", 0);
System slave_system("127.0.0.1", 10001); System slave_system("127.0.0.1", 0);
auto stream = master_system.Open("main"); auto stream = master_system.Open("main");
Writer writer(slave_system, "127.0.0.1", 10000, "main"); Writer writer(slave_system, "127.0.0.1", master_system.endpoint().port(),
"main");
std::this_thread::sleep_for(100ms); std::this_thread::sleep_for(100ms);
std::thread t([&] { std::thread t([&] {
@ -73,10 +75,11 @@ TEST(Messaging, Await) {
} }
TEST(Messaging, RecreateChannelAfterClosing) { TEST(Messaging, RecreateChannelAfterClosing) {
System master_system("127.0.0.1", 10000); System master_system("127.0.0.1", 0);
System slave_system("127.0.0.1", 10001); System slave_system("127.0.0.1", 0);
auto stream = master_system.Open("main"); auto stream = master_system.Open("main");
Writer writer(slave_system, "127.0.0.1", 10000, "main"); Writer writer(slave_system, "127.0.0.1", master_system.endpoint().port(),
"main");
std::this_thread::sleep_for(100ms); std::this_thread::sleep_for(100ms);
writer.Send<MessageInt>(10); writer.Send<MessageInt>(10);

View File

@ -39,7 +39,7 @@ CEREAL_REGISTER_TYPE(SumRes);
using Sum = RequestResponse<SumReq, SumRes>; using Sum = RequestResponse<SumReq, SumRes>;
TEST(Rpc, Call) { TEST(Rpc, Call) {
System server_system("127.0.0.1", 10000); System server_system("127.0.0.1", 0);
Server server(server_system, "main"); Server server(server_system, "main");
server.Register<Sum>([](const SumReq &request) { server.Register<Sum>([](const SumReq &request) {
return std::make_unique<SumRes>(request.x + request.y); return std::make_unique<SumRes>(request.x + request.y);
@ -47,8 +47,9 @@ TEST(Rpc, Call) {
std::thread server_thread([&] { server.Start(); }); std::thread server_thread([&] { server.Start(); });
std::this_thread::sleep_for(100ms); std::this_thread::sleep_for(100ms);
System client_system("127.0.0.1", 10001); System client_system("127.0.0.1", 0);
Client client(client_system, "127.0.0.1", 10000, "main"); Client client(client_system, "127.0.0.1", server_system.endpoint().port(),
"main");
auto sum = client.Call<Sum>(300ms, 10, 20); auto sum = client.Call<Sum>(300ms, 10, 20);
EXPECT_EQ(sum->sum, 30); EXPECT_EQ(sum->sum, 30);
@ -59,7 +60,7 @@ TEST(Rpc, Call) {
} }
TEST(Rpc, Timeout) { TEST(Rpc, Timeout) {
System server_system("127.0.0.1", 10000); System server_system("127.0.0.1", 0);
Server server(server_system, "main"); Server server(server_system, "main");
server.Register<Sum>([](const SumReq &request) { server.Register<Sum>([](const SumReq &request) {
std::this_thread::sleep_for(300ms); std::this_thread::sleep_for(300ms);
@ -68,8 +69,9 @@ TEST(Rpc, Timeout) {
std::thread server_thread([&] { server.Start(); }); std::thread server_thread([&] { server.Start(); });
std::this_thread::sleep_for(100ms); std::this_thread::sleep_for(100ms);
System client_system("127.0.0.1", 10001); System client_system("127.0.0.1", 0);
Client client(client_system, "127.0.0.1", 10000, "main"); Client client(client_system, "127.0.0.1", server_system.endpoint().port(),
"main");
auto sum = client.Call<Sum>(100ms, 10, 20); auto sum = client.Call<Sum>(100ms, 10, 20);
EXPECT_FALSE(sum); EXPECT_FALSE(sum);