diff --git a/src/bolt/v1/bolt.cpp b/src/bolt/v1/bolt.cpp new file mode 100644 index 000000000..77dbe6d18 --- /dev/null +++ b/src/bolt/v1/bolt.cpp @@ -0,0 +1,26 @@ +#include "bolt.hpp" + +#include "session.hpp" +#include + +namespace bolt +{ + +Bolt::Bolt() +{ +} + +Session* Bolt::create_session(io::Socket&& socket) +{ + // TODO fix session lifecycle handling + // dangling pointers are not cool :) + + return new Session(std::forward(socket), *this); +} + +void Bolt::close(Session* session) +{ + session->socket.close(); +} + +} diff --git a/src/bolt/v1/bolt.hpp b/src/bolt/v1/bolt.hpp new file mode 100644 index 000000000..3b42a200e --- /dev/null +++ b/src/bolt/v1/bolt.hpp @@ -0,0 +1,24 @@ +#pragma once + +#include "states.hpp" +#include "io/network/socket.hpp" + +namespace bolt +{ + +class Session; + +class Bolt +{ + friend class Session; + +public: + Bolt(); + + Session* create_session(io::Socket&& socket); + void close(Session* session); + + States states; +}; + +} diff --git a/src/bolt/v1/messaging/codes.hpp b/src/bolt/v1/messaging/codes.hpp new file mode 100644 index 000000000..19083199c --- /dev/null +++ b/src/bolt/v1/messaging/codes.hpp @@ -0,0 +1,46 @@ +#pragma once + +#include "utils/types/byte.hpp" +#include "utils/underlying_cast.hpp" + +namespace bolt +{ + +enum class MessageCode : byte +{ + Init = 0x01, + AckFailure = 0x0E, + Reset = 0x0F, + + Run = 0x10, + DiscardAll = 0x2F, + PullAll = 0x3F, + + Record = 0x71, + Success = 0x70, + Ignored = 0x7E, + Failure = 0x7F +}; + +inline bool operator==(byte value, MessageCode code) +{ + return value == underlying_cast(code); +} + +inline bool operator==(MessageCode code, byte value) +{ + return operator==(value, code); +} + +inline bool operator!=(byte value, MessageCode code) +{ + return !operator==(value, code); +} + +inline bool operator!=(MessageCode code, byte value) +{ + return operator!=(value, code); +} + + +} diff --git a/src/bolt/v1/messaging/messages.hpp b/src/bolt/v1/messaging/messages.hpp new file mode 100644 index 000000000..854f53936 --- /dev/null +++ b/src/bolt/v1/messaging/messages.hpp @@ -0,0 +1,8 @@ +#pragma once + +namespace bolt +{ + + + +} diff --git a/src/bolt/v1/packing/codes.hpp b/src/bolt/v1/packing/codes.hpp new file mode 100644 index 000000000..f963efb8c --- /dev/null +++ b/src/bolt/v1/packing/codes.hpp @@ -0,0 +1,54 @@ +#pragma once + +#include + +namespace bolt +{ + +namespace pack +{ + +enum Code : uint8_t +{ + TinyString = 0x80, + TinyList = 0x90, + TinyMap = 0xA0, + TinyStruct = 0xB0, + + Null = 0xC0, + + Float64 = 0xC1, + + False = 0xC2, + True = 0xC3, + + Int8 = 0xC8, + Int16 = 0xC9, + Int32 = 0xCA, + Int64 = 0xCB, + + Bytes8 = 0xCC, + Bytes16 = 0xCD, + Bytes32 = 0xCE, + + String8 = 0xD0, + String16 = 0xD1, + String32 = 0xD2, + + List8 = 0xD4, + List16 = 0xD5, + List32 = 0xD6, + + Map8 = 0xD8, + Map16 = 0xD9, + Map32 = 0xDA, + MapStream = 0xDB, + + Struct8 = 0xDC, + Struct16 = 0xDD, + EndOfStream = 0xDF, +}; + +} + +} diff --git a/src/bolt/v1/packing/types.hpp b/src/bolt/v1/packing/types.hpp new file mode 100644 index 000000000..d840c3f42 --- /dev/null +++ b/src/bolt/v1/packing/types.hpp @@ -0,0 +1,21 @@ +#pragma once + +namespace bolt +{ + +enum class PackType +{ + Null, // denotes absence of a value + Boolean, // denotes a type with two possible values (t/f) + Integer, // 64-bit signed integral number + Float, // 64-bit floating point number + Bytes, // binary data + String, // unicode string + List, // collection of values + Map, // collection of zero or more key/value pairs + Struct, // zero or more packstream values + EndOfStream, // denotes stream value end + Reserved // reserved for future use +}; + +} diff --git a/src/bolt/v1/server/server.hpp b/src/bolt/v1/server/server.hpp new file mode 100644 index 000000000..ea61154aa --- /dev/null +++ b/src/bolt/v1/server/server.hpp @@ -0,0 +1,67 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include "io/network/server.hpp" +#include "bolt/v1/bolt.hpp" + +namespace bolt +{ + +template +class Server : public io::Server> +{ +public: + Server(io::Socket&& socket) + : io::Server>(std::forward(socket)) {} + + void start(size_t n) + { + workers.reserve(n); + + for(size_t i = 0; i < n; ++i) + { + workers.push_back(std::make_shared(bolt)); + workers.back()->start(alive); + } + + while(alive) + { + this->wait_and_process_events(); + } + } + + void shutdown() + { + alive.store(false); + + for(auto& worker : workers) + worker->thread.join(); + } + + void on_connect() + { + assert(idx < workers.size()); + + if(UNLIKELY(!workers[idx]->accept(this->socket))) + return; + + idx = idx == workers.size() - 1 ? 0 : idx + 1; + } + + void on_wait_timeout() {} + +private: + Bolt bolt; + + std::vector workers; + std::atomic alive {true}; + + int idx {0}; +}; + +} diff --git a/src/bolt/v1/server/worker.hpp b/src/bolt/v1/server/worker.hpp new file mode 100644 index 000000000..eb475e135 --- /dev/null +++ b/src/bolt/v1/server/worker.hpp @@ -0,0 +1,112 @@ +#pragma once + +#include +#include +#include +#include + +#include +#include + +#include "io/network/stream_reader.hpp" + +#include "bolt/v1/bolt.hpp" +#include "bolt/v1/session.hpp" + +#include "logging/default.hpp" + +namespace bolt +{ + +template +class Server; + +class Worker : public io::StreamReader +{ + friend class bolt::Server; + +public: + using sptr = std::shared_ptr; + + Worker(Bolt& bolt) : bolt(bolt) + { + logger = logging::log->logger("Network"); + } + + Session& on_connect(io::Socket&& socket) + { + logger.trace("Accepting connection on socket {}", socket.id()); + + return *bolt.get().create_session(std::forward(socket)); + } + + void on_error(Session&) + { + logger.trace("[on_error] errno = {}", errno); + +#ifndef NDEBUG + auto err = io::NetworkError(""); + logger.debug("{}", err.what()); +#endif + + logger.error("Error occured in this session"); + + } + + void on_wait_timeout() {} + + Buffer on_alloc(Session&) + { + /* logger.trace("[on_alloc] Allocating {}B", sizeof buf); */ + + return Buffer { buf, sizeof buf }; + } + + void on_read(Session& session, Buffer& buf) + { + logger.trace("[on_read] Received {}B", buf.len); + +#ifndef NDEBUG + std::stringstream stream; + + for(size_t i = 0; i < buf.len; ++i) + stream << fmt::format("{:02X} ", static_cast(buf.ptr[i])); + + logger.trace("[on_read] {}", stream.str()); +#endif + + try + { + session.execute(reinterpret_cast(buf.ptr), buf.len); + } + catch(const std::exception& e) + { + logger.error("Error occured while executing statement."); + logger.error("{}", e.what()); + } + } + + void on_close(Session& session) + { + logger.trace("[on_close] Client closed the connection"); + session.close(); + } + + char buf[65536]; + +protected: + std::reference_wrapper bolt; + + Logger logger; + std::thread thread; + + void start(std::atomic& alive) + { + thread = std::thread([&, this]() { + while(alive) + wait_and_process_events(); + }); + } +}; + +} diff --git a/src/bolt/v1/session.cpp b/src/bolt/v1/session.cpp new file mode 100644 index 000000000..dd437cdf0 --- /dev/null +++ b/src/bolt/v1/session.cpp @@ -0,0 +1,54 @@ +#include "session.hpp" + +namespace bolt +{ + +Session::Session(io::Socket&& socket, Bolt& bolt) + : Stream(std::forward(socket)), bolt(bolt) +{ + logger = logging::log->logger("Session"); + + // start with a handshake state + state = bolt.states.handshake.get(); +} + +bool Session::alive() const +{ + return state != nullptr; +} + +void Session::execute(const byte* data, size_t len) +{ + // mark the end of the message + auto end = data + len; + + while(true) + { + auto size = end - data; + + if(LIKELY(connected)) + { + logger.debug("Decoding chunk of size {}", size); + auto finished = decoder.decode(data, size); + + if(!finished) + return; + } + else + { + logger.debug("Decoding handshake of size {}", size); + decoder.handshake(data, size); + } + + state = state->run(*this); + decoder.reset(); + } +} + +void Session::close() +{ + logger.debug("Closing session"); + bolt.close(this); +} + +} diff --git a/src/bolt/v1/session.hpp b/src/bolt/v1/session.hpp new file mode 100644 index 000000000..9bfa48886 --- /dev/null +++ b/src/bolt/v1/session.hpp @@ -0,0 +1,42 @@ +#pragma once + +#include "io/network/tcp/stream.hpp" +#include "io/network/socket.hpp" + +#include "bolt/v1/states/state.hpp" + +#include "bolt/v1/transport/bolt_decoder.hpp" +#include "bolt/v1/transport/bolt_encoder.hpp" + +#include "bolt.hpp" +#include "logging/default.hpp" + +namespace bolt +{ + +class Session : public io::tcp::Stream +{ +public: + using Decoder = BoltDecoder; + using Encoder = BoltEncoder; + + Session(io::Socket&& socket, Bolt& bolt); + + bool alive() const; + + void execute(const byte* data, size_t len); + void close(); + + Bolt& bolt; + + Decoder decoder; + Encoder encoder {socket}; + + bool connected {false}; + State* state; + +protected: + Logger logger; +}; + +} diff --git a/src/bolt/v1/states.cpp b/src/bolt/v1/states.cpp new file mode 100644 index 000000000..a7e0a9974 --- /dev/null +++ b/src/bolt/v1/states.cpp @@ -0,0 +1,17 @@ +#include "states.hpp" + +#include "states/handshake.hpp" +#include "states/init.hpp" +#include "states/executor.hpp" + +namespace bolt +{ + +States::States() +{ + handshake = std::make_unique(); + init = std::make_unique(); + executor = std::make_unique(); +} + +} diff --git a/src/bolt/v1/states.hpp b/src/bolt/v1/states.hpp new file mode 100644 index 000000000..c323214d4 --- /dev/null +++ b/src/bolt/v1/states.hpp @@ -0,0 +1,19 @@ +#pragma once + +#include "states/state.hpp" +#include "logging/log.hpp" + +namespace bolt +{ + +class States +{ +public: + States(); + + State::uptr handshake; + State::uptr init; + State::uptr executor; +}; + +} diff --git a/src/bolt/v1/states/error.cpp b/src/bolt/v1/states/error.cpp new file mode 100644 index 000000000..c482a1d8f --- /dev/null +++ b/src/bolt/v1/states/error.cpp @@ -0,0 +1,32 @@ +#include "error.hpp" + +#include "bolt/v1/session.hpp" + +namespace bolt +{ + +State* Error::run(Session& session) +{ + auto message_type = session.decoder.read_byte(); + + if(message_type == MessageCode::AckFailure) + { + // todo reset current statement? is it even necessary? + + return session.bolt.states.executor.get(); + } + else if(message_type == MessageCode::Reset) + { + // todo rollback current transaction + // discard all records waiting to be sent + + return session.bolt.states.executor.get(); + } + + session.encoder.message_ignored(); + session.encoder.flush(); + + return this; +} + +} diff --git a/src/bolt/v1/states/error.hpp b/src/bolt/v1/states/error.hpp new file mode 100644 index 000000000..0ef5964bf --- /dev/null +++ b/src/bolt/v1/states/error.hpp @@ -0,0 +1,14 @@ +#pragma once + +#include "bolt/v1/states/state.hpp" + +namespace bolt +{ + +class Error : public State +{ +public: + State* run(Session& session) override; +}; + +} diff --git a/src/bolt/v1/states/executor.cpp b/src/bolt/v1/states/executor.cpp new file mode 100644 index 000000000..7306618ab --- /dev/null +++ b/src/bolt/v1/states/executor.cpp @@ -0,0 +1,95 @@ +#include "executor.hpp" + +#include "bolt/v1/messaging/codes.hpp" + +namespace bolt +{ + +Executor::Executor() : logger(logging::log->logger("Executor")) {} + +State* Executor::run(Session& session) +{ + // just read one byte that represents the struct type, we can skip the + // information contained in this byte + session.decoder.read_byte(); + + auto message_type = session.decoder.read_byte(); + + if(message_type == MessageCode::Run) + { + Query q; + + q.statement = session.decoder.read_string(); + + this->run(session, q); + } + else if(message_type == MessageCode::PullAll) + { + pull_all(session); + } + else if(message_type == MessageCode::DiscardAll) + { + discard_all(session); + } + else if(message_type == MessageCode::Reset) + { + // todo rollback current transaction + // discard all records waiting to be sent + + return this; + } + else + { + logger.error("Unrecognized message recieved"); + logger.debug("Invalid message type 0x{:02X}", message_type); + + return session.bolt.states.error.get(); + } + + return this; +} + +void Executor::run(Session& session, Query& query) +{ + logger.trace("[Run] '{}'", query.statement); + + session.encoder.message_success(); + session.encoder.write_map_header(1); + + session.encoder.write_string("fields"); + session.encoder.write_list_header(1); + session.encoder.write_string("name"); + + session.encoder.flush(); +} + +void Executor::pull_all(Session& session) +{ + logger.trace("[PullAll]"); + + session.encoder.message_record(); + session.encoder.write_list_header(1); + session.encoder.write_string("buda"); + + session.encoder.message_record(); + session.encoder.write_list_header(1); + session.encoder.write_string("domko"); + + session.encoder.message_record(); + session.encoder.write_list_header(1); + session.encoder.write_string("max"); + + session.encoder.message_success_empty(); + + session.encoder.flush(); +} + +void Executor::discard_all(Session& session) +{ + logger.trace("[DiscardAll]"); + + session.encoder.message_success(); + session.encoder.flush(); +} + +} diff --git a/src/bolt/v1/states/executor.hpp b/src/bolt/v1/states/executor.hpp new file mode 100644 index 000000000..5b1127a3a --- /dev/null +++ b/src/bolt/v1/states/executor.hpp @@ -0,0 +1,41 @@ +#pragma once + +#include "bolt/v1/states/state.hpp" +#include "bolt/v1/session.hpp" + +namespace bolt +{ + +class Executor : public State +{ + struct Query + { + std::string statement; + }; + +public: + Executor(); + + State* run(Session& session) override final; + +protected: + Logger logger; + + /* Execute an incoming query + * + */ + void run(Session& session, Query& query); + + /* Send all remaining results to the client + * + */ + void pull_all(Session& session); + + /* Discard all remaining results + * + */ + void discard_all(Session& session); +}; + +} + diff --git a/src/bolt/v1/states/handshake.cpp b/src/bolt/v1/states/handshake.cpp new file mode 100644 index 000000000..b685381d6 --- /dev/null +++ b/src/bolt/v1/states/handshake.cpp @@ -0,0 +1,27 @@ +#include "handshake.hpp" + +#include "bolt/v1/session.hpp" + +namespace bolt +{ + +static constexpr uint32_t preamble = 0x6060B017; + +static constexpr byte protocol[4] = {0x00, 0x00, 0x00, 0x01}; + +State* Handshake::run(Session& session) +{ + if(UNLIKELY(session.decoder.read_uint32() != preamble)) + return nullptr; + + // TODO so far we only support version 1 of the protocol so it doesn't + // make sense to check which version the client prefers + // this will change in the future + + session.connected = true; + session.socket.write(protocol, sizeof protocol); + + return session.bolt.states.init.get(); +} + +} diff --git a/src/bolt/v1/states/handshake.hpp b/src/bolt/v1/states/handshake.hpp new file mode 100644 index 000000000..fe6d7052a --- /dev/null +++ b/src/bolt/v1/states/handshake.hpp @@ -0,0 +1,14 @@ +#pragma once + +#include "bolt/v1/states/state.hpp" + +namespace bolt +{ + +class Handshake : public State +{ +public: + State* run(Session& session) override; +}; + +} diff --git a/src/bolt/v1/states/init.cpp b/src/bolt/v1/states/init.cpp new file mode 100644 index 000000000..e1f30f690 --- /dev/null +++ b/src/bolt/v1/states/init.cpp @@ -0,0 +1,54 @@ +#include "init.hpp" + +#include "bolt/v1/session.hpp" +#include "bolt/v1/messaging/codes.hpp" + +#include "utils/likely.hpp" + +namespace bolt +{ + +Init::Init() : MessageParser(logging::log->logger("Init")) {} + +State* Init::parse(Session& session, Message& message) +{ + auto struct_type = session.decoder.read_byte(); + + if(UNLIKELY(struct_type != 0xB2)) + { + logger.debug("{}", struct_type); + + logger.debug("Expected struct marker 0xB2 instead of 0x{:02X}", + (unsigned)struct_type); + + return nullptr; + } + + auto message_type = session.decoder.read_byte(); + + if(UNLIKELY(message_type != MessageCode::Init)) + { + logger.debug("Expected Init (0x01) instead of (0x{:02X})", + (unsigned)message_type); + + return nullptr; + } + + message.client_name = session.decoder.read_string(); + + // TODO read authentication tokens + + return this; +} + +State* Init::execute(Session& session, Message& message) +{ + logger.debug("Client connected '{}'", message.client_name); + + session.encoder.message_success_empty(); + session.encoder.flush(); + + return session.bolt.states.executor.get(); +} + +} diff --git a/src/bolt/v1/states/init.hpp b/src/bolt/v1/states/init.hpp new file mode 100644 index 000000000..dce0c2b83 --- /dev/null +++ b/src/bolt/v1/states/init.hpp @@ -0,0 +1,22 @@ +#pragma once + +#include "bolt/v1/states/message_parser.hpp" + +namespace bolt +{ + +class Init : public MessageParser +{ +public: + struct Message + { + std::string client_name; + }; + + Init(); + + State* parse(Session& session, Message& message); + State* execute(Session& session, Message& message); +}; + +} diff --git a/src/bolt/v1/states/message_parser.hpp b/src/bolt/v1/states/message_parser.hpp new file mode 100644 index 000000000..ff8b6952c --- /dev/null +++ b/src/bolt/v1/states/message_parser.hpp @@ -0,0 +1,37 @@ +#pragma once + +#include "state.hpp" +#include "utils/crtp.hpp" + +#include "bolt/v1/session.hpp" + +namespace bolt +{ + +template +class MessageParser : public State, public Crtp +{ +public: + MessageParser(Logger&& logger) + : logger(std::forward(logger)) {} + + State* run(Session& session) override final + { + typename Derived::Message message; + + logger.debug("Parsing message"); + auto next = this->derived().parse(session, message); + + // return next state if parsing was unsuccessful (i.e. error state) + if(next != &this->derived()) + return next; + + logger.debug("Executing state"); + return this->derived().execute(session, message); + } + +protected: + Logger logger; +}; + +} diff --git a/src/bolt/v1/states/state.hpp b/src/bolt/v1/states/state.hpp new file mode 100644 index 000000000..50a38494c --- /dev/null +++ b/src/bolt/v1/states/state.hpp @@ -0,0 +1,23 @@ +#pragma once + +#include +#include +#include + +namespace bolt +{ + +class Session; + +class State +{ +public: + using uptr = std::unique_ptr; + + State() = default; + virtual ~State() = default; + + virtual State* run(Session& session) = 0; +}; + +} diff --git a/src/bolt/v1/transport/bolt_decoder.cpp b/src/bolt/v1/transport/bolt_decoder.cpp new file mode 100644 index 000000000..9e16c2264 --- /dev/null +++ b/src/bolt/v1/transport/bolt_decoder.cpp @@ -0,0 +1,158 @@ +#include "bolt_decoder.hpp" + +#include + +#include "utils/bswap.hpp" +#include "logging/default.hpp" + +#include "bolt/v1/packing/codes.hpp" + +namespace bolt +{ + +void BoltDecoder::handshake(const byte*& data, size_t len) +{ + buffer.write(data, len); + data += len; +} + +bool BoltDecoder::decode(const byte*& data, size_t len) +{ + return decoder(data, len); +} + +bool BoltDecoder::empty() const +{ + return pos == buffer.size(); +} + +void BoltDecoder::reset() +{ + buffer.clear(); + pos = 0; +} + +byte BoltDecoder::peek() const +{ + return buffer[pos]; +} + +byte BoltDecoder::read_byte() +{ + return buffer[pos++]; +} + +void BoltDecoder::read_bytes(void* dest, size_t n) +{ + std::memcpy(dest, buffer.data() + pos, n); + pos += n; +} + +template +T parse(const void* data) +{ + // reinterpret bytes as the target value + auto value = reinterpret_cast(data); + + // swap values to little endian + return bswap(*value); +} + +template +T parse(Buffer& buffer, size_t& pos) +{ + // get a pointer to the data we're converting + auto ptr = buffer.data() + pos; + + // skip sizeof bytes that we're going to read + pos += sizeof(T); + + // read and convert the value + return parse(ptr); +} + +int16_t BoltDecoder::read_int16() +{ + return parse(buffer, pos); +} + +uint16_t BoltDecoder::read_uint16() +{ + return parse(buffer, pos); +} + +int32_t BoltDecoder::read_int32() +{ + return parse(buffer, pos); +} + +uint32_t BoltDecoder::read_uint32() +{ + return parse(buffer, pos); +} + +int64_t BoltDecoder::read_int64() +{ + return parse(buffer, pos); +} + +uint64_t BoltDecoder::read_uint64() +{ + return parse(buffer, pos); +} + +double BoltDecoder::read_float64() +{ + auto v = parse(buffer, pos); + return *reinterpret_cast(&v); +} + +std::string BoltDecoder::read_string() +{ + auto marker = read_byte(); + + std::string res; + uint32_t size; + + // if the first 4 bits equal to 1000 (0x8), this is a tiny string + if((marker & 0xF0) == pack::TinyString) + { + // size is stored in the lower 4 bits of the marker byte + size = marker & 0x0F; + } + // if the marker is 0xD0, size is an 8-bit unsigned integer + if(marker == pack::String8) + { + size = read_byte(); + } + // if the marker is 0xD1, size is a 16-bit big-endian unsigned integer + else if(marker == pack::String16) + { + size = read_uint16(); + } + // if the marker is 0xD2, size is a 32-bit big-endian unsigned integer + else if(marker == pack::String32) + { + size = read_uint32(); + } + else + { + // TODO error? + return res; + } + + if(size == 0) + return res; + + res.append(reinterpret_cast(raw()), size); + pos += size; + + return res; +} + +const byte* BoltDecoder::raw() const +{ + return buffer.data() + pos; +} + +} diff --git a/src/bolt/v1/transport/bolt_decoder.hpp b/src/bolt/v1/transport/bolt_decoder.hpp new file mode 100644 index 000000000..c326f0548 --- /dev/null +++ b/src/bolt/v1/transport/bolt_decoder.hpp @@ -0,0 +1,45 @@ +#pragma once + +#include "buffer.hpp" +#include "chunked_decoder.hpp" + +#include "utils/types/byte.hpp" + +namespace bolt +{ + +class BoltDecoder +{ +public: + void handshake(const byte*& data, size_t len); + bool decode(const byte*& data, size_t len); + + bool empty() const; + void reset(); + + byte peek() const; + byte read_byte(); + void read_bytes(void* dest, size_t n); + + int16_t read_int16(); + uint16_t read_uint16(); + + int32_t read_int32(); + uint32_t read_uint32(); + + int64_t read_int64(); + uint64_t read_uint64(); + + double read_float64(); + + std::string read_string(); + +private: + Buffer buffer; + ChunkedDecoder decoder {buffer}; + size_t pos {0}; + + const byte* raw() const; +}; + +} diff --git a/src/bolt/v1/transport/bolt_encoder.cpp b/src/bolt/v1/transport/bolt_encoder.cpp new file mode 100644 index 000000000..472a42447 --- /dev/null +++ b/src/bolt/v1/transport/bolt_encoder.cpp @@ -0,0 +1 @@ +#include "bolt_encoder.hpp" diff --git a/src/bolt/v1/transport/bolt_encoder.hpp b/src/bolt/v1/transport/bolt_encoder.hpp new file mode 100644 index 000000000..1fa8321e8 --- /dev/null +++ b/src/bolt/v1/transport/bolt_encoder.hpp @@ -0,0 +1,243 @@ +#pragma once + +#include "chunked_encoder.hpp" +#include "socket_stream.hpp" + +#include "bolt/v1/packing/codes.hpp" +#include "bolt/v1/messaging/codes.hpp" + +#include "utils/types/byte.hpp" + +#include "utils/bswap.hpp" + +namespace bolt +{ + +template +class BoltEncoder +{ + static constexpr int64_t plus_2_to_the_31 = 2147483648L; + static constexpr int64_t plus_2_to_the_15 = 32768L; + static constexpr int64_t plus_2_to_the_7 = 128L; + static constexpr int64_t minus_2_to_the_4 = -16L; + static constexpr int64_t minus_2_to_the_7 = -128L; + static constexpr int64_t minus_2_to_the_15 = -32768L; + static constexpr int64_t minus_2_to_the_31 = -2147483648L; + +public: + BoltEncoder(Socket& socket) : stream(socket) {} + + void flush() + { + encoder.flush(); + } + + void write(byte value) + { + encoder.write(value); + } + + void write(const byte* values, size_t n) + { + encoder.write(values, n); + } + + void write_null() + { + encoder.write(pack::Null); + } + + void write(bool value) + { + if(value) write_true(); else write_false(); + } + + void write_true() + { + encoder.write(pack::True); + } + + void write_false() + { + encoder.write(pack::False); + } + + template + void write_value(T value) + { + value = bswap(value); + encoder.write(reinterpret_cast(&value), sizeof(value)); + } + + void write_integer(int64_t value) + { + if(value >= minus_2_to_the_4 && value < plus_2_to_the_7) + { + write(static_cast(value)); + } + else if(value >= minus_2_to_the_7 && value < minus_2_to_the_4) + { + write(pack::Int8); + write(static_cast(value)); + } + else if(value >= minus_2_to_the_15 && value < plus_2_to_the_15) + { + write(pack::Int16); + write_value(static_cast(value)); + } + else if(value >= minus_2_to_the_31 && value < plus_2_to_the_31) + { + write(pack::Int32); + write_value(static_cast(value)); + } + else + { + write(pack::Int64); + write_value(value); + } + } + + void write(double value) + { + write(pack::Float64); + write_value(*reinterpret_cast(&value)); + } + + void write_map_header(size_t size) + { + if(size < 0x10) + { + write(static_cast(pack::TinyMap | size)); + } + else if(size <= 0xFF) + { + write(pack::Map8); + write(static_cast(size)); + } + else if(size <= 0xFFFF) + { + write(pack::Map16); + write_value(size); + } + else + { + write(pack::Map32); + write_value(size); + } + } + + void write_empty_map() + { + write(pack::TinyMap); + } + + void write_list_header(size_t size) + { + if(size < 0x10) + { + write(static_cast(pack::TinyList | size)); + } + else if(size <= 0xFF) + { + write(pack::List8); + write(static_cast(size)); + } + else if(size <= 0xFFFF) + { + write(pack::List16); + write_value(size); + } + else + { + write(pack::List32); + write_value(size); + } + } + + void write_empty_list() + { + write(pack::TinyList); + } + + void write_string_header(size_t size) + { + if(size < 0x10) + { + write(static_cast(pack::TinyString | size)); + } + else if(size <= 0xFF) + { + write(pack::String8); + write(static_cast(size)); + } + else if(size <= 0xFFFF) + { + write(pack::String16); + write_value(size); + } + else + { + write(pack::String32); + write_value(size); + } + } + + void write_string(const std::string& str) + { + write_string(str.c_str(), str.size()); + } + + void write_string(const char* str, size_t len) + { + write_string_header(len); + write(reinterpret_cast(str), len); + } + + void write_struct_header(size_t size) + { + if(size < 0x10) + { + write(static_cast(pack::TinyStruct | size)); + } + else if(size <= 0xFF) + { + write(pack::Struct8); + write(static_cast(size)); + } + else + { + write(pack::Struct16); + write_value(size); + } + } + + void message_success() + { + write_struct_header(1); + write(underlying_cast(MessageCode::Success)); + } + + void message_success_empty() + { + message_success(); + write_empty_map(); + } + + void message_record() + { + write_struct_header(1); + write(underlying_cast(MessageCode::Record)); + } + + void message_record_empty() + { + message_record(); + write_empty_list(); + } + +private: + SocketStream stream; + ChunkedEncoder encoder {stream}; +}; + +} diff --git a/src/bolt/v1/transport/buffer.cpp b/src/bolt/v1/transport/buffer.cpp new file mode 100644 index 000000000..05a0769da --- /dev/null +++ b/src/bolt/v1/transport/buffer.cpp @@ -0,0 +1,16 @@ +#include "buffer.hpp" + +namespace bolt +{ + +void Buffer::write(const byte* data, size_t len) +{ + buffer.insert(buffer.end(), data, data + len); +} + +void Buffer::clear() +{ + buffer.clear(); +} + +} diff --git a/src/bolt/v1/transport/buffer.hpp b/src/bolt/v1/transport/buffer.hpp new file mode 100644 index 000000000..ae44c7f19 --- /dev/null +++ b/src/bolt/v1/transport/buffer.hpp @@ -0,0 +1,39 @@ +#pragma once + +#include +#include +#include + +namespace bolt +{ + +class Buffer +{ +public: + using byte = uint8_t; + + void write(const byte* data, size_t len); + + void clear(); + + size_t size() const + { + return buffer.size(); + } + + byte operator[](size_t idx) const + { + return buffer[idx]; + } + + const byte* data() const + { + return buffer.data(); + } + +private: + std::vector buffer; +}; + + +} diff --git a/src/bolt/v1/transport/chunked_decoder.hpp b/src/bolt/v1/transport/chunked_decoder.hpp new file mode 100644 index 000000000..6f1feb166 --- /dev/null +++ b/src/bolt/v1/transport/chunked_decoder.hpp @@ -0,0 +1,77 @@ +#pragma once + +#include +#include +#include + +#include "utils/exceptions/basic_exception.hpp" +#include "utils/likely.hpp" + +#include "logging/default.hpp" + +namespace bolt +{ + +template +class ChunkedDecoder +{ +public: + class DecoderError : public BasicException + { + public: + using BasicException::BasicException; + }; + + using byte = unsigned char; + + ChunkedDecoder(Stream& stream) : stream(stream) {} + + /* Decode chunked data + * + * Chunk format looks like: + * + * |Header| Data ||Header| Data || ... || End | + * | 2B | size bytes || 2B | size bytes || ... ||00 00| + */ + bool decode(const byte*& chunk, size_t n) + { + while(n > 0) + { + // get size from first two bytes in the chunk + auto size = get_size(chunk); + + if(UNLIKELY(size + 2 > n)) + throw DecoderError("Chunk size larger than available data."); + + // advance chunk to pass those two bytes + chunk += 2; + n -= 2; + + // if chunk size is 0, we're done! + if(size == 0) + return true; + + stream.get().write(chunk, size); + + chunk += size; + n -= size; + } + + return false; + } + + bool operator()(const byte*& chunk, size_t n) + { + return decode(chunk, n); + } + +private: + std::reference_wrapper stream; + + size_t get_size(const byte* chunk) + { + return size_t(chunk[0]) << 8 | chunk[1]; + } +}; + +} diff --git a/src/bolt/v1/transport/chunked_encoder.hpp b/src/bolt/v1/transport/chunked_encoder.hpp new file mode 100644 index 000000000..9ded82c9a --- /dev/null +++ b/src/bolt/v1/transport/chunked_encoder.hpp @@ -0,0 +1,92 @@ +#pragma once + +#include +#include +#include + +#include "utils/likely.hpp" + +namespace bolt +{ + +template +class ChunkedEncoder +{ + static constexpr size_t N = 65535; + static constexpr size_t C = N + 2 /* end mark */; + +public: + using byte = unsigned char; + + ChunkedEncoder(Stream& stream) : stream(stream) {} + + static constexpr size_t chunk_size = N - 2; + + void write(byte value) + { + if(UNLIKELY(pos == N)) + end_chunk(); + + chunk[pos++] = value; + } + + void write(const byte* values, size_t n) + { + while(n > 0) + { + auto size = n < N - pos ? n : N - pos; + + std::memcpy(chunk.data() + pos, values, size); + + pos += size; + n -= size; + + if(pos == N) + end_chunk(); + } + } + + void flush() + { + write_chunk_header(); + + // write two zeros to signal message end + chunk[pos++] = 0x00; + chunk[pos++] = 0x00; + + flush_stream(); + } + +private: + std::reference_wrapper stream; + + std::array chunk; + size_t pos {2}; + + void end_chunk() + { + write_chunk_header(); + flush(); + } + + void write_chunk_header() + { + // write the size of the chunk + uint16_t size = pos - 2; + + // write the higher byte + chunk[0] = size >> 8; + + // write the lower byte + chunk[1] = size & 0xFF; + } + + void flush_stream() + { + // write chunk to the stream + stream.get().write(chunk.data(), pos); + pos = 2; + } +}; + +} diff --git a/src/bolt/v1/transport/socket_stream.hpp b/src/bolt/v1/transport/socket_stream.hpp new file mode 100644 index 000000000..de41815bc --- /dev/null +++ b/src/bolt/v1/transport/socket_stream.hpp @@ -0,0 +1,38 @@ +#pragma once + +#include +#include +#include + +#include "io/network/socket.hpp" +#include "stream_error.hpp" + +namespace bolt +{ + +class SocketStream +{ +public: + using byte = uint8_t; + + SocketStream(io::Socket& socket) : socket(socket) {} + + void write(const byte* data, size_t n) + { + while(n > 0) + { + auto written = socket.get().write(data, n); + + if(UNLIKELY(written == -1)) + throw StreamError("Can't write to stream"); + + n -= written; + data += written; + } + } + +private: + std::reference_wrapper socket; +}; + +} diff --git a/src/bolt/v1/transport/stream_error.hpp b/src/bolt/v1/transport/stream_error.hpp new file mode 100644 index 000000000..fec4525dd --- /dev/null +++ b/src/bolt/v1/transport/stream_error.hpp @@ -0,0 +1,14 @@ +#pragma once + +#include "utils/exceptions/basic_exception.hpp" + +namespace bolt +{ + +class StreamError : BasicException +{ +public: + using BasicException::BasicException; +}; + +} diff --git a/src/cypher/debug/tree_print.hpp b/src/cypher/debug/tree_print.hpp index 6ab0813d8..1ea4658d6 100644 --- a/src/cypher/debug/tree_print.hpp +++ b/src/cypher/debug/tree_print.hpp @@ -11,6 +11,8 @@ public: class Printer { public: + friend class Entry; + Printer(std::ostream& stream, const std::string& header) : stream(stream) { @@ -49,10 +51,10 @@ public: } template - friend Entry& operator<<(Entry& entry, const T& item) + Entry& operator<<(const T& item) { - entry.printer.stream << item; - return entry; + printer.stream << item; + return *this; } private: diff --git a/src/cypher/lexertl b/src/cypher/lexertl new file mode 160000 index 000000000..7d4d36a35 --- /dev/null +++ b/src/cypher/lexertl @@ -0,0 +1 @@ +Subproject commit 7d4d36a357027df0e817453cc9cf948f71047ca9 diff --git a/src/dbms/dbms.hpp b/src/dbms/dbms.hpp new file mode 100644 index 000000000..a71c2ad24 --- /dev/null +++ b/src/dbms/dbms.hpp @@ -0,0 +1,8 @@ +#pragma once + +class Dbms +{ +public: + + +}; diff --git a/src/dbms/server/bolt.hpp b/src/dbms/server/bolt.hpp new file mode 100644 index 000000000..c11568cb6 --- /dev/null +++ b/src/dbms/server/bolt.hpp @@ -0,0 +1,9 @@ +#pragma once + +class BoltServer +{ +public: + BoltServer() = default; + + +}; diff --git a/src/examples/bolt.cpp b/src/examples/bolt.cpp new file mode 100644 index 000000000..2b2b7f317 --- /dev/null +++ b/src/examples/bolt.cpp @@ -0,0 +1,68 @@ +#include +#include + +#include "bolt/v1/server/server.hpp" +#include "bolt/v1/server/worker.hpp" + +#include "io/network/socket.hpp" + +#include "logging/default.hpp" +#include "logging/streams/stdout.hpp" + +static bolt::Server* serverptr; + +Logger logger; + +void sigint_handler(int s) +{ + auto signal = s == SIGINT ? "SIGINT" : "SIGABRT"; + + logger.info("Recieved signal {}", signal); + logger.info("Shutting down..."); + + std::exit(EXIT_SUCCESS); +} + +static constexpr const char* interface = "0.0.0.0"; +static constexpr const char* port = "7687"; + +int main(void) +{ + logging::init_sync(); + logging::log->pipe(std::make_unique()); + logger = logging::log->logger("Main"); + + signal(SIGINT, sigint_handler); + signal(SIGABRT, sigint_handler); + + io::Socket socket; + + try + { + socket = io::Socket::bind(interface, port); + } + catch(io::NetworkError e) + { + logger.error("Cannot bind to socket on {} at {}", interface, port); + logger.error("{}", e.what()); + + std::exit(EXIT_FAILURE); + } + + socket.set_non_blocking(); + socket.listen(1024); + + logger.info("Listening on {} at {}", interface, port); + + bolt::Server server(std::move(socket)); + serverptr = &server; + + constexpr size_t N = 1; + + logger.info("Starting {} workers", N); + server.start(N); + + logger.info("Shutting down..."); + + return EXIT_SUCCESS; +} diff --git a/src/examples/compile-bolt.sh b/src/examples/compile-bolt.sh new file mode 100644 index 000000000..349e20e46 --- /dev/null +++ b/src/examples/compile-bolt.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +clang++ -g -rdynamic ../bolt/v1/states/executor.cpp ../logging/streams/stdout.cpp ../logging/levels.cpp ../logging/logs/sync_log.cpp ../logging/logs/async_log.cpp ../logging/default.cpp ../logging/log.cpp ../bolt/v1/bolt.cpp ../bolt/v1/states/init.cpp ../bolt/v1/states.cpp ../bolt/v1/states/handshake.cpp ../bolt/v1/transport/bolt_decoder.cpp ../bolt/v1/transport/buffer.cpp ../bolt/v1/session.cpp bolt.cpp ../io/network/tls.cpp -o bolt -std=c++14 -I ../ -I ../../libs/fmt/ -pthread -lcppformat -lssl -lcrypto diff --git a/src/examples/endinan.cpp b/src/examples/endinan.cpp new file mode 100644 index 000000000..58eac20de --- /dev/null +++ b/src/examples/endinan.cpp @@ -0,0 +1,81 @@ +#include +#include + +#include + +char b[8] = {1, 2, 3, 4, 0, 0, 0, 1}; + +int64_t safe_int64(const char* b) +{ + return int64_t(b[0]) << 56 | int64_t(b[1]) << 48 + | int64_t(b[2]) << 40 | int64_t(b[3]) << 32 + | int64_t(b[4]) << 24 | int64_t(b[5]) << 16 + | int64_t(b[6]) << 8 | int64_t(b[7]); +} + +int64_t unsafe_int64(const char* b) +{ + auto i = reinterpret_cast(b); + return __bswap_64(*i); +} + +int32_t safe_int32(const char* b) +{ + return b[0] << 24 | b[1] << 16 | b[2] << 8 | b[3]; +} + +int32_t unsafe_int32(const char* b) +{ + auto i = reinterpret_cast(b); + return __bswap_32(*i); +} + +[[clang::optnone]] +void test(uint64_t n) +{ + for(uint64_t i = 0; i < n; ++i) + unsafe_int64(b); +} + +uint8_t f[8] = {0x3F, 0xF1, 0x99, 0x99, 0x99, 0x99, 0x99, 0x9A}; + +double ff = 1.1; + +double get_double(const uint8_t* b) +{ + auto v = __bswap_64(*reinterpret_cast(b)); + return *reinterpret_cast(&v); +} + +void print_hex(const char* buf, size_t n) +{ + for (size_t i = 0; i < n; ++i) + printf("%02X ", (unsigned char)buf[i]); +} + +void print_hex(const uint8_t* buf, size_t n) +{ + print_hex((const char*)buf, n); +} + +int main(void) +{ + auto dd = get_double(f); + + print_hex(f, 8); + + std::cout << std::endl; + print_hex((const uint8_t*)(&ff), 8); + + std::cout << std::endl; + print_hex((const uint8_t*)(&dd), 8); + + std::cout << dd << std::endl; + + /* std::cout << safe_int64(b) << std::endl; */ + /* std::cout << unsafe_int64(b) << std::endl; */ + + /* test(1000000000ull); */ + + return 0; +} diff --git a/src/io/network/Makefile b/src/io/network/Makefile deleted file mode 100644 index 245054369..000000000 --- a/src/io/network/Makefile +++ /dev/null @@ -1,17 +0,0 @@ -CXX=clang++ -CFLAGS=-std=c++1y -O2 -Wall -Wno-unknown-pragmas -LDFLAGS=-lhttp_parser -pthread - -INC=-I../../ -SOURCES=test.cpp -EXECUTABLE=test - -all: $(EXECUTABLE) - -$(EXECUTABLE): $(SOURCES) - $(CXX) $(CFLAGS) $(SOURCES) -o $(EXECUTABLE) $(INC) $(LDFLAGS) - -.PHONY: -clean: - rm -f test - rm -f *.o diff --git a/src/io/network/epoll-example.c b/src/io/network/epoll-example.c deleted file mode 100644 index 7f88c7625..000000000 --- a/src/io/network/epoll-example.c +++ /dev/null @@ -1,281 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -char buf[512]; -#define MAXEVENTS 64 - -static int -make_socket_non_blocking (int sfd) -{ - int flags, s; - - flags = fcntl (sfd, F_GETFL, 0); - if (flags == -1) - { - perror ("fcntl"); - return -1; - } - - flags |= O_NONBLOCK; - s = fcntl (sfd, F_SETFL, flags); - if (s == -1) - { - perror ("fcntl"); - return -1; - } - - return 0; -} - -static int -create_and_bind (char *port) -{ - struct addrinfo hints; - struct addrinfo *result, *rp; - int s, sfd; - - memset (&hints, 0, sizeof (struct addrinfo)); - hints.ai_family = AF_UNSPEC; /* Return IPv4 and IPv6 choices */ - hints.ai_socktype = SOCK_STREAM; /* We want a TCP socket */ - hints.ai_flags = AI_PASSIVE; /* All interfaces */ - - s = getaddrinfo (NULL, port, &hints, &result); - if (s != 0) - { - fprintf (stderr, "getaddrinfo: %s\n", gai_strerror (s)); - return -1; - } - - for (rp = result; rp != NULL; rp = rp->ai_next) - { - sfd = socket (rp->ai_family, rp->ai_socktype, rp->ai_protocol); - if (sfd == -1) - continue; - - s = bind (sfd, rp->ai_addr, rp->ai_addrlen); - if (s == 0) - { - /* We managed to bind successfully! */ - break; - } - - close (sfd); - } - - if (rp == NULL) - { - fprintf (stderr, "Could not bind\n"); - return -1; - } - - freeaddrinfo (result); - - return sfd; -} - -int -main (int argc, char *argv[]) -{ - const char* response = "HTTP/1.1 200 OK\r\nContent-Length:0\r\nConnection:Keep-Alive\r\n\r\n"; - - size_t len = strlen(response); - - int sfd, s; - int efd; - struct epoll_event event; - struct epoll_event *events; - - if (argc != 2) - { - fprintf (stderr, "Usage: %s [port]\n", argv[0]); - exit (EXIT_FAILURE); - } - - sfd = create_and_bind (argv[1]); - if (sfd == -1) - abort (); - - s = make_socket_non_blocking (sfd); - if (s == -1) - abort (); - - s = listen (sfd, SOMAXCONN); - if (s == -1) - { - perror ("listen"); - abort (); - } - - efd = epoll_create1 (0); - if (efd == -1) - { - perror ("epoll_create"); - abort (); - } - - event.data.fd = sfd; - event.events = EPOLLIN | EPOLLET; - s = epoll_ctl (efd, EPOLL_CTL_ADD, sfd, &event); - if (s == -1) - { - perror ("epoll_ctl"); - abort (); - } - - /* Buffer where events are returned */ - events = calloc (MAXEVENTS, sizeof event); - - /* The event loop */ - while (1) - { - int n, i; - - n = epoll_wait (efd, events, MAXEVENTS, -1); - for (i = 0; i < n; i++) - { - if ((events[i].events & EPOLLERR) || - (events[i].events & EPOLLHUP) || - (!(events[i].events & EPOLLIN))) - { - /* An error has occured on this fd, or the socket is not - ready for reading (why were we notified then?) */ - fprintf (stderr, "epoll error\n"); - close (events[i].data.fd); - continue; - } - - else if (sfd == events[i].data.fd) - { - /* We have a notification on the listening socket, which - means one or more incoming connections. */ - while (1) - { - struct sockaddr in_addr; - socklen_t in_len; - int infd; - char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; - - in_len = sizeof in_addr; - infd = accept (sfd, &in_addr, &in_len); - if (infd == -1) - { - if ((errno == EAGAIN) || - (errno == EWOULDBLOCK)) - { - /* We have processed all incoming - connections. */ - break; - } - else - { - perror ("accept"); - break; - } - } - - s = getnameinfo (&in_addr, in_len, - hbuf, sizeof hbuf, - sbuf, sizeof sbuf, - NI_NUMERICHOST | NI_NUMERICSERV); - if (s == 0) - { - printf("Accepted connection on descriptor %d " - "(host=%s, port=%s)\n", infd, hbuf, sbuf); - } - - /* Make the incoming socket non-blocking and add it to the - list of fds to monitor. */ - s = make_socket_non_blocking (infd); - if (s == -1) - abort (); - - event.data.fd = infd; - event.events = EPOLLIN | EPOLLET; - s = epoll_ctl (efd, EPOLL_CTL_ADD, infd, &event); - if (s == -1) - { - perror ("epoll_ctl"); - abort (); - } - } - continue; - } - else - { - /* We have data on the fd waiting to be read. Read and - display it. We must read whatever data is available - completely, as we are running in edge-triggered mode - and won't get a notification again for the same - data. */ - int done = 0; - - while (1) - { - ssize_t count; - - count = read (events[i].data.fd, buf, sizeof buf); - if (count == -1) - { - /* If errno == EAGAIN, that means we have read all - data. So go back to the main loop. */ - if (errno != EAGAIN) - { - perror ("read"); - done = 1; - } - break; - } - else if (count == 0) - { - /* End of file. The remote has closed the - connection. */ - done = 1; - break; - } - - size_t sum = 0; - char* resp = (char*)response; - - while(sum < len) - { - int k = write(event.data.fd, resp, len - sum); - sum += k; - resp += k; - - } - - /* Write the buffer to standard output */ - if (s == -1) - { - perror ("write"); - abort (); - } - } - - if (done) - { - printf ("Closed connection on descriptor %d\n", - events[i].data.fd); - - /* Closing the descriptor will make epoll remove it - from the set of descriptors which are monitored. */ - close (events[i].data.fd); - } - } - } - } - - free (events); - - close (sfd); - - return EXIT_SUCCESS; -} diff --git a/src/io/network/epoll.hpp b/src/io/network/epoll.hpp index 11c3764de..6ce725280 100644 --- a/src/io/network/epoll.hpp +++ b/src/io/network/epoll.hpp @@ -9,6 +9,12 @@ namespace io { +class EpollError : BasicException +{ +public: + using BasicException::BasicException; +}; + class Epoll { public: @@ -17,14 +23,18 @@ public: Epoll(int flags) { epoll_fd = epoll_create1(flags); + + if(UNLIKELY(epoll_fd == -1)) + throw EpollError("Can't create epoll file descriptor"); } - void add(Socket& socket, Event* event) + template + void add(Stream& stream, Event* event) { - auto status = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, socket, event); + auto status = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, stream, event); if(UNLIKELY(status)) - throw NetworkError("Can't add an event to epoll listener."); + throw EpollError("Can't add an event to epoll listener."); } int wait(Event* events, int max_events, int timeout) diff --git a/src/io/network/event_listener.hpp b/src/io/network/event_listener.hpp index b536b437a..1600b835a 100644 --- a/src/io/network/event_listener.hpp +++ b/src/io/network/event_listener.hpp @@ -1,14 +1,12 @@ #pragma once -#include "socket.hpp" #include "epoll.hpp" #include "utils/crtp.hpp" namespace io { -template +template class EventListener : public Crtp { public: @@ -16,32 +14,28 @@ public: EventListener(uint32_t flags = 0) : listener(flags) {} - void add(Stream& stream) - { - // add the stream to the event listener - listener.add(stream.socket, &stream.event); - } - void wait_and_process_events() { + // TODO hardcoded a wait timeout because of thread joining + // when you shutdown the server. This should be wait_timeout of the + // template parameter and should almost never change from that. + // thread joining should be resolved using a signal that interrupts + // the system call. + // waits for an event/multiple events and returns a maximum of // max_events and stores them in the events array. it waits for // wait_timeout milliseconds. if wait_timeout is achieved, returns 0 - auto n = listener.wait(events, max_events, wait_timeout); - - LOG_DEBUG("received " << n << " events"); + auto n = listener.wait(events, max_events, 200); // go through all events and process them in order for(int i = 0; i < n; ++i) { auto& event = events[i]; - auto& stream = *reinterpret_cast(event.data.ptr); - // a stream was closed + // hangup event if(UNLIKELY(event.events & EPOLLRDHUP)) { - LOG_DEBUG("EPOLLRDHUP event recieved on socket " << stream.id()); - this->derived().on_close(stream); + this->derived().on_close_event(event); continue; } @@ -49,18 +43,12 @@ public: if(UNLIKELY(!(event.events & EPOLLIN) || event.events & (EPOLLHUP | EPOLLERR))) { - LOG_DEBUG(">> EPOLL ERR"); - LOG_DEBUG("EPOLLIN" << (event.events & EPOLLIN)); - LOG_DEBUG("EPOLLHUP" << (event.events & EPOLLHUP)); - LOG_DEBUG("EPOLLERR" << (event.events & EPOLLERR)); - - this->derived().on_error(stream); + this->derived().on_error_event(event); continue; } - LOG_DEBUG("signalling that data exists on socket " << stream.id()); // we have some data waiting to be read - this->derived().on_data(stream); + this->derived().on_data_event(event); } // this will be optimized out :D diff --git a/src/io/network/network_error.hpp b/src/io/network/network_error.hpp index 7384dcd83..ed528a07b 100644 --- a/src/io/network/network_error.hpp +++ b/src/io/network/network_error.hpp @@ -2,9 +2,15 @@ #include -class NetworkError : public std::runtime_error +#include "utils/exceptions/basic_exception.hpp" + +namespace io +{ + +class NetworkError : public BasicException { public: - using std::runtime_error::runtime_error; + using BasicException::BasicException; }; +} diff --git a/src/io/network/pps.sh b/src/io/network/pps.sh deleted file mode 100755 index c46b46198..000000000 --- a/src/io/network/pps.sh +++ /dev/null @@ -1,27 +0,0 @@ -#!/bin/bash - -INTERVAL="1" # update interval in seconds - -if [ -z "$1" ]; then - echo - echo usage: $0 [network-interface] - echo - echo e.g. $0 eth0 - echo - echo shows packets-per-second - exit -fi - -IF=$1 - -while true -do - R1=`cat /sys/class/net/$1/statistics/rx_packets` - T1=`cat /sys/class/net/$1/statistics/tx_packets` - sleep $INTERVAL - R2=`cat /sys/class/net/$1/statistics/rx_packets` - T2=`cat /sys/class/net/$1/statistics/tx_packets` - TXPPS=`expr $T2 - $T1` - RXPPS=`expr $R2 - $R1` - echo "TX $1: $TXPPS pkts/s RX $1: $RXPPS pkts/s" -done diff --git a/src/io/network/secure_socket.hpp b/src/io/network/secure_socket.hpp new file mode 100644 index 000000000..c79ff3c12 --- /dev/null +++ b/src/io/network/secure_socket.hpp @@ -0,0 +1,93 @@ +#pragma once + +#include "tls.hpp" +#include "io/network/socket.hpp" +#include "tls_error.hpp" +#include "utils/types/byte.hpp" + +#include + +namespace io +{ + +class SecureSocket +{ +public: + SecureSocket(Socket&& socket, const Tls::Context& tls) + : socket(std::forward(socket)) + { + ssl = SSL_new(tls); + SSL_set_fd(ssl, this->socket); + + SSL_set_accept_state(ssl); + + if(SSL_accept(ssl) <= 0) + ERR_print_errors_fp(stderr); + } + + SecureSocket(SecureSocket&& other) + { + *this = std::forward(other); + } + + SecureSocket& operator=(SecureSocket&& other) + { + socket = std::move(other.socket); + + ssl = other.ssl; + other.ssl = nullptr; + + return *this; + } + + ~SecureSocket() + { + if(ssl == nullptr) + return; + + std::cout << "DELETING SSL" << std::endl; + + SSL_free(ssl); + } + + int error(int status) + { + return SSL_get_error(ssl, status); + } + + int write(const std::string& str) + { + return write(str.c_str(), str.size()); + } + + int write(const byte* data, size_t len) + { + return SSL_write(ssl, data, len); + } + + int write(const char* data, size_t len) + { + return SSL_write(ssl, data, len); + } + + int read(char* buffer, size_t len) + { + return SSL_read(ssl, buffer, len); + } + + operator int() + { + return socket; + } + + operator Socket&() + { + return socket; + } + +private: + Socket socket; + SSL* ssl {nullptr}; +}; + +} diff --git a/src/io/network/secure_stream_reader.hpp b/src/io/network/secure_stream_reader.hpp new file mode 100644 index 000000000..889bdb60f --- /dev/null +++ b/src/io/network/secure_stream_reader.hpp @@ -0,0 +1,61 @@ +#pragma once + +#include + +#include "stream_reader.hpp" +#include "logging/default.hpp" + +namespace io +{ +using namespace memory::literals; + +template +class SecureStreamReader : public StreamReader +{ +public: + struct Buffer + { + char* ptr; + size_t len; + }; + + SecureStreamReader(uint32_t flags = 0) + : StreamReader(flags) {} + + void on_data(Stream& stream) + { + while(true) + { + // allocate the buffer to fill the data + auto buf = this->derived().on_alloc(stream); + + // read from the buffer at most buf.len bytes + auto len = stream.socket.read(buf.ptr, buf.len); + + if(LIKELY(len > 0)) + { + buf.len = len; + return this->derived().on_read(stream, buf); + } + + auto err = stream.socket.error(len); + + // the socket is not ready for reading yet + if(err == SSL_ERROR_WANT_READ || + err == SSL_ERROR_WANT_WRITE || + err == SSL_ERROR_WANT_X509_LOOKUP) + { + return; + } + + // the socket notified a close event + if(err == SSL_ERROR_ZERO_RETURN) + return stream.close(); + + // some other error occurred, check errno + return this->derived().on_error(stream); + } + } +}; + +} diff --git a/src/io/network/server.hpp b/src/io/network/server.hpp index 8467086d0..47d5bf792 100644 --- a/src/io/network/server.hpp +++ b/src/io/network/server.hpp @@ -5,33 +5,39 @@ namespace io { -template -class Server : public StreamReader +template +class Server : public EventListener { public: - bool accept(Socket& socket) + Server(Socket&& socket) : socket(std::forward(socket)) { - // accept a connection from a socket - auto s = socket.accept(nullptr, nullptr); - LOG_DEBUG("socket " << s.id() << " accepted"); + event.data.fd = this->socket; + event.events = EPOLLIN | EPOLLET; - if(!s.is_open()) - return false; - - // make the recieved socket non blocking - s.set_non_blocking(); - - auto& stream = this->derived().on_connect(std::move(s)); - - // we want to listen to an incoming event which is edge triggered and - // we also want to listen on the hangup event - stream.event.events = EPOLLIN | EPOLLET | EPOLLRDHUP; - - // add the connection to the event listener - this->add(stream); - - return true; + this->listener.add(this->socket, &event); } + + void on_close_event(Epoll::Event& event) + { + ::close(event.data.fd); + } + + void on_error_event(Epoll::Event& event) + { + ::close(event.data.fd); + } + + void on_data_event(Epoll::Event& event) + { + if(UNLIKELY(socket != event.data.fd)) + return; + + this->derived().on_connect(); + } + +protected: + Epoll::Event event; + Socket socket; }; } diff --git a/src/io/network/socket.hpp b/src/io/network/socket.hpp index d8ab88601..e93ec02ee 100644 --- a/src/io/network/socket.hpp +++ b/src/io/network/socket.hpp @@ -16,17 +16,24 @@ #include "addrinfo.hpp" #include "utils/likely.hpp" +#include "logging/default.hpp" + +#include + namespace io { class Socket { +protected: Socket(int family, int socket_type, int protocol) { socket = ::socket(family, socket_type, protocol); } public: + using byte = uint8_t; + Socket(int socket = -1) : socket(socket) {} Socket(const Socket&) = delete; @@ -41,7 +48,16 @@ public: if(socket == -1) return; - close(socket); + + std::cout << "DELETING SOCKET" << std::endl; + + ::close(socket); + } + + void close() + { + ::close(socket); + socket = -1; } Socket& operator=(Socket&& other) @@ -73,7 +89,7 @@ public: continue; if(::connect(s, it->ai_addr, it->ai_addrlen) == 0) - return std::move(s); + return s; } throw NetworkError("Unable to connect to socket"); @@ -100,7 +116,7 @@ public: continue; if(::bind(s, it->ai_addr, it->ai_addrlen) == 0) - return std::move(s); + return s; } throw NetworkError("Unable to bind to socket"); @@ -141,22 +157,38 @@ public: return socket; } - size_t write(const std::string& str) + int write(const std::string& str) { - return ::write(socket, str.c_str(), str.size()); + return write(str.c_str(), str.size()); } - size_t write(const char* data, size_t len) + int write(const char* data, size_t len) { + return write(reinterpret_cast(data), len); + } + + int write(const byte* data, size_t len) + { +#ifndef NDEBUG + std::stringstream stream; + + for(size_t i = 0; i < len; ++i) + stream << fmt::format("{:02X} ", static_cast(data[i])); + + auto str = stream.str(); + + logging::debug("[Write {}B] {}", len, str); +#endif + return ::write(socket, data, len); } - size_t read(char* buffer, size_t len) + int read(void* buffer, size_t len) { return ::read(socket, buffer, len); } -private: +protected: int socket; }; diff --git a/src/io/network/stream_dispatcher.hpp b/src/io/network/stream_dispatcher.hpp new file mode 100644 index 000000000..3db4c00b4 --- /dev/null +++ b/src/io/network/stream_dispatcher.hpp @@ -0,0 +1,13 @@ +#pragma once + +#include " + +namespace io +{ + +class StreamDispatcher +{ + +}; + +} diff --git a/src/io/network/stream_listener.hpp b/src/io/network/stream_listener.hpp new file mode 100644 index 000000000..8048be7f9 --- /dev/null +++ b/src/io/network/stream_listener.hpp @@ -0,0 +1,43 @@ +#pragma once + +#include "event_listener.hpp" + +namespace io +{ + +template +class StreamListener : public EventListener +{ +public: + using EventListener::EventListener; + + void add(Stream& stream) + { + // add the stream to the event listener + this->listener.add(stream.socket, &stream.event); + } + + void on_close_event(Epoll::Event& event) + { + this->derived().on_close(to_stream(event)); + } + + void on_error_event(Epoll::Event& event) + { + this->derived().on_error(to_stream(event)); + } + + void on_data_event(Epoll::Event& event) + { + this->derived().on_data(to_stream(event)); + } + +private: + Stream& to_stream(Epoll::Event& event) + { + return *reinterpret_cast(event.data.ptr); + } +}; + +} diff --git a/src/io/network/stream_reader.hpp b/src/io/network/stream_reader.hpp index 3fad770f8..48e8d70d1 100644 --- a/src/io/network/stream_reader.hpp +++ b/src/io/network/stream_reader.hpp @@ -1,6 +1,6 @@ #pragma once -#include "event_listener.hpp" +#include "stream_listener.hpp" #include "memory/literals.hpp" namespace io @@ -8,7 +8,7 @@ namespace io using namespace memory::literals; template -class StreamReader : public EventListener +class StreamReader : public StreamListener { public: struct Buffer @@ -17,12 +17,41 @@ public: size_t len; }; - StreamReader(uint32_t flags = 0) : EventListener(flags) {} + StreamReader(uint32_t flags = 0) : StreamListener(flags) {} + + bool accept(Socket& socket) + { + // accept a connection from a socket + auto s = socket.accept(nullptr, nullptr); + + if(!s.is_open()) + return false; + + // make the recieved socket non blocking + s.set_non_blocking(); + + auto& stream = this->derived().on_connect(std::move(s)); + + // we want to listen to an incoming event which is edge triggered and + // we also want to listen on the hangup event + stream.event.events = EPOLLIN | EPOLLET | EPOLLRDHUP; + + // add the connection to the event listener + this->add(stream); + + return true; + } void on_data(Stream& stream) { while(true) { + if(UNLIKELY(!stream.alive())) + { + stream.close(); + break; + } + // allocate the buffer to fill the data auto buf = this->derived().on_alloc(stream); @@ -35,23 +64,21 @@ public: // this means we have read all available data if(LIKELY(errno == EAGAIN)) { - LOG_DEBUG("EAGAIN read all data on socket " << stream.id()); break; } // some other error occurred, check errno this->derived().on_error(stream); + break; } // end of file, the client has closed the connection if(UNLIKELY(buf.len == 0)) { - LOG_DEBUG("EOF stream closed on socket " << stream.id()); stream.close(); break; } - LOG_DEBUG("data on socket " << stream.id()); this->derived().on_read(stream, buf); } } diff --git a/src/io/network/tcp/stream.hpp b/src/io/network/tcp/stream.hpp index c4b1af76c..1317884a6 100644 --- a/src/io/network/tcp/stream.hpp +++ b/src/io/network/tcp/stream.hpp @@ -8,6 +8,7 @@ namespace io namespace tcp { +template class Stream { public: @@ -24,11 +25,6 @@ public: event.data.ptr = this; } - void close() - { - delete reinterpret_cast(event.data.ptr); - } - int id() const { return socket.id(); } Socket socket; diff --git a/src/io/network/tcp_server.hpp b/src/io/network/tcp_server.hpp deleted file mode 100644 index c3965298a..000000000 --- a/src/io/network/tcp_server.hpp +++ /dev/null @@ -1,94 +0,0 @@ -#pragma once - -#include -#include -#include - -#include "debug/log.hpp" -#include "tcp_listener.hpp" - -namespace io -{ - -template -class TcpServer : TcpListener, 64, -1> -{ -public: - TcpServer(const char* addr, const char* port) - : stream(std::move(Socket::bind(addr, port))) {} - - ~TcpServer() - { - stop(); - - for(auto& worker : workers) - worker.join(); - } - - template - void listen(size_t n, size_t backlog, F&& f) - { - for(size_t i = 0; i < n; ++i) - { - workers.emplace_back(); - auto& w = workers.back(); - - threads[i] = std::thread([this, &w]() { - while(alive) - { - LOG_DEBUG("Worker " << hash(std::this_thread::get_id()) - << " waiting... "); - w.wait_and_process_events(); - } - }); - } - - stream.socket.listen(backlog); - } - - void stop() - { - alive.store(false, std::memory_order_release); - } - -private: - std::list threads; - std::list workers; - std::atomic alive { true }; - - TcpStream stream; - size_t idx = 0; - - void on_close(TcpStream& stream) - { - LOG_DEBUG("server on_close!!!!"); - } - - void on_error(TcpStream& stream) - { - LOG_DEBUG("server on_error!!!!"); - } - - void on_data(TcpStream&) - { - while (true) - { - LOG_DEBUG("Trying to accept... "); - if(!workers[idx].accept(socket)) - { - LOG_DEBUG("Did not accept!"); - break; - } - - idx = (idx + 1) % workers.size(); - LOG_DEBUG("Accepted a new connection!"); - } - } - - void on_wait_timeout() - { - - } -}; - -} diff --git a/src/io/network/tls.cpp b/src/io/network/tls.cpp new file mode 100644 index 000000000..d35ffe8df --- /dev/null +++ b/src/io/network/tls.cpp @@ -0,0 +1,55 @@ +#include "tls.hpp" +#include "tls_error.hpp" + +namespace io +{ + +Tls::Context::Context() +{ + auto method = SSLv23_server_method(); + ctx = SSL_CTX_new(method); + + if(!ctx) + { + ERR_print_errors_fp(stderr); + throw io::TlsError("Unable to create TLS context"); + } + + SSL_CTX_set_ecdh_auto(ctx, 1); +} + +Tls::Context::~Context() +{ + SSL_CTX_free(ctx); +} + +Tls::Context& Tls::Context::cert(const std::string& path) +{ + if(SSL_CTX_use_certificate_file(ctx, path.c_str(), SSL_FILETYPE_PEM) >= 0) + return *this; + + ERR_print_errors_fp(stderr); + throw TlsError("Error Loading cert '{}'", path); +} + +Tls::Context& Tls::Context::key(const std::string& path) +{ + if(SSL_CTX_use_PrivateKey_file(ctx, path.c_str(), SSL_FILETYPE_PEM) >= 0) + return *this; + + ERR_print_errors_fp(stderr); + throw TlsError("Error Loading private key '{}'", path); +} + +void Tls::initialize() +{ + SSL_load_error_strings(); + OpenSSL_add_ssl_algorithms(); +} + +void Tls::cleanup() +{ + EVP_cleanup(); +} + +} diff --git a/src/io/network/tls.hpp b/src/io/network/tls.hpp new file mode 100644 index 000000000..bd0de8fdc --- /dev/null +++ b/src/io/network/tls.hpp @@ -0,0 +1,33 @@ +#pragma once + +#include + +#include +#include + +namespace io +{ + +class Tls +{ +public: + class Context + { + public: + Context(); + ~Context(); + + Context& cert(const std::string& path); + Context& key(const std::string& path); + + operator SSL_CTX*() const { return ctx; } + + private: + SSL_CTX* ctx; + }; + + static void initialize(); + static void cleanup(); +}; + +} diff --git a/src/io/network/tls_error.hpp b/src/io/network/tls_error.hpp new file mode 100644 index 000000000..53440a728 --- /dev/null +++ b/src/io/network/tls_error.hpp @@ -0,0 +1,14 @@ +#pragma once + +#include "utils/exceptions/basic_exception.hpp" + +namespace io +{ + +class TlsError : public BasicException +{ +public: + using BasicException::BasicException; +}; + +} diff --git a/src/io/network/worker.hpp b/src/io/network/worker.hpp deleted file mode 100644 index b50610525..000000000 --- a/src/io/network/worker.hpp +++ /dev/null @@ -1,91 +0,0 @@ -#pragma once - -#include "listener.hpp" -#include "tcp_stream.hpp" - -namespace io -{ - const char* response = "HTTP/1.1 200 OK\r\nContent-Length:0\r\nConnection:Keep-Alive\r\n\r\n"; - - size_t len = strlen(response); - -class Worker : public Listener -{ - char buf[64_kB]; - -public: - using Listener::Listener; - - bool accept(Socket& socket) - { - auto s = socket.accept(nullptr, nullptr); - - if(!s.is_open()) - return false; - - this->add(s); - - return true; - } - - void on_error(TcpStream* stream) - { - delete stream; - } - - std::atomic requests {0}; - - void on_read(TcpStream* stream) - { - int done = 0; - - while (1) - { - ssize_t count; - - count = read(stream->socket, buf, sizeof buf); - if (count == -1) - { - /* If errno == EAGAIN, that means we have read all - data. So go back to the main loop. */ - if (errno != EAGAIN) - { - perror ("read"); - done = 1; - } - break; - } - else if (count == 0) - { - /* End of file. The remote has closed the - connection. */ - done = 1; - break; - } - - size_t sum = 0; - char* resp = (char*)response; - - while(sum < len) - { - int k = write(stream->socket, resp, len - sum); - sum += k; - resp += k; - } - - requests.fetch_add(1, std::memory_order_relaxed); - - } - - if (done) - { - LOG_DEBUG("Closing TCP stream at " << stream->socket.id()) - - /* Closing the descriptor will make epoll remove it - from the set of descriptors which are monitored. */ - delete stream; - } - } -}; - -} diff --git a/src/logging/default.cpp b/src/logging/default.cpp new file mode 100644 index 000000000..914d50417 --- /dev/null +++ b/src/logging/default.cpp @@ -0,0 +1,33 @@ +#include "default.hpp" + +#include "logging/logs/async_log.hpp" +#include "logging/logs/sync_log.hpp" + +#include "logging/streams/stdout.hpp" + +namespace logging +{ + +std::unique_ptr log; + +std::unique_ptr debug_log = std::make_unique(); + +Logger init_debug_logger() +{ + debug_log->pipe(std::make_unique()); + return debug_log->logger("DEBUG"); +} + +Logger debug_logger = init_debug_logger(); + +void init_async() +{ + log = std::make_unique(); +} + +void init_sync() +{ + log = std::make_unique(); +} + +} diff --git a/src/logging/default.hpp b/src/logging/default.hpp new file mode 100644 index 000000000..dd8357426 --- /dev/null +++ b/src/logging/default.hpp @@ -0,0 +1,22 @@ +#pragma once + +#include "log.hpp" +#include "logger.hpp" + +namespace logging +{ + +extern std::unique_ptr log; + +extern Logger debug_logger; + +template +void debug(Args&&... args) +{ + debug_logger.debug(std::forward(args)...); +} + +void init_async(); +void init_sync(); + +} diff --git a/src/logging/log.cpp b/src/logging/log.cpp index d3aefc899..586e58424 100644 --- a/src/logging/log.cpp +++ b/src/logging/log.cpp @@ -5,5 +5,5 @@ Logger Log::logger(const std::string& name) { - return Logger(*this, name); + return Logger(this, name); } diff --git a/src/logging/logger.hpp b/src/logging/logger.hpp index 17af92809..d85c2e23d 100644 --- a/src/logging/logger.hpp +++ b/src/logging/logger.hpp @@ -1,5 +1,8 @@ #pragma once +#include +#include + #include "log.hpp" #include "levels.hpp" @@ -44,50 +47,68 @@ class Logger }; public: - Logger(Log& log, const std::string& name) : log(log), name(name) {} + Logger() = default; + + Logger(Log* log, const std::string& name) : log(log), name(name) {} template void emit(Args&&... args) { + assert(log != nullptr); + auto message = std::make_unique>( Timestamp::now(), name, fmt::format(std::forward(args)...) ); - log.get().emit(std::move(message)); + log->emit(std::move(message)); } template void trace(Args&&... args) { +#ifndef NDEBUG +#ifndef LOG_NO_TRACE emit(std::forward(args)...); +#endif +#endif } template void debug(Args&&... args) { +#ifndef NDEBUG +#ifndef LOG_NO_DEBUG emit(std::forward(args)...); +#endif +#endif } template void info(Args&&... args) { +#ifndef LOG_NO_INFO emit(std::forward(args)...); +#endif } template void warn(Args&&... args) { +#ifndef LOG_NO_WARN emit(std::forward(args)...); +#endif } template void error(Args&&... args) { +#ifndef LOG_NO_ERROR emit(std::forward(args)...); +#endif } private: - std::reference_wrapper log; + Log* log; std::string name; }; diff --git a/src/logging/streams/stdout.cpp b/src/logging/streams/stdout.cpp index 7f73fb808..8ffb97234 100644 --- a/src/logging/streams/stdout.cpp +++ b/src/logging/streams/stdout.cpp @@ -1,10 +1,17 @@ #include "stdout.hpp" -#include +#include +#include void Stdout::emit(const Log::Record& record) { - fmt::print("{} {:<5} [{}] {}\n", record.when(), record.level_str(), - record.where(), record.text()); + auto s = fmt::format("{} {:<5} [{}] {}\n", static_cast( + record.when()), record.level_str(), record.where(), + record.text()); + + std::cout << s; + + /* fmt::printf("{} {:<5} [{}] {}\n", static_cast(record.when()), */ + /* record.level_str(), record.where(), record.text()); */ } diff --git a/src/mvcc/id.hpp b/src/mvcc/id.hpp new file mode 100644 index 000000000..465986ff8 --- /dev/null +++ b/src/mvcc/id.hpp @@ -0,0 +1,36 @@ +#pragma once + +#include +#include +#include "utils/total_ordering.hpp" + +class Id : public TotalOrdering +{ +public: + Id() = default; + + Id(uint64_t id) : id(id) {} + + friend bool operator<(const Id& a, const Id& b) + { + return a.id < b.id; + } + + friend bool operator==(const Id& a, const Id& b) + { + return a.id == b.id; + } + + friend std::ostream& operator<<(std::ostream& stream, const Id& id) + { + return stream << id.id; + } + + operator uint64_t() const + { + return id; + } + +private: + uint64_t id {0}; +}; diff --git a/src/speedy/rapidjson b/src/speedy/rapidjson new file mode 160000 index 000000000..c02d52ad5 --- /dev/null +++ b/src/speedy/rapidjson @@ -0,0 +1 @@ +Subproject commit c02d52ad56595dc70b38daf46b5f315d3a7115fa diff --git a/src/utils/bswap.hpp b/src/utils/bswap.hpp new file mode 100644 index 000000000..0e7ee0fef --- /dev/null +++ b/src/utils/bswap.hpp @@ -0,0 +1,45 @@ +#pragma once + +#include +#include + +#include + +template +inline T bswap(T value); + +template<> +inline int16_t bswap(int16_t value) +{ + return __bswap_16(value); +} + +template<> +inline uint16_t bswap(uint16_t value) +{ + return __bswap_16(value); +} + +template<> +inline int32_t bswap(int32_t value) +{ + return __bswap_32(value); +} + +template<> +inline uint32_t bswap(uint32_t value) +{ + return __bswap_32(value); +} + +template<> +inline int64_t bswap(int64_t value) +{ + return __bswap_64(value); +} + +template<> +inline uint64_t bswap(uint64_t value) +{ + return __bswap_64(value); +} diff --git a/src/utils/datetime/timestamp.hpp b/src/utils/datetime/timestamp.hpp index f824e97ab..f9287e0ad 100644 --- a/src/utils/datetime/timestamp.hpp +++ b/src/utils/datetime/timestamp.hpp @@ -5,7 +5,7 @@ #include #include -#include +#include #include "utils/datetime/datetime_error.hpp" #include "utils/total_ordering.hpp" @@ -66,7 +66,7 @@ public: long subsec() const { - return nsec; + return nsec / 10000; } const std::string to_iso8601() const @@ -103,5 +103,5 @@ private: long nsec; static constexpr auto fiso8601 = - "{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:09d}Z"; + "{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:05d}Z"; }; diff --git a/src/utils/exceptions/basic_exception.hpp b/src/utils/exceptions/basic_exception.hpp index 91c1be5b5..79ea3d5f9 100644 --- a/src/utils/exceptions/basic_exception.hpp +++ b/src/utils/exceptions/basic_exception.hpp @@ -1,7 +1,6 @@ #pragma once #include - #include #include "utils/auto_scope.hpp" @@ -10,21 +9,23 @@ class BasicException : public std::exception { public: - template - BasicException(Args&&... args) noexcept - : message(fmt::format(std::forward(args)...)) + BasicException(const std::string& message) noexcept : message(message) { #ifndef NDEBUG - message += '\n'; + this->message += '\n'; Stacktrace stacktrace; for(auto& line : stacktrace) - message += fmt::format(" at {} ({})\n", + this->message += fmt::format(" at {} ({})\n", line.function, line.location); #endif } + template + BasicException(const std::string& format, Args&&... args) noexcept + : BasicException(fmt::format(format, std::forward(args)...)) {} + const char* what() const noexcept override { return message.c_str(); @@ -34,4 +35,3 @@ private: std::string message; }; - diff --git a/src/utils/string/weak_string.hpp b/src/utils/string/weak_string.hpp index 17189eff1..f3ebdc8a3 100644 --- a/src/utils/string/weak_string.hpp +++ b/src/utils/string/weak_string.hpp @@ -79,7 +79,6 @@ public: return !(lhs == rhs); } - private: const char* str; size_t len; diff --git a/src/utils/types/byte.hpp b/src/utils/types/byte.hpp new file mode 100644 index 000000000..931bd4008 --- /dev/null +++ b/src/utils/types/byte.hpp @@ -0,0 +1,5 @@ +#pragma once + +#include + +using byte = uint8_t; diff --git a/tests/unit/chunked_decoder.cpp b/tests/unit/chunked_decoder.cpp new file mode 100644 index 000000000..45bcee3c2 --- /dev/null +++ b/tests/unit/chunked_decoder.cpp @@ -0,0 +1,62 @@ +#include +#include +#include +#include +#include +#include + +#include "bolt/v1/transport/chunked_decoder.hpp" + +using byte = unsigned char; + +void print_hex(byte x) +{ + printf("%02X ", static_cast(x)); +} + +class DummyStream +{ +public: + void write(const byte* values, size_t n) + { + data.insert(data.end(), values, values + n); + } + + std::vector data; +}; + +using Decoder = bolt::ChunkedDecoder; + +std::vector chunks[] = { + {0x00,0x08,'A',' ','q','u','i','c','k',' ',0x00,0x06,'b','r','o','w','n',' '}, + {0x00,0x0A,'f','o','x',' ','j','u','m','p','s',' '}, + {0x00,0x07,'o','v','e','r',' ','a',' '}, + {0x00,0x08,'l','a','z','y',' ','d','o','g',0x00,0x00} +}; + +static constexpr size_t N = std::extent::value; + +std::string decoded = "A quick brown fox jumps over a lazy dog"; + +int main(void) +{ + DummyStream stream; + Decoder decoder(stream); + + for(size_t i = 0; i < N; ++i) + { + auto& chunk = chunks[i]; + auto finished = decoder.decode(chunk.data(), chunk.size()); + + // break early if finished + if(finished) + break; + } + + assert(decoded.size() == stream.data.size()); + + for(size_t i = 0; i < decoded.size(); ++i) + assert(decoded[i] == stream.data[i]); + + return 0; +} diff --git a/tests/unit/chunked_encoder.cpp b/tests/unit/chunked_encoder.cpp new file mode 100644 index 000000000..acd9d3442 --- /dev/null +++ b/tests/unit/chunked_encoder.cpp @@ -0,0 +1,115 @@ +#include +#include +#include +#include + +#include "bolt/v1/transport/chunked_encoder.hpp" + +using byte = unsigned char; + +void print_hex(byte x) +{ + printf("%02X ", static_cast(x)); +} + +class DummyStream +{ +public: + void write(const byte* values, size_t n) + { + num_calls++; + data.insert(data.end(), values, values + n); + } + + byte pop() + { + auto c = data.front(); + data.pop_front(); + return c; + } + + size_t pop_size() + { + return ((size_t)pop() << 8) | pop(); + } + + void print() + { + for(size_t i = 0; i < data.size(); ++i) + print_hex(data[i]); + } + + std::deque data; + size_t num_calls {0}; +}; + +using Encoder = bolt::ChunkedEncoder; + +void write_ff(Encoder& encoder, size_t n) +{ + std::vector v; + + for(size_t i = 0; i < n; ++i) + v.push_back('\xFF'); + + encoder.write(v.data(), v.size()); +} + +void check_ff(DummyStream& stream, size_t n) +{ + for(size_t i = 0; i < n; ++i) + assert(stream.pop() == byte('\xFF')); + + (void)stream; +} + +int main(void) +{ + DummyStream stream; + bolt::ChunkedEncoder encoder(stream); + + write_ff(encoder, 10); + write_ff(encoder, 10); + encoder.finish(); + + write_ff(encoder, 10); + write_ff(encoder, 10); + encoder.finish(); + + // this should be two chunks, one of size 65533 and the other of size 1467 + write_ff(encoder, 67000); + encoder.finish(); + + for(int i = 0; i < 10000; ++i) + write_ff(encoder, 1500); + encoder.finish(); + + assert(stream.pop_size() == 20); + check_ff(stream, 20); + assert(stream.pop_size() == 0); + + assert(stream.pop_size() == 20); + check_ff(stream, 20); + assert(stream.pop_size() == 0); + + assert(stream.pop_size() == encoder.chunk_size); + check_ff(stream, encoder.chunk_size); + assert(stream.pop_size() == 1467); + check_ff(stream, 1467); + assert(stream.pop_size() == 0); + + size_t k = 10000 * 1500; + + while(k > 0) + { + auto size = k > encoder.chunk_size ? encoder.chunk_size : k; + assert(stream.pop_size() == size); + check_ff(stream, size); + + k -= size; + } + + assert(stream.pop_size() == 0); + + return 0; +}