Merge branch 'bolt-protocol' into dev

* bolt-protocol:
  implemented bolt protocol
This commit is contained in:
Dominik Tomičević 2016-08-01 22:21:49 +01:00
commit da15775afa
72 changed files with 2558 additions and 601 deletions

26
src/bolt/v1/bolt.cpp Normal file
View File

@ -0,0 +1,26 @@
#include "bolt.hpp"
#include "session.hpp"
#include <iostream>
namespace bolt
{
Bolt::Bolt()
{
}
Session* Bolt::create_session(io::Socket&& socket)
{
// TODO fix session lifecycle handling
// dangling pointers are not cool :)
return new Session(std::forward<io::Socket>(socket), *this);
}
void Bolt::close(Session* session)
{
session->socket.close();
}
}

24
src/bolt/v1/bolt.hpp Normal file
View File

@ -0,0 +1,24 @@
#pragma once
#include "states.hpp"
#include "io/network/socket.hpp"
namespace bolt
{
class Session;
class Bolt
{
friend class Session;
public:
Bolt();
Session* create_session(io::Socket&& socket);
void close(Session* session);
States states;
};
}

View File

@ -0,0 +1,46 @@
#pragma once
#include "utils/types/byte.hpp"
#include "utils/underlying_cast.hpp"
namespace bolt
{
enum class MessageCode : byte
{
Init = 0x01,
AckFailure = 0x0E,
Reset = 0x0F,
Run = 0x10,
DiscardAll = 0x2F,
PullAll = 0x3F,
Record = 0x71,
Success = 0x70,
Ignored = 0x7E,
Failure = 0x7F
};
inline bool operator==(byte value, MessageCode code)
{
return value == underlying_cast(code);
}
inline bool operator==(MessageCode code, byte value)
{
return operator==(value, code);
}
inline bool operator!=(byte value, MessageCode code)
{
return !operator==(value, code);
}
inline bool operator!=(MessageCode code, byte value)
{
return operator!=(value, code);
}
}

View File

@ -0,0 +1,8 @@
#pragma once
namespace bolt
{
}

View File

@ -0,0 +1,54 @@
#pragma once
#include <cstdint>
namespace bolt
{
namespace pack
{
enum Code : uint8_t
{
TinyString = 0x80,
TinyList = 0x90,
TinyMap = 0xA0,
TinyStruct = 0xB0,
Null = 0xC0,
Float64 = 0xC1,
False = 0xC2,
True = 0xC3,
Int8 = 0xC8,
Int16 = 0xC9,
Int32 = 0xCA,
Int64 = 0xCB,
Bytes8 = 0xCC,
Bytes16 = 0xCD,
Bytes32 = 0xCE,
String8 = 0xD0,
String16 = 0xD1,
String32 = 0xD2,
List8 = 0xD4,
List16 = 0xD5,
List32 = 0xD6,
Map8 = 0xD8,
Map16 = 0xD9,
Map32 = 0xDA,
MapStream = 0xDB,
Struct8 = 0xDC,
Struct16 = 0xDD,
EndOfStream = 0xDF,
};
}
}

View File

@ -0,0 +1,21 @@
#pragma once
namespace bolt
{
enum class PackType
{
Null, // denotes absence of a value
Boolean, // denotes a type with two possible values (t/f)
Integer, // 64-bit signed integral number
Float, // 64-bit floating point number
Bytes, // binary data
String, // unicode string
List, // collection of values
Map, // collection of zero or more key/value pairs
Struct, // zero or more packstream values
EndOfStream, // denotes stream value end
Reserved // reserved for future use
};
}

View File

@ -0,0 +1,67 @@
#pragma once
#include <vector>
#include <memory>
#include <thread>
#include <atomic>
#include <cassert>
#include "io/network/server.hpp"
#include "bolt/v1/bolt.hpp"
namespace bolt
{
template <class Worker>
class Server : public io::Server<Server<Worker>>
{
public:
Server(io::Socket&& socket)
: io::Server<Server<Worker>>(std::forward<io::Socket>(socket)) {}
void start(size_t n)
{
workers.reserve(n);
for(size_t i = 0; i < n; ++i)
{
workers.push_back(std::make_shared<Worker>(bolt));
workers.back()->start(alive);
}
while(alive)
{
this->wait_and_process_events();
}
}
void shutdown()
{
alive.store(false);
for(auto& worker : workers)
worker->thread.join();
}
void on_connect()
{
assert(idx < workers.size());
if(UNLIKELY(!workers[idx]->accept(this->socket)))
return;
idx = idx == workers.size() - 1 ? 0 : idx + 1;
}
void on_wait_timeout() {}
private:
Bolt bolt;
std::vector<typename Worker::sptr> workers;
std::atomic<bool> alive {true};
int idx {0};
};
}

View File

@ -0,0 +1,112 @@
#pragma once
#include <iomanip>
#include <cstdio>
#include <atomic>
#include <sstream>
#include <memory>
#include <thread>
#include "io/network/stream_reader.hpp"
#include "bolt/v1/bolt.hpp"
#include "bolt/v1/session.hpp"
#include "logging/default.hpp"
namespace bolt
{
template <class Worker>
class Server;
class Worker : public io::StreamReader<Worker, Session>
{
friend class bolt::Server<Worker>;
public:
using sptr = std::shared_ptr<Worker>;
Worker(Bolt& bolt) : bolt(bolt)
{
logger = logging::log->logger("Network");
}
Session& on_connect(io::Socket&& socket)
{
logger.trace("Accepting connection on socket {}", socket.id());
return *bolt.get().create_session(std::forward<io::Socket>(socket));
}
void on_error(Session&)
{
logger.trace("[on_error] errno = {}", errno);
#ifndef NDEBUG
auto err = io::NetworkError("");
logger.debug("{}", err.what());
#endif
logger.error("Error occured in this session");
}
void on_wait_timeout() {}
Buffer on_alloc(Session&)
{
/* logger.trace("[on_alloc] Allocating {}B", sizeof buf); */
return Buffer { buf, sizeof buf };
}
void on_read(Session& session, Buffer& buf)
{
logger.trace("[on_read] Received {}B", buf.len);
#ifndef NDEBUG
std::stringstream stream;
for(size_t i = 0; i < buf.len; ++i)
stream << fmt::format("{:02X} ", static_cast<byte>(buf.ptr[i]));
logger.trace("[on_read] {}", stream.str());
#endif
try
{
session.execute(reinterpret_cast<const byte*>(buf.ptr), buf.len);
}
catch(const std::exception& e)
{
logger.error("Error occured while executing statement.");
logger.error("{}", e.what());
}
}
void on_close(Session& session)
{
logger.trace("[on_close] Client closed the connection");
session.close();
}
char buf[65536];
protected:
std::reference_wrapper<Bolt> bolt;
Logger logger;
std::thread thread;
void start(std::atomic<bool>& alive)
{
thread = std::thread([&, this]() {
while(alive)
wait_and_process_events();
});
}
};
}

54
src/bolt/v1/session.cpp Normal file
View File

@ -0,0 +1,54 @@
#include "session.hpp"
namespace bolt
{
Session::Session(io::Socket&& socket, Bolt& bolt)
: Stream(std::forward<io::Socket>(socket)), bolt(bolt)
{
logger = logging::log->logger("Session");
// start with a handshake state
state = bolt.states.handshake.get();
}
bool Session::alive() const
{
return state != nullptr;
}
void Session::execute(const byte* data, size_t len)
{
// mark the end of the message
auto end = data + len;
while(true)
{
auto size = end - data;
if(LIKELY(connected))
{
logger.debug("Decoding chunk of size {}", size);
auto finished = decoder.decode(data, size);
if(!finished)
return;
}
else
{
logger.debug("Decoding handshake of size {}", size);
decoder.handshake(data, size);
}
state = state->run(*this);
decoder.reset();
}
}
void Session::close()
{
logger.debug("Closing session");
bolt.close(this);
}
}

42
src/bolt/v1/session.hpp Normal file
View File

@ -0,0 +1,42 @@
#pragma once
#include "io/network/tcp/stream.hpp"
#include "io/network/socket.hpp"
#include "bolt/v1/states/state.hpp"
#include "bolt/v1/transport/bolt_decoder.hpp"
#include "bolt/v1/transport/bolt_encoder.hpp"
#include "bolt.hpp"
#include "logging/default.hpp"
namespace bolt
{
class Session : public io::tcp::Stream<io::Socket>
{
public:
using Decoder = BoltDecoder;
using Encoder = BoltEncoder<io::Socket>;
Session(io::Socket&& socket, Bolt& bolt);
bool alive() const;
void execute(const byte* data, size_t len);
void close();
Bolt& bolt;
Decoder decoder;
Encoder encoder {socket};
bool connected {false};
State* state;
protected:
Logger logger;
};
}

