Remove obsolete network buffers

Reviewers: buda

Reviewed By: buda

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1336
This commit is contained in:
Matej Ferencevic 2018-04-04 15:56:36 +02:00
parent b20e31e800
commit 71565287b8
7 changed files with 91 additions and 302 deletions

View File

@ -4,7 +4,6 @@
set(memgraph_src_files
communication/buffer.cpp
communication/bolt/v1/decoder/decoded_value.cpp
communication/rpc/buffer.cpp
communication/rpc/client.cpp
communication/rpc/protocol.cpp
communication/rpc/server.cpp

View File

@ -1,94 +0,0 @@
#pragma once
#include <algorithm>
#include <cstring>
#include <memory>
#include <vector>
#include "glog/logging.h"
#include "communication/bolt/v1/constants.hpp"
#include "io/network/stream_buffer.hpp"
#include "utils/bswap.hpp"
namespace communication::bolt {
/**
* @brief Buffer
*
* Has methods for writing and reading raw data.
*
* Allocating, writing and written stores data in the buffer. The stored
* data can then be read using the pointer returned with the data function.
* The current implementation stores data in a single fixed length buffer.
*
* @tparam Size the size of the internal byte array, defaults to the maximum
* size of a chunk in the Bolt protocol
*/
template <size_t Size = WHOLE_CHUNK_SIZE>
class Buffer {
private:
using StreamBufferT = io::network::StreamBuffer;
public:
Buffer() = default;
/**
* Allocates a new StreamBuffer from the internal buffer.
* This function returns a pointer to the first currently free memory
* location in the internal buffer. Also, it returns the size of the
* available memory.
*/
StreamBufferT Allocate() {
return StreamBufferT{&data_[size_], Size - size_};
}
/**
* This method is used to notify the buffer that the data has been written.
* To write data to this buffer you should do this:
* Call Allocate(), then write to the returned data pointer.
* IMPORTANT: Don't write more data then the returned size, you will cause
* a memory overflow. Then call Written(size) with the length of data that
* you have written into the buffer.
*
* @param len the size of data that has been written into the buffer
*/
void Written(size_t len) {
size_ += len;
DCHECK(size_ <= Size) << "Written more than storage has space!";
}
/**
* This method shifts the available data for len. It is used when you read
* some data from the buffer and you want to remove it from the buffer.
*
* @param len the length of data that has to be removed from the start of
* the buffer
*/
void Shift(size_t len) {
DCHECK(len <= size_) << "Tried to shift more data than the buffer has!";
memmove(data_, data_ + len, size_ - len);
size_ -= len;
}
/**
* This method clears the buffer.
*/
void Clear() { size_ = 0; }
/**
* This function returns a pointer to the internal buffer. It is used for
* reading data from the buffer.
*/
uint8_t *data() { return data_; }
/**
* This function returns the size of available data for reading.
*/
size_t size() { return size_; }
private:
uint8_t data_[Size];
size_t size_{0};
};
} // namespace communication::bolt

View File

@ -1,39 +0,0 @@
#include "glog/logging.h"
#include "communication/rpc/buffer.hpp"
namespace communication::rpc {
Buffer::Buffer() : data_(kBufferInitialSize, 0) {}
io::network::StreamBuffer Buffer::Allocate() {
return {data_.data() + have_, data_.size() - have_};
}
void Buffer::Written(size_t len) {
have_ += len;
DCHECK(have_ <= data_.size()) << "Written more than storage has space!";
}
void Buffer::Shift(size_t len) {
DCHECK(len <= have_) << "Tried to shift more data than the buffer has!";
if (len == have_) {
have_ = 0;
} else {
data_.erase(data_.begin(), data_.begin() + len);
have_ -= len;
}
}
void Buffer::Resize(size_t len) {
if (len < data_.size()) return;
data_.resize(len, 0);
}
void Buffer::Clear() { have_ = 0; }
uint8_t *Buffer::data() { return data_.data(); }
size_t Buffer::size() { return have_; }
} // namespace communication::rpc

View File

@ -1,86 +0,0 @@
#pragma once
#include <vector>
#include "io/network/stream_buffer.hpp"
namespace communication::rpc {
// Initial capacity of the internal buffer.
const size_t kBufferInitialSize = 65536;
/**
* @brief Buffer
*
* Has methods for writing and reading raw data.
*
* Allocating, writing and written stores data in the buffer. The stored
* data can then be read using the pointer returned with the data function.
* This implementation stores data in a variable sized array (a vector).
* The internal array can only grow in size.
*/
class Buffer {
public:
Buffer();
/**
* Allocates a new StreamBuffer from the internal buffer.
* This function returns a pointer to the first currently free memory
* location in the internal buffer. Also, it returns the size of the
* available memory.
*/
io::network::StreamBuffer Allocate();
/**
* This method is used to notify the buffer that the data has been written.
* To write data to this buffer you should do this:
* Call Allocate(), then write to the returned data pointer.
* IMPORTANT: Don't write more data then the returned size, you will cause
* a memory overflow. Then call Written(size) with the length of data that
* you have written into the buffer.
*
* @param len the size of data that has been written into the buffer
*/
void Written(size_t len);
/**
* This method shifts the available data for len. It is used when you read
* some data from the buffer and you want to remove it from the buffer.
*
* @param len the length of data that has to be removed from the start of
* the buffer
*/
void Shift(size_t len);
/**
* This method resizes the internal data buffer.
* It is used to notify the buffer of the incoming message size.
* If the requested size is larger than the buffer size then the buffer is
* resized, if the requested size is smaller than the buffer size then
* nothing is done.
*
* @param len the desired size of the buffer
*/
void Resize(size_t len);
/**
* This method clears the buffer.
*/
void Clear();
/**
* This function returns a pointer to the internal buffer. It is used for
* reading data from the buffer.
*/
uint8_t *data();
/**
* This function returns the size of available data for reading.
*/
size_t size();
private:
std::vector<uint8_t> data_;
size_t have_{0};
};
} // namespace communication::rpc

View File

@ -1,51 +0,0 @@
#include "bolt_common.hpp"
#include "communication/bolt/v1/decoder/buffer.hpp"
constexpr const int SIZE = 4096;
uint8_t data[SIZE];
using BufferT = communication::bolt::Buffer<>;
using StreamBufferT = io::network::StreamBuffer;
TEST(BoltBuffer, AllocateAndWritten) {
BufferT buffer;
StreamBufferT sb = buffer.Allocate();
memcpy(sb.data, data, 1000);
buffer.Written(1000);
ASSERT_EQ(buffer.size(), 1000);
uint8_t *tmp = buffer.data();
for (int i = 0; i < 1000; ++i) EXPECT_EQ(data[i], tmp[i]);
}
TEST(BoltBuffer, Shift) {
BufferT buffer;
StreamBufferT sb = buffer.Allocate();
memcpy(sb.data, data, 1000);
buffer.Written(1000);
sb = buffer.Allocate();
memcpy(sb.data, data + 1000, 1000);
buffer.Written(1000);
ASSERT_EQ(buffer.size(), 2000);
uint8_t *tmp = buffer.data();
for (int i = 0; i < 1000; ++i) EXPECT_EQ(data[i], tmp[i]);
buffer.Shift(1000);
ASSERT_EQ(buffer.size(), 1000);
tmp = buffer.data();
for (int i = 0; i < 1000; ++i) EXPECT_EQ(data[i + 1000], tmp[i]);
}
int main(int argc, char **argv) {
InitializeData(data, SIZE);
google::InitGoogleLogging(argv[0]);
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -1,27 +1,27 @@
#include "bolt_common.hpp"
#include "communication/bolt/v1/decoder/buffer.hpp"
#include "communication/buffer.hpp"
#include "communication/bolt/v1/decoder/chunked_decoder_buffer.hpp"
constexpr const int SIZE = 131072;
uint8_t data[SIZE];
using BufferT = communication::bolt::Buffer<>;
using BufferT = communication::Buffer;
using StreamBufferT = io::network::StreamBuffer;
using DecoderBufferT = communication::bolt::ChunkedDecoderBuffer<BufferT>;
using DecoderBufferT = communication::bolt::ChunkedDecoderBuffer<BufferT::ReadEnd>;
using ChunkStateT = communication::bolt::ChunkState;
TEST(BoltBuffer, CorrectChunk) {
uint8_t tmp[2000];
BufferT buffer;
DecoderBufferT decoder_buffer(buffer);
StreamBufferT sb = buffer.Allocate();
DecoderBufferT decoder_buffer(buffer.read_end());
StreamBufferT sb = buffer.write_end().Allocate();
sb.data[0] = 0x03;
sb.data[1] = 0xe8;
memcpy(sb.data + 2, data, 1000);
sb.data[1002] = 0;
sb.data[1003] = 0;
buffer.Written(1004);
buffer.write_end().Written(1004);
ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Whole);
ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Done);
@ -29,21 +29,21 @@ TEST(BoltBuffer, CorrectChunk) {
ASSERT_EQ(decoder_buffer.Read(tmp, 1000), true);
for (int i = 0; i < 1000; ++i) EXPECT_EQ(data[i], tmp[i]);
ASSERT_EQ(buffer.size(), 0);
ASSERT_EQ(buffer.read_end().size(), 0);
}
TEST(BoltBuffer, CorrectChunkTrailingData) {
uint8_t tmp[2000];
BufferT buffer;
DecoderBufferT decoder_buffer(buffer);
StreamBufferT sb = buffer.Allocate();
DecoderBufferT decoder_buffer(buffer.read_end());
StreamBufferT sb = buffer.write_end().Allocate();
sb.data[0] = 0x03;
sb.data[1] = 0xe8;
memcpy(sb.data + 2, data, 2002);
sb.data[1002] = 0;
sb.data[1003] = 0;
buffer.Written(2004);
buffer.write_end().Written(2004);
ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Whole);
ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Done);
@ -51,66 +51,66 @@ TEST(BoltBuffer, CorrectChunkTrailingData) {
ASSERT_EQ(decoder_buffer.Read(tmp, 1000), true);
for (int i = 0; i < 1000; ++i) EXPECT_EQ(data[i], tmp[i]);
uint8_t *leftover = buffer.data();
ASSERT_EQ(buffer.size(), 1000);
uint8_t *leftover = buffer.read_end().data();
ASSERT_EQ(buffer.read_end().size(), 1000);
for (int i = 0; i < 1000; ++i) EXPECT_EQ(data[i + 1002], leftover[i]);
}
TEST(BoltBuffer, GraduallyPopulatedChunk) {
uint8_t tmp[2000];
BufferT buffer;
DecoderBufferT decoder_buffer(buffer);
StreamBufferT sb = buffer.Allocate();
DecoderBufferT decoder_buffer(buffer.read_end());
StreamBufferT sb = buffer.write_end().Allocate();
sb.data[0] = 0x03;
sb.data[1] = 0xe8;
buffer.Written(2);
buffer.write_end().Written(2);
for (int i = 0; i < 5; ++i) {
ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Partial);
sb = buffer.Allocate();
sb = buffer.write_end().Allocate();
memcpy(sb.data, data + 200 * i, 200);
buffer.Written(200);
buffer.write_end().Written(200);
}
sb = buffer.Allocate();
sb = buffer.write_end().Allocate();
sb.data[0] = 0;
sb.data[1] = 0;
buffer.Written(2);
buffer.write_end().Written(2);
ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Whole);
ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Done);
ASSERT_EQ(decoder_buffer.Read(tmp, 1000), true);
for (int i = 0; i < 1000; ++i) EXPECT_EQ(data[i], tmp[i]);
ASSERT_EQ(buffer.size(), 0);
ASSERT_EQ(buffer.read_end().size(), 0);
}
TEST(BoltBuffer, GraduallyPopulatedChunkTrailingData) {
uint8_t tmp[2000];
BufferT buffer;
DecoderBufferT decoder_buffer(buffer);
StreamBufferT sb = buffer.Allocate();
DecoderBufferT decoder_buffer(buffer.read_end());
StreamBufferT sb = buffer.write_end().Allocate();
sb.data[0] = 0x03;
sb.data[1] = 0xe8;
buffer.Written(2);
buffer.write_end().Written(2);
for (int i = 0; i < 5; ++i) {
ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Partial);
sb = buffer.Allocate();
sb = buffer.write_end().Allocate();
memcpy(sb.data, data + 200 * i, 200);
buffer.Written(200);
buffer.write_end().Written(200);
}
sb = buffer.Allocate();
sb = buffer.write_end().Allocate();
sb.data[0] = 0;
sb.data[1] = 0;
buffer.Written(2);
buffer.write_end().Written(2);
sb = buffer.Allocate();
sb = buffer.write_end().Allocate();
memcpy(sb.data, data, 1000);
buffer.Written(1000);
buffer.write_end().Written(1000);
ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Whole);
ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Done);
@ -118,8 +118,8 @@ TEST(BoltBuffer, GraduallyPopulatedChunkTrailingData) {
ASSERT_EQ(decoder_buffer.Read(tmp, 1000), true);
for (int i = 0; i < 1000; ++i) EXPECT_EQ(data[i], tmp[i]);
uint8_t *leftover = buffer.data();
ASSERT_EQ(buffer.size(), 1000);
uint8_t *leftover = buffer.read_end().data();
ASSERT_EQ(buffer.read_end().size(), 1000);
for (int i = 0; i < 1000; ++i) EXPECT_EQ(data[i], leftover[i]);
}

