From c4a0da6054c599a737fb529e0282c94b9df7aa8c Mon Sep 17 00:00:00 2001
From: Matej Ferencevic <matej.ferencevic@memgraph.io>
Date: Tue, 27 Mar 2018 17:59:40 +0200
Subject: [PATCH] Refactor network clients to use a single client
 implementation

Summary: All network clients (Bolt & RPC) now use a wrapper.

Reviewers: buda

Reviewed By: buda

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1324
---
 src/communication/bolt/client.hpp             | 221 +++++++++---------
 .../v1/encoder/chunked_encoder_buffer.hpp     |  22 +-
 src/communication/bolt/v1/encoder/encoder.hpp |   4 +-
 src/communication/client.hpp                  | 136 +++++++++++
 src/communication/rpc/client.cpp              | 100 ++++----
 src/communication/rpc/client.hpp              |   9 +-
 src/communication/rpc/protocol.hpp            |   7 -
 src/memgraph_bolt.cpp                         |   4 -
 tests/macro_benchmark/clients/bolt_client.hpp |  13 +-
 tests/manual/bolt_client.cpp                  |  16 +-
 tests/stress/long_running.cpp                 |  16 +-
 11 files changed, 320 insertions(+), 228 deletions(-)
 create mode 100644 src/communication/client.hpp

diff --git a/src/communication/bolt/client.hpp b/src/communication/bolt/client.hpp
index 48d218238..c5a8773dc 100644
--- a/src/communication/bolt/client.hpp
+++ b/src/communication/bolt/client.hpp
@@ -2,7 +2,6 @@
 
 #include <glog/logging.h>
 
-#include "communication/bolt/v1/decoder/buffer.hpp"
 #include "communication/bolt/v1/decoder/chunked_decoder_buffer.hpp"
 #include "communication/bolt/v1/decoder/decoder.hpp"
 #include "communication/bolt/v1/encoder/chunked_encoder_buffer.hpp"
