From e2f2cd5722fcf247a185925ac5bf9a49ca6fb52f Mon Sep 17 00:00:00 2001 From: Matej Ferencevic Date: Thu, 25 Jan 2018 13:08:59 +0100 Subject: [PATCH] Improve network performance Summary: With this patch the number of packets for a simple RPC call is lowered from 22 to 12 (45% reduction). The number of packets for the Bolt protocol is lowered from 26 to 18 (30% reduction). Impact on the Bolt protocol will be a constant of ~ 8 packets less per connection, while the impact on the RPC protocol will be approximately a 45% reduction overall. Reviewers: buda, teon.banek Reviewed By: buda Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1141 --- src/communication/bolt/client.hpp | 4 ++-- src/communication/bolt/v1/session.hpp | 4 +++- src/communication/rpc/client.cpp | 6 +++--- src/communication/rpc/protocol.cpp | 15 ++++++--------- src/io/network/socket.cpp | 13 +++++++------ src/io/network/socket.hpp | 6 ++++-- tests/unit/bolt_common.hpp | 2 +- 7 files changed, 26 insertions(+), 24 deletions(-) diff --git a/src/communication/bolt/client.hpp b/src/communication/bolt/client.hpp index 6cba7a2b9..a0a27a069 100644 --- a/src/communication/bolt/client.hpp +++ b/src/communication/bolt/client.hpp @@ -51,11 +51,11 @@ class Client { const std::string &client_name = "memgraph-bolt/0.0.1") : socket_(std::move(socket)) { DLOG(INFO) << "Sending handshake"; - if (!socket_.Write(kPreamble, sizeof(kPreamble))) { + if (!socket_.Write(kPreamble, sizeof(kPreamble), true)) { throw ClientSocketException(); } for (int i = 0; i < 4; ++i) { - if (!socket_.Write(kProtocol, sizeof(kProtocol))) { + if (!socket_.Write(kProtocol, sizeof(kProtocol), i != 3)) { throw ClientSocketException(); } } diff --git a/src/communication/bolt/v1/session.hpp b/src/communication/bolt/v1/session.hpp index ae29d2386..8be06befd 100644 --- a/src/communication/bolt/v1/session.hpp +++ b/src/communication/bolt/v1/session.hpp @@ -47,7 +47,9 @@ class Session { explicit TimeoutSocket(Session &session) : session_(session) {} bool Write(const uint8_t *data, size_t len) { - return session_.socket_.Write(data, len, + // The have_more flag is hardcoded to false here because the bolt data + // is internally buffered and doesn't need to be buffered by the kernel. + return session_.socket_.Write(data, len, false, [this] { return !session_.TimedOut(); }); } diff --git a/src/communication/rpc/client.cpp b/src/communication/rpc/client.cpp index b000f459d..c4bb677ee 100644 --- a/src/communication/rpc/client.cpp +++ b/src/communication/rpc/client.cpp @@ -39,7 +39,7 @@ std::unique_ptr Client::Call(std::unique_ptr request) { // Send service name size. MessageSize service_len = service_name_.size(); if (!socket_->Write(reinterpret_cast(&service_len), - sizeof(MessageSize))) { + sizeof(MessageSize), true)) { LOG(ERROR) << "Couldn't send service name size!"; socket_ = std::experimental::nullopt; return nullptr; @@ -55,7 +55,7 @@ std::unique_ptr Client::Call(std::unique_ptr request) { // Send current request ID. if (!socket_->Write(reinterpret_cast(&request_id), - sizeof(uint32_t))) { + sizeof(uint32_t), true)) { LOG(ERROR) << "Couldn't send request ID!"; socket_ = std::experimental::nullopt; return nullptr; @@ -74,7 +74,7 @@ std::unique_ptr Client::Call(std::unique_ptr request) { kMaxMessageSize); if (!socket_->Write(reinterpret_cast(&request_data_size), - sizeof(MessageSize))) { + sizeof(MessageSize), true)) { LOG(ERROR) << "Couldn't send request size!"; socket_ = std::experimental::nullopt; return nullptr; diff --git a/src/communication/rpc/protocol.cpp b/src/communication/rpc/protocol.cpp index 8dc5ffc65..5f012188e 100644 --- a/src/communication/rpc/protocol.cpp +++ b/src/communication/rpc/protocol.cpp @@ -63,11 +63,6 @@ void Session::Close() { socket_.get()->Close(); } -bool SendLength(Socket &socket, MessageSize length) { - return socket.Write(reinterpret_cast(&length), - sizeof(MessageSize)); -} - void SendMessage(Socket &socket, uint32_t message_id, std::unique_ptr &message) { CHECK(message) << "Trying to send nullptr instead of message"; @@ -78,18 +73,20 @@ void SendMessage(Socket &socket, uint32_t message_id, archive << message; const std::string &buffer = stream.str(); - int64_t message_size = sizeof(MessageSize) + buffer.size(); + uint64_t message_size = sizeof(MessageSize) + buffer.size(); CHECK(message_size <= kMaxMessageSize) << fmt::format( "Trying to send message of size {}, max message size is {}", message_size, kMaxMessageSize); - if (!socket.Write(reinterpret_cast(&message_id), - sizeof(uint32_t))) { + if (!socket.Write(reinterpret_cast(&message_id), sizeof(uint32_t), + true)) { LOG(WARNING) << "Couldn't send message id!"; return; } - if (!SendLength(socket, buffer.size())) { + MessageSize buffer_size = buffer.size(); + if (!socket.Write(reinterpret_cast(&buffer_size), + sizeof(MessageSize), true)) { LOG(WARNING) << "Couldn't send message size!"; return; } diff --git a/src/io/network/socket.cpp b/src/io/network/socket.cpp index bafa7b80c..3320d603a 100644 --- a/src/io/network/socket.cpp +++ b/src/io/network/socket.cpp @@ -193,12 +193,13 @@ std::experimental::optional Socket::Accept() { return Socket(sfd, endpoint); } -bool Socket::Write(const uint8_t *data, size_t len, +bool Socket::Write(const uint8_t *data, size_t len, bool have_more, const std::function &keep_retrying) { + // MSG_NOSIGNAL is here to disable raising a SIGPIPE signal when a + // connection dies mid-write, the socket will only return an EPIPE error. + int flags = MSG_NOSIGNAL | (have_more ? MSG_MORE : 0); while (len > 0) { - // MSG_NOSIGNAL is here to disable raising a SIGPIPE signal when a - // connection dies mid-write, the socket will only return an EPIPE error. - auto written = send(socket_, data, len, MSG_NOSIGNAL); + auto written = send(socket_, data, len, flags); if (written == -1) { if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) { // Terminal error, return failure. @@ -218,9 +219,9 @@ bool Socket::Write(const uint8_t *data, size_t len, return true; } -bool Socket::Write(const std::string &s, +bool Socket::Write(const std::string &s, bool have_more, const std::function &keep_retrying) { - return Write(reinterpret_cast(s.data()), s.size(), + return Write(reinterpret_cast(s.data()), s.size(), have_more, keep_retrying); } diff --git a/src/io/network/socket.hpp b/src/io/network/socket.hpp index 8704bfe2d..6b8f8c08d 100644 --- a/src/io/network/socket.hpp +++ b/src/io/network/socket.hpp @@ -130,6 +130,8 @@ class Socket { * * @param data uint8_t* to data that should be written * @param len length of char* or uint8_t* data + * @param have_more set to true if you plan to send more data to allow the + * kernel to buffer the data instead of immediately sending it out * @param keep_retrying while function executes to true socket will retry to * write data if nonterminal error occurred on socket (EAGAIN, EWOULDBLOCK, * EINTR)... useful if socket is in nonblocking mode or timeout is set on a @@ -143,9 +145,9 @@ class Socket { * true if write succeeded * false if write failed */ - bool Write(const uint8_t *data, size_t len, + bool Write(const uint8_t *data, size_t len, bool have_more = false, const std::function &keep_retrying = [] { return false; }); - bool Write(const std::string &s, + bool Write(const std::string &s, bool have_more = false, const std::function &keep_retrying = [] { return false; }); /** diff --git a/tests/unit/bolt_common.hpp b/tests/unit/bolt_common.hpp index ed789a9b7..fde4c8bc4 100644 --- a/tests/unit/bolt_common.hpp +++ b/tests/unit/bolt_common.hpp @@ -25,7 +25,7 @@ class TestSocket { int id() const { return socket_; } - bool Write(const uint8_t *data, size_t len, + bool Write(const uint8_t *data, size_t len, bool have_more = false, const std::function & = [] { return false; }) { if (!write_success_) return false; for (size_t i = 0; i < len; ++i) output.push_back(data[i]);