17
src/bolt/v1/states.cpp Normal file
View File

@ -0,0 +1,17 @@
#include "states.hpp"
#include "states/handshake.hpp"
#include "states/init.hpp"
#include "states/executor.hpp"
namespace bolt
{
States::States()
{
handshake = std::make_unique<Handshake>();
init = std::make_unique<Init>();
executor = std::make_unique<Executor>();
}
}

19
src/bolt/v1/states.hpp Normal file
View File

@ -0,0 +1,19 @@
#pragma once
#include "states/state.hpp"
#include "logging/log.hpp"
namespace bolt
{
class States
{
public:
States();
State::uptr handshake;
State::uptr init;
State::uptr executor;
};
}

View File

@ -0,0 +1,32 @@
#include "error.hpp"
#include "bolt/v1/session.hpp"
namespace bolt
{
State* Error::run(Session& session)
{
auto message_type = session.decoder.read_byte();
if(message_type == MessageCode::AckFailure)
{
// todo reset current statement? is it even necessary?
return session.bolt.states.executor.get();
}
else if(message_type == MessageCode::Reset)
{
// todo rollback current transaction
// discard all records waiting to be sent
return session.bolt.states.executor.get();
}
session.encoder.message_ignored();
session.encoder.flush();
return this;
}
}

View File

@ -0,0 +1,14 @@
#pragma once
#include "bolt/v1/states/state.hpp"
namespace bolt
{
class Error : public State
{
public:
State* run(Session& session) override;
};
}

View File

@ -0,0 +1,95 @@
#include "executor.hpp"
#include "bolt/v1/messaging/codes.hpp"
namespace bolt
{
Executor::Executor() : logger(logging::log->logger("Executor")) {}
State* Executor::run(Session& session)
{
// just read one byte that represents the struct type, we can skip the
// information contained in this byte
session.decoder.read_byte();
auto message_type = session.decoder.read_byte();
if(message_type == MessageCode::Run)
{
Query q;
q.statement = session.decoder.read_string();
this->run(session, q);
}
else if(message_type == MessageCode::PullAll)
{
pull_all(session);
}
else if(message_type == MessageCode::DiscardAll)
{
discard_all(session);
}
else if(message_type == MessageCode::Reset)
{
// todo rollback current transaction
// discard all records waiting to be sent
return this;
}
else
{
logger.error("Unrecognized message recieved");
logger.debug("Invalid message type 0x{:02X}", message_type);
return session.bolt.states.error.get();
}
return this;
}
void Executor::run(Session& session, Query& query)
{
logger.trace("[Run] '{}'", query.statement);
session.encoder.message_success();
session.encoder.write_map_header(1);
session.encoder.write_string("fields");
session.encoder.write_list_header(1);
session.encoder.write_string("name");
session.encoder.flush();
}
void Executor::pull_all(Session& session)
{
logger.trace("[PullAll]");
session.encoder.message_record();
session.encoder.write_list_header(1);
session.encoder.write_string("buda");
session.encoder.message_record();
session.encoder.write_list_header(1);
session.encoder.write_string("domko");
session.encoder.message_record();
session.encoder.write_list_header(1);
session.encoder.write_string("max");
session.encoder.message_success_empty();
session.encoder.flush();
}
void Executor::discard_all(Session& session)
{
logger.trace("[DiscardAll]");
session.encoder.message_success();
session.encoder.flush();
}
}

View File

@ -0,0 +1,41 @@
#pragma once
#include "bolt/v1/states/state.hpp"
#include "bolt/v1/session.hpp"
namespace bolt
{
class Executor : public State
{
struct Query
{
std::string statement;
};
public:
Executor();
State* run(Session& session) override final;
protected:
Logger logger;
/* Execute an incoming query
*
*/
void run(Session& session, Query& query);
/* Send all remaining results to the client
*
*/
void pull_all(Session& session);
/* Discard all remaining results
*
*/
void discard_all(Session& session);
};
}

View File

@ -0,0 +1,27 @@
#include "handshake.hpp"
#include "bolt/v1/session.hpp"
namespace bolt
{
static constexpr uint32_t preamble = 0x6060B017;
static constexpr byte protocol[4] = {0x00, 0x00, 0x00, 0x01};
State* Handshake::run(Session& session)
{
if(UNLIKELY(session.decoder.read_uint32() != preamble))
return nullptr;
// TODO so far we only support version 1 of the protocol so it doesn't
// make sense to check which version the client prefers
// this will change in the future
session.connected = true;
session.socket.write(protocol, sizeof protocol);
return session.bolt.states.init.get();
}
}

View File

@ -0,0 +1,14 @@
#pragma once
#include "bolt/v1/states/state.hpp"
namespace bolt
{
class Handshake : public State
{
public:
State* run(Session& session) override;
};
}

View File

@ -0,0 +1,54 @@
#include "init.hpp"
#include "bolt/v1/session.hpp"
#include "bolt/v1/messaging/codes.hpp"
#include "utils/likely.hpp"
namespace bolt
{
Init::Init() : MessageParser<Init>(logging::log->logger("Init")) {}
State* Init::parse(Session& session, Message& message)
{
auto struct_type = session.decoder.read_byte();
if(UNLIKELY(struct_type != 0xB2))
{
logger.debug("{}", struct_type);
logger.debug("Expected struct marker 0xB2 instead of 0x{:02X}",
(unsigned)struct_type);
return nullptr;
}
auto message_type = session.decoder.read_byte();
if(UNLIKELY(message_type != MessageCode::Init))
{
logger.debug("Expected Init (0x01) instead of (0x{:02X})",
(unsigned)message_type);
return nullptr;
}
message.client_name = session.decoder.read_string();
// TODO read authentication tokens
return this;
}
State* Init::execute(Session& session, Message& message)
{
logger.debug("Client connected '{}'", message.client_name);
session.encoder.message_success_empty();
session.encoder.flush();
return session.bolt.states.executor.get();
}
}

View File

@ -0,0 +1,22 @@
#pragma once
#include "bolt/v1/states/message_parser.hpp"
namespace bolt
{
class Init : public MessageParser<Init>
{
public:
struct Message
{
std::string client_name;
};
Init();
State* parse(Session& session, Message& message);
State* execute(Session& session, Message& message);
};
}

View File

@ -0,0 +1,37 @@
#pragma once
#include "state.hpp"
#include "utils/crtp.hpp"
#include "bolt/v1/session.hpp"
namespace bolt
{
template <class Derived>
class MessageParser : public State, public Crtp<Derived>
{
public:
MessageParser(Logger&& logger)
: logger(std::forward<Logger>(logger)) {}
State* run(Session& session) override final
{
typename Derived::Message message;
logger.debug("Parsing message");
auto next = this->derived().parse(session, message);
// return next state if parsing was unsuccessful (i.e. error state)
if(next != &this->derived())
return next;
logger.debug("Executing state");
return this->derived().execute(session, message);
}
protected:
Logger logger;
};
}

View File

@ -0,0 +1,23 @@
#pragma once
#include <cstdlib>
#include <cstdint>
#include <memory>
namespace bolt
{
class Session;
class State
{
public:
using uptr = std::unique_ptr<State>;
State() = default;
virtual ~State() = default;
virtual State* run(Session& session) = 0;
};
}

View File

@ -0,0 +1,158 @@
#include "bolt_decoder.hpp"
#include <iostream>
#include "utils/bswap.hpp"
#include "logging/default.hpp"
#include "bolt/v1/packing/codes.hpp"
namespace bolt
{
void BoltDecoder::handshake(const byte*& data, size_t len)
{
buffer.write(data, len);
data += len;
}
bool BoltDecoder::decode(const byte*& data, size_t len)
{
return decoder(data, len);
}
bool BoltDecoder::empty() const
{
return pos == buffer.size();
}
void BoltDecoder::reset()
{
buffer.clear();
pos = 0;
}
byte BoltDecoder::peek() const
{
return buffer[pos];
}
byte BoltDecoder::read_byte()
{
return buffer[pos++];
}
void BoltDecoder::read_bytes(void* dest, size_t n)
{
std::memcpy(dest, buffer.data() + pos, n);
pos += n;
}
template <class T>
T parse(const void* data)
{
// reinterpret bytes as the target value
auto value = reinterpret_cast<const T*>(data);
// swap values to little endian
return bswap(*value);
}
template <class T>
T parse(Buffer& buffer, size_t& pos)
{
// get a pointer to the data we're converting
auto ptr = buffer.data() + pos;
// skip sizeof bytes that we're going to read
pos += sizeof(T);
// read and convert the value
return parse<T>(ptr);
}
int16_t BoltDecoder::read_int16()
{
return parse<int16_t>(buffer, pos);
}
uint16_t BoltDecoder::read_uint16()
{
return parse<uint16_t>(buffer, pos);
}
int32_t BoltDecoder::read_int32()
{
return parse<int32_t>(buffer, pos);
}
uint32_t BoltDecoder::read_uint32()
{
return parse<uint32_t>(buffer, pos);
}
int64_t BoltDecoder::read_int64()
{
return parse<int64_t>(buffer, pos);
}
uint64_t BoltDecoder::read_uint64()
{
return parse<uint64_t>(buffer, pos);
}
double BoltDecoder::read_float64()
{
auto v = parse<int64_t>(buffer, pos);
return *reinterpret_cast<const double *>(&v);
}
std::string BoltDecoder::read_string()
{
auto marker = read_byte();
std::string res;
uint32_t size;
// if the first 4 bits equal to 1000 (0x8), this is a tiny string
if((marker & 0xF0) == pack::TinyString)
{
// size is stored in the lower 4 bits of the marker byte
size = marker & 0x0F;
}
// if the marker is 0xD0, size is an 8-bit unsigned integer
if(marker == pack::String8)
{
size = read_byte();
}
// if the marker is 0xD1, size is a 16-bit big-endian unsigned integer
else if(marker == pack::String16)
{
size = read_uint16();
}
// if the marker is 0xD2, size is a 32-bit big-endian unsigned integer
else if(marker == pack::String32)
{
size = read_uint32();
}
else
{
// TODO error?
return res;
}
if(size == 0)
return res;
res.append(reinterpret_cast<const char*>(raw()), size);
pos += size;
return res;
}
const byte* BoltDecoder::raw() const
{
return buffer.data() + pos;
}
}

