From 71565287b80ee1127babf66b2594a9d8eae7eb9c Mon Sep 17 00:00:00 2001 From: Matej Ferencevic <matej.ferencevic@memgraph.io> Date: Wed, 4 Apr 2018 15:56:36 +0200 Subject: [PATCH] Remove obsolete network buffers Reviewers: buda Reviewed By: buda Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1336 --- src/CMakeLists.txt | 1 - src/communication/bolt/v1/decoder/buffer.hpp | 94 -------------------- src/communication/rpc/buffer.cpp | 39 -------- src/communication/rpc/buffer.hpp | 86 ------------------ tests/unit/bolt_buffer.cpp | 51 ----------- tests/unit/bolt_chunked_decoder_buffer.cpp | 62 ++++++------- tests/unit/communication_buffer.cpp | 60 +++++++++++++ 7 files changed, 91 insertions(+), 302 deletions(-) delete mode 100644 src/communication/bolt/v1/decoder/buffer.hpp delete mode 100644 src/communication/rpc/buffer.cpp delete mode 100644 src/communication/rpc/buffer.hpp delete mode 100644 tests/unit/bolt_buffer.cpp create mode 100644 tests/unit/communication_buffer.cpp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 16971a4fa..37c6209f6 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -4,7 +4,6 @@ set(memgraph_src_files communication/buffer.cpp communication/bolt/v1/decoder/decoded_value.cpp - communication/rpc/buffer.cpp communication/rpc/client.cpp communication/rpc/protocol.cpp communication/rpc/server.cpp diff --git a/src/communication/bolt/v1/decoder/buffer.hpp b/src/communication/bolt/v1/decoder/buffer.hpp deleted file mode 100644 index abd92834d..000000000 --- a/src/communication/bolt/v1/decoder/buffer.hpp +++ /dev/null @@ -1,94 +0,0 @@ -#pragma once - -#include <algorithm> -#include <cstring> -#include <memory> -#include <vector> - -#include "glog/logging.h" - -#include "communication/bolt/v1/constants.hpp" -#include "io/network/stream_buffer.hpp" -#include "utils/bswap.hpp" - -namespace communication::bolt { - -/** - * @brief Buffer - * - * Has methods for writing and reading raw data. - * - * Allocating, writing and written stores data in the buffer. The stored - * data can then be read using the pointer returned with the data function. - * The current implementation stores data in a single fixed length buffer. - * - * @tparam Size the size of the internal byte array, defaults to the maximum - * size of a chunk in the Bolt protocol - */ -template <size_t Size = WHOLE_CHUNK_SIZE> -class Buffer { - private: - using StreamBufferT = io::network::StreamBuffer; - - public: - Buffer() = default; - - /** - * Allocates a new StreamBuffer from the internal buffer. - * This function returns a pointer to the first currently free memory - * location in the internal buffer. Also, it returns the size of the - * available memory. - */ - StreamBufferT Allocate() { - return StreamBufferT{&data_[size_], Size - size_}; - } - - /** - * This method is used to notify the buffer that the data has been written. - * To write data to this buffer you should do this: - * Call Allocate(), then write to the returned data pointer. - * IMPORTANT: Don't write more data then the returned size, you will cause - * a memory overflow. Then call Written(size) with the length of data that - * you have written into the buffer. - * - * @param len the size of data that has been written into the buffer - */ - void Written(size_t len) { - size_ += len; - DCHECK(size_ <= Size) << "Written more than storage has space!"; - } - - /** - * This method shifts the available data for len. It is used when you read - * some data from the buffer and you want to remove it from the buffer. - * - * @param len the length of data that has to be removed from the start of - * the buffer - */ - void Shift(size_t len) { - DCHECK(len <= size_) << "Tried to shift more data than the buffer has!"; - memmove(data_, data_ + len, size_ - len); - size_ -= len; - } - - /** - * This method clears the buffer. - */ - void Clear() { size_ = 0; } - - /** - * This function returns a pointer to the internal buffer. It is used for - * reading data from the buffer. - */ - uint8_t *data() { return data_; } - - /** - * This function returns the size of available data for reading. - */ - size_t size() { return size_; } - - private: - uint8_t data_[Size]; - size_t size_{0}; -}; -} // namespace communication::bolt diff --git a/src/communication/rpc/buffer.cpp b/src/communication/rpc/buffer.cpp deleted file mode 100644 index e77de0012..000000000 --- a/src/communication/rpc/buffer.cpp +++ /dev/null @@ -1,39 +0,0 @@ -#include "glog/logging.h" - -#include "communication/rpc/buffer.hpp" - -namespace communication::rpc { - -Buffer::Buffer() : data_(kBufferInitialSize, 0) {} - -io::network::StreamBuffer Buffer::Allocate() { - return {data_.data() + have_, data_.size() - have_}; -} - -void Buffer::Written(size_t len) { - have_ += len; - DCHECK(have_ <= data_.size()) << "Written more than storage has space!"; -} - -void Buffer::Shift(size_t len) { - DCHECK(len <= have_) << "Tried to shift more data than the buffer has!"; - if (len == have_) { - have_ = 0; - } else { - data_.erase(data_.begin(), data_.begin() + len); - have_ -= len; - } -} - -void Buffer::Resize(size_t len) { - if (len < data_.size()) return; - data_.resize(len, 0); -} - -void Buffer::Clear() { have_ = 0; } - -uint8_t *Buffer::data() { return data_.data(); } - -size_t Buffer::size() { return have_; } - -} // namespace communication::rpc diff --git a/src/communication/rpc/buffer.hpp b/src/communication/rpc/buffer.hpp deleted file mode 100644 index 2dc03a17a..000000000 --- a/src/communication/rpc/buffer.hpp +++ /dev/null @@ -1,86 +0,0 @@ -#pragma once - -#include <vector> - -#include "io/network/stream_buffer.hpp" - -namespace communication::rpc { - -// Initial capacity of the internal buffer. -const size_t kBufferInitialSize = 65536; - -/** - * @brief Buffer - * - * Has methods for writing and reading raw data. - * - * Allocating, writing and written stores data in the buffer. The stored - * data can then be read using the pointer returned with the data function. - * This implementation stores data in a variable sized array (a vector). - * The internal array can only grow in size. - */ -class Buffer { - public: - Buffer(); - - /** - * Allocates a new StreamBuffer from the internal buffer. - * This function returns a pointer to the first currently free memory - * location in the internal buffer. Also, it returns the size of the - * available memory. - */ - io::network::StreamBuffer Allocate(); - - /** - * This method is used to notify the buffer that the data has been written. - * To write data to this buffer you should do this: - * Call Allocate(), then write to the returned data pointer. - * IMPORTANT: Don't write more data then the returned size, you will cause - * a memory overflow. Then call Written(size) with the length of data that - * you have written into the buffer. - * - * @param len the size of data that has been written into the buffer - */ - void Written(size_t len); - - /** - * This method shifts the available data for len. It is used when you read - * some data from the buffer and you want to remove it from the buffer. - * - * @param len the length of data that has to be removed from the start of - * the buffer - */ - void Shift(size_t len); - - /** - * This method resizes the internal data buffer. - * It is used to notify the buffer of the incoming message size. - * If the requested size is larger than the buffer size then the buffer is - * resized, if the requested size is smaller than the buffer size then - * nothing is done. - * - * @param len the desired size of the buffer - */ - void Resize(size_t len); - - /** - * This method clears the buffer. - */ - void Clear(); - - /** - * This function returns a pointer to the internal buffer. It is used for - * reading data from the buffer. - */ - uint8_t *data(); - - /** - * This function returns the size of available data for reading. - */ - size_t size(); - - private: - std::vector<uint8_t> data_; - size_t have_{0}; -}; -} // namespace communication::rpc diff --git a/tests/unit/bolt_buffer.cpp b/tests/unit/bolt_buffer.cpp deleted file mode 100644 index d46578c0b..000000000 --- a/tests/unit/bolt_buffer.cpp +++ /dev/null @@ -1,51 +0,0 @@ -#include "bolt_common.hpp" -#include "communication/bolt/v1/decoder/buffer.hpp" - -constexpr const int SIZE = 4096; -uint8_t data[SIZE]; - -using BufferT = communication::bolt::Buffer<>; -using StreamBufferT = io::network::StreamBuffer; - -TEST(BoltBuffer, AllocateAndWritten) { - BufferT buffer; - StreamBufferT sb = buffer.Allocate(); - - memcpy(sb.data, data, 1000); - buffer.Written(1000); - - ASSERT_EQ(buffer.size(), 1000); - - uint8_t *tmp = buffer.data(); - for (int i = 0; i < 1000; ++i) EXPECT_EQ(data[i], tmp[i]); -} - -TEST(BoltBuffer, Shift) { - BufferT buffer; - StreamBufferT sb = buffer.Allocate(); - - memcpy(sb.data, data, 1000); - buffer.Written(1000); - - sb = buffer.Allocate(); - memcpy(sb.data, data + 1000, 1000); - buffer.Written(1000); - - ASSERT_EQ(buffer.size(), 2000); - - uint8_t *tmp = buffer.data(); - for (int i = 0; i < 1000; ++i) EXPECT_EQ(data[i], tmp[i]); - - buffer.Shift(1000); - ASSERT_EQ(buffer.size(), 1000); - tmp = buffer.data(); - - for (int i = 0; i < 1000; ++i) EXPECT_EQ(data[i + 1000], tmp[i]); -} - -int main(int argc, char **argv) { - InitializeData(data, SIZE); - google::InitGoogleLogging(argv[0]); - ::testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -} diff --git a/tests/unit/bolt_chunked_decoder_buffer.cpp b/tests/unit/bolt_chunked_decoder_buffer.cpp index 2124e563f..67db3e922 100644 --- a/tests/unit/bolt_chunked_decoder_buffer.cpp +++ b/tests/unit/bolt_chunked_decoder_buffer.cpp @@ -1,27 +1,27 @@ #include "bolt_common.hpp" -#include "communication/bolt/v1/decoder/buffer.hpp" +#include "communication/buffer.hpp" #include "communication/bolt/v1/decoder/chunked_decoder_buffer.hpp" constexpr const int SIZE = 131072; uint8_t data[SIZE]; -using BufferT = communication::bolt::Buffer<>; +using BufferT = communication::Buffer; using StreamBufferT = io::network::StreamBuffer; -using DecoderBufferT = communication::bolt::ChunkedDecoderBuffer<BufferT>; +using DecoderBufferT = communication::bolt::ChunkedDecoderBuffer<BufferT::ReadEnd>; using ChunkStateT = communication::bolt::ChunkState; TEST(BoltBuffer, CorrectChunk) { uint8_t tmp[2000]; BufferT buffer; - DecoderBufferT decoder_buffer(buffer); - StreamBufferT sb = buffer.Allocate(); + DecoderBufferT decoder_buffer(buffer.read_end()); + StreamBufferT sb = buffer.write_end().Allocate(); sb.data[0] = 0x03; sb.data[1] = 0xe8; memcpy(sb.data + 2, data, 1000); sb.data[1002] = 0; sb.data[1003] = 0; - buffer.Written(1004); + buffer.write_end().Written(1004); ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Whole); ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Done); @@ -29,21 +29,21 @@ TEST(BoltBuffer, CorrectChunk) { ASSERT_EQ(decoder_buffer.Read(tmp, 1000), true); for (int i = 0; i < 1000; ++i) EXPECT_EQ(data[i], tmp[i]); - ASSERT_EQ(buffer.size(), 0); + ASSERT_EQ(buffer.read_end().size(), 0); } TEST(BoltBuffer, CorrectChunkTrailingData) { uint8_t tmp[2000]; BufferT buffer; - DecoderBufferT decoder_buffer(buffer); - StreamBufferT sb = buffer.Allocate(); + DecoderBufferT decoder_buffer(buffer.read_end()); + StreamBufferT sb = buffer.write_end().Allocate(); sb.data[0] = 0x03; sb.data[1] = 0xe8; memcpy(sb.data + 2, data, 2002); sb.data[1002] = 0; sb.data[1003] = 0; - buffer.Written(2004); + buffer.write_end().Written(2004); ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Whole); ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Done); @@ -51,66 +51,66 @@ TEST(BoltBuffer, CorrectChunkTrailingData) { ASSERT_EQ(decoder_buffer.Read(tmp, 1000), true); for (int i = 0; i < 1000; ++i) EXPECT_EQ(data[i], tmp[i]); - uint8_t *leftover = buffer.data(); - ASSERT_EQ(buffer.size(), 1000); + uint8_t *leftover = buffer.read_end().data(); + ASSERT_EQ(buffer.read_end().size(), 1000); for (int i = 0; i < 1000; ++i) EXPECT_EQ(data[i + 1002], leftover[i]); } TEST(BoltBuffer, GraduallyPopulatedChunk) { uint8_t tmp[2000]; BufferT buffer; - DecoderBufferT decoder_buffer(buffer); - StreamBufferT sb = buffer.Allocate(); + DecoderBufferT decoder_buffer(buffer.read_end()); + StreamBufferT sb = buffer.write_end().Allocate(); sb.data[0] = 0x03; sb.data[1] = 0xe8; - buffer.Written(2); + buffer.write_end().Written(2); for (int i = 0; i < 5; ++i) { ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Partial); - sb = buffer.Allocate(); + sb = buffer.write_end().Allocate(); memcpy(sb.data, data + 200 * i, 200); - buffer.Written(200); + buffer.write_end().Written(200); } - sb = buffer.Allocate(); + sb = buffer.write_end().Allocate(); sb.data[0] = 0; sb.data[1] = 0; - buffer.Written(2); + buffer.write_end().Written(2); ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Whole); ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Done); ASSERT_EQ(decoder_buffer.Read(tmp, 1000), true); for (int i = 0; i < 1000; ++i) EXPECT_EQ(data[i], tmp[i]); - ASSERT_EQ(buffer.size(), 0); + ASSERT_EQ(buffer.read_end().size(), 0); } TEST(BoltBuffer, GraduallyPopulatedChunkTrailingData) { uint8_t tmp[2000]; BufferT buffer; - DecoderBufferT decoder_buffer(buffer); - StreamBufferT sb = buffer.Allocate(); + DecoderBufferT decoder_buffer(buffer.read_end()); + StreamBufferT sb = buffer.write_end().Allocate(); sb.data[0] = 0x03; sb.data[1] = 0xe8; - buffer.Written(2); + buffer.write_end().Written(2); for (int i = 0; i < 5; ++i) { ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Partial); - sb = buffer.Allocate(); + sb = buffer.write_end().Allocate(); memcpy(sb.data, data + 200 * i, 200); - buffer.Written(200); + buffer.write_end().Written(200); } - sb = buffer.Allocate(); + sb = buffer.write_end().Allocate(); sb.data[0] = 0; sb.data[1] = 0; - buffer.Written(2); + buffer.write_end().Written(2); - sb = buffer.Allocate(); + sb = buffer.write_end().Allocate(); memcpy(sb.data, data, 1000); - buffer.Written(1000); + buffer.write_end().Written(1000); ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Whole); ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Done); @@ -118,8 +118,8 @@ TEST(BoltBuffer, GraduallyPopulatedChunkTrailingData) { ASSERT_EQ(decoder_buffer.Read(tmp, 1000), true); for (int i = 0; i < 1000; ++i) EXPECT_EQ(data[i], tmp[i]); - uint8_t *leftover = buffer.data(); - ASSERT_EQ(buffer.size(), 1000); + uint8_t *leftover = buffer.read_end().data(); + ASSERT_EQ(buffer.read_end().size(), 1000); for (int i = 0; i < 1000; ++i) EXPECT_EQ(data[i], leftover[i]); } diff --git a/tests/unit/communication_buffer.cpp b/tests/unit/communication_buffer.cpp new file mode 100644 index 000000000..64dda6bd1 --- /dev/null +++ b/tests/unit/communication_buffer.cpp @@ -0,0 +1,60 @@ +#include "bolt_common.hpp" +#include "communication/buffer.hpp" + +constexpr const int SIZE = 4096; +uint8_t data[SIZE]; + +using communication::Buffer; + +TEST(CommunicationBuffer, AllocateAndWritten) { + Buffer buffer; + auto sb = buffer.write_end().Allocate(); + + memcpy(sb.data, data, 1000); + buffer.write_end().Written(1000); + + ASSERT_EQ(buffer.read_end().size(), 1000); + + uint8_t *tmp = buffer.read_end().data(); + for (int i = 0; i < 1000; ++i) EXPECT_EQ(data[i], tmp[i]); +} + +TEST(CommunicationBuffer, Shift) { + Buffer buffer; + auto sb = buffer.write_end().Allocate(); + + memcpy(sb.data, data, 1000); + buffer.write_end().Written(1000); + + sb = buffer.write_end().Allocate(); + memcpy(sb.data, data + 1000, 1000); + buffer.write_end().Written(1000); + + ASSERT_EQ(buffer.read_end().size(), 2000); + + uint8_t *tmp = buffer.read_end().data(); + for (int i = 0; i < 1000; ++i) EXPECT_EQ(data[i], tmp[i]); + + buffer.read_end().Shift(1000); + ASSERT_EQ(buffer.read_end().size(), 1000); + tmp = buffer.read_end().data(); + + for (int i = 0; i < 1000; ++i) EXPECT_EQ(data[i + 1000], tmp[i]); +} + +TEST(CommunicationBuffer, Resize) { + Buffer buffer; + auto sb = buffer.write_end().Allocate(); + + buffer.read_end().Resize(sb.len + 1000); + + auto sbn = buffer.write_end().Allocate(); + ASSERT_EQ(sb.len + 1000, sbn.len); +} + +int main(int argc, char **argv) { + InitializeData(data, SIZE); + google::InitGoogleLogging(argv[0]); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}