Refactor network clients to use a single client implementation
Summary: All network clients (Bolt & RPC) now use a wrapper. Reviewers: buda Reviewed By: buda Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1324
This commit is contained in:
parent
02520ca3a8
commit
c4a0da6054
@ -2,7 +2,6 @@
|
||||
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include "communication/bolt/v1/decoder/buffer.hpp"
|
||||
#include "communication/bolt/v1/decoder/chunked_decoder_buffer.hpp"
|
||||
#include "communication/bolt/v1/decoder/decoder.hpp"
|
||||
#include "communication/bolt/v1/encoder/chunked_encoder_buffer.hpp"
|
||||
@ -13,28 +12,18 @@
|
||||
|
||||
namespace communication::bolt {
|
||||
|
||||
class ClientException : public utils::BasicException {
|
||||
class ClientFatalException : public utils::BasicException {
|
||||
public:
|
||||
using utils::BasicException::BasicException;
|
||||
ClientFatalException()
|
||||
: utils::BasicException(
|
||||
"Something went wrong while communicating with the server!") {}
|
||||
};
|
||||
|
||||
class ClientSocketException : public ClientException {
|
||||
class ClientQueryException : public utils::BasicException {
|
||||
public:
|
||||
using ClientException::ClientException;
|
||||
ClientSocketException()
|
||||
: ClientException("Couldn't write/read data to/from the socket!") {}
|
||||
};
|
||||
|
||||
class ClientInvalidDataException : public ClientException {
|
||||
public:
|
||||
using ClientException::ClientException;
|
||||
ClientInvalidDataException()
|
||||
: ClientException("The server sent invalid data!") {}
|
||||
};
|
||||
|
||||
class ClientQueryException : public ClientException {
|
||||
public:
|
||||
using ClientException::ClientException;
|
||||
ClientQueryException() : ClientException("Couldn't execute query!") {}
|
||||
using utils::BasicException::BasicException;
|
||||
ClientQueryException() : utils::BasicException("Couldn't execute query!") {}
|
||||
};
|
||||
|
||||
struct QueryData {
|
||||
@ -43,56 +32,67 @@ struct QueryData {
|
||||
std::map<std::string, DecodedValue> metadata;
|
||||
};
|
||||
|
||||
template <typename Socket>
|
||||
class Client {
|
||||
public:
|
||||
Client(Socket &&socket, const std::string &username,
|
||||
const std::string &password,
|
||||
const std::string &client_name = "memgraph-bolt/0.0.1")
|
||||
: socket_(std::move(socket)) {
|
||||
DLOG(INFO) << "Sending handshake";
|
||||
if (!socket_.Write(kPreamble, sizeof(kPreamble), true)) {
|
||||
throw ClientSocketException();
|
||||
}
|
||||
for (int i = 0; i < 4; ++i) {
|
||||
if (!socket_.Write(kProtocol, sizeof(kProtocol), i != 3)) {
|
||||
throw ClientSocketException();
|
||||
}
|
||||
}
|
||||
|
||||
DLOG(INFO) << "Reading handshake response";
|
||||
if (!GetDataByLen(4)) {
|
||||
throw ClientSocketException();
|
||||
}
|
||||
if (memcmp(kProtocol, buffer_.data(), sizeof(kProtocol)) != 0) {
|
||||
throw ClientException("Server negotiated unsupported protocol version!");
|
||||
}
|
||||
buffer_.Shift(sizeof(kProtocol));
|
||||
|
||||
DLOG(INFO) << "Sending init message";
|
||||
if (!encoder_.MessageInit(client_name, {{"scheme", "basic"},
|
||||
{"principal", username},
|
||||
{"credentials", password}})) {
|
||||
throw ClientSocketException();
|
||||
}
|
||||
|
||||
DLOG(INFO) << "Reading init message response";
|
||||
Signature signature;
|
||||
DecodedValue metadata;
|
||||
if (!ReadMessage(&signature, &metadata)) {
|
||||
throw ClientException("Couldn't read init message response!");
|
||||
}
|
||||
if (signature != Signature::Success) {
|
||||
throw ClientInvalidDataException();
|
||||
}
|
||||
DLOG(INFO) << "Metadata of init message response: " << metadata;
|
||||
}
|
||||
Client() {}
|
||||
|
||||
Client(const Client &) = delete;
|
||||
Client(Client &&) = delete;
|
||||
Client &operator=(const Client &) = delete;
|
||||
Client &operator=(Client &&) = delete;
|
||||
|
||||
bool Connect(const io::network::Endpoint &endpoint,
|
||||
const std::string &username, const std::string &password,
|
||||
const std::string &client_name = "memgraph-bolt/0.0.1") {
|
||||
if (!client_.Connect(endpoint)) {
|
||||
LOG(ERROR) << "Couldn't connect to " << endpoint;
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!client_.Write(kPreamble, sizeof(kPreamble), true)) {
|
||||
LOG(ERROR) << "Couldn't send preamble!";
|
||||
return false;
|
||||
}
|
||||
for (int i = 0; i < 4; ++i) {
|
||||
if (!client_.Write(kProtocol, sizeof(kProtocol), i != 3)) {
|
||||
LOG(ERROR) << "Couldn't send protocol version!";
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if (!client_.Read(sizeof(kProtocol))) {
|
||||
LOG(ERROR) << "Couldn't get negotiated protocol version!";
|
||||
return false;
|
||||
}
|
||||
if (memcmp(kProtocol, client_.GetData(), sizeof(kProtocol)) != 0) {
|
||||
LOG(ERROR) << "Server negotiated unsupported protocol version!";
|
||||
return false;
|
||||
}
|
||||
client_.ShiftData(sizeof(kProtocol));
|
||||
|
||||
if (!encoder_.MessageInit(client_name, {{"scheme", "basic"},
|
||||
{"principal", username},
|
||||
{"credentials", password}})) {
|
||||
LOG(ERROR) << "Couldn't send init message!";
|
||||
return false;
|
||||
}
|
||||
|
||||
Signature signature;
|
||||
DecodedValue metadata;
|
||||
if (!ReadMessage(&signature, &metadata)) {
|
||||
LOG(ERROR) << "Couldn't read init message response!";
|
||||
return false;
|
||||
}
|
||||
if (signature != Signature::Success) {
|
||||
LOG(ERROR) << "Handshake failed!";
|
||||
return false;
|
||||
}
|
||||
|
||||
DLOG(INFO) << "Metadata of init message response: " << metadata;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
QueryData Execute(const std::string &query,
|
||||
const std::map<std::string, DecodedValue> ¶meters) {
|
||||
DLOG(INFO) << "Sending run message with statement: '" << query
|
||||
@ -107,10 +107,10 @@ class Client {
|
||||
Signature signature;
|
||||
DecodedValue fields;
|
||||
if (!ReadMessage(&signature, &fields)) {
|
||||
throw ClientInvalidDataException();
|
||||
throw ClientFatalException();
|
||||
}
|
||||
if (fields.type() != DecodedValue::Type::Map) {
|
||||
throw ClientInvalidDataException();
|
||||
throw ClientFatalException();
|
||||
}
|
||||
|
||||
if (signature == Signature::Failure) {
|
||||
@ -122,7 +122,7 @@ class Client {
|
||||
}
|
||||
throw ClientQueryException();
|
||||
} else if (signature != Signature::Success) {
|
||||
throw ClientInvalidDataException();
|
||||
throw ClientFatalException();
|
||||
}
|
||||
|
||||
DLOG(INFO) << "Reading pull_all message response";
|
||||
@ -130,27 +130,27 @@ class Client {
|
||||
DecodedValue metadata;
|
||||
std::vector<std::vector<DecodedValue>> records;
|
||||
while (true) {
|
||||
if (!GetDataByChunk()) {
|
||||
throw ClientSocketException();
|
||||
if (!GetMessage()) {
|
||||
throw ClientFatalException();
|
||||
}
|
||||
if (!decoder_.ReadMessageHeader(&signature, &marker)) {
|
||||
throw ClientInvalidDataException();
|
||||
throw ClientFatalException();
|
||||
}
|
||||
if (signature == Signature::Record) {
|
||||
DecodedValue record;
|
||||
if (!decoder_.ReadValue(&record, DecodedValue::Type::List)) {
|
||||
throw ClientInvalidDataException();
|
||||
throw ClientFatalException();
|
||||
}
|
||||
records.push_back(record.ValueList());
|
||||
} else if (signature == Signature::Success) {
|
||||
if (!decoder_.ReadValue(&metadata)) {
|
||||
throw ClientInvalidDataException();
|
||||
throw ClientFatalException();
|
||||
}
|
||||
break;
|
||||
} else if (signature == Signature::Failure) {
|
||||
DecodedValue data;
|
||||
if (!decoder_.ReadValue(&data)) {
|
||||
throw ClientInvalidDataException();
|
||||
throw ClientFatalException();
|
||||
}
|
||||
HandleFailure();
|
||||
auto &tmp = data.ValueMap();
|
||||
@ -160,28 +160,28 @@ class Client {
|
||||
}
|
||||
throw ClientQueryException();
|
||||
} else {
|
||||
throw ClientInvalidDataException();
|
||||
throw ClientFatalException();
|
||||
}
|
||||
}
|
||||
|
||||
if (metadata.type() != DecodedValue::Type::Map) {
|
||||
throw ClientInvalidDataException();
|
||||
throw ClientFatalException();
|
||||
}
|
||||
|
||||
QueryData ret{{}, records, metadata.ValueMap()};
|
||||
|
||||
auto &header = fields.ValueMap();
|
||||
if (header.find("fields") == header.end()) {
|
||||
throw ClientInvalidDataException();
|
||||
throw ClientFatalException();
|
||||
}
|
||||
if (header["fields"].type() != DecodedValue::Type::List) {
|
||||
throw ClientInvalidDataException();
|
||||
throw ClientFatalException();
|
||||
}
|
||||
auto &field_vector = header["fields"].ValueList();
|
||||
|
||||
for (auto &field_item : field_vector) {
|
||||
if (field_item.type() != DecodedValue::Type::String) {
|
||||
throw ClientInvalidDataException();
|
||||
throw ClientFatalException();
|
||||
}
|
||||
ret.fields.push_back(field_item.ValueString());
|
||||
}
|
||||
@ -189,44 +189,30 @@ class Client {
|
||||
return ret;
|
||||
}
|
||||
|
||||
void Close() { socket_.Close(); };
|
||||
|
||||
~Client() { Close(); }
|
||||
void Close() { client_.Close(); };
|
||||
|
||||
private:
|
||||
bool GetDataByLen(uint64_t len) {
|
||||
while (buffer_.size() < len) {
|
||||
auto buff = buffer_.Allocate();
|
||||
int ret = socket_.Read(buff.data, buff.len);
|
||||
if (ret <= 0) return false;
|
||||
buffer_.Written(ret);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
bool GetMessage() {
|
||||
client_.ClearData();
|
||||
while (true) {
|
||||
if (!client_.Read(CHUNK_HEADER_SIZE)) return false;
|
||||
|
||||
bool GetDataByChunk() {
|
||||
ChunkState state;
|
||||
while ((state = decoder_buffer_.GetChunk()) != ChunkState::Done) {
|
||||
if (state == ChunkState::Whole) {
|
||||
// The chunk is whole, no need to read more data.
|
||||
continue;
|
||||
}
|
||||
auto buff = buffer_.Allocate();
|
||||
int ret = socket_.Read(buff.data, buff.len);
|
||||
if (ret <= 0) return false;
|
||||
buffer_.Written(ret);
|
||||
size_t chunk_size = client_.GetData()[0];
|
||||
chunk_size <<= 8;
|
||||
chunk_size += client_.GetData()[1];
|
||||
if (chunk_size == 0) return true;
|
||||
|
||||
if (!client_.Read(chunk_size)) return false;
|
||||
if (decoder_buffer_.GetChunk() != ChunkState::Whole) return false;
|
||||
client_.ClearData();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ReadMessage(Signature *signature, DecodedValue *ret) {
|
||||
Marker marker;
|
||||
if (!GetDataByChunk()) {
|
||||
return false;
|
||||
}
|
||||
if (!decoder_.ReadMessageHeader(signature, &marker)) {
|
||||
return false;
|
||||
}
|
||||
if (!GetMessage()) return false;
|
||||
if (!decoder_.ReadMessageHeader(signature, &marker)) return false;
|
||||
return ReadMessageData(marker, ret);
|
||||
}
|
||||
|
||||
@ -242,32 +228,37 @@ class Client {
|
||||
|
||||
void HandleFailure() {
|
||||
if (!encoder_.MessageAckFailure()) {
|
||||
throw ClientSocketException();
|
||||
throw ClientFatalException();
|
||||
}
|
||||
while (true) {
|
||||
Signature signature;
|
||||
DecodedValue data;
|
||||
if (!ReadMessage(&signature, &data)) {
|
||||
throw ClientInvalidDataException();
|
||||
throw ClientFatalException();
|
||||
}
|
||||
if (signature == Signature::Success) {
|
||||
break;
|
||||
} else if (signature != Signature::Ignored) {
|
||||
throw ClientInvalidDataException();
|
||||
throw ClientFatalException();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// socket
|
||||
Socket socket_;
|
||||
// client
|
||||
communication::Client client_;
|
||||
communication::ClientInputStream input_stream_{client_};
|
||||
communication::ClientOutputStream output_stream_{client_};
|
||||
|
||||
// decoder objects
|
||||
Buffer<> buffer_;
|
||||
ChunkedDecoderBuffer<Buffer<>> decoder_buffer_{buffer_};
|
||||
Decoder<ChunkedDecoderBuffer<Buffer<>>> decoder_{decoder_buffer_};
|
||||
ChunkedDecoderBuffer<communication::ClientInputStream> decoder_buffer_{
|
||||
input_stream_};
|
||||
Decoder<ChunkedDecoderBuffer<communication::ClientInputStream>> decoder_{
|
||||
decoder_buffer_};
|
||||
|
||||
// encoder objects
|
||||
ChunkedEncoderBuffer<Socket> encoder_buffer_{socket_};
|
||||
ClientEncoder<ChunkedEncoderBuffer<Socket>> encoder_{encoder_buffer_};
|
||||
ChunkedEncoderBuffer<communication::ClientOutputStream> encoder_buffer_{
|
||||
output_stream_};
|
||||
ClientEncoder<ChunkedEncoderBuffer<communication::ClientOutputStream>>
|
||||
encoder_{encoder_buffer_};
|
||||
};
|
||||
}
|
||||
} // namespace communication::bolt
|
||||
|
@ -22,8 +22,8 @@ namespace communication::bolt {
|
||||
* Has methods for writing and flushing data into the message buffer.
|
||||
*
|
||||
* Writing data stores data in the internal buffer and flushing data sends
|
||||
* the currently stored data to the Socket. Chunking prepends data length and
|
||||
* appends chunk end marker (0x00 0x00).
|
||||
* the currently stored data to the OutputStream. Chunking prepends data length
|
||||
* and appends chunk end marker (0x00 0x00).
|
||||
*
|
||||
* | chunk header | --- chunk --- | another chunk | -- end marker -- |
|
||||
* | ------- whole chunk -------- | whole chunk | chunk of size 0 |
|
||||
@ -34,12 +34,13 @@ namespace communication::bolt {
|
||||
* The current implementation stores the whole message into a single buffer
|
||||
* which is std::vector.
|
||||
*
|
||||
* @tparam Socket the output socket that should be used
|
||||
* @tparam TOutputStream the output stream that should be used
|
||||
*/
|
||||
template <class Socket>
|
||||
template <class TOutputStream>
|
||||
class ChunkedEncoderBuffer {
|
||||
public:
|
||||
ChunkedEncoderBuffer(Socket &socket) : socket_(socket) {}
|
||||
ChunkedEncoderBuffer(TOutputStream &output_stream)
|
||||
: output_stream_(output_stream) {}
|
||||
|
||||
/**
|
||||
* Writes n values into the buffer. If n is bigger than whole chunk size
|
||||
@ -123,7 +124,8 @@ class ChunkedEncoderBuffer {
|
||||
if (size_ == 0) return true;
|
||||
|
||||
// Flush the whole buffer.
|
||||
if (!socket_.Write(buffer_.data() + offset_, size_ - offset_)) return false;
|
||||
if (!output_stream_.Write(buffer_.data() + offset_, size_ - offset_))
|
||||
return false;
|
||||
DLOG(INFO) << "Flushed << " << size_ << " bytes.";
|
||||
|
||||
// Cleanup.
|
||||
@ -147,7 +149,7 @@ class ChunkedEncoderBuffer {
|
||||
if (first_chunk_size_ == -1) return false;
|
||||
|
||||
// Flush the first chunk
|
||||
if (!socket_.Write(buffer_.data(), first_chunk_size_)) return false;
|
||||
if (!output_stream_.Write(buffer_.data(), first_chunk_size_)) return false;
|
||||
DLOG(INFO) << "Flushed << " << first_chunk_size_ << " bytes.";
|
||||
|
||||
// Cleanup.
|
||||
@ -180,9 +182,9 @@ class ChunkedEncoderBuffer {
|
||||
|
||||
private:
|
||||
/**
|
||||
* A client socket.
|
||||
* The output stream used.
|
||||
*/
|
||||
Socket &socket_;
|
||||
TOutputStream &output_stream_;
|
||||
|
||||
/**
|
||||
* Buffer for a single chunk.
|
||||
@ -214,4 +216,4 @@ class ChunkedEncoderBuffer {
|
||||
*/
|
||||
size_t pos_{CHUNK_HEADER_SIZE};
|
||||
};
|
||||
}
|
||||
} // namespace communication::bolt
|
||||
|
@ -25,7 +25,7 @@ class Encoder : private BaseEncoder<Buffer> {
|
||||
|
||||
/**
|
||||
* Writes a Record message. This method only stores data in the Buffer.
|
||||
* It doesn't send the values out to the Socket (Chunk is called at the
|
||||
* It doesn't send the values out to the Buffer (Chunk is called at the
|
||||
* end of this method). To send the values Flush method has to be called
|
||||
* after this method.
|
||||
*
|
||||
@ -137,4 +137,4 @@ class Encoder : private BaseEncoder<Buffer> {
|
||||
return buffer_.Flush();
|
||||
}
|
||||
};
|
||||
}
|
||||
} // namespace communication::bolt
|
||||
|
136
src/communication/client.hpp
Normal file
136
src/communication/client.hpp
Normal file
@ -0,0 +1,136 @@
|
||||
#pragma once
|
||||
|
||||
#include "communication/buffer.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "io/network/socket.hpp"
|
||||
|
||||
namespace communication {
|
||||
|
||||
/**
|
||||
* This class implements a generic network Client.
|
||||
* It uses blocking sockets and provides an API that can be used to receive/send
|
||||
* data over the network connection.
|
||||
*/
|
||||
class Client {
|
||||
public:
|
||||
/**
|
||||
* This function connects to a remote server and returns whether the connect
|
||||
* succeeded.
|
||||
*/
|
||||
bool Connect(const io::network::Endpoint &endpoint) {
|
||||
if (!socket_.Connect(endpoint)) return false;
|
||||
socket_.SetKeepAlive();
|
||||
socket_.SetNoDelay();
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* This function returns `true` if the socket is in an error state.
|
||||
*/
|
||||
bool ErrorStatus() { return socket_.ErrorStatus(); }
|
||||
|
||||
/**
|
||||
* This function shuts down the socket.
|
||||
*/
|
||||
void Shutdown() { socket_.Shutdown(); }
|
||||
|
||||
/**
|
||||
* This function closes the socket.
|
||||
*/
|
||||
void Close() { socket_.Close(); }
|
||||
|
||||
/**
|
||||
* This function is used to receive `len` bytes from the socket and stores it
|
||||
* in an internal buffer. It returns `true` if the read succeeded and `false`
|
||||
* if it didn't.
|
||||
*/
|
||||
bool Read(size_t len) {
|
||||
size_t received = 0;
|
||||
buffer_.write_end().Resize(buffer_.read_end().size() + len);
|
||||
while (received < len) {
|
||||
auto buff = buffer_.write_end().Allocate();
|
||||
int got = socket_.Read(buff.data, len - received);
|
||||
if (got <= 0) return false;
|
||||
buffer_.write_end().Written(got);
|
||||
received += got;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* This function returns a pointer to the read data that is currently stored
|
||||
* in the client.
|
||||
*/
|
||||
uint8_t *GetData() { return buffer_.read_end().data(); }
|
||||
|
||||
/**
|
||||
* This function returns the size of the read data that is currently stored in
|
||||
* the client.
|
||||
*/
|
||||
size_t GetDataSize() { return buffer_.read_end().size(); }
|
||||
|
||||
/**
|
||||
* This function removes first `len` bytes from the data buffer.
|
||||
*/
|
||||
void ShiftData(size_t len) { buffer_.read_end().Shift(len); }
|
||||
|
||||
/**
|
||||
* This function clears the data buffer.
|
||||
*/
|
||||
void ClearData() { buffer_.read_end().Clear(); }
|
||||
|
||||
// Write end
|
||||
bool Write(const uint8_t *data, size_t len, bool have_more = false) {
|
||||
return socket_.Write(data, len, have_more);
|
||||
}
|
||||
bool Write(const std::string &str, bool have_more = false) {
|
||||
return Write(reinterpret_cast<const uint8_t *>(str.data()), str.size(),
|
||||
have_more);
|
||||
}
|
||||
|
||||
const io::network::Endpoint &endpoint() { return socket_.endpoint(); }
|
||||
|
||||
private:
|
||||
io::network::Socket socket_;
|
||||
|
||||
Buffer buffer_;
|
||||
};
|
||||
|
||||
/**
|
||||
* This class provides a stream-like input side object to the client.
|
||||
*/
|
||||
class ClientInputStream {
|
||||
public:
|
||||
ClientInputStream(Client &client) : client_(client) {}
|
||||
|
||||
uint8_t *data() { return client_.GetData(); }
|
||||
|
||||
size_t size() const { return client_.GetDataSize(); }
|
||||
|
||||
void Shift(size_t len) { client_.ShiftData(len); }
|
||||
|
||||
void Clear() { client_.ClearData(); }
|
||||
|
||||
private:
|
||||
Client &client_;
|
||||
};
|
||||
|
||||
/**
|
||||
* This class provides a stream-like output side object to the client.
|
||||
*/
|
||||
class ClientOutputStream {
|
||||
public:
|
||||
ClientOutputStream(Client &client) : client_(client) {}
|
||||
|
||||
bool Write(const uint8_t *data, size_t len, bool have_more = false) {
|
||||
return client_.Write(data, len, have_more);
|
||||
}
|
||||
bool Write(const std::string &str, bool have_more = false) {
|
||||
return client_.Write(str, have_more);
|
||||
}
|
||||
|
||||
private:
|
||||
Client &client_;
|
||||
};
|
||||
|
||||
} // namespace communication
|
@ -29,21 +29,18 @@ std::unique_ptr<Message> Client::Call(const Message &request) {
|
||||
|
||||
// Check if the connection is broken (if we haven't used the client for a
|
||||
// long time the server could have died).
|
||||
if (socket_ && socket_->ErrorStatus()) {
|
||||
socket_ = std::experimental::nullopt;
|
||||
if (client_ && client_->ErrorStatus()) {
|
||||
client_ = std::experimental::nullopt;
|
||||
}
|
||||
|
||||
// Connect to the remote server.
|
||||
if (!socket_) {
|
||||
socket_.emplace();
|
||||
buffer_.Clear();
|
||||
if (!socket_->Connect(endpoint_)) {
|
||||
if (!client_) {
|
||||
client_.emplace();
|
||||
if (!client_->Connect(endpoint_)) {
|
||||
LOG(ERROR) << "Couldn't connect to remote address " << endpoint_;
|
||||
socket_ = std::experimental::nullopt;
|
||||
client_ = std::experimental::nullopt;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
socket_->SetKeepAlive();
|
||||
}
|
||||
|
||||
// Serialize and send request.
|
||||
@ -59,64 +56,61 @@ std::unique_ptr<Message> Client::Call(const Message &request) {
|
||||
const std::string &request_buffer = request_stream.str();
|
||||
CHECK(request_buffer.size() <= std::numeric_limits<MessageSize>::max())
|
||||
<< fmt::format(
|
||||
"Trying to send message of size {}, max message size is {}",
|
||||
request_buffer.size(), std::numeric_limits<MessageSize>::max());
|
||||
"Trying to send message of size {}, max message size is {}",
|
||||
request_buffer.size(), std::numeric_limits<MessageSize>::max());
|
||||
|
||||
MessageSize request_data_size = request_buffer.size();
|
||||
if (!socket_->Write(reinterpret_cast<uint8_t *>(&request_data_size),
|
||||
if (!client_->Write(reinterpret_cast<uint8_t *>(&request_data_size),
|
||||
sizeof(MessageSize), true)) {
|
||||
LOG(ERROR) << "Couldn't send request size to " << socket_->endpoint();
|
||||
socket_ = std::experimental::nullopt;
|
||||
LOG(ERROR) << "Couldn't send request size to " << client_->endpoint();
|
||||
client_ = std::experimental::nullopt;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
if (!socket_->Write(request_buffer)) {
|
||||
LOG(ERROR) << "Couldn't send request data to " << socket_->endpoint();
|
||||
socket_ = std::experimental::nullopt;
|
||||
if (!client_->Write(request_buffer)) {
|
||||
LOG(ERROR) << "Couldn't send request data to " << client_->endpoint();
|
||||
client_ = std::experimental::nullopt;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// Receive response.
|
||||
while (true) {
|
||||
auto buff = buffer_.Allocate();
|
||||
auto received = socket_->Read(buff.data, buff.len);
|
||||
if (received <= 0) {
|
||||
LOG(ERROR) << "Couldn't get response from " << socket_->endpoint();
|
||||
socket_ = std::experimental::nullopt;
|
||||
return nullptr;
|
||||
}
|
||||
buffer_.Written(received);
|
||||
|
||||
if (buffer_.size() < sizeof(MessageSize)) continue;
|
||||
MessageSize response_data_size =
|
||||
*reinterpret_cast<MessageSize *>(buffer_.data());
|
||||
size_t response_size = sizeof(MessageSize) + response_data_size;
|
||||
buffer_.Resize(response_size);
|
||||
if (buffer_.size() < response_size) continue;
|
||||
|
||||
std::unique_ptr<Message> response;
|
||||
{
|
||||
std::stringstream response_stream(std::ios_base::in |
|
||||
std::ios_base::binary);
|
||||
response_stream.str(std::string(
|
||||
reinterpret_cast<char *>(buffer_.data() + sizeof(MessageSize)),
|
||||
response_data_size));
|
||||
boost::archive::binary_iarchive response_archive(response_stream);
|
||||
response_archive >> response;
|
||||
}
|
||||
|
||||
buffer_.Shift(response_size);
|
||||
|
||||
return response;
|
||||
// Receive response data size.
|
||||
if (!client_->Read(sizeof(MessageSize))) {
|
||||
LOG(ERROR) << "Couldn't get response from " << client_->endpoint();
|
||||
client_ = std::experimental::nullopt;
|
||||
return nullptr;
|
||||
}
|
||||
MessageSize response_data_size =
|
||||
*reinterpret_cast<MessageSize *>(client_->GetData());
|
||||
client_->ShiftData(sizeof(MessageSize));
|
||||
|
||||
// Receive response data.
|
||||
if (!client_->Read(response_data_size)) {
|
||||
LOG(ERROR) << "Couldn't get response from " << client_->endpoint();
|
||||
client_ = std::experimental::nullopt;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
std::unique_ptr<Message> response;
|
||||
{
|
||||
std::stringstream response_stream(std::ios_base::in |
|
||||
std::ios_base::binary);
|
||||
response_stream.str(std::string(reinterpret_cast<char *>(client_->GetData()),
|
||||
response_data_size));
|
||||
boost::archive::binary_iarchive response_archive(response_stream);
|
||||
response_archive >> response;
|
||||
}
|
||||
|
||||
client_->ShiftData(response_data_size);
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
void Client::Abort() {
|
||||
if (!socket_) return;
|
||||
// We need to call Shutdown on the socket to abort any pending read or
|
||||
if (!client_) return;
|
||||
// We need to call Shutdown on the client to abort any pending read or
|
||||
// write operations.
|
||||
socket_->Shutdown();
|
||||
socket_ = std::experimental::nullopt;
|
||||
client_->Shutdown();
|
||||
client_ = std::experimental::nullopt;
|
||||
}
|
||||
|
||||
} // namespace communication::rpc
|
||||
|
@ -7,10 +7,9 @@
|
||||
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include "communication/rpc/buffer.hpp"
|
||||
#include "communication/client.hpp"
|
||||
#include "communication/rpc/messages.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "io/network/socket.hpp"
|
||||
#include "utils/demangle.hpp"
|
||||
|
||||
namespace communication::rpc {
|
||||
@ -43,7 +42,7 @@ class Client {
|
||||
// Since message_id was checked in private Call function, this means
|
||||
// something is very wrong (probably on the server side).
|
||||
LOG(ERROR) << "Message response was of unexpected type";
|
||||
socket_ = std::experimental::nullopt;
|
||||
client_ = std::experimental::nullopt;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
@ -64,9 +63,7 @@ class Client {
|
||||
std::unique_ptr<Message> Call(const Message &request);
|
||||
|
||||
io::network::Endpoint endpoint_;
|
||||
std::experimental::optional<io::network::Socket> socket_;
|
||||
|
||||
Buffer buffer_;
|
||||
std::experimental::optional<communication::Client> client_;
|
||||
|
||||
std::mutex mutex_;
|
||||
|
||||
|
@ -6,9 +6,6 @@
|
||||
|
||||
#include "communication/rpc/messages.hpp"
|
||||
#include "communication/session.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "io/network/socket.hpp"
|
||||
#include "io/network/stream_buffer.hpp"
|
||||
|
||||
/**
|
||||
* @brief Protocol
|
||||
@ -21,10 +18,6 @@
|
||||
*/
|
||||
namespace communication::rpc {
|
||||
|
||||
using Endpoint = io::network::Endpoint;
|
||||
using Socket = io::network::Socket;
|
||||
using StreamBuffer = io::network::StreamBuffer;
|
||||
|
||||
// Forward declaration of class Server
|
||||
class Server;
|
||||
|
||||
|
@ -12,8 +12,6 @@
|
||||
#include "distributed/coordination_master.hpp"
|
||||
#include "distributed/coordination_worker.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "io/network/network_error.hpp"
|
||||
#include "io/network/socket.hpp"
|
||||
#include "stats/stats.hpp"
|
||||
#include "utils/flag_validation.hpp"
|
||||
#include "utils/on_scope_exit.hpp"
|
||||
@ -26,8 +24,6 @@
|
||||
|
||||
namespace fs = std::experimental::filesystem;
|
||||
using communication::bolt::SessionData;
|
||||
using io::network::Endpoint;
|
||||
using io::network::Socket;
|
||||
using SessionT = communication::bolt::Session<communication::InputStream,
|
||||
communication::OutputStream>;
|
||||
using ServerT = communication::Server<SessionT, SessionData>;
|
||||
|
@ -1,4 +1,4 @@
|
||||
#pragma once
|
||||
#pragma once
|
||||
|
||||
#include <fstream>
|
||||
#include <vector>
|
||||
@ -8,11 +8,9 @@
|
||||
#include "communication/bolt/client.hpp"
|
||||
#include "communication/bolt/v1/decoder/decoded_value.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "io/network/socket.hpp"
|
||||
|
||||
using SocketT = io::network::Socket;
|
||||
using EndpointT = io::network::Endpoint;
|
||||
using ClientT = communication::bolt::Client<SocketT>;
|
||||
using ClientT = communication::bolt::Client;
|
||||
using QueryDataT = communication::bolt::QueryData;
|
||||
using communication::bolt::DecodedValue;
|
||||
|
||||
@ -21,14 +19,13 @@ class BoltClient {
|
||||
BoltClient(const std::string &address, uint16_t port,
|
||||
const std::string &username, const std::string &password,
|
||||
const std::string & = "") {
|
||||
SocketT socket;
|
||||
EndpointT endpoint(address, port);
|
||||
client_ = std::make_unique<ClientT>();
|
||||
|
||||
if (!socket.Connect(endpoint)) {
|
||||
LOG(FATAL) << "Could not connect to: " << address << ":" << port;
|
||||
if (!client_->Connect(endpoint, username, password)) {
|
||||
LOG(FATAL) << "Could not connect to: " << endpoint;
|
||||
}
|
||||
|
||||
client_ = std::make_unique<ClientT>(std::move(socket), username, password);
|
||||
}
|
||||
|
||||
QueryDataT Execute(const std::string &query,
|
||||
|
@ -3,14 +3,9 @@
|
||||
|
||||
#include "communication/bolt/client.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "io/network/socket.hpp"
|
||||
#include "utils/network.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
using SocketT = io::network::Socket;
|
||||
using EndpointT = io::network::Endpoint;
|
||||
using ClientT = communication::bolt::Client<SocketT>;
|
||||
|
||||
DEFINE_string(address, "127.0.0.1", "Server address");
|
||||
DEFINE_int32(port, 7687, "Server port");
|
||||
DEFINE_string(username, "", "Username for the database");
|
||||
@ -21,12 +16,11 @@ int main(int argc, char **argv) {
|
||||
google::InitGoogleLogging(argv[0]);
|
||||
|
||||
// TODO: handle endpoint exception
|
||||
EndpointT endpoint(utils::ResolveHostname(FLAGS_address), FLAGS_port);
|
||||
SocketT socket;
|
||||
io::network::Endpoint endpoint(utils::ResolveHostname(FLAGS_address),
|
||||
FLAGS_port);
|
||||
communication::bolt::Client client;
|
||||
|
||||
if (!socket.Connect(endpoint)) return 1;
|
||||
|
||||
ClientT client(std::move(socket), FLAGS_username, FLAGS_password);
|
||||
if (!client.Connect(endpoint, FLAGS_username, FLAGS_password)) return 1;
|
||||
|
||||
std::cout << "Memgraph bolt client is connected and running." << std::endl;
|
||||
|
||||
@ -64,7 +58,5 @@ int main(int argc, char **argv) {
|
||||
}
|
||||
}
|
||||
|
||||
client.Close();
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -4,13 +4,11 @@
|
||||
|
||||
#include "communication/bolt/client.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "io/network/socket.hpp"
|
||||
#include "utils/exceptions.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
using SocketT = io::network::Socket;
|
||||
using EndpointT = io::network::Endpoint;
|
||||
using ClientT = communication::bolt::Client<SocketT>;
|
||||
using ClientT = communication::bolt::Client;
|
||||
using DecodedValueT = communication::bolt::DecodedValue;
|
||||
using QueryDataT = communication::bolt::QueryData;
|
||||
using ExceptionT = communication::bolt::ClientQueryException;
|
||||
@ -53,14 +51,11 @@ class GraphSession {
|
||||
}
|
||||
|
||||
EndpointT endpoint(FLAGS_address, FLAGS_port);
|
||||
SocketT socket;
|
||||
client_ = std::make_unique<ClientT>();
|
||||
|
||||
if (!socket.Connect(endpoint)) {
|
||||
if (!client_->Connect(endpoint, FLAGS_username, FLAGS_password)) {
|
||||
throw utils::BasicException("Couldn't connect to server!");
|
||||
}
|
||||
|
||||
client_ = std::make_unique<ClientT>(std::move(socket), FLAGS_username,
|
||||
FLAGS_password);
|
||||
}
|
||||
|
||||
private:
|
||||
@ -374,11 +369,10 @@ int main(int argc, char **argv) {
|
||||
|
||||
// create client
|
||||
EndpointT endpoint(FLAGS_address, FLAGS_port);
|
||||
SocketT socket;
|
||||
if (!socket.Connect(endpoint)) {
|
||||
ClientT client;
|
||||
if (!client.Connect(endpoint, FLAGS_username, FLAGS_password)) {
|
||||
throw utils::BasicException("Couldn't connect to server!");
|
||||
}
|
||||
ClientT client(std::move(socket), FLAGS_username, FLAGS_password);
|
||||
|
||||
// cleanup and create indexes
|
||||
client.Execute("MATCH (n) DETACH DELETE n", {});
|
||||
|
Loading…
Reference in New Issue
Block a user