View File

@ -0,0 +1,45 @@
#pragma once
#include "buffer.hpp"
#include "chunked_decoder.hpp"
#include "utils/types/byte.hpp"
namespace bolt
{
class BoltDecoder
{
public:
void handshake(const byte*& data, size_t len);
bool decode(const byte*& data, size_t len);
bool empty() const;
void reset();
byte peek() const;
byte read_byte();
void read_bytes(void* dest, size_t n);
int16_t read_int16();
uint16_t read_uint16();
int32_t read_int32();
uint32_t read_uint32();
int64_t read_int64();
uint64_t read_uint64();
double read_float64();
std::string read_string();
private:
Buffer buffer;
ChunkedDecoder<Buffer> decoder {buffer};
size_t pos {0};
const byte* raw() const;
};
}

View File

@ -0,0 +1 @@
#include "bolt_encoder.hpp"

View File

@ -0,0 +1,243 @@
#pragma once
#include "chunked_encoder.hpp"
#include "socket_stream.hpp"
#include "bolt/v1/packing/codes.hpp"
#include "bolt/v1/messaging/codes.hpp"
#include "utils/types/byte.hpp"
#include "utils/bswap.hpp"
namespace bolt
{
template <class Socket>
class BoltEncoder
{
static constexpr int64_t plus_2_to_the_31 = 2147483648L;
static constexpr int64_t plus_2_to_the_15 = 32768L;
static constexpr int64_t plus_2_to_the_7 = 128L;
static constexpr int64_t minus_2_to_the_4 = -16L;
static constexpr int64_t minus_2_to_the_7 = -128L;
static constexpr int64_t minus_2_to_the_15 = -32768L;
static constexpr int64_t minus_2_to_the_31 = -2147483648L;
public:
BoltEncoder(Socket& socket) : stream(socket) {}
void flush()
{
encoder.flush();
}
void write(byte value)
{
encoder.write(value);
}
void write(const byte* values, size_t n)
{
encoder.write(values, n);
}
void write_null()
{
encoder.write(pack::Null);
}
void write(bool value)
{
if(value) write_true(); else write_false();
}
void write_true()
{
encoder.write(pack::True);
}
void write_false()
{
encoder.write(pack::False);
}
template <class T>
void write_value(T value)
{
value = bswap(value);
encoder.write(reinterpret_cast<const byte*>(&value), sizeof(value));
}
void write_integer(int64_t value)
{
if(value >= minus_2_to_the_4 && value < plus_2_to_the_7)
{
write(static_cast<byte>(value));
}
else if(value >= minus_2_to_the_7 && value < minus_2_to_the_4)
{
write(pack::Int8);
write(static_cast<byte>(value));
}
else if(value >= minus_2_to_the_15 && value < plus_2_to_the_15)
{
write(pack::Int16);
write_value(static_cast<int16_t>(value));
}
else if(value >= minus_2_to_the_31 && value < plus_2_to_the_31)
{
write(pack::Int32);
write_value(static_cast<int32_t>(value));
}
else
{
write(pack::Int64);
write_value(value);
}
}
void write(double value)
{
write(pack::Float64);
write_value(*reinterpret_cast<const int64_t*>(&value));
}
void write_map_header(size_t size)
{
if(size < 0x10)
{
write(static_cast<byte>(pack::TinyMap | size));
}
else if(size <= 0xFF)
{
write(pack::Map8);
write(static_cast<byte>(size));
}
else if(size <= 0xFFFF)
{
write(pack::Map16);
write_value<uint16_t>(size);
}
else
{
write(pack::Map32);
write_value<uint32_t>(size);
}
}
void write_empty_map()
{
write(pack::TinyMap);
}
void write_list_header(size_t size)
{
if(size < 0x10)
{
write(static_cast<byte>(pack::TinyList | size));
}
else if(size <= 0xFF)
{
write(pack::List8);
write(static_cast<byte>(size));
}
else if(size <= 0xFFFF)
{
write(pack::List16);
write_value<uint16_t>(size);
}
else
{
write(pack::List32);
write_value<uint32_t>(size);
}
}
void write_empty_list()
{
write(pack::TinyList);
}
void write_string_header(size_t size)
{
if(size < 0x10)
{
write(static_cast<byte>(pack::TinyString | size));
}
else if(size <= 0xFF)
{
write(pack::String8);
write(static_cast<byte>(size));
}
else if(size <= 0xFFFF)
{
write(pack::String16);
write_value<uint16_t>(size);
}
else
{
write(pack::String32);
write_value<uint32_t>(size);
}
}
void write_string(const std::string& str)
{
write_string(str.c_str(), str.size());
}
void write_string(const char* str, size_t len)
{
write_string_header(len);
write(reinterpret_cast<const byte*>(str), len);
}
void write_struct_header(size_t size)
{
if(size < 0x10)
{
write(static_cast<byte>(pack::TinyStruct | size));
}
else if(size <= 0xFF)
{
write(pack::Struct8);
write(static_cast<byte>(size));
}
else
{
write(pack::Struct16);
write_value<uint16_t>(size);
}
}
void message_success()
{
write_struct_header(1);
write(underlying_cast(MessageCode::Success));
}
void message_success_empty()
{
message_success();
write_empty_map();
}
void message_record()
{
write_struct_header(1);
write(underlying_cast(MessageCode::Record));
}
void message_record_empty()
{
message_record();
write_empty_list();
}
private:
SocketStream stream;
ChunkedEncoder<SocketStream> encoder {stream};
};
}

View File

@ -0,0 +1,16 @@
#include "buffer.hpp"
namespace bolt
{
void Buffer::write(const byte* data, size_t len)
{
buffer.insert(buffer.end(), data, data + len);
}
void Buffer::clear()
{
buffer.clear();
}
}

View File

@ -0,0 +1,39 @@
#pragma once
#include <cstdint>
#include <cstdlib>
#include <vector>
namespace bolt
{
class Buffer
{
public:
using byte = uint8_t;
void write(const byte* data, size_t len);
void clear();
size_t size() const
{
return buffer.size();
}
byte operator[](size_t idx) const
{
return buffer[idx];
}
const byte* data() const
{
return buffer.data();
}
private:
std::vector<byte> buffer;
};
}

View File

@ -0,0 +1,77 @@
#pragma once
#include <cstring>
#include <functional>
#include <cassert>
#include "utils/exceptions/basic_exception.hpp"
#include "utils/likely.hpp"
#include "logging/default.hpp"
namespace bolt
{
template <class Stream>
class ChunkedDecoder
{
public:
class DecoderError : public BasicException
{
public:
using BasicException::BasicException;
};
using byte = unsigned char;
ChunkedDecoder(Stream& stream) : stream(stream) {}
/* Decode chunked data
*
* Chunk format looks like:
*
* |Header| Data ||Header| Data || ... || End |
* | 2B | size bytes || 2B | size bytes || ... ||00 00|
*/
bool decode(const byte*& chunk, size_t n)
{
while(n > 0)
{
// get size from first two bytes in the chunk
auto size = get_size(chunk);
if(UNLIKELY(size + 2 > n))
throw DecoderError("Chunk size larger than available data.");
// advance chunk to pass those two bytes
chunk += 2;
n -= 2;
// if chunk size is 0, we're done!
if(size == 0)
return true;
stream.get().write(chunk, size);
chunk += size;
n -= size;
}
return false;
}
bool operator()(const byte*& chunk, size_t n)
{
return decode(chunk, n);
}
private:
std::reference_wrapper<Stream> stream;
size_t get_size(const byte* chunk)
{
return size_t(chunk[0]) << 8 | chunk[1];
}
};
}

