Fixed bolt chunked encoder buffer for big data.
Reviewers: buda Reviewed By: buda Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D756
This commit is contained in:
parent
642c2f07bb
commit
f848394e5d
src/communication/bolt/v1/encoder
tests/unit
@ -50,16 +50,19 @@ class ChunkedEncoderBuffer {
|
||||
* @param n is the number of bytes
|
||||
*/
|
||||
void Write(const uint8_t *values, size_t n) {
|
||||
int written = 0;
|
||||
|
||||
while (n > 0) {
|
||||
// Define number of bytes which will be copied into chunk because
|
||||
// chunk is a fixed length array.
|
||||
auto size = n < WHOLE_CHUNK_SIZE - pos_ ? n : WHOLE_CHUNK_SIZE - pos_;
|
||||
|
||||
// Copy size values to chunk array.
|
||||
std::memcpy(chunk_.data() + pos_, values, size);
|
||||
std::memcpy(chunk_.data() + pos_, values + written, size);
|
||||
|
||||
// Update positions. Position pointer and incomming size have to be
|
||||
// updated because all incomming values have to be processed.
|
||||
written += size;
|
||||
pos_ += size;
|
||||
n -= size;
|
||||
|
||||
|
@ -5,26 +5,30 @@
|
||||
using SocketT = TestSocket;
|
||||
using BufferT = communication::bolt::ChunkedEncoderBuffer<SocketT>;
|
||||
|
||||
// "alias" constants
|
||||
static constexpr auto CHS = communication::bolt::CHUNK_HEADER_SIZE;
|
||||
static constexpr auto CEMS = communication::bolt::CHUNK_END_MARKER_SIZE;
|
||||
static constexpr auto MCS = communication::bolt::MAX_CHUNK_SIZE;
|
||||
static constexpr auto WCS = communication::bolt::WHOLE_CHUNK_SIZE;
|
||||
// constants
|
||||
using communication::bolt::CHUNK_HEADER_SIZE;
|
||||
using communication::bolt::CHUNK_END_MARKER_SIZE;
|
||||
using communication::bolt::MAX_CHUNK_SIZE;
|
||||
using communication::bolt::WHOLE_CHUNK_SIZE;
|
||||
|
||||
// test data
|
||||
constexpr const int TEST_DATA_SIZE = 100000;
|
||||
uint8_t test_data[TEST_DATA_SIZE];
|
||||
|
||||
/**
|
||||
* Verifies a single chunk. The chunk should be constructed from header
|
||||
* (chunk size), data and end marker. The header is two bytes long number
|
||||
* written in big endian format. Data is array of elements which max size is
|
||||
* 0xFFFF. The end marker is always two bytes long array of two zeros.
|
||||
* written in big endian format. Data is array of elements from test_data
|
||||
* which max size is 0xFFFF. The end marker is always two bytes long array of
|
||||
* two zeros.
|
||||
*
|
||||
* @param data pointer on data array (array of bytes)
|
||||
* @param size of data array
|
||||
* @param element expected element in all positions of chunk data array
|
||||
* (all data bytes in tested chunk should be equal to element)
|
||||
* @param offset offset from the begining of the test data
|
||||
* @param final_chunk if set to true then check for 0x00 0x00 after the chunk
|
||||
*/
|
||||
void VerifyChunkOfOnes(uint8_t *data, int size, uint8_t element,
|
||||
bool final_chunk = true) {
|
||||
void VerifyChunkOfTestData(uint8_t *data, int size, uint64_t offset = 0,
|
||||
bool final_chunk = true) {
|
||||
// first two bytes are size (big endian)
|
||||
uint8_t lower_byte = size & 0xFF;
|
||||
uint8_t higher_byte = (size & 0xFF00) >> 8;
|
||||
@ -33,87 +37,79 @@ void VerifyChunkOfOnes(uint8_t *data, int size, uint8_t element,
|
||||
|
||||
// in the data array should be size number of ones
|
||||
// the header is skipped
|
||||
for (auto i = CHS; i < size + CHS; ++i) {
|
||||
ASSERT_EQ(*(data + i), element);
|
||||
for (auto i = 0; i < size; ++i) {
|
||||
ASSERT_EQ(data[i + CHUNK_HEADER_SIZE], test_data[i + offset]);
|
||||
}
|
||||
|
||||
// last two bytes should be zeros
|
||||
// next to header and data
|
||||
if (final_chunk) {
|
||||
ASSERT_EQ(*(data + CHS + size), 0x00);
|
||||
ASSERT_EQ(*(data + CHS + size + 1), 0x00);
|
||||
ASSERT_EQ(data[CHUNK_HEADER_SIZE + size], 0x00);
|
||||
ASSERT_EQ(data[CHUNK_HEADER_SIZE + size + 1], 0x00);
|
||||
}
|
||||
}
|
||||
|
||||
TEST(BoltChunkedEncoderBuffer, OneSmallChunk) {
|
||||
// initialize array of 100 ones (small chunk)
|
||||
int size = 100;
|
||||
uint8_t element = '1';
|
||||
std::vector<uint8_t> data(100, element);
|
||||
|
||||
// initialize tested buffer
|
||||
SocketT socket(10);
|
||||
BufferT buffer(socket);
|
||||
|
||||
// write into buffer
|
||||
buffer.Write(data.data(), size);
|
||||
buffer.Write(test_data, size);
|
||||
buffer.Flush();
|
||||
|
||||
// check the output array
|
||||
// the array should look like: [0, 100, 1, 1, ... , 1, 0, 0]
|
||||
VerifyChunkOfOnes(socket.output.data(), size, element);
|
||||
// the array should look like: [0, 100, first 100 bytes of test data, 0, 0]
|
||||
VerifyChunkOfTestData(socket.output.data(), size);
|
||||
}
|
||||
|
||||
TEST(BoltChunkedEncoderBuffer, TwoSmallChunks) {
|
||||
// initialize the small arrays
|
||||
int size1 = 100;
|
||||
uint8_t element1 = '1';
|
||||
std::vector<uint8_t> data1(size1, element1);
|
||||
int size2 = 200;
|
||||
uint8_t element2 = '2';
|
||||
std::vector<uint8_t> data2(size2, element2);
|
||||
|
||||
// initialize tested buffer
|
||||
SocketT socket(10);
|
||||
BufferT buffer(socket);
|
||||
|
||||
// write into buffer
|
||||
buffer.Write(data1.data(), size1);
|
||||
buffer.Write(test_data, size1);
|
||||
buffer.Chunk();
|
||||
buffer.Write(data2.data(), size2);
|
||||
buffer.Flush();
|
||||
|
||||
// check the output array
|
||||
// the output array should look like this: [0, 100, 1, 1, ... , 1, 0, 0] +
|
||||
// [0, 100, 2, 2, ...... , 2, 0, 0]
|
||||
auto data = socket.output.data();
|
||||
VerifyChunkOfOnes(data, size1, element1);
|
||||
VerifyChunkOfOnes(data + CHS + size1 + CEMS, size2, element2);
|
||||
}
|
||||
|
||||
TEST(BoltChunkedEncoderBuffer, OneAndAHalfOfMaxChunk) {
|
||||
// initialize a big chunk
|
||||
int size = 100000;
|
||||
uint8_t element = '1';
|
||||
std::vector<uint8_t> data(size, element);
|
||||
|
||||
// initialize tested buffer
|
||||
SocketT socket(10);
|
||||
BufferT buffer(socket);
|
||||
|
||||
// write into buffer
|
||||
buffer.Write(data.data(), size);
|
||||
buffer.Write(test_data + size1, size2);
|
||||
buffer.Flush();
|
||||
|
||||
// check the output array
|
||||
// the output array should look like this:
|
||||
// [0xFF, 0xFF, 1, 1, ... , 1, 0, 0, 0x86, 0xA1, 1, 1, ... , 1, 0, 0]
|
||||
// [0, 100, first 100 bytes of test data, 0, 0] +
|
||||
// [0, 100, second 100 bytes of test data, 0, 0]
|
||||
auto data = socket.output.data();
|
||||
VerifyChunkOfTestData(data, size1);
|
||||
VerifyChunkOfTestData(
|
||||
data + CHUNK_HEADER_SIZE + size1 + CHUNK_END_MARKER_SIZE, size2, size1);
|
||||
}
|
||||
|
||||
TEST(BoltChunkedEncoderBuffer, OneAndAHalfOfMaxChunk) {
|
||||
// initialize tested buffer
|
||||
SocketT socket(10);
|
||||
BufferT buffer(socket);
|
||||
|
||||
// write into buffer
|
||||
buffer.Write(test_data, TEST_DATA_SIZE);
|
||||
buffer.Flush();
|
||||
|
||||
// check the output array
|
||||
// the output array should look like this:
|
||||
// [0xFF, 0xFF, first 65535 bytes of test data,
|
||||
// 0x86, 0xA1, 34465 bytes of test data after the first 65535 bytes, 0, 0]
|
||||
auto output = socket.output.data();
|
||||
VerifyChunkOfOnes(output, MCS, element, false);
|
||||
VerifyChunkOfOnes(output + WCS, size - MCS, element);
|
||||
VerifyChunkOfTestData(output, MAX_CHUNK_SIZE, 0, false);
|
||||
VerifyChunkOfTestData(output + WHOLE_CHUNK_SIZE,
|
||||
TEST_DATA_SIZE - MAX_CHUNK_SIZE, MAX_CHUNK_SIZE);
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
InitializeData(test_data, TEST_DATA_SIZE);
|
||||
google::InitGoogleLogging(argv[0]);
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
|
Loading…
Reference in New Issue
Block a user