From 8952df06c155bf5bf3c1e66a4ad0534ac998d027 Mon Sep 17 00:00:00 2001 From: Mislav Bradac Date: Wed, 27 Dec 2017 14:30:18 +0100 Subject: [PATCH] Temporary workaround for raft network problem Reviewers: mtomic Reviewed By: mtomic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1085 --- src/communication/messaging/distributed.cpp | 2 +- src/communication/messaging/protocol.cpp | 36 +++++++++++++++++---- 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/src/communication/messaging/distributed.cpp b/src/communication/messaging/distributed.cpp index 195bf7df8..9d42ec2f3 100644 --- a/src/communication/messaging/distributed.cpp +++ b/src/communication/messaging/distributed.cpp @@ -9,7 +9,7 @@ System::System(const std::string &address, uint16_t port) StartServer(4); } -System::System(const io::network::NetworkEndpoint &endpoint) +System::System(const io::network::NetworkEndpoint &endpoint) : System(endpoint.address(), endpoint.port()) {} System::~System() { diff --git a/src/communication/messaging/protocol.cpp b/src/communication/messaging/protocol.cpp index 86720783e..eaba214bc 100644 --- a/src/communication/messaging/protocol.cpp +++ b/src/communication/messaging/protocol.cpp @@ -1,4 +1,5 @@ #include +#include #include "boost/archive/binary_iarchive.hpp" #include "boost/archive/binary_oarchive.hpp" @@ -66,20 +67,41 @@ bool SendLength(Socket &socket, SizeT length) { return socket.Write(reinterpret_cast(&length), sizeof(SizeT)); } +struct PairHash { + public: + template + std::size_t operator()(const std::pair &x) const { + return std::hash()(x.first) ^ std::hash()(x.second); + } +}; + void SendMessage(const std::string &address, uint16_t port, const std::string &channel, std::unique_ptr message) { + static thread_local std::unordered_map, + Socket, PairHash> + cache; CHECK(message) << "Trying to send nullptr instead of message"; - // Initialize endpoint. - Endpoint endpoint(address.c_str(), port); + auto it = cache.find({address, port}); + if (it == cache.end()) { + // Initialize endpoint. + Endpoint endpoint(address.c_str(), port); - Socket socket; - if (!socket.Connect(endpoint)) { - LOG(INFO) << "Couldn't connect to remote address: " << address << ":" - << port; - return; + Socket socket; + if (!socket.Connect(endpoint)) { + LOG(INFO) << "Couldn't connect to remote address: " << address << ":" + << port; + return; + } + + it = cache + .emplace(std::piecewise_construct, + std::forward_as_tuple(address, port), + std::forward_as_tuple(std::move(socket))) + .first; } + auto &socket = it->second; if (!SendLength(socket, channel.size())) { LOG(INFO) << "Couldn't send channel size!"; return;