View File

@ -0,0 +1,92 @@
#pragma once
#include <array>
#include <cstring>
#include <functional>
#include "utils/likely.hpp"
namespace bolt
{
template <class Stream>
class ChunkedEncoder
{
static constexpr size_t N = 65535;
static constexpr size_t C = N + 2 /* end mark */;
public:
using byte = unsigned char;
ChunkedEncoder(Stream& stream) : stream(stream) {}
static constexpr size_t chunk_size = N - 2;
void write(byte value)
{
if(UNLIKELY(pos == N))
end_chunk();
chunk[pos++] = value;
}
void write(const byte* values, size_t n)
{
while(n > 0)
{
auto size = n < N - pos ? n : N - pos;
std::memcpy(chunk.data() + pos, values, size);
pos += size;
n -= size;
if(pos == N)
end_chunk();
}
}
void flush()
{
write_chunk_header();
// write two zeros to signal message end
chunk[pos++] = 0x00;
chunk[pos++] = 0x00;
flush_stream();
}
private:
std::reference_wrapper<Stream> stream;
std::array<byte, C> chunk;
size_t pos {2};
void end_chunk()
{
write_chunk_header();
flush();
}
void write_chunk_header()
{
// write the size of the chunk
uint16_t size = pos - 2;
// write the higher byte
chunk[0] = size >> 8;
// write the lower byte
chunk[1] = size & 0xFF;
}
void flush_stream()
{
// write chunk to the stream
stream.get().write(chunk.data(), pos);
pos = 2;
}
};
}

View File

@ -0,0 +1,38 @@
#pragma once
#include <cstdint>
#include <vector>
#include <cstdio>
#include "io/network/socket.hpp"
#include "stream_error.hpp"
namespace bolt
{
class SocketStream
{
public:
using byte = uint8_t;
SocketStream(io::Socket& socket) : socket(socket) {}
void write(const byte* data, size_t n)
{
while(n > 0)
{
auto written = socket.get().write(data, n);
if(UNLIKELY(written == -1))
throw StreamError("Can't write to stream");
n -= written;
data += written;
}
}
private:
std::reference_wrapper<io::Socket> socket;
};
}

View File

@ -0,0 +1,14 @@
#pragma once
#include "utils/exceptions/basic_exception.hpp"
namespace bolt
{
class StreamError : BasicException
{
public:
using BasicException::BasicException;
};
}

View File

