memgraph/include/communication/bolt/v1/transport/chunked_encoder.hpp

93 lines
1.8 KiB
C++
Raw Normal View History

2016-08-02 05:14:09 +08:00
#pragma once
#include <array>
#include <cstring>
#include <functional>
#include "communication/bolt/v1/config.hpp"
#include "logging/default.hpp"
#include "utils/likely.hpp"
2016-08-02 05:14:09 +08:00
namespace bolt
{
template <class Stream>
class ChunkedEncoder
{
static constexpr size_t N = bolt::config::N;
static constexpr size_t C = bolt::config::C;
2016-08-02 05:14:09 +08:00
public:
using byte = unsigned char;
ChunkedEncoder(Stream &stream)
: logger(logging::log->logger("Chunked Encoder")), stream(stream)
{
}
2016-08-02 05:14:09 +08:00
static constexpr size_t chunk_size = N - 2;
void write(byte value)
{
if (UNLIKELY(pos == N)) write_chunk();
2016-08-02 05:14:09 +08:00
chunk[pos++] = value;
}
void write(const byte *values, size_t n)
2016-08-02 05:14:09 +08:00
{
logger.trace("write {} bytes", n);
while (n > 0) {
2016-08-02 05:14:09 +08:00
auto size = n < N - pos ? n : N - pos;
std::memcpy(chunk.data() + pos, values, size);
pos += size;
n -= size;
// TODO: see how bolt splits message over more TCP packets,
// test for more TCP packets
if (pos == N) write_chunk();
2016-08-02 05:14:09 +08:00
}
}
void write_chunk()
2016-08-02 05:14:09 +08:00
{
write_chunk_header();
// write two zeros to signal message end
chunk[pos++] = 0x00;
chunk[pos++] = 0x00;
flush();
2016-08-02 05:14:09 +08:00
}
private:
Logger logger;
2016-08-02 05:14:09 +08:00
std::reference_wrapper<Stream> stream;
std::array<byte, C> chunk;
size_t pos{2};
2016-08-02 05:14:09 +08:00
void write_chunk_header()
{
// write the size of the chunk
uint16_t size = pos - 2;
// write the higher byte
chunk[0] = size >> 8;
// write the lower byte
chunk[1] = size & 0xFF;
}
void flush()
2016-08-02 05:14:09 +08:00
{
// write chunk to the stream
stream.get().write(chunk.data(), pos);
pos = 2;
}
};
}