Support streaming of Bolt results
Summary: Previously, our implementation of the Bolt protocol buffered all results in memory before sending them out to the client. This implementation immediately streams the results to the client to avoid any memory allocations. Also, this implementation splits the interpretation and pulling logic into two. Reviewers: teon.banek Reviewed By: teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1495
This commit is contained in:
parent
475d285224
commit
4f417e1f5d
@ -99,7 +99,7 @@ class Client final {
|
||||
DLOG(INFO) << "Sending run message with statement: '" << query
|
||||
<< "'; parameters: " << parameters;
|
||||
|
||||
encoder_.MessageRun(query, parameters, false);
|
||||
encoder_.MessageRun(query, parameters);
|
||||
encoder_.MessagePullAll();
|
||||
|
||||
DLOG(INFO) << "Reading run message response";
|
||||
@ -194,7 +194,7 @@ class Client final {
|
||||
bool GetMessage() {
|
||||
client_.ClearData();
|
||||
while (true) {
|
||||
if (!client_.Read(CHUNK_HEADER_SIZE)) return false;
|
||||
if (!client_.Read(kChunkHeaderSize)) return false;
|
||||
|
||||
size_t chunk_size = client_.GetData()[0];
|
||||
chunk_size <<= 8;
|
||||
|
@ -7,13 +7,12 @@ namespace communication::bolt {
|
||||
/**
|
||||
* Sizes related to the chunk defined in Bolt protocol.
|
||||
*/
|
||||
static constexpr size_t CHUNK_HEADER_SIZE = 2;
|
||||
static constexpr size_t MAX_CHUNK_SIZE = 65535;
|
||||
static constexpr size_t CHUNK_END_MARKER_SIZE = 2;
|
||||
static constexpr size_t WHOLE_CHUNK_SIZE = CHUNK_HEADER_SIZE + MAX_CHUNK_SIZE;
|
||||
static constexpr size_t kChunkHeaderSize = 2;
|
||||
static constexpr size_t kChunkMaxDataSize = 65535;
|
||||
static constexpr size_t kChunkWholeSize = kChunkHeaderSize + kChunkMaxDataSize;
|
||||
|
||||
/**
|
||||
* Handshake size defined in the Bolt protocol.
|
||||
*/
|
||||
static constexpr size_t HANDSHAKE_SIZE = 20;
|
||||
}
|
||||
static constexpr size_t kHandshakeSize = 20;
|
||||
} // namespace communication::bolt
|
||||
|
@ -42,7 +42,7 @@ template <typename TBuffer>
|
||||
class ChunkedDecoderBuffer {
|
||||
public:
|
||||
ChunkedDecoderBuffer(TBuffer &buffer) : buffer_(buffer) {
|
||||
data_.reserve(MAX_CHUNK_SIZE);
|
||||
data_.reserve(kChunkMaxDataSize);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -12,18 +12,14 @@
|
||||
|
||||
namespace communication::bolt {
|
||||
|
||||
// TODO: implement a better flushing strategy + optimize memory allocations!
|
||||
// TODO: see how bolt splits message over more TCP packets
|
||||
// -> test for more TCP packets!
|
||||
|
||||
/**
|
||||
* @brief ChunkedEncoderBuffer
|
||||
*
|
||||
* Has methods for writing and flushing data into the message buffer.
|
||||
* Has methods for writing data into the chunk buffer.
|
||||
*
|
||||
* Writing data stores data in the internal buffer and flushing data sends
|
||||
* the currently stored data to the OutputStream. Chunking prepends data length
|
||||
* and appends chunk end marker (0x00 0x00).
|
||||
* Writing data stores data in the internal buffer and chunking data sends
|
||||
* the currently stored data to the OutputStream. Flushing prepends the data
|
||||
* length to each chunk.
|
||||
*
|
||||
* | chunk header | --- chunk --- | another chunk | -- end marker -- |
|
||||
* | ------- whole chunk -------- | whole chunk | chunk of size 0 |
|
||||
@ -31,8 +27,13 @@ namespace communication::bolt {
|
||||
* | --------------------------- message --------------------------- |
|
||||
* | --------------------------- buffer --------------------------- |
|
||||
*
|
||||
* The current implementation stores the whole message into a single buffer
|
||||
* which is std::vector.
|
||||
* NOTE: To send a message end marker (chunk of size 0) it is necessary to
|
||||
* explicitly call the `Flush` method on an empty buffer. In that way the user
|
||||
* can control when the message is over and the whole message isn't
|
||||
* unnecessarily buffered in memory.
|
||||
*
|
||||
* The current implementation stores only a single chunk into memory and sends
|
||||
* it immediately to the output stream when new data arrives.
|
||||
*
|
||||
* @tparam TOutputStream the output stream that should be used
|
||||
*/
|
||||
@ -44,174 +45,78 @@ class ChunkedEncoderBuffer {
|
||||
|
||||
/**
|
||||
* Writes n values into the buffer. If n is bigger than whole chunk size
|
||||
* values are automatically chunked.
|
||||
* values are automatically chunked and sent to the output buffer.
|
||||
*
|
||||
* @param values data array of bytes
|
||||
* @param n is the number of bytes
|
||||
*/
|
||||
void Write(const uint8_t *values, size_t n) {
|
||||
int written = 0;
|
||||
size_t written = 0;
|
||||
|
||||
while (n > 0) {
|
||||
// Define number of bytes which will be copied into chunk because
|
||||
// chunk is a fixed length array.
|
||||
auto size = n < WHOLE_CHUNK_SIZE - pos_ ? n : WHOLE_CHUNK_SIZE - pos_;
|
||||
// Define the number of bytes which will be copied into the chunk because
|
||||
// the internal storage is a fixed length array.
|
||||
size_t size =
|
||||
n < kChunkMaxDataSize - have_ ? n : kChunkMaxDataSize - have_;
|
||||
|
||||
// Copy size values to chunk array.
|
||||
std::memcpy(chunk_.data() + pos_, values + written, size);
|
||||
// Copy `size` values to the chunk array.
|
||||
std::memcpy(chunk_.data() + kChunkHeaderSize + have_, values + written,
|
||||
size);
|
||||
|
||||
// Update positions. Position pointer and incomming size have to be
|
||||
// updated because all incomming values have to be processed.
|
||||
// Update positions. The position pointer and incoming size have to be
|
||||
// updated because all incoming values have to be processed.
|
||||
written += size;
|
||||
pos_ += size;
|
||||
have_ += size;
|
||||
n -= size;
|
||||
|
||||
// If chunk is full copy it into the message buffer and make space for
|
||||
// other incomming values that are left in the values array.
|
||||
if (pos_ == WHOLE_CHUNK_SIZE) Chunk(false);
|
||||
// If the chunk is full, send it to the output stream and clear the
|
||||
// internal storage to make space for other incoming values that are left
|
||||
// in the values array.
|
||||
if (have_ == kChunkMaxDataSize) Flush(true);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrap the data from chunk array (append header and end marker) and put
|
||||
* the whole chunk into the buffer.
|
||||
* Wrap the data from the chunk array (append the size header) and send
|
||||
* the whole chunk into the output stream.
|
||||
*
|
||||
* @param message_done if set to true then chunk appends an end message
|
||||
* marker to the chunk, should always be set to true
|
||||
* (the default value), false is used only internally
|
||||
* @param have_more this parameter is passed to the underlying output stream
|
||||
* `Write` method to indicate wether we have more data
|
||||
* waiting to be sent (in order to optimize network packets)
|
||||
*/
|
||||
void Chunk(bool message_done = true) {
|
||||
// 1. Write the size of the chunk (CHUNK HEADER).
|
||||
uint16_t size = pos_ - CHUNK_HEADER_SIZE;
|
||||
// Write the higher byte.
|
||||
chunk_[0] = size >> 8;
|
||||
// Write the lower byte.
|
||||
chunk_[1] = size & 0xFF;
|
||||
bool Flush(bool have_more = false) {
|
||||
// Write the size of the chunk.
|
||||
chunk_[0] = have_ >> 8;
|
||||
chunk_[1] = have_ & 0xFF;
|
||||
|
||||
// 2. Determine the final size for the end marker.
|
||||
if (message_done) size_ += 2;
|
||||
|
||||
// 3. Copy whole chunk into the buffer.
|
||||
size_ += pos_;
|
||||
std::copy(chunk_.begin(), chunk_.begin() + pos_,
|
||||
std::back_inserter(buffer_));
|
||||
|
||||
// 4. Insert message end marker.
|
||||
if (message_done) {
|
||||
buffer_.push_back(0);
|
||||
buffer_.push_back(0);
|
||||
}
|
||||
|
||||
// 5. Remember first chunk size.
|
||||
if (first_chunk_size_ == -1) first_chunk_size_ = size_;
|
||||
|
||||
// 6. Cleanup.
|
||||
// * pos_ has to be reset to the size of chunk header (reserved
|
||||
// space for the chunk size)
|
||||
pos_ = CHUNK_HEADER_SIZE;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends the whole buffer(message) to the client.
|
||||
* @returns true if the data was successfully sent to the client
|
||||
* false otherwise
|
||||
*/
|
||||
bool Flush() {
|
||||
// Call chunk if is hasn't been called.
|
||||
if (pos_ > CHUNK_HEADER_SIZE) Chunk();
|
||||
|
||||
// Early return if buffer is empty because there is nothing to write.
|
||||
if (size_ == 0) return true;
|
||||
|
||||
// Flush the whole buffer.
|
||||
if (!output_stream_.Write(buffer_.data() + offset_, size_ - offset_))
|
||||
return false;
|
||||
// Write the data to the stream.
|
||||
auto ret = output_stream_.Write(chunk_.data(), kChunkHeaderSize + have_,
|
||||
have_more);
|
||||
|
||||
// Cleanup.
|
||||
Clear();
|
||||
return true;
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends only the first message chunk in the buffer to the client.
|
||||
* @returns true if the data was successfully sent to the client
|
||||
* false otherwise
|
||||
*/
|
||||
bool FlushFirstChunk() {
|
||||
// Call chunk if is hasn't been called.
|
||||
if (pos_ > CHUNK_HEADER_SIZE) Chunk();
|
||||
|
||||
// Early return if buffer is empty because there is nothing to write.
|
||||
if (size_ == 0) return false;
|
||||
|
||||
// Early return if there is no first chunk
|
||||
if (first_chunk_size_ == -1) return false;
|
||||
|
||||
// Flush the first chunk
|
||||
if (!output_stream_.Write(buffer_.data(), first_chunk_size_)) return false;
|
||||
|
||||
// Cleanup.
|
||||
// Here we use offset as a method of deleting from the front of the
|
||||
// data vector. Because the first chunk will always be relatively
|
||||
// small comparing to the rest of the data it is more optimal just to
|
||||
// skip the first part of the data than to shift everything in the
|
||||
// vector buffer.
|
||||
offset_ = first_chunk_size_;
|
||||
first_chunk_size_ = -1;
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clears the internal buffers.
|
||||
*/
|
||||
void Clear() {
|
||||
buffer_.clear();
|
||||
size_ = 0;
|
||||
first_chunk_size_ = -1;
|
||||
offset_ = 0;
|
||||
}
|
||||
/** Clears the internal buffers. */
|
||||
void Clear() { have_ = 0; }
|
||||
|
||||
/**
|
||||
* Returns a boolean indicating whether there is data in the buffer.
|
||||
* @returns true if there is data in the buffer,
|
||||
* false otherwise
|
||||
*/
|
||||
bool HasData() { return buffer_.size() > 0 || size_ > 0; }
|
||||
bool HasData() { return have_ > 0; }
|
||||
|
||||
private:
|
||||
/**
|
||||
* The output stream used.
|
||||
*/
|
||||
// The output stream used.
|
||||
TOutputStream &output_stream_;
|
||||
|
||||
/**
|
||||
* Buffer for a single chunk.
|
||||
*/
|
||||
std::array<uint8_t, WHOLE_CHUNK_SIZE> chunk_;
|
||||
// Buffer for a single chunk.
|
||||
std::array<uint8_t, kChunkWholeSize> chunk_;
|
||||
|
||||
/**
|
||||
* Buffer for the message which will be sent to a client.
|
||||
*/
|
||||
std::vector<uint8_t> buffer_;
|
||||
|
||||
/**
|
||||
* Size of the message.
|
||||
*/
|
||||
size_t size_{0};
|
||||
|
||||
/**
|
||||
* Size of first chunk in the buffer.
|
||||
*/
|
||||
int32_t first_chunk_size_{-1};
|
||||
|
||||
/**
|
||||
* Offset from the start of the buffer.
|
||||
*/
|
||||
size_t offset_{0};
|
||||
|
||||
/**
|
||||
* Current position in chunk array.
|
||||
*/
|
||||
size_t pos_{CHUNK_HEADER_SIZE};
|
||||
// Amount of data in chunk array.
|
||||
size_t have_{0};
|
||||
};
|
||||
} // namespace communication::bolt
|
||||
|
@ -44,6 +44,10 @@ class ClientEncoder : private BaseEncoder<Buffer> {
|
||||
WriteRAW(utils::UnderlyingCast(Signature::Init));
|
||||
WriteString(client_name);
|
||||
WriteMap(auth_token);
|
||||
// Try to flush all remaining data in the buffer, but tell it that we will
|
||||
// send more data (the end of message chunk).
|
||||
if (!buffer_.Flush(true)) return false;
|
||||
// Flush an empty chunk to indicate that the message is done.
|
||||
return buffer_.Flush();
|
||||
}
|
||||
|
||||
@ -63,18 +67,18 @@ class ClientEncoder : private BaseEncoder<Buffer> {
|
||||
*/
|
||||
bool MessageRun(const std::string &statement,
|
||||
const std::map<std::string, DecodedValue> ¶meters,
|
||||
bool flush = true) {
|
||||
bool have_more = true) {
|
||||
WriteRAW(utils::UnderlyingCast(Marker::TinyStruct2));
|
||||
WriteRAW(utils::UnderlyingCast(Signature::Run));
|
||||
WriteString(statement);
|
||||
WriteMap(parameters);
|
||||
if (flush) {
|
||||
return buffer_.Flush();
|
||||
} else {
|
||||
buffer_.Chunk();
|
||||
// Chunk always succeeds, so return true
|
||||
return true;
|
||||
}
|
||||
// Try to flush all remaining data in the buffer, but tell it that we will
|
||||
// send more data (the end of message chunk).
|
||||
if (!buffer_.Flush(true)) return false;
|
||||
// Flush an empty chunk to indicate that the message is done. Here we
|
||||
// forward the `have_more` flag to indicate if there is more data that will
|
||||
// be sent.
|
||||
return buffer_.Flush(have_more);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -90,6 +94,10 @@ class ClientEncoder : private BaseEncoder<Buffer> {
|
||||
bool MessageDiscardAll() {
|
||||
WriteRAW(utils::UnderlyingCast(Marker::TinyStruct));
|
||||
WriteRAW(utils::UnderlyingCast(Signature::DiscardAll));
|
||||
// Try to flush all remaining data in the buffer, but tell it that we will
|
||||
// send more data (the end of message chunk).
|
||||
if (!buffer_.Flush(true)) return false;
|
||||
// Flush an empty chunk to indicate that the message is done.
|
||||
return buffer_.Flush();
|
||||
}
|
||||
|
||||
@ -106,6 +114,10 @@ class ClientEncoder : private BaseEncoder<Buffer> {
|
||||
bool MessagePullAll() {
|
||||
WriteRAW(utils::UnderlyingCast(Marker::TinyStruct));
|
||||
WriteRAW(utils::UnderlyingCast(Signature::PullAll));
|
||||
// Try to flush all remaining data in the buffer, but tell it that we will
|
||||
// send more data (the end of message chunk).
|
||||
if (!buffer_.Flush(true)) return false;
|
||||
// Flush an empty chunk to indicate that the message is done.
|
||||
return buffer_.Flush();
|
||||
}
|
||||
|
||||
@ -122,6 +134,10 @@ class ClientEncoder : private BaseEncoder<Buffer> {
|
||||
bool MessageAckFailure() {
|
||||
WriteRAW(utils::UnderlyingCast(Marker::TinyStruct));
|
||||
WriteRAW(utils::UnderlyingCast(Signature::AckFailure));
|
||||
// Try to flush all remaining data in the buffer, but tell it that we will
|
||||
// send more data (the end of message chunk).
|
||||
if (!buffer_.Flush(true)) return false;
|
||||
// Flush an empty chunk to indicate that the message is done.
|
||||
return buffer_.Flush();
|
||||
}
|
||||
|
||||
@ -138,7 +154,11 @@ class ClientEncoder : private BaseEncoder<Buffer> {
|
||||
bool MessageReset() {
|
||||
WriteRAW(utils::UnderlyingCast(Marker::TinyStruct));
|
||||
WriteRAW(utils::UnderlyingCast(Signature::Reset));
|
||||
// Try to flush all remaining data in the buffer, but tell it that we will
|
||||
// send more data (the end of message chunk).
|
||||
if (!buffer_.Flush(true)) return false;
|
||||
// Flush an empty chunk to indicate that the message is done.
|
||||
return buffer_.Flush();
|
||||
}
|
||||
};
|
||||
}
|
||||
} // namespace communication::bolt
|
||||
|
@ -24,10 +24,7 @@ class Encoder : private BaseEncoder<Buffer> {
|
||||
Encoder(Buffer &buffer) : BaseEncoder<Buffer>(buffer) {}
|
||||
|
||||
/**
|
||||
* Writes a Record message. This method only stores data in the Buffer.
|
||||
* It doesn't send the values out to the Buffer (Chunk is called at the
|
||||
* end of this method). To send the values Flush method has to be called
|
||||
* after this method.
|
||||
* Sends a Record message.
|
||||
*
|
||||
* From the Bolt v1 documentation:
|
||||
* RecordMessage (signature=0x71) {
|
||||
@ -36,11 +33,18 @@ class Encoder : private BaseEncoder<Buffer> {
|
||||
*
|
||||
* @param values the fields list object that should be sent
|
||||
*/
|
||||
void MessageRecord(const std::vector<DecodedValue> &values) {
|
||||
bool MessageRecord(const std::vector<DecodedValue> &values) {
|
||||
WriteRAW(utils::UnderlyingCast(Marker::TinyStruct1));
|
||||
WriteRAW(utils::UnderlyingCast(Signature::Record));
|
||||
WriteList(values);
|
||||
buffer_.Chunk();
|
||||
// Try to flush all remaining data in the buffer, but tell it that we will
|
||||
// send more data (the end of message chunk).
|
||||
if (!buffer_.Flush(true)) return false;
|
||||
// Flush an empty chunk to indicate that the message is done. Here we tell
|
||||
// the buffer that there will be more data because this is a Record message
|
||||
// and it will surely be followed by either a Record, Success or Failure
|
||||
// message.
|
||||
return buffer_.Flush(true);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -52,22 +56,18 @@ class Encoder : private BaseEncoder<Buffer> {
|
||||
* }
|
||||
*
|
||||
* @param metadata the metadata map object that should be sent
|
||||
* @param flush should method flush the socket
|
||||
* @returns true if the data was successfully sent to the client
|
||||
* when flushing, false otherwise
|
||||
*/
|
||||
bool MessageSuccess(const std::map<std::string, DecodedValue> &metadata,
|
||||
bool flush = true) {
|
||||
bool MessageSuccess(const std::map<std::string, DecodedValue> &metadata) {
|
||||
WriteRAW(utils::UnderlyingCast(Marker::TinyStruct1));
|
||||
WriteRAW(utils::UnderlyingCast(Signature::Success));
|
||||
WriteMap(metadata);
|
||||
if (flush) {
|
||||
return buffer_.Flush();
|
||||
} else {
|
||||
buffer_.Chunk();
|
||||
// Chunk always succeeds, so return true
|
||||
return true;
|
||||
}
|
||||
// Try to flush all remaining data in the buffer, but tell it that we will
|
||||
// send more data (the end of message chunk).
|
||||
if (!buffer_.Flush(true)) return false;
|
||||
// Flush an empty chunk to indicate that the message is done.
|
||||
return buffer_.Flush();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -99,6 +99,10 @@ class Encoder : private BaseEncoder<Buffer> {
|
||||
WriteRAW(utils::UnderlyingCast(Marker::TinyStruct1));
|
||||
WriteRAW(utils::UnderlyingCast(Signature::Failure));
|
||||
WriteMap(metadata);
|
||||
// Try to flush all remaining data in the buffer, but tell it that we will
|
||||
// send more data (the end of message chunk).
|
||||
if (!buffer_.Flush(true)) return false;
|
||||
// Flush an empty chunk to indicate that the message is done.
|
||||
return buffer_.Flush();
|
||||
}
|
||||
|
||||
@ -118,6 +122,10 @@ class Encoder : private BaseEncoder<Buffer> {
|
||||
WriteRAW(utils::UnderlyingCast(Marker::TinyStruct1));
|
||||
WriteRAW(utils::UnderlyingCast(Signature::Ignored));
|
||||
WriteMap(metadata);
|
||||
// Try to flush all remaining data in the buffer, but tell it that we will
|
||||
// send more data (the end of message chunk).
|
||||
if (!buffer_.Flush(true)) return false;
|
||||
// Flush an empty chunk to indicate that the message is done.
|
||||
return buffer_.Flush();
|
||||
}
|
||||
|
||||
@ -132,6 +140,10 @@ class Encoder : private BaseEncoder<Buffer> {
|
||||
bool MessageIgnored() {
|
||||
WriteRAW(utils::UnderlyingCast(Marker::TinyStruct));
|
||||
WriteRAW(utils::UnderlyingCast(Signature::Ignored));
|
||||
// Try to flush all remaining data in the buffer, but tell it that we will
|
||||
// send more data (the end of message chunk).
|
||||
if (!buffer_.Flush(true)) return false;
|
||||
// Flush an empty chunk to indicate that the message is done.
|
||||
return buffer_.Flush();
|
||||
}
|
||||
};
|
||||
|
@ -1,74 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include "communication/bolt/v1/encoder/chunked_encoder_buffer.hpp"
|
||||
#include "communication/bolt/v1/encoder/encoder.hpp"
|
||||
|
||||
namespace communication::bolt {
|
||||
|
||||
/**
|
||||
* A high level API for streaming a Bolt response. Exposes
|
||||
* functionalities used by the compiler and query plans (which
|
||||
* should not use any lower level API).
|
||||
*
|
||||
* @tparam Encoder Encoder used.
|
||||
*/
|
||||
template <typename Encoder>
|
||||
class ResultStream {
|
||||
public:
|
||||
ResultStream(Encoder &encoder) : encoder_(encoder) {}
|
||||
|
||||
/**
|
||||
* Writes a header. Typically a header is something like:
|
||||
* [ "Header1", "Header2", "Header3" ]
|
||||
*
|
||||
* @param fields the header fields that should be sent.
|
||||
*/
|
||||
void Header(const std::vector<std::string> &fields) {
|
||||
std::vector<DecodedValue> vec;
|
||||
std::map<std::string, DecodedValue> data;
|
||||
for (auto &i : fields) vec.push_back(DecodedValue(i));
|
||||
data.insert(std::make_pair(std::string("fields"), DecodedValue(vec)));
|
||||
// this message shouldn't send directly to the client because if an error
|
||||
// happened the client will receive two messages (success and failure)
|
||||
// instead of only one
|
||||
encoder_.MessageSuccess(data, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes a result. Typically a result is something like:
|
||||
* [
|
||||
* Value1,
|
||||
* Value2,
|
||||
* Value3
|
||||
* ]
|
||||
* NOTE: The result fields should be in the same ordering that the header
|
||||
* fields were sent in.
|
||||
*
|
||||
* @param values the values that should be sent
|
||||
*/
|
||||
void Result(const std::vector<DecodedValue> &values) {
|
||||
encoder_.MessageRecord(values);
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes a summary. Typically a summary is something like:
|
||||
* {
|
||||
* "type" : "r" | "rw" | ...,
|
||||
* "stats": {
|
||||
* "nodes_created": 12,
|
||||
* "nodes_deleted": 0
|
||||
* }
|
||||
* }
|
||||
*
|
||||
* @param summary the summary map object that should be sent
|
||||
*/
|
||||
void Summary(const std::map<std::string, DecodedValue> &summary) {
|
||||
// at this point message should not flush the socket so
|
||||
// here is false because chunk has to be called instead of flush
|
||||
encoder_.MessageSuccess(summary, false);
|
||||
}
|
||||
|
||||
private:
|
||||
Encoder &encoder_;
|
||||
};
|
||||
}
|
@ -7,8 +7,8 @@
|
||||
#include "communication/bolt/v1/constants.hpp"
|
||||
#include "communication/bolt/v1/decoder/chunked_decoder_buffer.hpp"
|
||||
#include "communication/bolt/v1/decoder/decoder.hpp"
|
||||
#include "communication/bolt/v1/encoder/chunked_encoder_buffer.hpp"
|
||||
#include "communication/bolt/v1/encoder/encoder.hpp"
|
||||
#include "communication/bolt/v1/encoder/result_stream.hpp"
|
||||
#include "communication/bolt/v1/state.hpp"
|
||||
#include "communication/bolt/v1/states/error.hpp"
|
||||
#include "communication/bolt/v1/states/executing.hpp"
|
||||
@ -39,8 +39,7 @@ class SessionException : public utils::BasicException {
|
||||
template <typename TInputStream, typename TOutputStream>
|
||||
class Session {
|
||||
public:
|
||||
using ResultStreamT =
|
||||
ResultStream<Encoder<ChunkedEncoderBuffer<TOutputStream>>>;
|
||||
using TEncoder = Encoder<ChunkedEncoderBuffer<TOutputStream>>;
|
||||
|
||||
Session(TInputStream &input_stream, TOutputStream &output_stream)
|
||||
: input_stream_(input_stream), output_stream_(output_stream) {}
|
||||
@ -48,21 +47,19 @@ class Session {
|
||||
virtual ~Session() {}
|
||||
|
||||
/**
|
||||
* Put results in the `result_stream` by processing the given `query` with
|
||||
* `params`.
|
||||
* Process the given `query` with `params`.
|
||||
*/
|
||||
virtual void PullAll(const std::string &query,
|
||||
const std::map<std::string, DecodedValue> ¶ms,
|
||||
ResultStreamT *result_stream) = 0;
|
||||
virtual std::vector<std::string> Interpret(const std::string &query,
|
||||
const std::map<std::string, DecodedValue> ¶ms) = 0;
|
||||
|
||||
/**
|
||||
* Put results of the processed query in the `encoder`.
|
||||
*/
|
||||
virtual std::map<std::string, DecodedValue> PullAll(TEncoder *encoder) = 0;
|
||||
|
||||
/** Aborts currently running query. */
|
||||
virtual void Abort() = 0;
|
||||
|
||||
void PullAll(const std::string &query,
|
||||
const std::map<std::string, DecodedValue> ¶ms) {
|
||||
return PullAll(query, params, &result_stream_);
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the session after data has been read into the buffer.
|
||||
* Goes through the bolt states in order to execute commands from the client.
|
||||
@ -71,10 +68,10 @@ class Session {
|
||||
if (UNLIKELY(!handshake_done_)) {
|
||||
// Resize the input buffer to ensure that a whole chunk can fit into it.
|
||||
// This can be done only once because the buffer holds its size.
|
||||
input_stream_.Resize(WHOLE_CHUNK_SIZE);
|
||||
input_stream_.Resize(kChunkWholeSize);
|
||||
|
||||
// Receive the handshake.
|
||||
if (input_stream_.size() < HANDSHAKE_SIZE) {
|
||||
if (input_stream_.size() < kHandshakeSize) {
|
||||
DLOG(WARNING) << fmt::format("Received partial handshake of size {}",
|
||||
input_stream_.size());
|
||||
return;
|
||||
@ -128,8 +125,7 @@ class Session {
|
||||
TOutputStream &output_stream_;
|
||||
|
||||
ChunkedEncoderBuffer<TOutputStream> encoder_buffer_{output_stream_};
|
||||
Encoder<ChunkedEncoderBuffer<TOutputStream>> encoder_{encoder_buffer_};
|
||||
ResultStreamT result_stream_{encoder_};
|
||||
TEncoder encoder_{encoder_buffer_};
|
||||
|
||||
ChunkedDecoderBuffer<TInputStream> decoder_buffer_{input_stream_};
|
||||
Decoder<ChunkedDecoderBuffer<TInputStream>> decoder_{decoder_buffer_};
|
||||
|
@ -26,6 +26,49 @@ class ClientError : public utils::BasicException {
|
||||
using utils::BasicException::BasicException;
|
||||
};
|
||||
|
||||
// TODO (mferencevic): revise these error messages
|
||||
inline std::pair<std::string, std::string> ExceptionToErrorMessage(
|
||||
const std::exception &e) {
|
||||
if (dynamic_cast<const ClientError *>(&e)) {
|
||||
// Clients expect 4 strings separated by dots. First being database name
|
||||
// (for example: Neo, Memgraph...), second being either ClientError,
|
||||
// TransientError or DatabaseError (or ClientNotification for warnings).
|
||||
// ClientError means wrong query, do not retry. DatabaseError means
|
||||
// something wrong in database, do not retry. TransientError means query
|
||||
// failed, but if retried it may succeed, retry it.
|
||||
//
|
||||
// Third and fourth strings being namespace and specific error name.
|
||||
// It is not really important what we put there since we don't expect
|
||||
// any special handling of specific exceptions on client side, but we
|
||||
// need to make sure that we don't accidentally return some exception
|
||||
// name which clients handle in a special way. For example, if client
|
||||
// receives *.TransientError.Transaction.Terminate it will not rerun
|
||||
// query even though TransientError was returned, because of Neo's
|
||||
// semantics of that error.
|
||||
return {"Memgraph.ClientError.MemgraphError.MemgraphError", e.what()};
|
||||
}
|
||||
if (dynamic_cast<const utils::BasicException *>(&e)) {
|
||||
// Exception not derived from QueryException was thrown which means that
|
||||
// database probably aborted transaction because of some timeout,
|
||||
// deadlock, serialization error or something similar. We return
|
||||
// TransientError since retry of same transaction could succeed.
|
||||
return {"Memgraph.TransientError.MemgraphError.MemgraphError", e.what()};
|
||||
}
|
||||
if (dynamic_cast<const std::bad_alloc *>(&e)) {
|
||||
// std::bad_alloc was thrown, God knows in which state is database ->
|
||||
// terminate.
|
||||
LOG(FATAL) << "Memgraph is out of memory";
|
||||
}
|
||||
// All exceptions used in memgraph are derived from BasicException. Since
|
||||
// we caught some other exception we don't know what is going on. Return
|
||||
// DatabaseError, log real message and return generic string.
|
||||
LOG(ERROR) << "Unknown exception occurred during query execution "
|
||||
<< e.what();
|
||||
return {"Memgraph.DatabaseError.MemgraphError.MemgraphError",
|
||||
"An unknown exception occurred, this is unexpected. Real message "
|
||||
"should be in database logs."};
|
||||
}
|
||||
|
||||
template <typename TSession>
|
||||
State HandleRun(TSession &session, State state, Marker marker) {
|
||||
const std::map<std::string, DecodedValue> kEmptyFields = {
|
||||
@ -63,68 +106,27 @@ State HandleRun(TSession &session, State state, Marker marker) {
|
||||
DLOG(INFO) << fmt::format("[Run] '{}'", query.ValueString());
|
||||
|
||||
try {
|
||||
// PullAll can throw.
|
||||
session.PullAll(query.ValueString(), params.ValueMap());
|
||||
|
||||
// The query engine has already stored all query data in the buffer.
|
||||
// We should only send the first chunk now which is the success
|
||||
// message which contains header data. The rest of this data (records
|
||||
// and summary) will be sent after a PULL_ALL command from the client.
|
||||
if (!session.encoder_buffer_.FlushFirstChunk()) {
|
||||
DLOG(WARNING) << "Couldn't flush header data from the buffer!";
|
||||
// Interpret can throw.
|
||||
auto header = session.Interpret(query.ValueString(), params.ValueMap());
|
||||
// Convert std::string to DecodedValue
|
||||
std::vector<DecodedValue> vec;
|
||||
std::map<std::string, DecodedValue> data;
|
||||
vec.reserve(header.size());
|
||||
for (auto &i : header) vec.push_back(DecodedValue(i));
|
||||
data.insert(std::make_pair(std::string("fields"), DecodedValue(vec)));
|
||||
// Send the header.
|
||||
if (!session.encoder_.MessageSuccess(data)) {
|
||||
DLOG(WARNING) << "Couldn't send query header!";
|
||||
return State::Close;
|
||||
}
|
||||
return State::Result;
|
||||
} catch (const std::exception &e) {
|
||||
// clear header success message
|
||||
session.encoder_buffer_.Clear();
|
||||
DLOG(WARNING) << fmt::format("Error message: {}", e.what());
|
||||
if (const auto *p = dynamic_cast<const utils::StacktraceException *>(&e)) {
|
||||
DLOG(WARNING) << fmt::format("Error trace: {}", p->trace());
|
||||
}
|
||||
|
||||
auto code_message = [&e]() -> std::pair<std::string, std::string> {
|
||||
if (dynamic_cast<const ClientError *>(&e)) {
|
||||
// Clients expect 4 strings separated by dots. First being database name
|
||||
// (for example: Neo, Memgraph...), second being either ClientError,
|
||||
// TransientError or DatabaseError (or ClientNotification for warnings).
|
||||
// ClientError means wrong query, do not retry. DatabaseError means
|
||||
// something wrong in database, do not retry. TransientError means query
|
||||
// failed, but if retried it may succeed, retry it.
|
||||
//
|
||||
// Third and fourth strings being namespace and specific error name.
|
||||
// It is not really important what we put there since we don't expect
|
||||
// any special handling of specific exceptions on client side, but we
|
||||
// need to make sure that we don't accidentally return some exception
|
||||
// name which clients handle in a special way. For example, if client
|
||||
// receives *.TransientError.Transaction.Terminate it will not rerun
|
||||
// query even though TransientError was returned, because of Neo's
|
||||
// semantics of that error.
|
||||
return {"Memgraph.ClientError.MemgraphError.MemgraphError", e.what()};
|
||||
}
|
||||
if (dynamic_cast<const utils::BasicException *>(&e)) {
|
||||
// Exception not derived from QueryException was thrown which means that
|
||||
// database probably aborted transaction because of some timeout,
|
||||
// deadlock, serialization error or something similar. We return
|
||||
// TransientError since retry of same transaction could succeed.
|
||||
return {"Memgraph.TransientError.MemgraphError.MemgraphError",
|
||||
e.what()};
|
||||
}
|
||||
if (dynamic_cast<const std::bad_alloc *>(&e)) {
|
||||
// std::bad_alloc was thrown, God knows in which state is database ->
|
||||
// terminate.
|
||||
LOG(FATAL) << "Memgraph is out of memory";
|
||||
}
|
||||
// All exceptions used in memgraph are derived from BasicException. Since
|
||||
// we caught some other exception we don't know what is going on. Return
|
||||
// DatabaseError, log real message and return generic string.
|
||||
LOG(ERROR) << "Unknown exception occurred during query execution "
|
||||
<< e.what();
|
||||
return {"Memgraph.DatabaseError.MemgraphError.MemgraphError",
|
||||
"An unknown exception occurred, this is unexpected. Real message "
|
||||
"should be in database logs."};
|
||||
}();
|
||||
|
||||
session.encoder_buffer_.Clear();
|
||||
auto code_message = ExceptionToErrorMessage(e);
|
||||
bool fail_sent = session.encoder_.MessageFailure(
|
||||
{{"code", code_message.first}, {"message", code_message.second}});
|
||||
if (!fail_sent) {
|
||||
@ -135,7 +137,6 @@ State HandleRun(TSession &session, State state, Marker marker) {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Get rid of duplications in PullAll/DiscardAll functions.
|
||||
template <typename Session>
|
||||
State HandlePullAll(Session &session, State state, Marker marker) {
|
||||
if (marker != Marker::TinyStruct) {
|
||||
@ -150,13 +151,30 @@ State HandlePullAll(Session &session, State state, Marker marker) {
|
||||
// Same as `unexpected RUN` case.
|
||||
return State::Close;
|
||||
}
|
||||
// Flush pending data to the client, the success message is streamed
|
||||
// from the query engine, it contains statistics from the query run.
|
||||
if (!session.encoder_buffer_.Flush()) {
|
||||
DLOG(WARNING) << "Couldn't flush data from the buffer!";
|
||||
return State::Close;
|
||||
|
||||
try {
|
||||
// PullAll can throw.
|
||||
auto summary = session.PullAll(&session.encoder_);
|
||||
if (!session.encoder_.MessageSuccess(summary)) {
|
||||
DLOG(WARNING) << "Couldn't send query summary!";
|
||||
return State::Close;
|
||||
}
|
||||
return State::Idle;
|
||||
} catch (const std::exception &e) {
|
||||
DLOG(WARNING) << fmt::format("Error message: {}", e.what());
|
||||
if (const auto *p = dynamic_cast<const utils::StacktraceException *>(&e)) {
|
||||
DLOG(WARNING) << fmt::format("Error trace: {}", p->trace());
|
||||
}
|
||||
session.encoder_buffer_.Clear();
|
||||
auto code_message = ExceptionToErrorMessage(e);
|
||||
bool fail_sent = session.encoder_.MessageFailure(
|
||||
{{"code", code_message.first}, {"message", code_message.second}});
|
||||
if (!fail_sent) {
|
||||
DLOG(WARNING) << "Couldn't send failure message!";
|
||||
return State::Close;
|
||||
}
|
||||
return State::Error;
|
||||
}
|
||||
return State::Idle;
|
||||
}
|
||||
|
||||
template <typename Session>
|
||||
@ -173,12 +191,14 @@ State HandleDiscardAll(Session &session, State state, Marker marker) {
|
||||
// Same as `unexpected RUN` case.
|
||||
return State::Close;
|
||||
}
|
||||
|
||||
// Clear all pending data and send a success message.
|
||||
session.encoder_buffer_.Clear();
|
||||
if (!session.encoder_.MessageSuccess()) {
|
||||
DLOG(WARNING) << "Couldn't send success message!";
|
||||
return State::Close;
|
||||
}
|
||||
|
||||
return State::Idle;
|
||||
}
|
||||
|
||||
@ -199,13 +219,16 @@ State HandleReset(Session &session, State, Marker marker) {
|
||||
utils::UnderlyingCast(marker));
|
||||
return State::Close;
|
||||
}
|
||||
// clear all pending data and send a success message
|
||||
|
||||
// Clear all pending data and send a success message.
|
||||
session.encoder_buffer_.Clear();
|
||||
if (!session.encoder_.MessageSuccess()) {
|
||||
DLOG(WARNING) << "Couldn't send success message!";
|
||||
return State::Close;
|
||||
}
|
||||
|
||||
session.Abort();
|
||||
|
||||
return State::Idle;
|
||||
}
|
||||
|
||||
|
@ -32,8 +32,8 @@ State StateHandshakeRun(TSession &session) {
|
||||
}
|
||||
|
||||
// Delete data from the input stream. It is guaranteed that there will more
|
||||
// than, or equal to 20 bytes (HANDSHAKE_SIZE) in the buffer.
|
||||
session.input_stream_.Shift(HANDSHAKE_SIZE);
|
||||
// than, or equal to 20 bytes (kHandshakeSize) in the buffer.
|
||||
session.input_stream_.Shift(kHandshakeSize);
|
||||
|
||||
return State::Init;
|
||||
}
|
||||
|
@ -76,19 +76,35 @@ class BoltSession final
|
||||
input_stream, output_stream),
|
||||
transaction_engine_(data.db, data.interpreter) {}
|
||||
|
||||
using communication::bolt::Session<
|
||||
communication::InputStream, communication::OutputStream>::ResultStreamT;
|
||||
using communication::bolt::Session<communication::InputStream,
|
||||
communication::OutputStream>::TEncoder;
|
||||
|
||||
void PullAll(
|
||||
std::vector<std::string> Interpret(
|
||||
const std::string &query,
|
||||
const std::map<std::string, communication::bolt::DecodedValue> ¶ms,
|
||||
ResultStreamT *result_stream) override {
|
||||
const std::map<std::string, communication::bolt::DecodedValue> ¶ms)
|
||||
override {
|
||||
std::map<std::string, query::TypedValue> params_tv;
|
||||
for (const auto &kv : params)
|
||||
params_tv.emplace(kv.first, glue::ToTypedValue(kv.second));
|
||||
try {
|
||||
TypedValueResultStream stream(result_stream);
|
||||
transaction_engine_.PullAll(query, params_tv, &stream);
|
||||
return transaction_engine_.Interpret(query, params_tv);
|
||||
} catch (const query::QueryException &e) {
|
||||
// Wrap QueryException into ClientError, because we want to allow the
|
||||
// client to fix their query.
|
||||
throw communication::bolt::ClientError(e.what());
|
||||
}
|
||||
}
|
||||
|
||||
std::map<std::string, communication::bolt::DecodedValue> PullAll(
|
||||
TEncoder *encoder) override {
|
||||
try {
|
||||
TypedValueResultStream stream(encoder);
|
||||
const auto &summary = transaction_engine_.PullAll(&stream);
|
||||
std::map<std::string, communication::bolt::DecodedValue> decoded_summary;
|
||||
for (const auto &kv : summary) {
|
||||
decoded_summary.emplace(kv.first, glue::ToDecodedValue(kv.second));
|
||||
}
|
||||
return decoded_summary;
|
||||
} catch (const query::QueryException &e) {
|
||||
// Wrap QueryException into ClientError, because we want to allow the
|
||||
// client to fix their query.
|
||||
@ -99,16 +115,11 @@ class BoltSession final
|
||||
void Abort() override { transaction_engine_.Abort(); }
|
||||
|
||||
private:
|
||||
// Wrapper around ResultStreamT which converts TypedValue to DecodedValue
|
||||
// before forwarding the calls to original ResultStreamT.
|
||||
// Wrapper around TEncoder which converts TypedValue to DecodedValue
|
||||
// before forwarding the calls to original TEncoder.
|
||||
class TypedValueResultStream {
|
||||
public:
|
||||
TypedValueResultStream(ResultStreamT *result_stream)
|
||||
: result_stream_(result_stream) {}
|
||||
|
||||
void Header(const std::vector<std::string> &fields) {
|
||||
return result_stream_->Header(fields);
|
||||
}
|
||||
TypedValueResultStream(TEncoder *encoder) : encoder_(encoder) {}
|
||||
|
||||
void Result(const std::vector<query::TypedValue> &values) {
|
||||
std::vector<communication::bolt::DecodedValue> decoded_values;
|
||||
@ -116,19 +127,11 @@ class BoltSession final
|
||||
for (const auto &v : values) {
|
||||
decoded_values.push_back(glue::ToDecodedValue(v));
|
||||
}
|
||||
return result_stream_->Result(decoded_values);
|
||||
}
|
||||
|
||||
void Summary(const std::map<std::string, query::TypedValue> &summary) {
|
||||
std::map<std::string, communication::bolt::DecodedValue> decoded_summary;
|
||||
for (const auto &kv : summary) {
|
||||
decoded_summary.emplace(kv.first, glue::ToDecodedValue(kv.second));
|
||||
}
|
||||
return result_stream_->Summary(decoded_summary);
|
||||
encoder_->MessageRecord(decoded_values);
|
||||
}
|
||||
|
||||
private:
|
||||
ResultStreamT *result_stream_;
|
||||
TEncoder *encoder_;
|
||||
};
|
||||
|
||||
query::TransactionEngine transaction_engine_;
|
||||
|
@ -94,13 +94,8 @@ class Interpreter {
|
||||
*/
|
||||
template <typename TStream>
|
||||
bool Pull(TStream &stream) {
|
||||
if (!header_written_) {
|
||||
stream.Header(header_);
|
||||
header_written_ = true;
|
||||
}
|
||||
|
||||
utils::Timer timer;
|
||||
bool return_value = cursor_->Pull(frame_, ctx_);
|
||||
|
||||
if (return_value && !output_symbols_.empty()) {
|
||||
std::vector<TypedValue> values;
|
||||
values.reserve(output_symbols_.size());
|
||||
@ -109,11 +104,10 @@ class Interpreter {
|
||||
}
|
||||
stream.Result(values);
|
||||
}
|
||||
execution_time_ += timer.Elapsed().count();
|
||||
|
||||
if (!return_value) {
|
||||
auto execution_time = execution_timer_.Elapsed();
|
||||
summary_["plan_execution_time"] = execution_time.count();
|
||||
stream.Summary(summary_);
|
||||
summary_["plan_execution_time"] = execution_time_;
|
||||
|
||||
if (ctx_.is_index_created_) {
|
||||
auto access = plan_cache_.access();
|
||||
@ -132,6 +126,9 @@ class Interpreter {
|
||||
while (Pull(stream)) continue;
|
||||
}
|
||||
|
||||
const std::vector<std::string> &header() { return header_; }
|
||||
const std::map<std::string, TypedValue> &summary() { return summary_; }
|
||||
|
||||
private:
|
||||
Context ctx_;
|
||||
std::shared_ptr<CachedPlan> plan_;
|
||||
@ -139,11 +136,10 @@ class Interpreter {
|
||||
Frame frame_;
|
||||
std::vector<Symbol> output_symbols_;
|
||||
|
||||
bool header_written_{false};
|
||||
std::vector<std::string> header_;
|
||||
std::map<std::string, TypedValue> summary_;
|
||||
|
||||
utils::Timer execution_timer_;
|
||||
double execution_time_{0};
|
||||
// Gets invalidated after if an index has been built.
|
||||
PlanCacheT &plan_cache_;
|
||||
};
|
||||
|
@ -5,6 +5,7 @@
|
||||
#include "query/exceptions.hpp"
|
||||
#include "query/interpreter.hpp"
|
||||
#include "utils/string.hpp"
|
||||
#include "utils/likely.hpp"
|
||||
|
||||
namespace query {
|
||||
|
||||
@ -15,20 +16,21 @@ class TransactionEngine final {
|
||||
|
||||
~TransactionEngine() { Abort(); }
|
||||
|
||||
template <typename TStream>
|
||||
void PullAll(const std::string &query,
|
||||
const std::map<std::string, TypedValue> ¶ms,
|
||||
TStream *result_stream) {
|
||||
std::vector<std::string> Interpret(
|
||||
const std::string &query,
|
||||
const std::map<std::string, TypedValue> ¶ms) {
|
||||
// Clear pending results.
|
||||
results_ = std::experimental::nullopt;
|
||||
|
||||
// Check the query for transaction commands.
|
||||
auto query_upper = utils::Trim(utils::ToUpperCase(query));
|
||||
if (query_upper == "BEGIN") {
|
||||
if (in_explicit_transaction_) {
|
||||
throw QueryException("Nested transactions are not supported.");
|
||||
}
|
||||
result_stream->Header({});
|
||||
result_stream->Summary({});
|
||||
in_explicit_transaction_ = true;
|
||||
expect_rollback_ = false;
|
||||
return;
|
||||
return {};
|
||||
} else if (query_upper == "COMMIT") {
|
||||
if (!in_explicit_transaction_) {
|
||||
throw QueryException("No current transaction to commit.");
|
||||
@ -39,21 +41,17 @@ class TransactionEngine final {
|
||||
"error. Please invoke a rollback instead.");
|
||||
}
|
||||
Commit();
|
||||
result_stream->Header({});
|
||||
result_stream->Summary({});
|
||||
expect_rollback_ = false;
|
||||
in_explicit_transaction_ = false;
|
||||
return;
|
||||
return {};
|
||||
} else if (query_upper == "ROLLBACK") {
|
||||
if (!in_explicit_transaction_) {
|
||||
throw QueryException("No current transaction to rollback.");
|
||||
}
|
||||
Abort();
|
||||
result_stream->Header({});
|
||||
result_stream->Summary({});
|
||||
expect_rollback_ = false;
|
||||
in_explicit_transaction_ = false;
|
||||
return;
|
||||
return {};
|
||||
}
|
||||
|
||||
// Any other query in an explicit transaction block advances the command.
|
||||
@ -63,11 +61,28 @@ class TransactionEngine final {
|
||||
if (!db_accessor_)
|
||||
db_accessor_ = std::make_unique<database::GraphDbAccessor>(db_);
|
||||
|
||||
// Interpret the query and stream the results
|
||||
// Interpret the query and return the headers.
|
||||
try {
|
||||
interpreter_(query, *db_accessor_, params, in_explicit_transaction_)
|
||||
.PullAll(*result_stream);
|
||||
results_.emplace(
|
||||
interpreter_(query, *db_accessor_, params, in_explicit_transaction_));
|
||||
return results_->header();
|
||||
} catch (const utils::BasicException &) {
|
||||
AbortCommand();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
template <typename TStream>
|
||||
std::map<std::string, TypedValue> PullAll(TStream *result_stream) {
|
||||
// If we don't have any results (eg. a transaction command preceeded),
|
||||
// return an empty summary.
|
||||
if (UNLIKELY(!results_)) return {};
|
||||
|
||||
// Stream all results and return the summary.
|
||||
try {
|
||||
results_->PullAll(*result_stream);
|
||||
if (!in_explicit_transaction_) Commit();
|
||||
return results_->summary();
|
||||
} catch (const utils::BasicException &) {
|
||||
AbortCommand();
|
||||
throw;
|
||||
@ -83,9 +98,8 @@ class TransactionEngine final {
|
||||
private:
|
||||
database::MasterBase &db_;
|
||||
Interpreter &interpreter_;
|
||||
// GraphDbAccessor of active transaction in the session, can be null if
|
||||
// there is no associated transaction.
|
||||
std::unique_ptr<database::GraphDbAccessor> db_accessor_;
|
||||
std::experimental::optional<query::Interpreter::Results> results_;
|
||||
bool in_explicit_transaction_{false};
|
||||
bool expect_rollback_{false};
|
||||
|
||||
|
@ -240,9 +240,6 @@ target_link_libraries(${test_prefix}bolt_chunked_encoder_buffer mg-communication
|
||||
add_unit_test(bolt_decoder.cpp)
|
||||
target_link_libraries(${test_prefix}bolt_decoder mg-communication)
|
||||
|
||||
add_unit_test(bolt_result_stream.cpp)
|
||||
target_link_libraries(${test_prefix}bolt_result_stream mg-communication)
|
||||
|
||||
add_unit_test(bolt_session.cpp)
|
||||
target_link_libraries(${test_prefix}bolt_session mg-communication)
|
||||
|
||||
|
@ -5,29 +5,25 @@
|
||||
using BufferT = communication::bolt::ChunkedEncoderBuffer<TestOutputStream>;
|
||||
|
||||
// constants
|
||||
using communication::bolt::CHUNK_END_MARKER_SIZE;
|
||||
using communication::bolt::CHUNK_HEADER_SIZE;
|
||||
using communication::bolt::MAX_CHUNK_SIZE;
|
||||
using communication::bolt::WHOLE_CHUNK_SIZE;
|
||||
using communication::bolt::kChunkHeaderSize;
|
||||
using communication::bolt::kChunkMaxDataSize;
|
||||
using communication::bolt::kChunkWholeSize;
|
||||
|
||||
// test data
|
||||
constexpr const int TEST_DATA_SIZE = 100000;
|
||||
uint8_t test_data[TEST_DATA_SIZE];
|
||||
constexpr const int kTestDataSize = 100000;
|
||||
uint8_t test_data[kTestDataSize];
|
||||
|
||||
/**
|
||||
* Verifies a single chunk. The chunk should be constructed from header
|
||||
* (chunk size), data and end marker. The header is two bytes long number
|
||||
* written in big endian format. Data is array of elements from test_data
|
||||
* which max size is 0xFFFF. The end marker is always two bytes long array of
|
||||
* two zeros.
|
||||
* Verifies a single chunk. The chunk should be constructed from a header
|
||||
* (chunk size) and data. The header is a two byte long number written in big
|
||||
* endian format. Data is an array of elements from test_data whose max size is
|
||||
* 0xFFFF.
|
||||
*
|
||||
* @param data pointer on data array (array of bytes)
|
||||
* @param size of data array
|
||||
* @param offset offset from the begining of the test data
|
||||
* @param final_chunk if set to true then check for 0x00 0x00 after the chunk
|
||||
*/
|
||||
void VerifyChunkOfTestData(uint8_t *data, int size, uint64_t offset = 0,
|
||||
bool final_chunk = true) {
|
||||
void VerifyChunkOfTestData(uint8_t *data, int size, uint64_t offset = 0) {
|
||||
// first two bytes are size (big endian)
|
||||
uint8_t lower_byte = size & 0xFF;
|
||||
uint8_t higher_byte = (size & 0xFF00) >> 8;
|
||||
@ -37,14 +33,7 @@ void VerifyChunkOfTestData(uint8_t *data, int size, uint64_t offset = 0,
|
||||
// in the data array should be size number of ones
|
||||
// the header is skipped
|
||||
for (auto i = 0; i < size; ++i) {
|
||||
ASSERT_EQ(data[i + CHUNK_HEADER_SIZE], test_data[i + offset]);
|
||||
}
|
||||
|
||||
// last two bytes should be zeros
|
||||
// next to header and data
|
||||
if (final_chunk) {
|
||||
ASSERT_EQ(data[CHUNK_HEADER_SIZE + size], 0x00);
|
||||
ASSERT_EQ(data[CHUNK_HEADER_SIZE + size + 1], 0x00);
|
||||
ASSERT_EQ(data[i + kChunkHeaderSize], test_data[i + offset]);
|
||||
}
|
||||
}
|
||||
|
||||
@ -60,7 +49,7 @@ TEST(BoltChunkedEncoderBuffer, OneSmallChunk) {
|
||||
buffer.Flush();
|
||||
|
||||
// check the output array
|
||||
// the array should look like: [0, 100, first 100 bytes of test data, 0, 0]
|
||||
// the array should look like: [0, 100, first 100 bytes of test data]
|
||||
VerifyChunkOfTestData(output_stream.output.data(), size);
|
||||
}
|
||||
|
||||
@ -74,18 +63,17 @@ TEST(BoltChunkedEncoderBuffer, TwoSmallChunks) {
|
||||
|
||||
// write into buffer
|
||||
buffer.Write(test_data, size1);
|
||||
buffer.Chunk();
|
||||
buffer.Flush();
|
||||
buffer.Write(test_data + size1, size2);
|
||||
buffer.Flush();
|
||||
|
||||
// check the output array
|
||||
// the output array should look like this:
|
||||
// [0, 100, first 100 bytes of test data, 0, 0] +
|
||||
// [0, 100, second 100 bytes of test data, 0, 0]
|
||||
// [0, 100, first 100 bytes of test data] +
|
||||
// [0, 100, second 100 bytes of test data]
|
||||
auto data = output_stream.output.data();
|
||||
VerifyChunkOfTestData(data, size1);
|
||||
VerifyChunkOfTestData(
|
||||
data + CHUNK_HEADER_SIZE + size1 + CHUNK_END_MARKER_SIZE, size2, size1);
|
||||
VerifyChunkOfTestData(data + kChunkHeaderSize + size1, size2, size1);
|
||||
}
|
||||
|
||||
TEST(BoltChunkedEncoderBuffer, OneAndAHalfOfMaxChunk) {
|
||||
@ -94,21 +82,21 @@ TEST(BoltChunkedEncoderBuffer, OneAndAHalfOfMaxChunk) {
|
||||
BufferT buffer(output_stream);
|
||||
|
||||
// write into buffer
|
||||
buffer.Write(test_data, TEST_DATA_SIZE);
|
||||
buffer.Write(test_data, kTestDataSize);
|
||||
buffer.Flush();
|
||||
|
||||
// check the output array
|
||||
// the output array should look like this:
|
||||
// [0xFF, 0xFF, first 65535 bytes of test data,
|
||||
// 0x86, 0xA1, 34465 bytes of test data after the first 65535 bytes, 0, 0]
|
||||
// 0x86, 0xA1, 34465 bytes of test data after the first 65535 bytes]
|
||||
auto output = output_stream.output.data();
|
||||
VerifyChunkOfTestData(output, MAX_CHUNK_SIZE, 0, false);
|
||||
VerifyChunkOfTestData(output + WHOLE_CHUNK_SIZE,
|
||||
TEST_DATA_SIZE - MAX_CHUNK_SIZE, MAX_CHUNK_SIZE);
|
||||
VerifyChunkOfTestData(output, kChunkMaxDataSize);
|
||||
VerifyChunkOfTestData(output + kChunkWholeSize,
|
||||
kTestDataSize - kChunkMaxDataSize, kChunkMaxDataSize);
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
InitializeData(test_data, TEST_DATA_SIZE);
|
||||
InitializeData(test_data, kTestDataSize);
|
||||
google::InitGoogleLogging(argv[0]);
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
|
@ -68,8 +68,7 @@ class TestBuffer {
|
||||
: output_stream_(output_stream) {}
|
||||
|
||||
void Write(const uint8_t *data, size_t n) { output_stream_.Write(data, n); }
|
||||
void Chunk() {}
|
||||
bool Flush() { return true; }
|
||||
bool Flush(bool have_more = false) { return true; }
|
||||
|
||||
private:
|
||||
TestOutputStream &output_stream_;
|
||||
|
@ -1,57 +0,0 @@
|
||||
#include "bolt_common.hpp"
|
||||
|
||||
#include "communication/bolt/v1/encoder/chunked_encoder_buffer.hpp"
|
||||
#include "communication/bolt/v1/encoder/encoder.hpp"
|
||||
#include "communication/bolt/v1/encoder/result_stream.hpp"
|
||||
|
||||
using communication::bolt::DecodedValue;
|
||||
|
||||
using BufferT = communication::bolt::ChunkedEncoderBuffer<TestOutputStream>;
|
||||
using EncoderT = communication::bolt::Encoder<BufferT>;
|
||||
using ResultStreamT = communication::bolt::ResultStream<EncoderT>;
|
||||
|
||||
const uint8_t header_output[] =
|
||||
"\x00\x29\xB1\x70\xA1\x86\x66\x69\x65\x6C\x64\x73\x9A\x82\x61\x61\x82\x62"
|
||||
"\x62\x82\x63\x63\x82\x64\x64\x82\x65\x65\x82\x66\x66\x82\x67\x67\x82\x68"
|
||||
"\x68\x82\x69\x69\x82\x6A\x6A\x00\x00";
|
||||
const uint8_t result_output[] =
|
||||
"\x00\x0A\xB1\x71\x92\x05\x85\x68\x65\x6C\x6C\x6F\x00\x00";
|
||||
const uint8_t summary_output[] =
|
||||
"\x00\x0C\xB1\x70\xA1\x87\x63\x68\x61\x6E\x67\x65\x64\x0A\x00\x00";
|
||||
|
||||
TEST(Bolt, ResultStream) {
|
||||
TestOutputStream output_stream;
|
||||
BufferT buffer(output_stream);
|
||||
EncoderT encoder(buffer);
|
||||
ResultStreamT result_stream(encoder);
|
||||
std::vector<uint8_t> &output = output_stream.output;
|
||||
|
||||
std::vector<std::string> headers;
|
||||
for (int i = 0; i < 10; ++i)
|
||||
headers.push_back(std::string(2, (char)('a' + i)));
|
||||
|
||||
result_stream.Header(headers);
|
||||
buffer.FlushFirstChunk();
|
||||
PrintOutput(output);
|
||||
CheckOutput(output, header_output, 45);
|
||||
|
||||
std::vector<DecodedValue> result{DecodedValue(5),
|
||||
DecodedValue(std::string("hello"))};
|
||||
result_stream.Result(result);
|
||||
buffer.Flush();
|
||||
PrintOutput(output);
|
||||
CheckOutput(output, result_output, 14);
|
||||
|
||||
std::map<std::string, DecodedValue> summary;
|
||||
summary.insert(std::make_pair(std::string("changed"), DecodedValue(10)));
|
||||
result_stream.Summary(summary);
|
||||
buffer.Flush();
|
||||
PrintOutput(output);
|
||||
CheckOutput(output, summary_output, 16);
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
google::InitGoogleLogging(argv[0]);
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
@ -18,32 +18,42 @@ class TestSessionData {};
|
||||
|
||||
class TestSession : public Session<TestInputStream, TestOutputStream> {
|
||||
public:
|
||||
using Session<TestInputStream, TestOutputStream>::ResultStreamT;
|
||||
using Session<TestInputStream, TestOutputStream>::TEncoder;
|
||||
|
||||
TestSession(TestSessionData &data, TestInputStream &input_stream,
|
||||
TestOutputStream &output_stream)
|
||||
: Session<TestInputStream, TestOutputStream>(input_stream,
|
||||
output_stream) {}
|
||||
|
||||
void PullAll(const std::string &query,
|
||||
const std::map<std::string, DecodedValue> ¶ms,
|
||||
ResultStreamT *result_stream) override {
|
||||
if (query == kQueryReturn42) {
|
||||
result_stream->Header({"result_name"});
|
||||
result_stream->Result(std::vector<DecodedValue>{42});
|
||||
result_stream->Summary({});
|
||||
} else if (query == kQueryEmpty) {
|
||||
result_stream->Header({"result_name"});
|
||||
result_stream->Summary({});
|
||||
std::vector<std::string> Interpret(
|
||||
const std::string &query,
|
||||
const std::map<std::string, DecodedValue> ¶ms) override {
|
||||
if (query == kQueryReturn42 || query == kQueryEmpty) {
|
||||
query_ = query;
|
||||
return {"result_name"};
|
||||
} else {
|
||||
query_ = "";
|
||||
throw ClientError("client sent invalid query");
|
||||
}
|
||||
}
|
||||
|
||||
std::map<std::string, DecodedValue> PullAll(
|
||||
TEncoder *encoder) override {
|
||||
if (query_ == kQueryReturn42) {
|
||||
encoder->MessageRecord(std::vector<DecodedValue>{42});
|
||||
return {};
|
||||
} else if (query_ == kQueryEmpty) {
|
||||
return {};
|
||||
} else {
|
||||
throw ClientError("client sent invalid query");
|
||||
}
|
||||
}
|
||||
|
||||
void Abort() override {}
|
||||
};
|
||||
|
||||
using ResultStreamT = TestSession::ResultStreamT;
|
||||
private:
|
||||
std::string query_;
|
||||
};
|
||||
|
||||
// TODO: This could be done in fixture.
|
||||
// Shortcuts for writing variable initializations in tests
|
||||
|
@ -39,9 +39,12 @@ class DistributedInterpretationTest : public DistributedGraphDbTest {
|
||||
|
||||
auto RunWithDba(const std::string &query, GraphDbAccessor &dba) {
|
||||
std::map<std::string, query::TypedValue> params = {};
|
||||
ResultStreamFaker<query::TypedValue> result;
|
||||
interpreter_.value()(query, dba, params, false).PullAll(result);
|
||||
return result.GetResults();
|
||||
ResultStreamFaker<query::TypedValue> stream;
|
||||
auto results = interpreter_.value()(query, dba, params, false);
|
||||
stream.Header(results.header());
|
||||
results.PullAll(stream);
|
||||
stream.Summary(results.summary());
|
||||
return stream.GetResults();
|
||||
}
|
||||
|
||||
auto Run(const std::string &query) {
|
||||
|
@ -20,9 +20,12 @@ class InterpreterTest : public ::testing::Test {
|
||||
auto Interpret(const std::string &query,
|
||||
const std::map<std::string, query::TypedValue> ¶ms = {}) {
|
||||
database::GraphDbAccessor dba(db_);
|
||||
ResultStreamFaker<query::TypedValue> result;
|
||||
interpreter_(query, dba, params, false).PullAll(result);
|
||||
return result;
|
||||
ResultStreamFaker<query::TypedValue> stream;
|
||||
auto results = interpreter_(query, dba, params, false);
|
||||
stream.Header(results.header());
|
||||
results.PullAll(stream);
|
||||
stream.Summary(results.summary());
|
||||
return stream;
|
||||
}
|
||||
};
|
||||
|
||||
@ -198,11 +201,13 @@ TEST_F(InterpreterTest, Bfs) {
|
||||
|
||||
database::GraphDbAccessor dba(db_);
|
||||
ResultStreamFaker<query::TypedValue> stream;
|
||||
interpreter_(
|
||||
auto results = interpreter_(
|
||||
"MATCH (n {id: 0})-[r *bfs..5 (e, n | n.reachable and "
|
||||
"e.reachable)]->(m) RETURN r",
|
||||
dba, {}, false)
|
||||
.PullAll(stream);
|
||||
dba, {}, false);
|
||||
stream.Header(results.header());
|
||||
results.PullAll(stream);
|
||||
stream.Summary(results.summary());
|
||||
|
||||
ASSERT_EQ(stream.GetHeader().size(), 1U);
|
||||
EXPECT_EQ(stream.GetHeader()[0], "r");
|
||||
@ -264,9 +269,11 @@ TEST_F(InterpreterTest, ShortestPath) {
|
||||
|
||||
ResultStreamFaker<query::TypedValue> stream;
|
||||
database::GraphDbAccessor dba(db_);
|
||||
interpreter_("MATCH (n)-[e *wshortest 5 (e, n | e.w) ]->(m) return e", dba,
|
||||
{}, false)
|
||||
.PullAll(stream);
|
||||
auto results = interpreter_(
|
||||
"MATCH (n)-[e *wshortest 5 (e, n | e.w) ]->(m) return e", dba, {}, false);
|
||||
stream.Header(results.header());
|
||||
results.PullAll(stream);
|
||||
stream.Summary(results.summary());
|
||||
|
||||
ASSERT_EQ(stream.GetHeader().size(), 1U);
|
||||
EXPECT_EQ(stream.GetHeader()[0], "e");
|
||||
|
@ -39,9 +39,12 @@ class QueryExecution : public testing::Test {
|
||||
/** Executes the query and returns the results.
|
||||
* Does NOT commit the transaction */
|
||||
auto Execute(const std::string &query) {
|
||||
ResultStreamFaker<query::TypedValue> results;
|
||||
query::Interpreter{*db_}(query, *dba_, {}, false).PullAll(results);
|
||||
return results.GetResults();
|
||||
ResultStreamFaker<query::TypedValue> stream;
|
||||
auto results = query::Interpreter{*db_}(query, *dba_, {}, false);
|
||||
stream.Header(results.header());
|
||||
results.PullAll(stream);
|
||||
stream.Summary(results.summary());
|
||||
return stream.GetResults();
|
||||
}
|
||||
};
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user