@ -11,6 +11,8 @@ public:
class Printer
{
public:
friend class Entry;
Printer(std::ostream& stream, const std::string& header)
: stream(stream)
{
@ -49,10 +51,10 @@ public:
}
template <class T>
friend Entry& operator<<(Entry& entry, const T& item)
Entry& operator<<(const T& item)
{
entry.printer.stream << item;
return entry;
printer.stream << item;
return *this;
}
private:

1
src/cypher/lexertl Submodule

@ -0,0 +1 @@
Subproject commit 7d4d36a357027df0e817453cc9cf948f71047ca9

8
src/dbms/dbms.hpp Normal file
View File

@ -0,0 +1,8 @@
#pragma once
class Dbms
{
public:
};

9
src/dbms/server/bolt.hpp Normal file
View File

@ -0,0 +1,9 @@
#pragma once
class BoltServer
{
public:
BoltServer() = default;
};

68
src/examples/bolt.cpp Normal file
View File

@ -0,0 +1,68 @@
#include <iostream>
#include <signal.h>
#include "bolt/v1/server/server.hpp"
#include "bolt/v1/server/worker.hpp"
#include "io/network/socket.hpp"
#include "logging/default.hpp"
#include "logging/streams/stdout.hpp"
static bolt::Server<bolt::Worker>* serverptr;
Logger logger;
void sigint_handler(int s)
{
auto signal = s == SIGINT ? "SIGINT" : "SIGABRT";
logger.info("Recieved signal {}", signal);
logger.info("Shutting down...");
std::exit(EXIT_SUCCESS);
}
static constexpr const char* interface = "0.0.0.0";
static constexpr const char* port = "7687";
int main(void)
{
logging::init_sync();
logging::log->pipe(std::make_unique<Stdout>());
logger = logging::log->logger("Main");
signal(SIGINT, sigint_handler);
signal(SIGABRT, sigint_handler);
io::Socket socket;
try
{
socket = io::Socket::bind(interface, port);
}
catch(io::NetworkError e)
{
logger.error("Cannot bind to socket on {} at {}", interface, port);
logger.error("{}", e.what());
std::exit(EXIT_FAILURE);
}
socket.set_non_blocking();
socket.listen(1024);
logger.info("Listening on {} at {}", interface, port);
bolt::Server<bolt::Worker> server(std::move(socket));
serverptr = &server;
constexpr size_t N = 1;
logger.info("Starting {} workers", N);
server.start(N);
logger.info("Shutting down...");
return EXIT_SUCCESS;
}

View File

@ -0,0 +1,3 @@
#!/bin/bash
clang++ -g -rdynamic ../bolt/v1/states/executor.cpp ../logging/streams/stdout.cpp ../logging/levels.cpp ../logging/logs/sync_log.cpp ../logging/logs/async_log.cpp ../logging/default.cpp ../logging/log.cpp ../bolt/v1/bolt.cpp ../bolt/v1/states/init.cpp ../bolt/v1/states.cpp ../bolt/v1/states/handshake.cpp ../bolt/v1/transport/bolt_decoder.cpp ../bolt/v1/transport/buffer.cpp ../bolt/v1/session.cpp bolt.cpp ../io/network/tls.cpp -o bolt -std=c++14 -I ../ -I ../../libs/fmt/ -pthread -lcppformat -lssl -lcrypto

81
src/examples/endinan.cpp Normal file
View File

@ -0,0 +1,81 @@
#include <iostream>
#include <cstdint>
#include <byteswap.h>
char b[8] = {1, 2, 3, 4, 0, 0, 0, 1};
int64_t safe_int64(const char* b)
{
return int64_t(b[0]) << 56 | int64_t(b[1]) << 48
| int64_t(b[2]) << 40 | int64_t(b[3]) << 32
| int64_t(b[4]) << 24 | int64_t(b[5]) << 16
| int64_t(b[6]) << 8 | int64_t(b[7]);
}
int64_t unsafe_int64(const char* b)
{
auto i = reinterpret_cast<const int64_t*>(b);
return __bswap_64(*i);
}
int32_t safe_int32(const char* b)
{
return b[0] << 24 | b[1] << 16 | b[2] << 8 | b[3];
}
int32_t unsafe_int32(const char* b)
{
auto i = reinterpret_cast<const int32_t*>(b);
return __bswap_32(*i);
}
[[clang::optnone]]
void test(uint64_t n)
{
for(uint64_t i = 0; i < n; ++i)
unsafe_int64(b);
}
uint8_t f[8] = {0x3F, 0xF1, 0x99, 0x99, 0x99, 0x99, 0x99, 0x9A};
double ff = 1.1;
double get_double(const uint8_t* b)
{
auto v = __bswap_64(*reinterpret_cast<const uint64_t*>(b));
return *reinterpret_cast<const double*>(&v);
}
void print_hex(const char* buf, size_t n)
{
for (size_t i = 0; i < n; ++i)
printf("%02X ", (unsigned char)buf[i]);
}
void print_hex(const uint8_t* buf, size_t n)
{
print_hex((const char*)buf, n);
}
int main(void)
{
auto dd = get_double(f);
print_hex(f, 8);
std::cout << std::endl;
print_hex((const uint8_t*)(&ff), 8);
std::cout << std::endl;
print_hex((const uint8_t*)(&dd), 8);
std::cout << dd << std::endl;
/* std::cout << safe_int64(b) << std::endl; */
/* std::cout << unsafe_int64(b) << std::endl; */
/* test(1000000000ull); */
return 0;
}

View File

@ -1,17 +0,0 @@
CXX=clang++
CFLAGS=-std=c++1y -O2 -Wall -Wno-unknown-pragmas
LDFLAGS=-lhttp_parser -pthread
INC=-I../../
SOURCES=test.cpp
EXECUTABLE=test
all: $(EXECUTABLE)
$(EXECUTABLE): $(SOURCES)
$(CXX) $(CFLAGS) $(SOURCES) -o $(EXECUTABLE) $(INC) $(LDFLAGS)
.PHONY:
clean:
rm -f test
rm -f *.o

View File

@ -1,281 +0,0 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <errno.h>
char buf[512];
#define MAXEVENTS 64
static int
make_socket_non_blocking (int sfd)
{
int flags, s;
flags = fcntl (sfd, F_GETFL, 0);
if (flags == -1)
{
perror ("fcntl");
return -1;
}
flags |= O_NONBLOCK;
s = fcntl (sfd, F_SETFL, flags);
if (s == -1)
{
perror ("fcntl");
return -1;
}
return 0;
}
static int
create_and_bind (char *port)
{
struct addrinfo hints;
struct addrinfo *result, *rp;
int s, sfd;
memset (&hints, 0, sizeof (struct addrinfo));
hints.ai_family = AF_UNSPEC; /* Return IPv4 and IPv6 choices */
hints.ai_socktype = SOCK_STREAM; /* We want a TCP socket */
hints.ai_flags = AI_PASSIVE; /* All interfaces */
s = getaddrinfo (NULL, port, &hints, &result);
if (s != 0)
{
fprintf (stderr, "getaddrinfo: %s\n", gai_strerror (s));
return -1;
}
for (rp = result; rp != NULL; rp = rp->ai_next)
{
sfd = socket (rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (sfd == -1)
continue;
s = bind (sfd, rp->ai_addr, rp->ai_addrlen);
if (s == 0)
{
/* We managed to bind successfully! */
break;
}
close (sfd);
}
if (rp == NULL)
{
fprintf (stderr, "Could not bind\n");
return -1;
}
freeaddrinfo (result);
return sfd;
}
int
main (int argc, char *argv[])
{
const char* response = "HTTP/1.1 200 OK\r\nContent-Length:0\r\nConnection:Keep-Alive\r\n\r\n";
size_t len = strlen(response);
int sfd, s;
int efd;
struct epoll_event event;
struct epoll_event *events;
if (argc != 2)
{
fprintf (stderr, "Usage: %s [port]\n", argv[0]);
exit (EXIT_FAILURE);
}
sfd = create_and_bind (argv[1]);
if (sfd == -1)
abort ();
s = make_socket_non_blocking (sfd);
if (s == -1)
abort ();
s = listen (sfd, SOMAXCONN);
if (s == -1)
{
perror ("listen");
abort ();
}
efd = epoll_create1 (0);
if (efd == -1)
{
perror ("epoll_create");
abort ();
}
event.data.fd = sfd;
event.events = EPOLLIN | EPOLLET;
s = epoll_ctl (efd, EPOLL_CTL_ADD, sfd, &event);
if (s == -1)
{
perror ("epoll_ctl");
abort ();
}
/* Buffer where events are returned */
events = calloc (MAXEVENTS, sizeof event);
/* The event loop */
while (1)
{
int n, i;
n = epoll_wait (efd, events, MAXEVENTS, -1);
for (i = 0; i < n; i++)
{
if ((events[i].events & EPOLLERR) ||
(events[i].events & EPOLLHUP) ||
(!(events[i].events & EPOLLIN)))
{
/* An error has occured on this fd, or the socket is not
ready for reading (why were we notified then?) */
fprintf (stderr, "epoll error\n");
close (events[i].data.fd);
continue;
}
else if (sfd == events[i].data.fd)
{
/* We have a notification on the listening socket, which
means one or more incoming connections. */
while (1)
{
struct sockaddr in_addr;
socklen_t in_len;
int infd;
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
in_len = sizeof in_addr;
infd = accept (sfd, &in_addr, &in_len);
if (infd == -1)
{
if ((errno == EAGAIN) ||
(errno == EWOULDBLOCK))
{
/* We have processed all incoming
connections. */
break;
}
else
{
perror ("accept");
break;
}
}
s = getnameinfo (&in_addr, in_len,
hbuf, sizeof hbuf,
sbuf, sizeof sbuf,
NI_NUMERICHOST | NI_NUMERICSERV);
if (s == 0)
{
printf("Accepted connection on descriptor %d "
"(host=%s, port=%s)\n", infd, hbuf, sbuf);
}
/* Make the incoming socket non-blocking and add it to the
list of fds to monitor. */
s = make_socket_non_blocking (infd);
if (s == -1)
abort ();
event.data.fd = infd;
event.events = EPOLLIN | EPOLLET;
s = epoll_ctl (efd, EPOLL_CTL_ADD, infd, &event);
if (s == -1)
{
perror ("epoll_ctl");
abort ();
}
}
continue;
}
else
{
/* We have data on the fd waiting to be read. Read and
display it. We must read whatever data is available
completely, as we are running in edge-triggered mode
and won't get a notification again for the same
data. */
int done = 0;
while (1)
{
ssize_t count;
count = read (events[i].data.fd, buf, sizeof buf);
if (count == -1)
{
/* If errno == EAGAIN, that means we have read all
data. So go back to the main loop. */
if (errno != EAGAIN)
{
perror ("read");
done = 1;
}
break;
}
else if (count == 0)
{
/* End of file. The remote has closed the
connection. */
done = 1;
break;
}
size_t sum = 0;
char* resp = (char*)response;
while(sum < len)
{
int k = write(event.data.fd, resp, len - sum);
sum += k;
resp += k;
}
/* Write the buffer to standard output */
if (s == -1)
{
perror ("write");
abort ();
}
}
if (done)
{
printf ("Closed connection on descriptor %d\n",
events[i].data.fd);
/* Closing the descriptor will make epoll remove it
from the set of descriptors which are monitored. */
close (events[i].data.fd);
}
}
}
}
free (events);
close (sfd);
return EXIT_SUCCESS;
}

View File

@ -9,6 +9,12 @@
namespace io
{
class EpollError : BasicException
{
public:
using BasicException::BasicException;
};
class Epoll
{
public:
@ -17,14 +23,18 @@ public:
Epoll(int flags)
{
epoll_fd = epoll_create1(flags);
if(UNLIKELY(epoll_fd == -1))
throw EpollError("Can't create epoll file descriptor");
}
void add(Socket& socket, Event* event)
template <class Stream>
void add(Stream& stream, Event* event)
{
auto status = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, socket, event);
auto status = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, stream, event);
if(UNLIKELY(status))
throw NetworkError("Can't add an event to epoll listener.");
throw EpollError("Can't add an event to epoll listener.");
}
int wait(Event* events, int max_events, int timeout)

View File

@ -1,14 +1,12 @@
#pragma once
#include "socket.hpp"
#include "epoll.hpp"
#include "utils/crtp.hpp"
namespace io
{
template <class Derived, class Stream,
size_t max_events = 64, int wait_timeout = -1>
template <class Derived, size_t max_events = 64, int wait_timeout = -1>
class EventListener : public Crtp<Derived>
{
public:
@ -16,32 +14,28 @@ public:
EventListener(uint32_t flags = 0) : listener(flags) {}
void add(Stream& stream)
{
// add the stream to the event listener
listener.add(stream.socket, &stream.event);
}
void wait_and_process_events()
{
// TODO hardcoded a wait timeout because of thread joining
// when you shutdown the server. This should be wait_timeout of the
// template parameter and should almost never change from that.
// thread joining should be resolved using a signal that interrupts
// the system call.
// waits for an event/multiple events and returns a maximum of
// max_events and stores them in the events array. it waits for
// wait_timeout milliseconds. if wait_timeout is achieved, returns 0
auto n = listener.wait(events, max_events, wait_timeout);
LOG_DEBUG("received " << n << " events");
auto n = listener.wait(events, max_events, 200);
// go through all events and process them in order
for(int i = 0; i < n; ++i)
{
auto& event = events[i];
auto& stream = *reinterpret_cast<Stream*>(event.data.ptr);
// a stream was closed
// hangup event
if(UNLIKELY(event.events & EPOLLRDHUP))
{
LOG_DEBUG("EPOLLRDHUP event recieved on socket " << stream.id());
this->derived().on_close(stream);
this->derived().on_close_event(event);
continue;
}
@ -49,18 +43,12 @@ public:
if(UNLIKELY(!(event.events & EPOLLIN) ||
event.events & (EPOLLHUP | EPOLLERR)))
{
LOG_DEBUG(">> EPOLL ERR");
LOG_DEBUG("EPOLLIN" << (event.events & EPOLLIN));
LOG_DEBUG("EPOLLHUP" << (event.events & EPOLLHUP));
LOG_DEBUG("EPOLLERR" << (event.events & EPOLLERR));
this->derived().on_error(stream);
this->derived().on_error_event(event);
continue;
}
LOG_DEBUG("signalling that data exists on socket " << stream.id());
// we have some data waiting to be read
this->derived().on_data(stream);
this->derived().on_data_event(event);
}
// this will be optimized out :D

View File

@ -2,9 +2,15 @@
#include <stdexcept>
class NetworkError : public std::runtime_error
#include "utils/exceptions/basic_exception.hpp"
namespace io
{
class NetworkError : public BasicException
{
public:
using std::runtime_error::runtime_error;
using BasicException::BasicException;
};
}

View File

@ -1,27 +0,0 @@
#!/bin/bash
INTERVAL="1" # update interval in seconds
if [ -z "$1" ]; then
echo
echo usage: $0 [network-interface]
echo
echo e.g. $0 eth0
echo
echo shows packets-per-second
exit
fi
IF=$1
while true
do
R1=`cat /sys/class/net/$1/statistics/rx_packets`
T1=`cat /sys/class/net/$1/statistics/tx_packets`
sleep $INTERVAL
R2=`cat /sys/class/net/$1/statistics/rx_packets`
T2=`cat /sys/class/net/$1/statistics/tx_packets`
TXPPS=`expr $T2 - $T1`
RXPPS=`expr $R2 - $R1`
echo "TX $1: $TXPPS pkts/s RX $1: $RXPPS pkts/s"
done

View File

@ -0,0 +1,93 @@
#pragma once
#include "tls.hpp"
#include "io/network/socket.hpp"
#include "tls_error.hpp"
#include "utils/types/byte.hpp"
#include <iostream>
namespace io
{
class SecureSocket
{
public:
SecureSocket(Socket&& socket, const Tls::Context& tls)
: socket(std::forward<Socket>(socket))
{
ssl = SSL_new(tls);
SSL_set_fd(ssl, this->socket);
SSL_set_accept_state(ssl);
if(SSL_accept(ssl) <= 0)
ERR_print_errors_fp(stderr);
}
SecureSocket(SecureSocket&& other)
{
*this = std::forward<SecureSocket>(other);
}
SecureSocket& operator=(SecureSocket&& other)
{
socket = std::move(other.socket);
ssl = other.ssl;
other.ssl = nullptr;
return *this;
}
~SecureSocket()
{
if(ssl == nullptr)
return;
std::cout << "DELETING SSL" << std::endl;
SSL_free(ssl);
}
int error(int status)
{
return SSL_get_error(ssl, status);
}
int write(const std::string& str)
{
return write(str.c_str(), str.size());
}
int write(const byte* data, size_t len)
{
return SSL_write(ssl, data, len);
}
int write(const char* data, size_t len)
{
return SSL_write(ssl, data, len);
}
int read(char* buffer, size_t len)
{
return SSL_read(ssl, buffer, len);
}
operator int()
{
return socket;
}
operator Socket&()
{
return socket;
}
private:
Socket socket;
SSL* ssl {nullptr};
};
}

View File

@ -0,0 +1,61 @@
#pragma once
#include <openssl/ssl.h>
#include "stream_reader.hpp"
#include "logging/default.hpp"
namespace io
{
using namespace memory::literals;
template <class Derived, class Stream>
class SecureStreamReader : public StreamReader<Derived, Stream>
{
public:
struct Buffer
{
char* ptr;
size_t len;
};
SecureStreamReader(uint32_t flags = 0)
: StreamReader<Derived, Stream>(flags) {}
void on_data(Stream& stream)
{
while(true)
{
// allocate the buffer to fill the data
auto buf = this->derived().on_alloc(stream);
// read from the buffer at most buf.len bytes
auto len = stream.socket.read(buf.ptr, buf.len);
if(LIKELY(len > 0))
{
buf.len = len;
return this->derived().on_read(stream, buf);
}
auto err = stream.socket.error(len);
// the socket is not ready for reading yet
if(err == SSL_ERROR_WANT_READ ||
err == SSL_ERROR_WANT_WRITE ||
err == SSL_ERROR_WANT_X509_LOOKUP)
{
return;
}
// the socket notified a close event
if(err == SSL_ERROR_ZERO_RETURN)
return stream.close();
// some other error occurred, check errno
return this->derived().on_error(stream);
}
}
};
}

View File

@ -5,33 +5,39 @@
namespace io
{
template <class Derived, class Stream>
class Server : public StreamReader<Derived, Stream>
template <class Derived>
class Server : public EventListener<Derived>
{
public:
bool accept(Socket& socket)
Server(Socket&& socket) : socket(std::forward<Socket>(socket))
{
// accept a connection from a socket
auto s = socket.accept(nullptr, nullptr);
LOG_DEBUG("socket " << s.id() << " accepted");
event.data.fd = this->socket;
event.events = EPOLLIN | EPOLLET;
if(!s.is_open())
return false;
// make the recieved socket non blocking
s.set_non_blocking();
auto& stream = this->derived().on_connect(std::move(s));
// we want to listen to an incoming event which is edge triggered and
// we also want to listen on the hangup event
stream.event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
// add the connection to the event listener
this->add(stream);
return true;
this->listener.add(this->socket, &event);
}
void on_close_event(Epoll::Event& event)
{
::close(event.data.fd);
}
void on_error_event(Epoll::Event& event)
{
::close(event.data.fd);
}
void on_data_event(Epoll::Event& event)
{
if(UNLIKELY(socket != event.data.fd))
return;
this->derived().on_connect();
}
protected:
Epoll::Event event;
Socket socket;
};
}

View File

@ -16,17 +16,24 @@
#include "addrinfo.hpp"
#include "utils/likely.hpp"
#include "logging/default.hpp"
#include <iostream>
namespace io
{
class Socket
{
protected:
Socket(int family, int socket_type, int protocol)
{
socket = ::socket(family, socket_type, protocol);
}
public:
using byte = uint8_t;
Socket(int socket = -1) : socket(socket) {}
Socket(const Socket&) = delete;
@ -41,7 +48,16 @@ public:
if(socket == -1)
return;
close(socket);
std::cout << "DELETING SOCKET" << std::endl;
::close(socket);
}
void close()
{
::close(socket);
socket = -1;
}
Socket& operator=(Socket&& other)
@ -73,7 +89,7 @@ public:
continue;
if(::connect(s, it->ai_addr, it->ai_addrlen) == 0)
return std::move(s);
return s;
}
throw NetworkError("Unable to connect to socket");
@ -100,7 +116,7 @@ public:
continue;
if(::bind(s, it->ai_addr, it->ai_addrlen) == 0)
return std::move(s);
return s;
}
throw NetworkError("Unable to bind to socket");
@ -141,22 +157,38 @@ public:
return socket;
}
size_t write(const std::string& str)
int write(const std::string& str)
{
return ::write(socket, str.c_str(), str.size());
return write(str.c_str(), str.size());
}
size_t write(const char* data, size_t len)
int write(const char* data, size_t len)
{
return write(reinterpret_cast<const byte*>(data), len);
}
int write(const byte* data, size_t len)
{
#ifndef NDEBUG
std::stringstream stream;
for(size_t i = 0; i < len; ++i)
stream << fmt::format("{:02X} ", static_cast<byte>(data[i]));
auto str = stream.str();
logging::debug("[Write {}B] {}", len, str);
#endif
return ::write(socket, data, len);
}
size_t read(char* buffer, size_t len)
int read(void* buffer, size_t len)
{
return ::read(socket, buffer, len);
}
private:
protected:
int socket;
};

View File

@ -0,0 +1,13 @@
#pragma once
#include "
namespace io
{
class StreamDispatcher
{
};
}

View File

@ -0,0 +1,43 @@
#pragma once
#include "event_listener.hpp"
namespace io
{
template <class Derived, class Stream,
size_t max_events = 64, int wait_timeout = -1>
class StreamListener : public EventListener<Derived, max_events, wait_timeout>
{
public:
using EventListener<Derived, max_events, wait_timeout>::EventListener;
void add(Stream& stream)
{
// add the stream to the event listener
this->listener.add(stream.socket, &stream.event);
}
void on_close_event(Epoll::Event& event)
{
this->derived().on_close(to_stream(event));
}
void on_error_event(Epoll::Event& event)
{
this->derived().on_error(to_stream(event));
}
void on_data_event(Epoll::Event& event)
{
this->derived().on_data(to_stream(event));
}
private:
Stream& to_stream(Epoll::Event& event)
{
return *reinterpret_cast<Stream*>(event.data.ptr);
}
};
}

View File

@ -1,6 +1,6 @@
#pragma once
#include "event_listener.hpp"
#include "stream_listener.hpp"
#include "memory/literals.hpp"
namespace io
@ -8,7 +8,7 @@ namespace io
using namespace memory::literals;
template <class Derived, class Stream>
class StreamReader : public EventListener<Derived, Stream>
class StreamReader : public StreamListener<Derived, Stream>
{
public:
struct Buffer
@ -17,12 +17,41 @@ public:
size_t len;
};
StreamReader(uint32_t flags = 0) : EventListener<Derived, Stream>(flags) {}
StreamReader(uint32_t flags = 0) : StreamListener<Derived, Stream>(flags) {}
bool accept(Socket& socket)
{
// accept a connection from a socket
auto s = socket.accept(nullptr, nullptr);
if(!s.is_open())
return false;
// make the recieved socket non blocking
s.set_non_blocking();
auto& stream = this->derived().on_connect(std::move(s));
// we want to listen to an incoming event which is edge triggered and
// we also want to listen on the hangup event
stream.event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
// add the connection to the event listener
this->add(stream);
return true;
}
void on_data(Stream& stream)
{
while(true)
{
if(UNLIKELY(!stream.alive()))
{
stream.close();
break;
}
// allocate the buffer to fill the data
auto buf = this->derived().on_alloc(stream);
@ -35,23 +64,21 @@ public:
// this means we have read all available data
if(LIKELY(errno == EAGAIN))
{
LOG_DEBUG("EAGAIN read all data on socket " << stream.id());
break;
}
// some other error occurred, check errno
this->derived().on_error(stream);
break;
}
// end of file, the client has closed the connection
if(UNLIKELY(buf.len == 0))
{
LOG_DEBUG("EOF stream closed on socket " << stream.id());
stream.close();
break;
}
LOG_DEBUG("data on socket " << stream.id());
this->derived().on_read(stream, buf);
}
}

View File

@ -8,6 +8,7 @@ namespace io
namespace tcp
{
template <class Socket>
class Stream
{
public:
@ -24,11 +25,6 @@ public:
event.data.ptr = this;
}
void close()
{
delete reinterpret_cast<Stream*>(event.data.ptr);
}
int id() const { return socket.id(); }
Socket socket;

View File

@ -1,94 +0,0 @@
#pragma once
#include <list>
#include <thread>
#include <atomic>
#include "debug/log.hpp"
#include "tcp_listener.hpp"
namespace io
{
template <class T>
class TcpServer : TcpListener<TcpServer<T>, 64, -1>
{
public:
TcpServer(const char* addr, const char* port)
: stream(std::move(Socket::bind(addr, port))) {}
~TcpServer()
{
stop();
for(auto& worker : workers)
worker.join();
}
template <class F>
void listen(size_t n, size_t backlog, F&& f)
{
for(size_t i = 0; i < n; ++i)
{
workers.emplace_back();
auto& w = workers.back();
threads[i] = std::thread([this, &w]() {
while(alive)
{
LOG_DEBUG("Worker " << hash(std::this_thread::get_id())
<< " waiting... ");
w.wait_and_process_events();
}
});
}
stream.socket.listen(backlog);
}
void stop()
{
alive.store(false, std::memory_order_release);
}
private:
std::list<std::thread> threads;
std::list<T> workers;
std::atomic<bool> alive { true };
TcpStream stream;
size_t idx = 0;
void on_close(TcpStream& stream)
{
LOG_DEBUG("server on_close!!!!");
}
void on_error(TcpStream& stream)
{
LOG_DEBUG("server on_error!!!!");
}
void on_data(TcpStream&)
{
while (true)
{
LOG_DEBUG("Trying to accept... ");
if(!workers[idx].accept(socket))
{
LOG_DEBUG("Did not accept!");
break;
}
idx = (idx + 1) % workers.size();
LOG_DEBUG("Accepted a new connection!");
}
}
void on_wait_timeout()
{
}
};
}

55
src/io/network/tls.cpp Normal file
View File

@ -0,0 +1,55 @@
#include "tls.hpp"
#include "tls_error.hpp"
namespace io
{
Tls::Context::Context()
{
auto method = SSLv23_server_method();
ctx = SSL_CTX_new(method);
if(!ctx)
{
ERR_print_errors_fp(stderr);
throw io::TlsError("Unable to create TLS context");
}
SSL_CTX_set_ecdh_auto(ctx, 1);
}
Tls::Context::~Context()
{
SSL_CTX_free(ctx);
}
Tls::Context& Tls::Context::cert(const std::string& path)
{
if(SSL_CTX_use_certificate_file(ctx, path.c_str(), SSL_FILETYPE_PEM) >= 0)
return *this;
ERR_print_errors_fp(stderr);
throw TlsError("Error Loading cert '{}'", path);
}
Tls::Context& Tls::Context::key(const std::string& path)
{
if(SSL_CTX_use_PrivateKey_file(ctx, path.c_str(), SSL_FILETYPE_PEM) >= 0)
return *this;
ERR_print_errors_fp(stderr);
throw TlsError("Error Loading private key '{}'", path);
}
void Tls::initialize()
{
SSL_load_error_strings();
OpenSSL_add_ssl_algorithms();
}
void Tls::cleanup()
{
EVP_cleanup();
}
}

33
src/io/network/tls.hpp Normal file
View File

@ -0,0 +1,33 @@
#pragma once
#include <string>
#include <openssl/ssl.h>
#include <openssl/err.h>
namespace io
{
class Tls
{
public:
class Context
{
public:
Context();
~Context();
Context& cert(const std::string& path);
Context& key(const std::string& path);
operator SSL_CTX*() const { return ctx; }
private:
SSL_CTX* ctx;
};
static void initialize();
static void cleanup();
};
}

View File

@ -0,0 +1,14 @@
#pragma once
#include "utils/exceptions/basic_exception.hpp"
namespace io
{
class TlsError : public BasicException
{
public:
using BasicException::BasicException;
};
}

View File

@ -1,91 +0,0 @@
#pragma once
#include "listener.hpp"
#include "tcp_stream.hpp"
namespace io
{
const char* response = "HTTP/1.1 200 OK\r\nContent-Length:0\r\nConnection:Keep-Alive\r\n\r\n";
size_t len = strlen(response);
class Worker : public Listener<Worker>
{
char buf[64_kB];
public:
using Listener::Listener;
bool accept(Socket& socket)
{
auto s = socket.accept(nullptr, nullptr);
if(!s.is_open())
return false;
this->add(s);
return true;
}
void on_error(TcpStream* stream)
{
delete stream;
}
std::atomic<int> requests {0};
void on_read(TcpStream* stream)
{
int done = 0;
while (1)
{
ssize_t count;
count = read(stream->socket, buf, sizeof buf);
if (count == -1)
{
/* If errno == EAGAIN, that means we have read all
data. So go back to the main loop. */
if (errno != EAGAIN)
{
perror ("read");
done = 1;
}
break;
}
else if (count == 0)
{
/* End of file. The remote has closed the
connection. */
done = 1;
break;
}
size_t sum = 0;
char* resp = (char*)response;
while(sum < len)
{
int k = write(stream->socket, resp, len - sum);
sum += k;
resp += k;
}
requests.fetch_add(1, std::memory_order_relaxed);
}
if (done)
{
LOG_DEBUG("Closing TCP stream at " << stream->socket.id())
/* Closing the descriptor will make epoll remove it
from the set of descriptors which are monitored. */
delete stream;
}
}
};
}

33
src/logging/default.cpp Normal file
View File

@ -0,0 +1,33 @@
#include "default.hpp"
#include "logging/logs/async_log.hpp"
#include "logging/logs/sync_log.hpp"
#include "logging/streams/stdout.hpp"
namespace logging
{
std::unique_ptr<Log> log;
std::unique_ptr<Log> debug_log = std::make_unique<SyncLog>();
Logger init_debug_logger()
{
debug_log->pipe(std::make_unique<Stdout>());
return debug_log->logger("DEBUG");
}
Logger debug_logger = init_debug_logger();
void init_async()
{
log = std::make_unique<AsyncLog>();
}
void init_sync()
{
log = std::make_unique<SyncLog>();
}
}

22
src/logging/default.hpp Normal file
View File

@ -0,0 +1,22 @@
#pragma once
#include "log.hpp"
#include "logger.hpp"
namespace logging
{
extern std::unique_ptr<Log> log;
extern Logger debug_logger;
template <class... Args>
void debug(Args&&... args)
{
debug_logger.debug(std::forward<Args>(args)...);
}
void init_async();
void init_sync();
}

View File

@ -5,5 +5,5 @@
Logger Log::logger(const std::string& name)
{
return Logger(*this, name);
return Logger(this, name);
}

View File

@ -1,5 +1,8 @@
#pragma once
#include <cassert>
#include <fmt/format.h>
#include "log.hpp"
#include "levels.hpp"
@ -44,50 +47,68 @@ class Logger
};
public:
Logger(Log& log, const std::string& name) : log(log), name(name) {}
Logger() = default;
Logger(Log* log, const std::string& name) : log(log), name(name) {}
template <class Level, class... Args>
void emit(Args&&... args)
{
assert(log != nullptr);
auto message = std::make_unique<Message<Level>>(
Timestamp::now(), name, fmt::format(std::forward<Args>(args)...)
);
log.get().emit(std::move(message));
log->emit(std::move(message));
}
template <class... Args>
void trace(Args&&... args)
{
#ifndef NDEBUG
#ifndef LOG_NO_TRACE
emit<Trace>(std::forward<Args>(args)...);
#endif
#endif
}
template <class... Args>
void debug(Args&&... args)
{
#ifndef NDEBUG
#ifndef LOG_NO_DEBUG
emit<Debug>(std::forward<Args>(args)...);
#endif
#endif
}
template <class... Args>
void info(Args&&... args)
{
#ifndef LOG_NO_INFO
emit<Info>(std::forward<Args>(args)...);
#endif
}
template <class... Args>
void warn(Args&&... args)
{
#ifndef LOG_NO_WARN
emit<Warn>(std::forward<Args>(args)...);
#endif
}
template <class... Args>
void error(Args&&... args)
{
#ifndef LOG_NO_ERROR
emit<Error>(std::forward<Args>(args)...);
#endif
}
private:
std::reference_wrapper<Log> log;
Log* log;
std::string name;
};