@@ -13,28 +12,18 @@
 
 namespace communication::bolt {
 
-class ClientException : public utils::BasicException {
+class ClientFatalException : public utils::BasicException {
+ public:
   using utils::BasicException::BasicException;
+  ClientFatalException()
+      : utils::BasicException(
+            "Something went wrong while communicating with the server!") {}
 };
 
-class ClientSocketException : public ClientException {
+class ClientQueryException : public utils::BasicException {
  public:
-  using ClientException::ClientException;
-  ClientSocketException()
-      : ClientException("Couldn't write/read data to/from the socket!") {}
-};
-
-class ClientInvalidDataException : public ClientException {
- public:
-  using ClientException::ClientException;
-  ClientInvalidDataException()
-      : ClientException("The server sent invalid data!") {}
-};
-
-class ClientQueryException : public ClientException {
- public:
-  using ClientException::ClientException;
-  ClientQueryException() : ClientException("Couldn't execute query!") {}
+  using utils::BasicException::BasicException;
+  ClientQueryException() : utils::BasicException("Couldn't execute query!") {}
 };
 
 struct QueryData {
@@ -43,56 +32,67 @@ struct QueryData {
   std::map<std::string, DecodedValue> metadata;
 };
 
-template <typename Socket>
 class Client {
  public:
-  Client(Socket &&socket, const std::string &username,
-         const std::string &password,
-         const std::string &client_name = "memgraph-bolt/0.0.1")
-      : socket_(std::move(socket)) {
-    DLOG(INFO) << "Sending handshake";
-    if (!socket_.Write(kPreamble, sizeof(kPreamble), true)) {
-      throw ClientSocketException();
-    }
-    for (int i = 0; i < 4; ++i) {
-      if (!socket_.Write(kProtocol, sizeof(kProtocol), i != 3)) {
-        throw ClientSocketException();
-      }
-    }
-
-    DLOG(INFO) << "Reading handshake response";
-    if (!GetDataByLen(4)) {
-      throw ClientSocketException();
-    }
-    if (memcmp(kProtocol, buffer_.data(), sizeof(kProtocol)) != 0) {
-      throw ClientException("Server negotiated unsupported protocol version!");
-    }
-    buffer_.Shift(sizeof(kProtocol));
-
-    DLOG(INFO) << "Sending init message";
-    if (!encoder_.MessageInit(client_name, {{"scheme", "basic"},
-                                            {"principal", username},
-                                            {"credentials", password}})) {
-      throw ClientSocketException();
-    }
-
-    DLOG(INFO) << "Reading init message response";
-    Signature signature;
-    DecodedValue metadata;
-    if (!ReadMessage(&signature, &metadata)) {
-      throw ClientException("Couldn't read init message response!");
-    }
-    if (signature != Signature::Success) {
-      throw ClientInvalidDataException();
-    }
-    DLOG(INFO) << "Metadata of init message response: " << metadata;
-  }
+  Client() {}
 
   Client(const Client &) = delete;
   Client(Client &&) = delete;
   Client &operator=(const Client &) = delete;
   Client &operator=(Client &&) = delete;
 
+  bool Connect(const io::network::Endpoint &endpoint,
+               const std::string &username, const std::string &password,
+               const std::string &client_name = "memgraph-bolt/0.0.1") {
+    if (!client_.Connect(endpoint)) {
+      LOG(ERROR) << "Couldn't connect to " << endpoint;
+      return false;
+    }
+
+    if (!client_.Write(kPreamble, sizeof(kPreamble), true)) {
+      LOG(ERROR) << "Couldn't send preamble!";
+      return false;
+    }
+    for (int i = 0; i < 4; ++i) {
+      if (!client_.Write(kProtocol, sizeof(kProtocol), i != 3)) {
+        LOG(ERROR) << "Couldn't send protocol version!";
+        return false;
+      }
+    }
+
+    if (!client_.Read(sizeof(kProtocol))) {
+      LOG(ERROR) << "Couldn't get negotiated protocol version!";
+      return false;
+    }
+    if (memcmp(kProtocol, client_.GetData(), sizeof(kProtocol)) != 0) {
+      LOG(ERROR) << "Server negotiated unsupported protocol version!";
+      return false;
+    }
+    client_.ShiftData(sizeof(kProtocol));
+
+    if (!encoder_.MessageInit(client_name, {{"scheme", "basic"},
+                                            {"principal", username},
+                                            {"credentials", password}})) {
+      LOG(ERROR) << "Couldn't send init message!";
+      return false;
+    }
+
+    Signature signature;
+    DecodedValue metadata;
+    if (!ReadMessage(&signature, &metadata)) {
+      LOG(ERROR) << "Couldn't read init message response!";
+      return false;
+    }
+    if (signature != Signature::Success) {
+      LOG(ERROR) << "Handshake failed!";
+      return false;
+    }
+
+    DLOG(INFO) << "Metadata of init message response: " << metadata;
+
+    return true;
+  }
+
   QueryData Execute(const std::string &query,
                     const std::map<std::string, DecodedValue> &parameters) {
     DLOG(INFO) << "Sending run message with statement: '" << query
@@ -107,10 +107,10 @@ class Client {
     Signature signature;
     DecodedValue fields;
     if (!ReadMessage(&signature, &fields)) {
-      throw ClientInvalidDataException();
+      throw ClientFatalException();
     }
     if (fields.type() != DecodedValue::Type::Map) {
-      throw ClientInvalidDataException();
+      throw ClientFatalException();
     }
 
     if (signature == Signature::Failure) {
@@ -122,7 +122,7 @@ class Client {
       }
       throw ClientQueryException();
     } else if (signature != Signature::Success) {
-      throw ClientInvalidDataException();
+      throw ClientFatalException();
     }
 
     DLOG(INFO) << "Reading pull_all message response";
@@ -130,27 +130,27 @@ class Client {
     DecodedValue metadata;
     std::vector<std::vector<DecodedValue>> records;
     while (true) {
-      if (!GetDataByChunk()) {
-        throw ClientSocketException();
+      if (!GetMessage()) {
+        throw ClientFatalException();
       }
       if (!decoder_.ReadMessageHeader(&signature, &marker)) {
-        throw ClientInvalidDataException();
+        throw ClientFatalException();
       }
       if (signature == Signature::Record) {
         DecodedValue record;
         if (!decoder_.ReadValue(&record, DecodedValue::Type::List)) {
-          throw ClientInvalidDataException();
+          throw ClientFatalException();
         }
         records.push_back(record.ValueList());
       } else if (signature == Signature::Success) {
         if (!decoder_.ReadValue(&metadata)) {
-          throw ClientInvalidDataException();
+          throw ClientFatalException();
         }
         break;
       } else if (signature == Signature::Failure) {
         DecodedValue data;
         if (!decoder_.ReadValue(&data)) {
-          throw ClientInvalidDataException();
+          throw ClientFatalException();
         }
         HandleFailure();
         auto &tmp = data.ValueMap();
@@ -160,28 +160,28 @@ class Client {
         }
         throw ClientQueryException();
       } else {
-        throw ClientInvalidDataException();
+        throw ClientFatalException();
       }
     }
 
     if (metadata.type() != DecodedValue::Type::Map) {
-      throw ClientInvalidDataException();
+      throw ClientFatalException();
     }
 
     QueryData ret{{}, records, metadata.ValueMap()};
 
     auto &header = fields.ValueMap();
     if (header.find("fields") == header.end()) {
-      throw ClientInvalidDataException();
+      throw ClientFatalException();
     }
     if (header["fields"].type() != DecodedValue::Type::List) {
-      throw ClientInvalidDataException();
+      throw ClientFatalException();
     }
     auto &field_vector = header["fields"].ValueList();
 
     for (auto &field_item : field_vector) {
       if (field_item.type() != DecodedValue::Type::String) {
-        throw ClientInvalidDataException();
+        throw ClientFatalException();
       }
       ret.fields.push_back(field_item.ValueString());
     }
@@ -189,44 +189,30 @@ class Client {
     return ret;
   }
 
-  void Close() { socket_.Close(); };
-
-  ~Client() { Close(); }
+  void Close() { client_.Close(); };
 
  private:
-  bool GetDataByLen(uint64_t len) {
-    while (buffer_.size() < len) {
-      auto buff = buffer_.Allocate();
-      int ret = socket_.Read(buff.data, buff.len);
-      if (ret <= 0) return false;
-      buffer_.Written(ret);
-    }
-    return true;
-  }
+  bool GetMessage() {
+    client_.ClearData();
+    while (true) {
+      if (!client_.Read(CHUNK_HEADER_SIZE)) return false;
 
-  bool GetDataByChunk() {
-    ChunkState state;
-    while ((state = decoder_buffer_.GetChunk()) != ChunkState::Done) {
-      if (state == ChunkState::Whole) {
-        // The chunk is whole, no need to read more data.
-        continue;
-      }
-      auto buff = buffer_.Allocate();
-      int ret = socket_.Read(buff.data, buff.len);
-      if (ret <= 0) return false;
-      buffer_.Written(ret);
+      size_t chunk_size = client_.GetData()[0];
+      chunk_size <<= 8;
+      chunk_size += client_.GetData()[1];
+      if (chunk_size == 0) return true;
+
+      if (!client_.Read(chunk_size)) return false;
+      if (decoder_buffer_.GetChunk() != ChunkState::Whole) return false;
+      client_.ClearData();
     }
     return true;
   }
 
   bool ReadMessage(Signature *signature, DecodedValue *ret) {
     Marker marker;
-    if (!GetDataByChunk()) {
-      return false;
-    }
-    if (!decoder_.ReadMessageHeader(signature, &marker)) {
-      return false;
-    }
+    if (!GetMessage()) return false;
+    if (!decoder_.ReadMessageHeader(signature, &marker)) return false;
     return ReadMessageData(marker, ret);
   }
 
@@ -242,32 +228,37 @@ class Client {
 
   void HandleFailure() {
     if (!encoder_.MessageAckFailure()) {
-      throw ClientSocketException();
+      throw ClientFatalException();
     }
     while (true) {
       Signature signature;
       DecodedValue data;
       if (!ReadMessage(&signature, &data)) {
-        throw ClientInvalidDataException();
+        throw ClientFatalException();
       }
       if (signature == Signature::Success) {
         break;
       } else if (signature != Signature::Ignored) {
-        throw ClientInvalidDataException();
+        throw ClientFatalException();
       }
     }
   }
 
-  // socket
-  Socket socket_;
+  // client
+  communication::Client client_;
+  communication::ClientInputStream input_stream_{client_};
+  communication::ClientOutputStream output_stream_{client_};
 
   // decoder objects
-  Buffer<> buffer_;
-  ChunkedDecoderBuffer<Buffer<>> decoder_buffer_{buffer_};
-  Decoder<ChunkedDecoderBuffer<Buffer<>>> decoder_{decoder_buffer_};
+  ChunkedDecoderBuffer<communication::ClientInputStream> decoder_buffer_{
+      input_stream_};
+  Decoder<ChunkedDecoderBuffer<communication::ClientInputStream>> decoder_{
+      decoder_buffer_};
 
   // encoder objects
-  ChunkedEncoderBuffer<Socket> encoder_buffer_{socket_};
-  ClientEncoder<ChunkedEncoderBuffer<Socket>> encoder_{encoder_buffer_};
+  ChunkedEncoderBuffer<communication::ClientOutputStream> encoder_buffer_{
+      output_stream_};
+  ClientEncoder<ChunkedEncoderBuffer<communication::ClientOutputStream>>
+      encoder_{encoder_buffer_};
 };
-}
+}  // namespace communication::bolt
diff --git a/src/communication/bolt/v1/encoder/chunked_encoder_buffer.hpp b/src/communication/bolt/v1/encoder/chunked_encoder_buffer.hpp
index 460f2e05f..36aaab924 100644
--- a/src/communication/bolt/v1/encoder/chunked_encoder_buffer.hpp
+++ b/src/communication/bolt/v1/encoder/chunked_encoder_buffer.hpp
@@ -22,8 +22,8 @@ namespace communication::bolt {
  * Has methods for writing and flushing data into the message buffer.
  *
  * Writing data stores data in the internal buffer and flushing data sends
- * the currently stored data to the Socket. Chunking prepends data length and
- * appends chunk end marker (0x00 0x00).
+ * the currently stored data to the OutputStream. Chunking prepends data length
+ * and appends chunk end marker (0x00 0x00).
  *
  * | chunk header | --- chunk --- | another chunk | -- end marker -- |
  * | ------- whole chunk -------- |  whole chunk  | chunk of size 0  |
@@ -34,12 +34,13 @@ namespace communication::bolt {
  * The current implementation stores the whole message into a single buffer
  * which is std::vector.
  *
- * @tparam Socket the output socket that should be used
+ * @tparam TOutputStream the output stream that should be used
  */
-template <class Socket>
+template <class TOutputStream>
 class ChunkedEncoderBuffer {
  public:
-  ChunkedEncoderBuffer(Socket &socket) : socket_(socket) {}
+  ChunkedEncoderBuffer(TOutputStream &output_stream)
+      : output_stream_(output_stream) {}
 
   /**
    * Writes n values into the buffer. If n is bigger than whole chunk size
@@ -123,7 +124,8 @@ class ChunkedEncoderBuffer {
     if (size_ == 0) return true;
 
     // Flush the whole buffer.
-    if (!socket_.Write(buffer_.data() + offset_, size_ - offset_)) return false;
+    if (!output_stream_.Write(buffer_.data() + offset_, size_ - offset_))
+      return false;
     DLOG(INFO) << "Flushed << " << size_ << " bytes.";
 
     // Cleanup.
@@ -147,7 +149,7 @@ class ChunkedEncoderBuffer {
     if (first_chunk_size_ == -1) return false;
 
     // Flush the first chunk
-    if (!socket_.Write(buffer_.data(), first_chunk_size_)) return false;
+    if (!output_stream_.Write(buffer_.data(), first_chunk_size_)) return false;
     DLOG(INFO) << "Flushed << " << first_chunk_size_ << " bytes.";
 
     // Cleanup.
@@ -180,9 +182,9 @@ class ChunkedEncoderBuffer {
 
  private:
   /**
-   * A client socket.
+   * The output stream used.
    */
-  Socket &socket_;
+  TOutputStream &output_stream_;
 
   /**
    * Buffer for a single chunk.
@@ -214,4 +216,4 @@ class ChunkedEncoderBuffer {
    */
   size_t pos_{CHUNK_HEADER_SIZE};
 };
-}
+}  // namespace communication::bolt
diff --git a/src/communication/bolt/v1/encoder/encoder.hpp b/src/communication/bolt/v1/encoder/encoder.hpp
index fa3483db4..f5eb8bb97 100644
--- a/src/communication/bolt/v1/encoder/encoder.hpp
+++ b/src/communication/bolt/v1/encoder/encoder.hpp
@@ -25,7 +25,7 @@ class Encoder : private BaseEncoder<Buffer> {
 
   /**
    * Writes a Record message. This method only stores data in the Buffer.
-   * It doesn't send the values out to the Socket (Chunk is called at the
+   * It doesn't send the values out to the Buffer (Chunk is called at the
    * end of this method). To send the values Flush method has to be called
    * after this method.
    *
@@ -137,4 +137,4 @@ class Encoder : private BaseEncoder<Buffer> {
     return buffer_.Flush();
   }
 };
-}
+}  // namespace communication::bolt
diff --git a/src/communication/client.hpp b/src/communication/client.hpp
new file mode 100644
index 000000000..771cb464e
--- /dev/null
+++ b/src/communication/client.hpp
@@ -0,0 +1,136 @@
+#pragma once
+
+#include "communication/buffer.hpp"
+#include "io/network/endpoint.hpp"
+#include "io/network/socket.hpp"
+
+namespace communication {
+
+/**
+ * This class implements a generic network Client.
+ * It uses blocking sockets and provides an API that can be used to receive/send
+ * data over the network connection.
+ */
+class Client {
+ public:
+  /**
+   * This function connects to a remote server and returns whether the connect
+   * succeeded.
+   */
+  bool Connect(const io::network::Endpoint &endpoint) {
+    if (!socket_.Connect(endpoint)) return false;
+    socket_.SetKeepAlive();
+    socket_.SetNoDelay();
+    return true;
+  }
+
+  /**
+   * This function returns `true` if the socket is in an error state.
+   */
+  bool ErrorStatus() { return socket_.ErrorStatus(); }
+
+  /**
+   * This function shuts down the socket.
+   */
+  void Shutdown() { socket_.Shutdown(); }
+
+  /**
+   * This function closes the socket.
+   */
+  void Close() { socket_.Close(); }
+
+  /**
+   * This function is used to receive `len` bytes from the socket and stores it
+   * in an internal buffer. It returns `true` if the read succeeded and `false`
+   * if it didn't.
+   */
+  bool Read(size_t len) {
+    size_t received = 0;
+    buffer_.write_end().Resize(buffer_.read_end().size() + len);
+    while (received < len) {
+      auto buff = buffer_.write_end().Allocate();
+      int got = socket_.Read(buff.data, len - received);
+      if (got <= 0) return false;
+      buffer_.write_end().Written(got);
+      received += got;
+    }
+    return true;
+  }
+
+  /**
+   * This function returns a pointer to the read data that is currently stored
+   * in the client.
+   */
+  uint8_t *GetData() { return buffer_.read_end().data(); }
+
+  /**
+   * This function returns the size of the read data that is currently stored in
+   * the client.
+   */
+  size_t GetDataSize() { return buffer_.read_end().size(); }
+
+  /**
+   * This function removes first `len` bytes from the data buffer.
+   */
+  void ShiftData(size_t len) { buffer_.read_end().Shift(len); }
+
+  /**
+   * This function clears the data buffer.
+   */
+  void ClearData() { buffer_.read_end().Clear(); }
+
+  // Write end
+  bool Write(const uint8_t *data, size_t len, bool have_more = false) {
+    return socket_.Write(data, len, have_more);
+  }
+  bool Write(const std::string &str, bool have_more = false) {
+    return Write(reinterpret_cast<const uint8_t *>(str.data()), str.size(),
+                 have_more);
+  }
+
+  const io::network::Endpoint &endpoint() { return socket_.endpoint(); }
+
+ private:
+  io::network::Socket socket_;
+
+  Buffer buffer_;
+};
+
+/**
+ * This class provides a stream-like input side object to the client.
+ */
+class ClientInputStream {
+ public:
+  ClientInputStream(Client &client) : client_(client) {}
+
+  uint8_t *data() { return client_.GetData(); }
+
+  size_t size() const { return client_.GetDataSize(); }
+
+  void Shift(size_t len) { client_.ShiftData(len); }
+
+  void Clear() { client_.ClearData(); }
+
+ private:
+  Client &client_;
+};
+
+/**
+ * This class provides a stream-like output side object to the client.
+ */
+class ClientOutputStream {
+ public:
+  ClientOutputStream(Client &client) : client_(client) {}
+
+  bool Write(const uint8_t *data, size_t len, bool have_more = false) {
+    return client_.Write(data, len, have_more);
+  }
+  bool Write(const std::string &str, bool have_more = false) {
+    return client_.Write(str, have_more);
+  }
+
+ private:
+  Client &client_;
+};
+
+}  // namespace communication
diff --git a/src/communication/rpc/client.cpp b/src/communication/rpc/client.cpp
index 43f83c0f2..aa4498a29 100644
--- a/src/communication/rpc/client.cpp
+++ b/src/communication/rpc/client.cpp
@@ -29,21 +29,18 @@ std::unique_ptr<Message> Client::Call(const Message &request) {
 
   // Check if the connection is broken (if we haven't used the client for a
   // long time the server could have died).
-  if (socket_ && socket_->ErrorStatus()) {
-    socket_ = std::experimental::nullopt;
+  if (client_ && client_->ErrorStatus()) {
+    client_ = std::experimental::nullopt;
   }
 
   // Connect to the remote server.
-  if (!socket_) {
-    socket_.emplace();
-    buffer_.Clear();
-    if (!socket_->Connect(endpoint_)) {
+  if (!client_) {
+    client_.emplace();
+    if (!client_->Connect(endpoint_)) {
       LOG(ERROR) << "Couldn't connect to remote address " << endpoint_;
-      socket_ = std::experimental::nullopt;
+      client_ = std::experimental::nullopt;
       return nullptr;
     }
-
-    socket_->SetKeepAlive();
   }
 
   // Serialize and send request.
@@ -59,64 +56,61 @@ std::unique_ptr<Message> Client::Call(const Message &request) {
   const std::string &request_buffer = request_stream.str();
   CHECK(request_buffer.size() <= std::numeric_limits<MessageSize>::max())
       << fmt::format(
-          "Trying to send message of size {}, max message size is {}",
-          request_buffer.size(), std::numeric_limits<MessageSize>::max());
+             "Trying to send message of size {}, max message size is {}",
+             request_buffer.size(), std::numeric_limits<MessageSize>::max());
 
   MessageSize request_data_size = request_buffer.size();
-  if (!socket_->Write(reinterpret_cast<uint8_t *>(&request_data_size),
+  if (!client_->Write(reinterpret_cast<uint8_t *>(&request_data_size),
                       sizeof(MessageSize), true)) {
-    LOG(ERROR) << "Couldn't send request size to " << socket_->endpoint();
-    socket_ = std::experimental::nullopt;
+    LOG(ERROR) << "Couldn't send request size to " << client_->endpoint();
+    client_ = std::experimental::nullopt;
     return nullptr;
   }
 
-  if (!socket_->Write(request_buffer)) {
-    LOG(ERROR) << "Couldn't send request data to " << socket_->endpoint();
-    socket_ = std::experimental::nullopt;
+  if (!client_->Write(request_buffer)) {
+    LOG(ERROR) << "Couldn't send request data to " << client_->endpoint();
+    client_ = std::experimental::nullopt;
     return nullptr;
   }
 
-  // Receive response.
-  while (true) {
-    auto buff = buffer_.Allocate();
-    auto received = socket_->Read(buff.data, buff.len);
-    if (received <= 0) {
-      LOG(ERROR) << "Couldn't get response from " << socket_->endpoint();
-      socket_ = std::experimental::nullopt;
-      return nullptr;
-    }
-    buffer_.Written(received);
-
-    if (buffer_.size() < sizeof(MessageSize)) continue;
-    MessageSize response_data_size =
-        *reinterpret_cast<MessageSize *>(buffer_.data());
-    size_t response_size = sizeof(MessageSize) + response_data_size;
-    buffer_.Resize(response_size);
-    if (buffer_.size() < response_size) continue;
-
-    std::unique_ptr<Message> response;
-    {
-      std::stringstream response_stream(std::ios_base::in |
-                                        std::ios_base::binary);
-      response_stream.str(std::string(
-          reinterpret_cast<char *>(buffer_.data() + sizeof(MessageSize)),
-          response_data_size));
-      boost::archive::binary_iarchive response_archive(response_stream);
-      response_archive >> response;
-    }
-
-    buffer_.Shift(response_size);
-
-    return response;
+  // Receive response data size.
+  if (!client_->Read(sizeof(MessageSize))) {
+    LOG(ERROR) << "Couldn't get response from " << client_->endpoint();
+    client_ = std::experimental::nullopt;
+    return nullptr;
   }
+  MessageSize response_data_size =
+      *reinterpret_cast<MessageSize *>(client_->GetData());
+  client_->ShiftData(sizeof(MessageSize));
+
+  // Receive response data.
+  if (!client_->Read(response_data_size)) {
+    LOG(ERROR) << "Couldn't get response from " << client_->endpoint();
+    client_ = std::experimental::nullopt;
+    return nullptr;
+  }
+
+  std::unique_ptr<Message> response;
+  {
+    std::stringstream response_stream(std::ios_base::in |
+                                      std::ios_base::binary);
+    response_stream.str(std::string(reinterpret_cast<char *>(client_->GetData()),
+                                    response_data_size));
+    boost::archive::binary_iarchive response_archive(response_stream);
+    response_archive >> response;
+  }
+
+  client_->ShiftData(response_data_size);
+
+  return response;
 }
 
 void Client::Abort() {
-  if (!socket_) return;
-  // We need to call Shutdown on the socket to abort any pending read or
+  if (!client_) return;
+  // We need to call Shutdown on the client to abort any pending read or
   // write operations.
-  socket_->Shutdown();
-  socket_ = std::experimental::nullopt;
+  client_->Shutdown();
+  client_ = std::experimental::nullopt;
 }
 
 }  // namespace communication::rpc
diff --git a/src/communication/rpc/client.hpp b/src/communication/rpc/client.hpp
index 555976a9f..d02ffc8ff 100644
--- a/src/communication/rpc/client.hpp
+++ b/src/communication/rpc/client.hpp
@@ -7,10 +7,9 @@
 
 #include <glog/logging.h>
 
-#include "communication/rpc/buffer.hpp"
+#include "communication/client.hpp"
 #include "communication/rpc/messages.hpp"
 #include "io/network/endpoint.hpp"
-#include "io/network/socket.hpp"
 #include "utils/demangle.hpp"
 
 namespace communication::rpc {
@@ -43,7 +42,7 @@ class Client {
       // Since message_id was checked in private Call function, this means
       // something is very wrong (probably on the server side).
       LOG(ERROR) << "Message response was of unexpected type";
-      socket_ = std::experimental::nullopt;
+      client_ = std::experimental::nullopt;
       return nullptr;
     }
 
@@ -64,9 +63,7 @@ class Client {
   std::unique_ptr<Message> Call(const Message &request);
 
   io::network::Endpoint endpoint_;
-  std::experimental::optional<io::network::Socket> socket_;
-
-  Buffer buffer_;
+  std::experimental::optional<communication::Client> client_;
 
   std::mutex mutex_;
 
diff --git a/src/communication/rpc/protocol.hpp b/src/communication/rpc/protocol.hpp
index 3291ee47f..cbaacc7e3 100644
--- a/src/communication/rpc/protocol.hpp
+++ b/src/communication/rpc/protocol.hpp
@@ -6,9 +6,6 @@
 
 #include "communication/rpc/messages.hpp"
 #include "communication/session.hpp"
-#include "io/network/endpoint.hpp"
-#include "io/network/socket.hpp"
-#include "io/network/stream_buffer.hpp"
 
 /**
  * @brief Protocol
@@ -21,10 +18,6 @@
  */
 namespace communication::rpc {
 
-using Endpoint = io::network::Endpoint;
-using Socket = io::network::Socket;
-using StreamBuffer = io::network::StreamBuffer;
-
 // Forward declaration of class Server
 class Server;
 
diff --git a/src/memgraph_bolt.cpp b/src/memgraph_bolt.cpp
index 22cef0fef..b36ec03ad 100644
--- a/src/memgraph_bolt.cpp
+++ b/src/memgraph_bolt.cpp
@@ -12,8 +12,6 @@
 #include "distributed/coordination_master.hpp"
 #include "distributed/coordination_worker.hpp"
 #include "io/network/endpoint.hpp"
-#include "io/network/network_error.hpp"
-#include "io/network/socket.hpp"
 #include "stats/stats.hpp"
 #include "utils/flag_validation.hpp"
 #include "utils/on_scope_exit.hpp"
@@ -26,8 +24,6 @@
 
 namespace fs = std::experimental::filesystem;
 using communication::bolt::SessionData;
-using io::network::Endpoint;
-using io::network::Socket;
 using SessionT = communication::bolt::Session<communication::InputStream,
                                               communication::OutputStream>;
 using ServerT = communication::Server<SessionT, SessionData>;
diff --git a/tests/macro_benchmark/clients/bolt_client.hpp b/tests/macro_benchmark/clients/bolt_client.hpp
index 9d9659934..110f7cf23 100644
--- a/tests/macro_benchmark/clients/bolt_client.hpp
+++ b/tests/macro_benchmark/clients/bolt_client.hpp
@@ -1,4 +1,4 @@
-#pragma once 
+#pragma once
 
 #include <fstream>
 #include <vector>
@@ -8,11 +8,9 @@
 #include "communication/bolt/client.hpp"
 #include "communication/bolt/v1/decoder/decoded_value.hpp"
 #include "io/network/endpoint.hpp"
-#include "io/network/socket.hpp"
 
-using SocketT = io::network::Socket;
 using EndpointT = io::network::Endpoint;
-using ClientT = communication::bolt::Client<SocketT>;
+using ClientT = communication::bolt::Client;
 using QueryDataT = communication::bolt::QueryData;
 using communication::bolt::DecodedValue;
 
@@ -21,14 +19,13 @@ class BoltClient {
   BoltClient(const std::string &address, uint16_t port,
              const std::string &username, const std::string &password,
              const std::string & = "") {
-    SocketT socket;
     EndpointT endpoint(address, port);
+    client_ = std::make_unique<ClientT>();
 
-    if (!socket.Connect(endpoint)) {
-      LOG(FATAL) << "Could not connect to: " << address << ":" << port;
+    if (!client_->Connect(endpoint, username, password)) {
+      LOG(FATAL) << "Could not connect to: " << endpoint;
     }
 
-    client_ = std::make_unique<ClientT>(std::move(socket), username, password);
   }
 
   QueryDataT Execute(const std::string &query,
diff --git a/tests/manual/bolt_client.cpp b/tests/manual/bolt_client.cpp
index b1b50e974..ddb9bec84 100644
--- a/tests/manual/bolt_client.cpp
+++ b/tests/manual/bolt_client.cpp
@@ -3,14 +3,9 @@
 
 #include "communication/bolt/client.hpp"
 #include "io/network/endpoint.hpp"
-#include "io/network/socket.hpp"
 #include "utils/network.hpp"
 #include "utils/timer.hpp"
 
-using SocketT = io::network::Socket;
-using EndpointT = io::network::Endpoint;
-using ClientT = communication::bolt::Client<SocketT>;
-
 DEFINE_string(address, "127.0.0.1", "Server address");
 DEFINE_int32(port, 7687, "Server port");
 DEFINE_string(username, "", "Username for the database");
@@ -21,12 +16,11 @@ int main(int argc, char **argv) {
   google::InitGoogleLogging(argv[0]);
 
   // TODO: handle endpoint exception
-  EndpointT endpoint(utils::ResolveHostname(FLAGS_address), FLAGS_port);
-  SocketT socket;
+  io::network::Endpoint endpoint(utils::ResolveHostname(FLAGS_address),
+                                 FLAGS_port);
+  communication::bolt::Client client;
 
-  if (!socket.Connect(endpoint)) return 1;
-
-  ClientT client(std::move(socket), FLAGS_username, FLAGS_password);
+  if (!client.Connect(endpoint, FLAGS_username, FLAGS_password)) return 1;
 
   std::cout << "Memgraph bolt client is connected and running." << std::endl;
 
@@ -64,7 +58,5 @@ int main(int argc, char **argv) {
     }
   }
 
-  client.Close();
-
   return 0;
 }
diff --git a/tests/stress/long_running.cpp b/tests/stress/long_running.cpp
index ef81a5365..bd7081b00 100644
--- a/tests/stress/long_running.cpp
+++ b/tests/stress/long_running.cpp
@@ -4,13 +4,11 @@
 
 #include "communication/bolt/client.hpp"
 #include "io/network/endpoint.hpp"
-#include "io/network/socket.hpp"
 #include "utils/exceptions.hpp"
 #include "utils/timer.hpp"
 
-using SocketT = io::network::Socket;
 using EndpointT = io::network::Endpoint;
-using ClientT = communication::bolt::Client<SocketT>;
+using ClientT = communication::bolt::Client;
 using DecodedValueT = communication::bolt::DecodedValue;
 using QueryDataT = communication::bolt::QueryData;
 using ExceptionT = communication::bolt::ClientQueryException;
@@ -53,14 +51,11 @@ class GraphSession {
     }
 
     EndpointT endpoint(FLAGS_address, FLAGS_port);
-    SocketT socket;
+    client_ = std::make_unique<ClientT>();
 
-    if (!socket.Connect(endpoint)) {
+    if (!client_->Connect(endpoint, FLAGS_username, FLAGS_password)) {
       throw utils::BasicException("Couldn't connect to server!");
     }
-
-    client_ = std::make_unique<ClientT>(std::move(socket), FLAGS_username,
-                                        FLAGS_password);
   }
 
  private:
@@ -374,11 +369,10 @@ int main(int argc, char **argv) {
 
   // create client
   EndpointT endpoint(FLAGS_address, FLAGS_port);
-  SocketT socket;
-  if (!socket.Connect(endpoint)) {
+  ClientT client;
+  if (!client.Connect(endpoint, FLAGS_username, FLAGS_password)) {
     throw utils::BasicException("Couldn't connect to server!");
   }
-  ClientT client(std::move(socket), FLAGS_username, FLAGS_password);
 
   // cleanup and create indexes
   client.Execute("MATCH (n) DETACH DELETE n", {});