Fixed bolt decoder chunked buffer implementation.

Summary:
Bolt client now uses new chunked decoder.

Fixed bolt session to use new chunked decoder.

Fixed chunked decoder buffer test.

Fixed bolt session test.

Removed debug message from client.

Fixed bolt encoder to comply with specification.

Reviewers: buda, mislav.bradac

Reviewed By: buda

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D751
This commit is contained in:
Matej Ferencevic 2017-09-06 12:51:56 +02:00
parent 4975743f35
commit 8cf69738bf
10 changed files with 106 additions and 141 deletions

View File

@ -14,6 +14,7 @@
### Bug Fixes and Other Changes ### Bug Fixes and Other Changes
* Keywords appearing in header (named expressions) keep original case. * Keywords appearing in header (named expressions) keep original case.
* Our Bolt protocol implementation is now completely compatible with the protocol version 1 specification. (https://boltprotocol.org/v1/)
## v0.7.0 ## v0.7.0

View File

@ -75,14 +75,11 @@ class Client {
DLOG(INFO) << "Sending init message"; DLOG(INFO) << "Sending init message";
if (!encoder_.MessageInit(client_name, {{"scheme", "basic"}, if (!encoder_.MessageInit(client_name, {{"scheme", "basic"},
{"principal", username}, {"principal", username},
{"secret", password}})) { {"credentials", password}})) {
throw ClientSocketException(); throw ClientSocketException();
} }
DLOG(INFO) << "Reading init message response"; DLOG(INFO) << "Reading init message response";
if (!GetDataByChunk()) {
throw ClientSocketException();
}
Signature signature; Signature signature;
DecodedValue metadata; DecodedValue metadata;
if (!ReadMessage(&signature, &metadata)) { if (!ReadMessage(&signature, &metadata)) {
@ -91,6 +88,7 @@ class Client {
if (signature != Signature::Success) { if (signature != Signature::Success) {
throw ClientInvalidDataException(); throw ClientInvalidDataException();
} }
DLOG(INFO) << "Metadata of init message response: " << metadata;
} }
Client(const Client &) = delete; Client(const Client &) = delete;
@ -109,9 +107,6 @@ class Client {
encoder_.MessagePullAll(); encoder_.MessagePullAll();
DLOG(INFO) << "Reading run message response"; DLOG(INFO) << "Reading run message response";
if (!GetDataByChunk()) {
throw ClientSocketException();
}
Signature signature; Signature signature;
DecodedValue fields; DecodedValue fields;
if (!ReadMessage(&signature, &fields)) { if (!ReadMessage(&signature, &fields)) {
@ -213,25 +208,25 @@ class Client {
} }
bool GetDataByChunk() { bool GetDataByChunk() {
// If there is more data in the buffer then don't read data.
if (decoder_buffer_.Size() > 0) return true;
ChunkState state; ChunkState state;
while ((state = decoder_buffer_.GetChunk()) == ChunkState::Partial) { while ((state = decoder_buffer_.GetChunk()) != ChunkState::Done) {
if (state == ChunkState::Whole) {
// The chunk is whole, no need to read more data.
continue;
}
auto buff = buffer_.Allocate(); auto buff = buffer_.Allocate();
int ret = socket_.Read(buff.data, buff.len); int ret = socket_.Read(buff.data, buff.len);
if (ret == -1) return false; if (ret == -1) return false;
buffer_.Written(ret); buffer_.Written(ret);
} }
return true;
if (state == ChunkState::Whole) {
return true;
}
return false;
} }
bool ReadMessage(Signature *signature, DecodedValue *ret) { bool ReadMessage(Signature *signature, DecodedValue *ret) {
Marker marker; Marker marker;
if (!GetDataByChunk()) {
return false;
}
if (!decoder_.ReadMessageHeader(signature, &marker)) { if (!decoder_.ReadMessageHeader(signature, &marker)) {
return false; return false;
} }
@ -255,9 +250,6 @@ class Client {
while (true) { while (true) {
Signature signature; Signature signature;
DecodedValue data; DecodedValue data;
if (!GetDataByChunk()) {
throw ClientInvalidDataException();
}
if (!ReadMessage(&signature, &data)) { if (!ReadMessage(&signature, &data)) {
throw ClientInvalidDataException(); throw ClientInvalidDataException();
} }

View File

@ -10,8 +10,7 @@ namespace communication::bolt {
static constexpr size_t CHUNK_HEADER_SIZE = 2; static constexpr size_t CHUNK_HEADER_SIZE = 2;
static constexpr size_t MAX_CHUNK_SIZE = 65535; static constexpr size_t MAX_CHUNK_SIZE = 65535;
static constexpr size_t CHUNK_END_MARKER_SIZE = 2; static constexpr size_t CHUNK_END_MARKER_SIZE = 2;
static constexpr size_t WHOLE_CHUNK_SIZE = static constexpr size_t WHOLE_CHUNK_SIZE = CHUNK_HEADER_SIZE + MAX_CHUNK_SIZE;
CHUNK_HEADER_SIZE + MAX_CHUNK_SIZE + CHUNK_END_MARKER_SIZE;
/** /**
* Handshake size defined in the Bolt protocol. * Handshake size defined in the Bolt protocol.

View File

@ -22,11 +22,11 @@ enum class ChunkState : uint8_t {
// The chunk isn't complete, we have to read more data // The chunk isn't complete, we have to read more data
Partial, Partial,
// The chunk is invalid, it's tail isn't 0x00 0x00
Invalid,
// The chunk is whole and correct and has been loaded into the buffer // The chunk is whole and correct and has been loaded into the buffer
Whole Whole,
// The chunk size is 0 meaning that the message is done
Done
}; };
/** /**
@ -44,7 +44,9 @@ class ChunkedDecoderBuffer {
using StreamBufferT = io::network::StreamBuffer; using StreamBufferT = io::network::StreamBuffer;
public: public:
ChunkedDecoderBuffer(Buffer<> &buffer) : buffer_(buffer) {} ChunkedDecoderBuffer(Buffer<> &buffer) : buffer_(buffer) {
data_.reserve(MAX_CHUNK_SIZE);
}
/** /**
* Reads data from the internal buffer. * Reads data from the internal buffer.
@ -55,9 +57,13 @@ class ChunkedDecoderBuffer {
* false otherwise * false otherwise
*/ */
bool Read(uint8_t *data, size_t len) { bool Read(uint8_t *data, size_t len) {
if (len > size_ - pos_) return false; if (len > Size()) return false;
memcpy(data, &data_[pos_], len); memcpy(data, &data_[pos_], len);
pos_ += len; pos_ += len;
if (Size() == 0) {
pos_ = 0;
data_.clear();
}
return true; return true;
} }
@ -72,25 +78,23 @@ class ChunkedDecoderBuffer {
* false otherwise * false otherwise
*/ */
bool Peek(uint8_t *data, size_t len, size_t offset = 0) { bool Peek(uint8_t *data, size_t len, size_t offset = 0) {
if (len + offset > size_ - pos_) return false; if (len + offset > Size()) return false;
memcpy(data, &data_[pos_ + offset], len); memcpy(data, &data_[pos_ + offset], len);
return true; return true;
} }
/** /**
* Gets a chunk from the underlying raw data buffer. * Gets a chunk from the underlying raw data buffer.
* When getting a chunk this function validates the chunk.
* If the chunk isn't yet finished the function just returns false.
* If the chunk is finished (all data has been read) and the chunk isn't
* valid, then the function automatically deletes the invalid chunk
* from the underlying buffer and returns false.
* *
* @returns true if a chunk was successfully copied into the internal * @returns ChunkState::Partial if the chunk isn't whole
* buffer, false otherwise * ChunkState::Whole if the chunk is whole
* ChunkState::Done if the chunk size is 0 (that signals that the
* message is whole)
*/ */
ChunkState GetChunk() { ChunkState GetChunk() {
uint8_t *data = buffer_.data(); uint8_t *data = buffer_.data();
size_t size = buffer_.size(); size_t size = buffer_.size();
if (size < 2) { if (size < 2) {
DLOG(WARNING) << "Size < 2"; DLOG(WARNING) << "Size < 2";
return ChunkState::Partial; return ChunkState::Partial;
@ -99,22 +103,22 @@ class ChunkedDecoderBuffer {
size_t chunk_size = data[0]; size_t chunk_size = data[0];
chunk_size <<= 8; chunk_size <<= 8;
chunk_size += data[1]; chunk_size += data[1];
if (size < chunk_size + 4) {
if (chunk_size == 0) {
// The message is done.
buffer_.Shift(2);
return ChunkState::Done;
}
if (size < chunk_size + 2) {
DLOG(WARNING) << fmt::format( DLOG(WARNING) << fmt::format(
"Chunk size is {} but only have {} data bytes.", chunk_size, size); "Chunk size is {} but only have {} data bytes.", chunk_size, size);
return ChunkState::Partial; return ChunkState::Partial;
} }
if (data[chunk_size + 2] != 0 || data[chunk_size + 3] != 0) { data_.reserve(data_.size() + chunk_size);
DLOG(WARNING) << "Invalid chunk!"; std::copy(data + 2, data + chunk_size + 2, std::back_inserter(data_));
buffer_.Shift(chunk_size + 4); buffer_.Shift(chunk_size + 2);
return ChunkState::Invalid;
}
pos_ = 0;
size_ = chunk_size;
memcpy(data_, data + 2, size - 4);
buffer_.Shift(chunk_size + 4);
return ChunkState::Whole; return ChunkState::Whole;
} }
@ -124,12 +128,11 @@ class ChunkedDecoderBuffer {
* *
* @returns size of available data * @returns size of available data
*/ */
size_t Size() { return size_ - pos_; } size_t Size() { return data_.size() - pos_; }
private: private:
Buffer<> &buffer_; Buffer<> &buffer_;
uint8_t data_[MAX_CHUNK_SIZE]; std::vector<uint8_t> data_;
size_t size_{0};
size_t pos_{0}; size_t pos_{0};
}; };
} }

View File

@ -26,11 +26,11 @@ namespace communication::bolt {
* the currently stored data to the Socket. Chunking prepends data length and * the currently stored data to the Socket. Chunking prepends data length and
* appends chunk end marker (0x00 0x00). * appends chunk end marker (0x00 0x00).
* *
* | chunk header | --- chunk --- | end marker | ---------- another chunk ... | * | chunk header | --- chunk --- | another chunk | -- end marker -- |
* | ------------- whole chunk ----------------| ---------- another chunk ... | * | ------- whole chunk -------- | whole chunk | chunk of size 0 |
* *
* | ------------------------ message --------------------------------------- | * | --------------------------- message --------------------------- |
* | ------------------------ buffer --------------------------------------- | * | --------------------------- buffer --------------------------- |
* *
* The current implementation stores the whole message into a single buffer * The current implementation stores the whole message into a single buffer
* which is std::vector. * which is std::vector.
@ -53,9 +53,7 @@ class ChunkedEncoderBuffer {
while (n > 0) { while (n > 0) {
// Define number of bytes which will be copied into chunk because // Define number of bytes which will be copied into chunk because
// chunk is a fixed length array. // chunk is a fixed length array.
auto size = n < MAX_CHUNK_SIZE + CHUNK_HEADER_SIZE - pos_ auto size = n < WHOLE_CHUNK_SIZE - pos_ ? n : WHOLE_CHUNK_SIZE - pos_;
? n
: MAX_CHUNK_SIZE + CHUNK_HEADER_SIZE - pos_;
// Copy size values to chunk array. // Copy size values to chunk array.
std::memcpy(chunk_.data() + pos_, values, size); std::memcpy(chunk_.data() + pos_, values, size);
@ -67,15 +65,19 @@ class ChunkedEncoderBuffer {
// If chunk is full copy it into the message buffer and make space for // If chunk is full copy it into the message buffer and make space for
// other incomming values that are left in the values array. // other incomming values that are left in the values array.
if (pos_ == CHUNK_HEADER_SIZE + MAX_CHUNK_SIZE) Chunk(); if (pos_ == WHOLE_CHUNK_SIZE) Chunk(false);
} }
} }
/** /**
* Wrap the data from chunk array (append header and end marker) and put * Wrap the data from chunk array (append header and end marker) and put
* the whole chunk into the buffer. * the whole chunk into the buffer.
*
* @param message_done if set to true then chunk appends an end message
* marker to the chunk, should always be set to true
* (the default value), false is used only internally
*/ */
void Chunk() { void Chunk(bool message_done = true) {
// 1. Write the size of the chunk (CHUNK HEADER). // 1. Write the size of the chunk (CHUNK HEADER).
uint16_t size = pos_ - CHUNK_HEADER_SIZE; uint16_t size = pos_ - CHUNK_HEADER_SIZE;
// Write the higher byte. // Write the higher byte.
@ -83,24 +85,25 @@ class ChunkedEncoderBuffer {
// Write the lower byte. // Write the lower byte.
chunk_[1] = size & 0xFF; chunk_[1] = size & 0xFF;
// 2. Write last two bytes in the whole chunk (CHUNK END MARKER). // 2. Determine the final size for the end marker.
// The last two bytes are always 0x00 0x00. if (message_done) size_ += 2;
chunk_[pos_++] = 0x00;
chunk_[pos_++] = 0x00;
debug_assert(pos_ <= WHOLE_CHUNK_SIZE, // 3. Copy whole chunk into the buffer.
"Internal variable pos_ is bigger than the whole chunk size.");
// 3. Remember first chunk size.
if (first_chunk_size_ == -1) first_chunk_size_ = pos_;
// 4. Copy whole chunk into the buffer.
size_ += pos_; size_ += pos_;
buffer_.reserve(size_); buffer_.reserve(size_);
std::copy(chunk_.begin(), chunk_.begin() + pos_, std::copy(chunk_.begin(), chunk_.begin() + pos_,
std::back_inserter(buffer_)); std::back_inserter(buffer_));
// 5. Cleanup. // 4. Insert message end marker.
if (message_done) {
buffer_.push_back(0);
buffer_.push_back(0);
}
// 5. Remember first chunk size.
if (first_chunk_size_ == -1) first_chunk_size_ = size_;
// 6. Cleanup.
// * pos_ has to be reset to the size of chunk header (reserved // * pos_ has to be reset to the size of chunk header (reserved
// space for the chunk size) // space for the chunk size)
pos_ = CHUNK_HEADER_SIZE; pos_ = CHUNK_HEADER_SIZE;

View File

@ -79,34 +79,31 @@ class Session {
* Goes through the bolt states in order to execute commands from the client. * Goes through the bolt states in order to execute commands from the client.
*/ */
void Execute() { void Execute() {
// while there is data in the buffers if (UNLIKELY(!handshake_done_)) {
while (buffer_.size() > 0 || decoder_buffer_.Size() > 0) { if (buffer_.size() < HANDSHAKE_SIZE) {
if (LIKELY(connected_)) {
DLOG(INFO) << fmt::format("Decoding chunk of size {}", buffer_.size());
auto chunk_state = decoder_buffer_.GetChunk();
if (chunk_state == ChunkState::Partial) {
DLOG(WARNING) << "Chunk isn't complete!";
return;
} else if (chunk_state == ChunkState::Invalid) {
DLOG(WARNING) << "Chunk is invalid!";
ClientFailureInvalidData();
return;
}
// if chunk_state == ChunkState::Whole then we continue with
// execution of the select below
} else if (buffer_.size() < HANDSHAKE_SIZE) {
DLOG(WARNING) << fmt::format("Received partial handshake of size {}", DLOG(WARNING) << fmt::format("Received partial handshake of size {}",
buffer_.size()); buffer_.size());
return; return;
} else { }
DLOG(WARNING) << fmt::format("Decoding handshake of size {}", DLOG(WARNING) << fmt::format("Decoding handshake of size {}",
buffer_.size()); buffer_.size());
state_ = StateHandshakeRun(*this);
if (UNLIKELY(state_ == State::Close)) {
ClientFailureInvalidData();
return;
}
handshake_done_ = true;
}
ChunkState chunk_state;
while ((chunk_state = decoder_buffer_.GetChunk()) != ChunkState::Partial) {
if (chunk_state == ChunkState::Whole) {
// The chunk is whole, we need to read one more chunk
// (the 0x00 0x00 end marker).
continue;
} }
switch (state_) { switch (state_) {
case State::Handshake:
state_ = StateHandshakeRun(*this);
break;
case State::Init: case State::Init:
state_ = StateInitRun(*this); state_ = StateInitRun(*this);
break; break;
@ -119,15 +116,16 @@ class Session {
case State::ErrorWaitForRollback: case State::ErrorWaitForRollback:
state_ = StateErrorRun(*this, state_); state_ = StateErrorRun(*this, state_);
break; break;
case State::Close: default:
// This state is handled below // State::Handshake is handled above
// State::Close is handled below
break; break;
} }
// State::Close is handled here because we always want to check for // State::Close is handled here because we always want to check for
// it after the above select. If any of the states above return a // it after the above select. If any of the states above return a
// State::Close then the connection should be terminated immediately. // State::Close then the connection should be terminated immediately.
if (state_ == State::Close) { if (UNLIKELY(state_ == State::Close)) {
ClientFailureInvalidData(); ClientFailureInvalidData();
return; return;
} }
@ -200,7 +198,7 @@ class Session {
Decoder<ChunkedDecoderBuffer> decoder_{decoder_buffer_}; Decoder<ChunkedDecoderBuffer> decoder_{decoder_buffer_};
io::network::Epoll::Event event_; io::network::Epoll::Event event_;
bool connected_{false}; bool handshake_done_{false};
State state_{State::Handshake}; State state_{State::Handshake};
// GraphDbAccessor of active transaction in the session, can be null if there // GraphDbAccessor of active transaction in the session, can be null if there
// is no associated transaction. // is no associated transaction.

View File

@ -28,7 +28,6 @@ State StateHandshakeRun(Session &session) {
DLOG(WARNING) << "Couldn't write handshake response!"; DLOG(WARNING) << "Couldn't write handshake response!";
return State::Close; return State::Close;
} }
session.connected_ = true;
// Delete data from buffer. It is guaranteed that there will more than, or // Delete data from buffer. It is guaranteed that there will more than, or
// equal to 20 bytes (HANDSHAKE_SIZE) in the buffer. // equal to 20 bytes (HANDSHAKE_SIZE) in the buffer.

View File

@ -24,6 +24,7 @@ TEST(BoltBuffer, CorrectChunk) {
buffer.Written(1004); buffer.Written(1004);
ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Whole); ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Whole);
ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Done);
ASSERT_EQ(decoder_buffer.Read(tmp, 1000), true); ASSERT_EQ(decoder_buffer.Read(tmp, 1000), true);
for (int i = 0; i < 1000; ++i) EXPECT_EQ(data[i], tmp[i]); for (int i = 0; i < 1000; ++i) EXPECT_EQ(data[i], tmp[i]);
@ -45,6 +46,7 @@ TEST(BoltBuffer, CorrectChunkTrailingData) {
buffer.Written(2004); buffer.Written(2004);
ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Whole); ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Whole);
ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Done);
ASSERT_EQ(decoder_buffer.Read(tmp, 1000), true); ASSERT_EQ(decoder_buffer.Read(tmp, 1000), true);
for (int i = 0; i < 1000; ++i) EXPECT_EQ(data[i], tmp[i]); for (int i = 0; i < 1000; ++i) EXPECT_EQ(data[i], tmp[i]);
@ -54,26 +56,6 @@ TEST(BoltBuffer, CorrectChunkTrailingData) {
for (int i = 0; i < 1000; ++i) EXPECT_EQ(data[i + 1002], leftover[i]); for (int i = 0; i < 1000; ++i) EXPECT_EQ(data[i + 1002], leftover[i]);
} }
TEST(BoltBuffer, InvalidChunk) {
BufferT buffer;
DecoderBufferT decoder_buffer(buffer);
StreamBufferT sb = buffer.Allocate();
sb.data[0] = 0x03;
sb.data[1] = 0xe8;
memcpy(sb.data + 2, data, 2002);
sb.data[1002] = 1;
sb.data[1003] = 1;
buffer.Written(2004);
ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Invalid);
ASSERT_EQ(buffer.size(), 1000);
uint8_t *tmp = buffer.data();
for (int i = 0; i < 1000; ++i) EXPECT_EQ(data[i + 1002], tmp[i]);
}
TEST(BoltBuffer, GraduallyPopulatedChunk) { TEST(BoltBuffer, GraduallyPopulatedChunk) {
uint8_t tmp[2000]; uint8_t tmp[2000];
BufferT buffer; BufferT buffer;
@ -83,13 +65,12 @@ TEST(BoltBuffer, GraduallyPopulatedChunk) {
sb.data[0] = 0x03; sb.data[0] = 0x03;
sb.data[1] = 0xe8; sb.data[1] = 0xe8;
buffer.Written(2); buffer.Written(2);
ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Partial);
for (int i = 0; i < 5; ++i) { for (int i = 0; i < 5; ++i) {
ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Partial);
sb = buffer.Allocate(); sb = buffer.Allocate();
memcpy(sb.data, data + 200 * i, 200); memcpy(sb.data, data + 200 * i, 200);
buffer.Written(200); buffer.Written(200);
ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Partial);
} }
sb = buffer.Allocate(); sb = buffer.Allocate();
@ -97,6 +78,7 @@ TEST(BoltBuffer, GraduallyPopulatedChunk) {
sb.data[1] = 0; sb.data[1] = 0;
buffer.Written(2); buffer.Written(2);
ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Whole); ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Whole);
ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Done);
ASSERT_EQ(decoder_buffer.Read(tmp, 1000), true); ASSERT_EQ(decoder_buffer.Read(tmp, 1000), true);
for (int i = 0; i < 1000; ++i) EXPECT_EQ(data[i], tmp[i]); for (int i = 0; i < 1000; ++i) EXPECT_EQ(data[i], tmp[i]);
@ -113,13 +95,12 @@ TEST(BoltBuffer, GraduallyPopulatedChunkTrailingData) {
sb.data[0] = 0x03; sb.data[0] = 0x03;
sb.data[1] = 0xe8; sb.data[1] = 0xe8;
buffer.Written(2); buffer.Written(2);
ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Partial);
for (int i = 0; i < 5; ++i) { for (int i = 0; i < 5; ++i) {
ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Partial);
sb = buffer.Allocate(); sb = buffer.Allocate();
memcpy(sb.data, data + 200 * i, 200); memcpy(sb.data, data + 200 * i, 200);
buffer.Written(200); buffer.Written(200);
ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Partial);
} }
sb = buffer.Allocate(); sb = buffer.Allocate();
@ -132,6 +113,7 @@ TEST(BoltBuffer, GraduallyPopulatedChunkTrailingData) {
buffer.Written(1000); buffer.Written(1000);
ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Whole); ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Whole);
ASSERT_EQ(decoder_buffer.GetChunk(), ChunkStateT::Done);
ASSERT_EQ(decoder_buffer.Read(tmp, 1000), true); ASSERT_EQ(decoder_buffer.Read(tmp, 1000), true);
for (int i = 0; i < 1000; ++i) EXPECT_EQ(data[i], tmp[i]); for (int i = 0; i < 1000; ++i) EXPECT_EQ(data[i], tmp[i]);

View File

@ -21,8 +21,10 @@ static constexpr auto WCS = communication::bolt::WHOLE_CHUNK_SIZE;
* @param size of data array * @param size of data array
* @param element expected element in all positions of chunk data array * @param element expected element in all positions of chunk data array
* (all data bytes in tested chunk should be equal to element) * (all data bytes in tested chunk should be equal to element)
* @param final_chunk if set to true then check for 0x00 0x00 after the chunk
*/ */
void VerifyChunkOfOnes(uint8_t *data, int size, uint8_t element) { void VerifyChunkOfOnes(uint8_t *data, int size, uint8_t element,
bool final_chunk = true) {
// first two bytes are size (big endian) // first two bytes are size (big endian)
uint8_t lower_byte = size & 0xFF; uint8_t lower_byte = size & 0xFF;
uint8_t higher_byte = (size & 0xFF00) >> 8; uint8_t higher_byte = (size & 0xFF00) >> 8;
@ -37,8 +39,10 @@ void VerifyChunkOfOnes(uint8_t *data, int size, uint8_t element) {
// last two bytes should be zeros // last two bytes should be zeros
// next to header and data // next to header and data
ASSERT_EQ(*(data + CHS + size), 0x00); if (final_chunk) {
ASSERT_EQ(*(data + CHS + size + 1), 0x00); ASSERT_EQ(*(data + CHS + size), 0x00);
ASSERT_EQ(*(data + CHS + size + 1), 0x00);
}
} }
TEST(BoltChunkedEncoderBuffer, OneSmallChunk) { TEST(BoltChunkedEncoderBuffer, OneSmallChunk) {
@ -105,7 +109,7 @@ TEST(BoltChunkedEncoderBuffer, OneAndAHalfOfMaxChunk) {
// the output array should look like this: // the output array should look like this:
// [0xFF, 0xFF, 1, 1, ... , 1, 0, 0, 0x86, 0xA1, 1, 1, ... , 1, 0, 0] // [0xFF, 0xFF, 1, 1, ... , 1, 0, 0, 0x86, 0xA1, 1, 1, ... , 1, 0, 0]
auto output = socket.output.data(); auto output = socket.output.data();
VerifyChunkOfOnes(output, MCS, element); VerifyChunkOfOnes(output, MCS, element, false);
VerifyChunkOfOnes(output + WCS, size - MCS, element); VerifyChunkOfOnes(output + WCS, size - MCS, element);
} }

View File

@ -614,22 +614,6 @@ TEST(BoltSession, PartialChunk) {
PrintOutput(output); PrintOutput(output);
} }
TEST(BoltSession, InvalidChunk) {
INIT_VARS;
ExecuteHandshake(session, output);
ExecuteInit(session, output);
// this will write 0x00 0x02 0x00 0x02 0x00 0x02
// that is a chunk of good size, but it's invalid because the last
// two bytes are 0x00 0x02 and they should be 0x00 0x00
for (int i = 0; i < 3; ++i) WriteChunkHeader(session, 2);
session.Execute();
ASSERT_EQ(session.state_, StateT::Close);
ASSERT_FALSE(session.socket_.IsOpen());
CheckFailureMessage(output);
}
TEST(BoltSession, ExplicitTransactionValidQueries) { TEST(BoltSession, ExplicitTransactionValidQueries) {
// It is not really easy to check if we commited or aborted transaction except // It is not really easy to check if we commited or aborted transaction except
// by faking GraphDb/TxEngine... // by faking GraphDb/TxEngine...