View File

@ -1,10 +1,17 @@
#include "stdout.hpp"
#include <cppformat/format.h>
#include <iostream>
#include <fmt/format.h>
void Stdout::emit(const Log::Record& record)
{
fmt::print("{} {:<5} [{}] {}\n", record.when(), record.level_str(),
record.where(), record.text());
auto s = fmt::format("{} {:<5} [{}] {}\n", static_cast<std::string>(
record.when()), record.level_str(), record.where(),
record.text());
std::cout << s;
/* fmt::printf("{} {:<5} [{}] {}\n", static_cast<std::string>(record.when()), */
/* record.level_str(), record.where(), record.text()); */
}

36
src/mvcc/id.hpp Normal file
View File

@ -0,0 +1,36 @@
#pragma once
#include <ostream>
#include <stdint.h>
#include "utils/total_ordering.hpp"
class Id : public TotalOrdering<Id>
{
public:
Id() = default;
Id(uint64_t id) : id(id) {}
friend bool operator<(const Id& a, const Id& b)
{
return a.id < b.id;
}
friend bool operator==(const Id& a, const Id& b)
{
return a.id == b.id;
}
friend std::ostream& operator<<(std::ostream& stream, const Id& id)
{
return stream << id.id;
}
operator uint64_t() const
{
return id;
}
private:
uint64_t id {0};
};