View File

@ -0,0 +1,60 @@
#include "bolt_common.hpp"
#include "communication/buffer.hpp"
constexpr const int SIZE = 4096;
uint8_t data[SIZE];
using communication::Buffer;
TEST(CommunicationBuffer, AllocateAndWritten) {
Buffer buffer;
auto sb = buffer.write_end().Allocate();
memcpy(sb.data, data, 1000);
buffer.write_end().Written(1000);
ASSERT_EQ(buffer.read_end().size(), 1000);
uint8_t *tmp = buffer.read_end().data();
for (int i = 0; i < 1000; ++i) EXPECT_EQ(data[i], tmp[i]);
}
TEST(CommunicationBuffer, Shift) {
Buffer buffer;
auto sb = buffer.write_end().Allocate();
memcpy(sb.data, data, 1000);
buffer.write_end().Written(1000);
sb = buffer.write_end().Allocate();
memcpy(sb.data, data + 1000, 1000);
buffer.write_end().Written(1000);
ASSERT_EQ(buffer.read_end().size(), 2000);
uint8_t *tmp = buffer.read_end().data();
for (int i = 0; i < 1000; ++i) EXPECT_EQ(data[i], tmp[i]);
buffer.read_end().Shift(1000);
ASSERT_EQ(buffer.read_end().size(), 1000);
tmp = buffer.read_end().data();
for (int i = 0; i < 1000; ++i) EXPECT_EQ(data[i + 1000], tmp[i]);
}
TEST(CommunicationBuffer, Resize) {
Buffer buffer;
auto sb = buffer.write_end().Allocate();
buffer.read_end().Resize(sb.len + 1000);
auto sbn = buffer.write_end().Allocate();
ASSERT_EQ(sb.len + 1000, sbn.len);
}
int main(int argc, char **argv) {
InitializeData(data, SIZE);
google::InitGoogleLogging(argv[0]);
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}