Destroy archives to force flushing binary data

Summary:
It is possible that boost's archive didn't write all data to a stream
before the archive is destroyed. To make sure everything works, archives
are now wrapped in a nested block.

Additionally, there may be some issues if the stream isn't set for
binary reading/writing due to new line character conversion. This
shouldn't pose problems on Linux, but just to make sure it doesn't bite
us in the a$% down the line.

Reviewers: mferencevic, mculinovic

Reviewed By: mferencevic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1194
This commit is contained in:
Teon Banek 2018-02-13 15:56:50 +01:00
parent 0616eb0aa4
commit cc6f6f4b35
2 changed files with 31 additions and 20 deletions

View File

@ -62,9 +62,12 @@ std::unique_ptr<Message> Client::Call(std::unique_ptr<Message> request) {
} }
// Serialize and send request. // Serialize and send request.
std::stringstream request_stream; std::stringstream request_stream(std::ios_base::out | std::ios_base::binary);
{
boost::archive::binary_oarchive request_archive(request_stream); boost::archive::binary_oarchive request_archive(request_stream);
request_archive << request; request_archive << request;
// Archive destructor ensures everything is written.
}
const std::string &request_buffer = request_stream.str(); const std::string &request_buffer = request_stream.str();
MessageSize request_data_size = request_buffer.size(); MessageSize request_data_size = request_buffer.size();
@ -104,14 +107,17 @@ std::unique_ptr<Message> Client::Call(std::unique_ptr<Message> request) {
sizeof(uint32_t) + sizeof(MessageSize) + response_data_size; sizeof(uint32_t) + sizeof(MessageSize) + response_data_size;
if (received_bytes_ < response_size) continue; if (received_bytes_ < response_size) continue;
std::stringstream response_stream; std::unique_ptr<Message> response;
response_stream.str( {
std::string(reinterpret_cast<char *>(buffer_.data() + sizeof(uint32_t) + std::stringstream response_stream(std::ios_base::in |
std::ios_base::binary);
response_stream.str(std::string(
reinterpret_cast<char *>(buffer_.data() + sizeof(uint32_t) +
sizeof(MessageSize)), sizeof(MessageSize)),
response_data_size)); response_data_size));
boost::archive::binary_iarchive response_archive(response_stream); boost::archive::binary_iarchive response_archive(response_stream);
std::unique_ptr<Message> response;
response_archive >> response; response_archive >> response;
}
std::copy(buffer_.begin() + response_size, std::copy(buffer_.begin() + response_size,
buffer_.begin() + received_bytes_, buffer_.begin()); buffer_.begin() + received_bytes_, buffer_.begin());

View File

@ -38,14 +38,16 @@ void Session::Execute() {
return; return;
// TODO (mferencevic): check for exceptions // TODO (mferencevic): check for exceptions
std::stringstream stream; std::unique_ptr<Message> message;
{
std::stringstream stream(std::ios_base::in | std::ios_base::binary);
stream.str( stream.str(
std::string(reinterpret_cast<char *>(buffer_.data() + sizeof(uint32_t) + std::string(reinterpret_cast<char *>(buffer_.data() + sizeof(uint32_t) +
sizeof(MessageSize)), sizeof(MessageSize)),
message_len)); message_len));
boost::archive::binary_iarchive archive(stream); boost::archive::binary_iarchive archive(stream);
std::unique_ptr<Message> message;
archive >> message; archive >> message;
}
buffer_.Shift(sizeof(uint32_t) + sizeof(MessageSize) + message_len); buffer_.Shift(sizeof(uint32_t) + sizeof(MessageSize) + message_len);
system_.AddTask(socket_, service_name_, message_id, std::move(message)); system_.AddTask(socket_, service_name_, message_id, std::move(message));
@ -68,9 +70,12 @@ void SendMessage(Socket &socket, uint32_t message_id,
CHECK(message) << "Trying to send nullptr instead of message"; CHECK(message) << "Trying to send nullptr instead of message";
// Serialize and send message // Serialize and send message
std::stringstream stream; std::stringstream stream(std::ios_base::out | std::ios_base::binary);
{
boost::archive::binary_oarchive archive(stream); boost::archive::binary_oarchive archive(stream);
archive << message; archive << message;
// Archive destructor ensures everything is written.
}
const std::string &buffer = stream.str(); const std::string &buffer = stream.str();
uint64_t message_size = sizeof(MessageSize) + buffer.size(); uint64_t message_size = sizeof(MessageSize) + buffer.size();