1
src/speedy/rapidjson Submodule

@ -0,0 +1 @@
Subproject commit c02d52ad56595dc70b38daf46b5f315d3a7115fa

45
src/utils/bswap.hpp Normal file
View File

@ -0,0 +1,45 @@
#pragma once
#include <cstdint>
#include <cstdlib>
#include <byteswap.h>
template <class T>
inline T bswap(T value);
template<>
inline int16_t bswap<int16_t>(int16_t value)
{
return __bswap_16(value);
}
template<>
inline uint16_t bswap<uint16_t>(uint16_t value)
{
return __bswap_16(value);
}
template<>
inline int32_t bswap<int32_t>(int32_t value)
{
return __bswap_32(value);
}
template<>
inline uint32_t bswap<uint32_t>(uint32_t value)
{
return __bswap_32(value);
}
template<>
inline int64_t bswap<int64_t>(int64_t value)
{
return __bswap_64(value);
}
template<>
inline uint64_t bswap<uint64_t>(uint64_t value)
{
return __bswap_64(value);
}

View File

@ -5,7 +5,7 @@
#include <iomanip>
#include <ostream>
#include <cppformat/format.h>
#include <fmt/format.h>
#include "utils/datetime/datetime_error.hpp"
#include "utils/total_ordering.hpp"
@ -66,7 +66,7 @@ public:
long subsec() const
{
return nsec;
return nsec / 10000;
}
const std::string to_iso8601() const
@ -103,5 +103,5 @@ private:
long nsec;
static constexpr auto fiso8601 =
"{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:09d}Z";
"{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:05d}Z";
};

View File

@ -1,7 +1,6 @@
#pragma once
#include <stdexcept>
#include <fmt/format.h>
#include "utils/auto_scope.hpp"
@ -10,21 +9,23 @@
class BasicException : public std::exception
{
public:
template <class... Args>
BasicException(Args&&... args) noexcept
: message(fmt::format(std::forward<Args>(args)...))
BasicException(const std::string& message) noexcept : message(message)
{
#ifndef NDEBUG
message += '\n';
this->message += '\n';
Stacktrace stacktrace;
for(auto& line : stacktrace)
message += fmt::format(" at {} ({})\n",
this->message += fmt::format(" at {} ({})\n",
line.function, line.location);
#endif
}
template <class... Args>
BasicException(const std::string& format, Args&&... args) noexcept
: BasicException(fmt::format(format, std::forward<Args>(args)...)) {}
const char* what() const noexcept override
{
return message.c_str();
@ -34,4 +35,3 @@ private:
std::string message;
};

View File

@ -79,7 +79,6 @@ public:
return !(lhs == rhs);
}
private:
const char* str;
size_t len;

5
src/utils/types/byte.hpp Normal file
View File

@ -0,0 +1,5 @@
#pragma once
#include <cstdint>
using byte = uint8_t;

View File

@ -0,0 +1,62 @@
#include <iostream>
#include <deque>
#include <cassert>
#include <cstring>
#include <array>
#include <vector>
#include "bolt/v1/transport/chunked_decoder.hpp"
using byte = unsigned char;
void print_hex(byte x)
{
printf("%02X ", static_cast<byte>(x));
}
class DummyStream
{
public:
void write(const byte* values, size_t n)
{
data.insert(data.end(), values, values + n);
}
std::vector<byte> data;
};
using Decoder = bolt::ChunkedDecoder<DummyStream>;
std::vector<byte> chunks[] = {
{0x00,0x08,'A',' ','q','u','i','c','k',' ',0x00,0x06,'b','r','o','w','n',' '},
{0x00,0x0A,'f','o','x',' ','j','u','m','p','s',' '},
{0x00,0x07,'o','v','e','r',' ','a',' '},
{0x00,0x08,'l','a','z','y',' ','d','o','g',0x00,0x00}
};
static constexpr size_t N = std::extent<decltype(chunks)>::value;
std::string decoded = "A quick brown fox jumps over a lazy dog";
int main(void)
{
DummyStream stream;
Decoder decoder(stream);
for(size_t i = 0; i < N; ++i)
{
auto& chunk = chunks[i];
auto finished = decoder.decode(chunk.data(), chunk.size());
// break early if finished
if(finished)
break;
}
assert(decoded.size() == stream.data.size());
for(size_t i = 0; i < decoded.size(); ++i)
assert(decoded[i] == stream.data[i]);
return 0;
}

View File

@ -0,0 +1,115 @@
#include <iostream>
#include <deque>
#include <cassert>
#include <vector>
#include "bolt/v1/transport/chunked_encoder.hpp"
using byte = unsigned char;
void print_hex(byte x)
{
printf("%02X ", static_cast<byte>(x));
}
class DummyStream
{
public:
void write(const byte* values, size_t n)
{
num_calls++;
data.insert(data.end(), values, values + n);
}
byte pop()
{
auto c = data.front();
data.pop_front();
return c;
}
size_t pop_size()
{
return ((size_t)pop() << 8) | pop();
}
void print()
{
for(size_t i = 0; i < data.size(); ++i)
print_hex(data[i]);
}
std::deque<byte> data;
size_t num_calls {0};
};
using Encoder = bolt::ChunkedEncoder<DummyStream>;
void write_ff(Encoder& encoder, size_t n)
{
std::vector<byte> v;
for(size_t i = 0; i < n; ++i)
v.push_back('\xFF');
encoder.write(v.data(), v.size());
}
void check_ff(DummyStream& stream, size_t n)
{
for(size_t i = 0; i < n; ++i)
assert(stream.pop() == byte('\xFF'));
(void)stream;
}
int main(void)
{
DummyStream stream;
bolt::ChunkedEncoder<DummyStream> encoder(stream);
write_ff(encoder, 10);
write_ff(encoder, 10);
encoder.finish();
write_ff(encoder, 10);
write_ff(encoder, 10);
encoder.finish();
// this should be two chunks, one of size 65533 and the other of size 1467
write_ff(encoder, 67000);
encoder.finish();
for(int i = 0; i < 10000; ++i)
write_ff(encoder, 1500);
encoder.finish();
assert(stream.pop_size() == 20);
check_ff(stream, 20);
assert(stream.pop_size() == 0);
assert(stream.pop_size() == 20);
check_ff(stream, 20);
assert(stream.pop_size() == 0);
assert(stream.pop_size() == encoder.chunk_size);
check_ff(stream, encoder.chunk_size);
assert(stream.pop_size() == 1467);
check_ff(stream, 1467);
assert(stream.pop_size() == 0);
size_t k = 10000 * 1500;
while(k > 0)
{
auto size = k > encoder.chunk_size ? encoder.chunk_size : k;
assert(stream.pop_size() == size);
check_ff(stream, size);
k -= size;
}
assert(stream.pop_size() == 0);
return 0;
}