Merge branch 'dev' of https://phabricator.tomicevic.com/diffusion/MG/memgraph into dev
This commit is contained in:
commit
3b8746573d
26
src/bolt/v1/bolt.cpp
Normal file
26
src/bolt/v1/bolt.cpp
Normal file
@ -0,0 +1,26 @@
|
||||
#include "bolt.hpp"
|
||||
|
||||
#include "session.hpp"
|
||||
#include <iostream>
|
||||
|
||||
namespace bolt
|
||||
{
|
||||
|
||||
Bolt::Bolt()
|
||||
{
|
||||
}
|
||||
|
||||
Session* Bolt::create_session(io::Socket&& socket)
|
||||
{
|
||||
// TODO fix session lifecycle handling
|
||||
// dangling pointers are not cool :)
|
||||
|
||||
return new Session(std::forward<io::Socket>(socket), *this);
|
||||
}
|
||||
|
||||
void Bolt::close(Session* session)
|
||||
{
|
||||
session->socket.close();
|
||||
}
|
||||
|
||||
}
|
24
src/bolt/v1/bolt.hpp
Normal file
24
src/bolt/v1/bolt.hpp
Normal file
@ -0,0 +1,24 @@
|
||||
#pragma once
|
||||
|
||||
#include "states.hpp"
|
||||
#include "io/network/socket.hpp"
|
||||
|
||||
namespace bolt
|
||||
{
|
||||
|
||||
class Session;
|
||||
|
||||
class Bolt
|
||||
{
|
||||
friend class Session;
|
||||
|
||||
public:
|
||||
Bolt();
|
||||
|
||||
Session* create_session(io::Socket&& socket);
|
||||
void close(Session* session);
|
||||
|
||||
States states;
|
||||
};
|
||||
|
||||
}
|
46
src/bolt/v1/messaging/codes.hpp
Normal file
46
src/bolt/v1/messaging/codes.hpp
Normal file
@ -0,0 +1,46 @@
|
||||
#pragma once
|
||||
|
||||
#include "utils/types/byte.hpp"
|
||||
#include "utils/underlying_cast.hpp"
|
||||
|
||||
namespace bolt
|
||||
{
|
||||
|
||||
enum class MessageCode : byte
|
||||
{
|
||||
Init = 0x01,
|
||||
AckFailure = 0x0E,
|
||||
Reset = 0x0F,
|
||||
|
||||
Run = 0x10,
|
||||
DiscardAll = 0x2F,
|
||||
PullAll = 0x3F,
|
||||
|
||||
Record = 0x71,
|
||||
Success = 0x70,
|
||||
Ignored = 0x7E,
|
||||
Failure = 0x7F
|
||||
};
|
||||
|
||||
inline bool operator==(byte value, MessageCode code)
|
||||
{
|
||||
return value == underlying_cast(code);
|
||||
}
|
||||
|
||||
inline bool operator==(MessageCode code, byte value)
|
||||
{
|
||||
return operator==(value, code);
|
||||
}
|
||||
|
||||
inline bool operator!=(byte value, MessageCode code)
|
||||
{
|
||||
return !operator==(value, code);
|
||||
}
|
||||
|
||||
inline bool operator!=(MessageCode code, byte value)
|
||||
{
|
||||
return operator!=(value, code);
|
||||
}
|
||||
|
||||
|
||||
}
|
8
src/bolt/v1/messaging/messages.hpp
Normal file
8
src/bolt/v1/messaging/messages.hpp
Normal file
@ -0,0 +1,8 @@
|
||||
#pragma once
|
||||
|
||||
namespace bolt
|
||||
{
|
||||
|
||||
|
||||
|
||||
}
|
54
src/bolt/v1/packing/codes.hpp
Normal file
54
src/bolt/v1/packing/codes.hpp
Normal file
@ -0,0 +1,54 @@
|
||||
#pragma once
|
||||
|
||||
#include <cstdint>
|
||||
|
||||
namespace bolt
|
||||
{
|
||||
|
||||
namespace pack
|
||||
{
|
||||
|
||||
enum Code : uint8_t
|
||||
{
|
||||
TinyString = 0x80,
|
||||
TinyList = 0x90,
|
||||
TinyMap = 0xA0,
|
||||
TinyStruct = 0xB0,
|
||||
|
||||
Null = 0xC0,
|
||||
|
||||
Float64 = 0xC1,
|
||||
|
||||
False = 0xC2,
|
||||
True = 0xC3,
|
||||
|
||||
Int8 = 0xC8,
|
||||
Int16 = 0xC9,
|
||||
Int32 = 0xCA,
|
||||
Int64 = 0xCB,
|
||||
|
||||
Bytes8 = 0xCC,
|
||||
Bytes16 = 0xCD,
|
||||
Bytes32 = 0xCE,
|
||||
|
||||
String8 = 0xD0,
|
||||
String16 = 0xD1,
|
||||
String32 = 0xD2,
|
||||
|
||||
List8 = 0xD4,
|
||||
List16 = 0xD5,
|
||||
List32 = 0xD6,
|
||||
|
||||
Map8 = 0xD8,
|
||||
Map16 = 0xD9,
|
||||
Map32 = 0xDA,
|
||||
MapStream = 0xDB,
|
||||
|
||||
Struct8 = 0xDC,
|
||||
Struct16 = 0xDD,
|
||||
EndOfStream = 0xDF,
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
}
|
21
src/bolt/v1/packing/types.hpp
Normal file
21
src/bolt/v1/packing/types.hpp
Normal file
@ -0,0 +1,21 @@
|
||||
#pragma once
|
||||
|
||||
namespace bolt
|
||||
{
|
||||
|
||||
enum class PackType
|
||||
{
|
||||
Null, // denotes absence of a value
|
||||
Boolean, // denotes a type with two possible values (t/f)
|
||||
Integer, // 64-bit signed integral number
|
||||
Float, // 64-bit floating point number
|
||||
Bytes, // binary data
|
||||
String, // unicode string
|
||||
List, // collection of values
|
||||
Map, // collection of zero or more key/value pairs
|
||||
Struct, // zero or more packstream values
|
||||
EndOfStream, // denotes stream value end
|
||||
Reserved // reserved for future use
|
||||
};
|
||||
|
||||
}
|
67
src/bolt/v1/server/server.hpp
Normal file
67
src/bolt/v1/server/server.hpp
Normal file
@ -0,0 +1,67 @@
|
||||
#pragma once
|
||||
|
||||
#include <vector>
|
||||
#include <memory>
|
||||
#include <thread>
|
||||
#include <atomic>
|
||||
#include <cassert>
|
||||
|
||||
#include "io/network/server.hpp"
|
||||
#include "bolt/v1/bolt.hpp"
|
||||
|
||||
namespace bolt
|
||||
{
|
||||
|
||||
template <class Worker>
|
||||
class Server : public io::Server<Server<Worker>>
|
||||
{
|
||||
public:
|
||||
Server(io::Socket&& socket)
|
||||
: io::Server<Server<Worker>>(std::forward<io::Socket>(socket)) {}
|
||||
|
||||
void start(size_t n)
|
||||
{
|
||||
workers.reserve(n);
|
||||
|
||||
for(size_t i = 0; i < n; ++i)
|
||||
{
|
||||
workers.push_back(std::make_shared<Worker>(bolt));
|
||||
workers.back()->start(alive);
|
||||
}
|
||||
|
||||
while(alive)
|
||||
{
|
||||
this->wait_and_process_events();
|
||||
}
|
||||
}
|
||||
|
||||
void shutdown()
|
||||
{
|
||||
alive.store(false);
|
||||
|
||||
for(auto& worker : workers)
|
||||
worker->thread.join();
|
||||
}
|
||||
|
||||
void on_connect()
|
||||
{
|
||||
assert(idx < workers.size());
|
||||
|
||||
if(UNLIKELY(!workers[idx]->accept(this->socket)))
|
||||
return;
|
||||
|
||||
idx = idx == workers.size() - 1 ? 0 : idx + 1;
|
||||
}
|
||||
|
||||
void on_wait_timeout() {}
|
||||
|
||||
private:
|
||||
Bolt bolt;
|
||||
|
||||
std::vector<typename Worker::sptr> workers;
|
||||
std::atomic<bool> alive {true};
|
||||
|
||||
int idx {0};
|
||||
};
|
||||
|
||||
}
|
112
src/bolt/v1/server/worker.hpp
Normal file
112
src/bolt/v1/server/worker.hpp
Normal file
@ -0,0 +1,112 @@
|
||||
#pragma once
|
||||
|
||||
#include <iomanip>
|
||||
#include <cstdio>
|
||||
#include <atomic>
|
||||
#include <sstream>
|
||||
|
||||
#include <memory>
|
||||
#include <thread>
|
||||
|
||||
#include "io/network/stream_reader.hpp"
|
||||
|
||||
#include "bolt/v1/bolt.hpp"
|
||||
#include "bolt/v1/session.hpp"
|
||||
|
||||
#include "logging/default.hpp"
|
||||
|
||||
namespace bolt
|
||||
{
|
||||
|
||||
template <class Worker>
|
||||
class Server;
|
||||
|
||||
class Worker : public io::StreamReader<Worker, Session>
|
||||
{
|
||||
friend class bolt::Server<Worker>;
|
||||
|
||||
public:
|
||||
using sptr = std::shared_ptr<Worker>;
|
||||
|
||||
Worker(Bolt& bolt) : bolt(bolt)
|
||||
{
|
||||
logger = logging::log->logger("Network");
|
||||
}
|
||||
|
||||
Session& on_connect(io::Socket&& socket)
|
||||
{
|
||||
logger.trace("Accepting connection on socket {}", socket.id());
|
||||
|
||||
return *bolt.get().create_session(std::forward<io::Socket>(socket));
|
||||
}
|
||||
|
||||
void on_error(Session&)
|
||||
{
|
||||
logger.trace("[on_error] errno = {}", errno);
|
||||
|
||||
#ifndef NDEBUG
|
||||
auto err = io::NetworkError("");
|
||||
logger.debug("{}", err.what());
|
||||
#endif
|
||||
|
||||
logger.error("Error occured in this session");
|
||||
|
||||
}
|
||||
|
||||
void on_wait_timeout() {}
|
||||
|
||||
Buffer on_alloc(Session&)
|
||||
{
|
||||
/* logger.trace("[on_alloc] Allocating {}B", sizeof buf); */
|
||||
|
||||
return Buffer { buf, sizeof buf };
|
||||
}
|
||||
|
||||
void on_read(Session& session, Buffer& buf)
|
||||
{
|
||||
logger.trace("[on_read] Received {}B", buf.len);
|
||||
|
||||
#ifndef NDEBUG
|
||||
std::stringstream stream;
|
||||
|
||||
for(size_t i = 0; i < buf.len; ++i)
|
||||
stream << fmt::format("{:02X} ", static_cast<byte>(buf.ptr[i]));
|
||||
|
||||
logger.trace("[on_read] {}", stream.str());
|
||||
#endif
|
||||
|
||||
try
|
||||
{
|
||||
session.execute(reinterpret_cast<const byte*>(buf.ptr), buf.len);
|
||||
}
|
||||
catch(const std::exception& e)
|
||||
{
|
||||
logger.error("Error occured while executing statement.");
|
||||
logger.error("{}", e.what());
|
||||
}
|
||||
}
|
||||
|
||||
void on_close(Session& session)
|
||||
{
|
||||
logger.trace("[on_close] Client closed the connection");
|
||||
session.close();
|
||||
}
|
||||
|
||||
char buf[65536];
|
||||
|
||||
protected:
|
||||
std::reference_wrapper<Bolt> bolt;
|
||||
|
||||
Logger logger;
|
||||
std::thread thread;
|
||||
|
||||
void start(std::atomic<bool>& alive)
|
||||
{
|
||||
thread = std::thread([&, this]() {
|
||||
while(alive)
|
||||
wait_and_process_events();
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
}
|
54
src/bolt/v1/session.cpp
Normal file
54
src/bolt/v1/session.cpp
Normal file
@ -0,0 +1,54 @@
|
||||
#include "session.hpp"
|
||||
|
||||
namespace bolt
|
||||
{
|
||||
|
||||
Session::Session(io::Socket&& socket, Bolt& bolt)
|
||||
: Stream(std::forward<io::Socket>(socket)), bolt(bolt)
|
||||
{
|
||||
logger = logging::log->logger("Session");
|
||||
|
||||
// start with a handshake state
|
||||
state = bolt.states.handshake.get();
|
||||
}
|
||||
|
||||
bool Session::alive() const
|
||||
{
|
||||
return state != nullptr;
|
||||
}
|
||||
|
||||
void Session::execute(const byte* data, size_t len)
|
||||
{
|
||||
// mark the end of the message
|
||||
auto end = data + len;
|
||||
|
||||
while(true)
|
||||
{
|
||||
auto size = end - data;
|
||||
|
||||
if(LIKELY(connected))
|
||||
{
|
||||
logger.debug("Decoding chunk of size {}", size);
|
||||
auto finished = decoder.decode(data, size);
|
||||
|
||||
if(!finished)
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
logger.debug("Decoding handshake of size {}", size);
|
||||
decoder.handshake(data, size);
|
||||
}
|
||||
|
||||
state = state->run(*this);
|
||||
decoder.reset();
|
||||
}
|
||||
}
|
||||
|
||||
void Session::close()
|
||||
{
|
||||
logger.debug("Closing session");
|
||||
bolt.close(this);
|
||||
}
|
||||
|
||||
}
|
42
src/bolt/v1/session.hpp
Normal file
42
src/bolt/v1/session.hpp
Normal file
@ -0,0 +1,42 @@
|
||||
#pragma once
|
||||
|
||||
#include "io/network/tcp/stream.hpp"
|
||||
#include "io/network/socket.hpp"
|
||||
|
||||
#include "bolt/v1/states/state.hpp"
|
||||
|
||||
#include "bolt/v1/transport/bolt_decoder.hpp"
|
||||
#include "bolt/v1/transport/bolt_encoder.hpp"
|
||||
|
||||
#include "bolt.hpp"
|
||||
#include "logging/default.hpp"
|
||||
|
||||
namespace bolt
|
||||
{
|
||||
|
||||
class Session : public io::tcp::Stream<io::Socket>
|
||||
{
|
||||
public:
|
||||
using Decoder = BoltDecoder;
|
||||
using Encoder = BoltEncoder<io::Socket>;
|
||||
|
||||
Session(io::Socket&& socket, Bolt& bolt);
|
||||
|
||||
bool alive() const;
|
||||
|
||||
void execute(const byte* data, size_t len);
|
||||
void close();
|
||||
|
||||
Bolt& bolt;
|
||||
|
||||
Decoder decoder;
|
||||
Encoder encoder {socket};
|
||||
|
||||
bool connected {false};
|
||||
State* state;
|
||||
|
||||
protected:
|
||||
Logger logger;
|
||||
};
|
||||
|
||||
}
|
17
src/bolt/v1/states.cpp
Normal file
17
src/bolt/v1/states.cpp
Normal file
@ -0,0 +1,17 @@
|
||||
#include "states.hpp"
|
||||
|
||||
#include "states/handshake.hpp"
|
||||
#include "states/init.hpp"
|
||||
#include "states/executor.hpp"
|
||||
|
||||
namespace bolt
|
||||
{
|
||||
|
||||
States::States()
|
||||
{
|
||||
handshake = std::make_unique<Handshake>();
|
||||
init = std::make_unique<Init>();
|
||||
executor = std::make_unique<Executor>();
|
||||
}
|
||||
|
||||
}
|
19
src/bolt/v1/states.hpp
Normal file
19
src/bolt/v1/states.hpp
Normal file
@ -0,0 +1,19 @@
|
||||
#pragma once
|
||||
|
||||
#include "states/state.hpp"
|
||||
#include "logging/log.hpp"
|
||||
|
||||
namespace bolt
|
||||
{
|
||||
|
||||
class States
|
||||
{
|
||||
public:
|
||||
States();
|
||||
|
||||
State::uptr handshake;
|
||||
State::uptr init;
|
||||
State::uptr executor;
|
||||
};
|
||||
|
||||
}
|
32
src/bolt/v1/states/error.cpp
Normal file
32
src/bolt/v1/states/error.cpp
Normal file
@ -0,0 +1,32 @@
|
||||
#include "error.hpp"
|
||||
|
||||
#include "bolt/v1/session.hpp"
|
||||
|
||||
namespace bolt
|
||||
{
|
||||
|
||||
State* Error::run(Session& session)
|
||||
{
|
||||
auto message_type = session.decoder.read_byte();
|
||||
|
||||
if(message_type == MessageCode::AckFailure)
|
||||
{
|
||||
// todo reset current statement? is it even necessary?
|
||||
|
||||
return session.bolt.states.executor.get();
|
||||
}
|
||||
else if(message_type == MessageCode::Reset)
|
||||
{
|
||||
// todo rollback current transaction
|
||||
// discard all records waiting to be sent
|
||||
|
||||
return session.bolt.states.executor.get();
|
||||
}
|
||||
|
||||
session.encoder.message_ignored();
|
||||
session.encoder.flush();
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
}
|
14
src/bolt/v1/states/error.hpp
Normal file
14
src/bolt/v1/states/error.hpp
Normal file
@ -0,0 +1,14 @@
|
||||
#pragma once
|
||||
|
||||
#include "bolt/v1/states/state.hpp"
|
||||
|
||||
namespace bolt
|
||||
{
|
||||
|
||||
class Error : public State
|
||||
{
|
||||
public:
|
||||
State* run(Session& session) override;
|
||||
};
|
||||
|
||||
}
|
95
src/bolt/v1/states/executor.cpp
Normal file
95
src/bolt/v1/states/executor.cpp
Normal file
@ -0,0 +1,95 @@
|
||||
#include "executor.hpp"
|
||||
|
||||
#include "bolt/v1/messaging/codes.hpp"
|
||||
|
||||
namespace bolt
|
||||
{
|
||||
|
||||
Executor::Executor() : logger(logging::log->logger("Executor")) {}
|
||||
|
||||
State* Executor::run(Session& session)
|
||||
{
|
||||
// just read one byte that represents the struct type, we can skip the
|
||||
// information contained in this byte
|
||||
session.decoder.read_byte();
|
||||
|
||||
auto message_type = session.decoder.read_byte();
|
||||
|
||||
if(message_type == MessageCode::Run)
|
||||
{
|
||||
Query q;
|
||||
|
||||
q.statement = session.decoder.read_string();
|
||||
|
||||
this->run(session, q);
|
||||
}
|
||||
else if(message_type == MessageCode::PullAll)
|
||||
{
|
||||
pull_all(session);
|
||||
}
|
||||
else if(message_type == MessageCode::DiscardAll)
|
||||
{
|
||||
discard_all(session);
|
||||
}
|
||||
else if(message_type == MessageCode::Reset)
|
||||
{
|
||||
// todo rollback current transaction
|
||||
// discard all records waiting to be sent
|
||||
|
||||
return this;
|
||||
}
|
||||
else
|
||||
{
|
||||
logger.error("Unrecognized message recieved");
|
||||
logger.debug("Invalid message type 0x{:02X}", message_type);
|
||||
|
||||
return session.bolt.states.error.get();
|
||||
}
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
void Executor::run(Session& session, Query& query)
|
||||
{
|
||||
logger.trace("[Run] '{}'", query.statement);
|
||||
|
||||
session.encoder.message_success();
|
||||
session.encoder.write_map_header(1);
|
||||
|
||||
session.encoder.write_string("fields");
|
||||
session.encoder.write_list_header(1);
|
||||
session.encoder.write_string("name");
|
||||
|
||||
session.encoder.flush();
|
||||
}
|
||||
|
||||
void Executor::pull_all(Session& session)
|
||||
{
|
||||
logger.trace("[PullAll]");
|
||||
|
||||
session.encoder.message_record();
|
||||
session.encoder.write_list_header(1);
|
||||
session.encoder.write_string("buda");
|
||||
|
||||
session.encoder.message_record();
|
||||
session.encoder.write_list_header(1);
|
||||
session.encoder.write_string("domko");
|
||||
|
||||
session.encoder.message_record();
|
||||
session.encoder.write_list_header(1);
|
||||
session.encoder.write_string("max");
|
||||
|
||||
session.encoder.message_success_empty();
|
||||
|
||||
session.encoder.flush();
|
||||
}
|
||||
|
||||
void Executor::discard_all(Session& session)
|
||||
{
|
||||
logger.trace("[DiscardAll]");
|
||||
|
||||
session.encoder.message_success();
|
||||
session.encoder.flush();
|
||||
}
|
||||
|
||||
}
|
41
src/bolt/v1/states/executor.hpp
Normal file
41
src/bolt/v1/states/executor.hpp
Normal file
@ -0,0 +1,41 @@
|
||||
#pragma once
|
||||
|
||||
#include "bolt/v1/states/state.hpp"
|
||||
#include "bolt/v1/session.hpp"
|
||||
|
||||
namespace bolt
|
||||
{
|
||||
|
||||
class Executor : public State
|
||||
{
|
||||
struct Query
|
||||
{
|
||||
std::string statement;
|
||||
};
|
||||
|
||||
public:
|
||||
Executor();
|
||||
|
||||
State* run(Session& session) override final;
|
||||
|
||||
protected:
|
||||
Logger logger;
|
||||
|
||||
/* Execute an incoming query
|
||||
*
|
||||
*/
|
||||
void run(Session& session, Query& query);
|
||||
|
||||
/* Send all remaining results to the client
|
||||
*
|
||||
*/
|
||||
void pull_all(Session& session);
|
||||
|
||||
/* Discard all remaining results
|
||||
*
|
||||
*/
|
||||
void discard_all(Session& session);
|
||||
};
|
||||
|
||||
}
|
||||
|
27
src/bolt/v1/states/handshake.cpp
Normal file
27
src/bolt/v1/states/handshake.cpp
Normal file
@ -0,0 +1,27 @@
|
||||
#include "handshake.hpp"
|
||||
|
||||
#include "bolt/v1/session.hpp"
|
||||
|
||||
namespace bolt
|
||||
{
|
||||
|
||||
static constexpr uint32_t preamble = 0x6060B017;
|
||||
|
||||
static constexpr byte protocol[4] = {0x00, 0x00, 0x00, 0x01};
|
||||
|
||||
State* Handshake::run(Session& session)
|
||||
{
|
||||
if(UNLIKELY(session.decoder.read_uint32() != preamble))
|
||||
return nullptr;
|
||||
|
||||
// TODO so far we only support version 1 of the protocol so it doesn't
|
||||
// make sense to check which version the client prefers
|
||||
// this will change in the future
|
||||
|
||||
session.connected = true;
|
||||
session.socket.write(protocol, sizeof protocol);
|
||||
|
||||
return session.bolt.states.init.get();
|
||||
}
|
||||
|
||||
}
|
14
src/bolt/v1/states/handshake.hpp
Normal file
14
src/bolt/v1/states/handshake.hpp
Normal file
@ -0,0 +1,14 @@
|
||||
#pragma once
|
||||
|
||||
#include "bolt/v1/states/state.hpp"
|
||||
|
||||
namespace bolt
|
||||
{
|
||||
|
||||
class Handshake : public State
|
||||
{
|
||||
public:
|
||||
State* run(Session& session) override;
|
||||
};
|
||||
|
||||
}
|
54
src/bolt/v1/states/init.cpp
Normal file
54
src/bolt/v1/states/init.cpp
Normal file
@ -0,0 +1,54 @@
|
||||
#include "init.hpp"
|
||||
|
||||
#include "bolt/v1/session.hpp"
|
||||
#include "bolt/v1/messaging/codes.hpp"
|
||||
|
||||
#include "utils/likely.hpp"
|
||||
|
||||
namespace bolt
|
||||
{
|
||||
|
||||
Init::Init() : MessageParser<Init>(logging::log->logger("Init")) {}
|
||||
|
||||
State* Init::parse(Session& session, Message& message)
|
||||
{
|
||||
auto struct_type = session.decoder.read_byte();
|
||||
|
||||
if(UNLIKELY(struct_type != 0xB2))
|
||||
{
|
||||
logger.debug("{}", struct_type);
|
||||
|
||||
logger.debug("Expected struct marker 0xB2 instead of 0x{:02X}",
|
||||
(unsigned)struct_type);
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
auto message_type = session.decoder.read_byte();
|
||||
|
||||
if(UNLIKELY(message_type != MessageCode::Init))
|
||||
{
|
||||
logger.debug("Expected Init (0x01) instead of (0x{:02X})",
|
||||
(unsigned)message_type);
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
message.client_name = session.decoder.read_string();
|
||||
|
||||
// TODO read authentication tokens
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
State* Init::execute(Session& session, Message& message)
|
||||
{
|
||||
logger.debug("Client connected '{}'", message.client_name);
|
||||
|
||||
session.encoder.message_success_empty();
|
||||
session.encoder.flush();
|
||||
|
||||
return session.bolt.states.executor.get();
|
||||
}
|
||||
|
||||
}
|
22
src/bolt/v1/states/init.hpp
Normal file
22
src/bolt/v1/states/init.hpp
Normal file
@ -0,0 +1,22 @@
|
||||
#pragma once
|
||||
|
||||
#include "bolt/v1/states/message_parser.hpp"
|
||||
|
||||
namespace bolt
|
||||
{
|
||||
|
||||
class Init : public MessageParser<Init>
|
||||
{
|
||||
public:
|
||||
struct Message
|
||||
{
|
||||
std::string client_name;
|
||||
};
|
||||
|
||||
Init();
|
||||
|
||||
State* parse(Session& session, Message& message);
|
||||
State* execute(Session& session, Message& message);
|
||||
};
|
||||
|
||||
}
|
37
src/bolt/v1/states/message_parser.hpp
Normal file
37
src/bolt/v1/states/message_parser.hpp
Normal file
@ -0,0 +1,37 @@
|
||||
#pragma once
|
||||
|
||||
#include "state.hpp"
|
||||
#include "utils/crtp.hpp"
|
||||
|
||||
#include "bolt/v1/session.hpp"
|
||||
|
||||
namespace bolt
|
||||
{
|
||||
|
||||
template <class Derived>
|
||||
class MessageParser : public State, public Crtp<Derived>
|
||||
{
|
||||
public:
|
||||
MessageParser(Logger&& logger)
|
||||
: logger(std::forward<Logger>(logger)) {}
|
||||
|
||||
State* run(Session& session) override final
|
||||
{
|
||||
typename Derived::Message message;
|
||||
|
||||
logger.debug("Parsing message");
|
||||
auto next = this->derived().parse(session, message);
|
||||
|
||||
// return next state if parsing was unsuccessful (i.e. error state)
|
||||
if(next != &this->derived())
|
||||
return next;
|
||||
|
||||
logger.debug("Executing state");
|
||||
return this->derived().execute(session, message);
|
||||
}
|
||||
|
||||
protected:
|
||||
Logger logger;
|
||||
};
|
||||
|
||||
}
|
23
src/bolt/v1/states/state.hpp
Normal file
23
src/bolt/v1/states/state.hpp
Normal file
@ -0,0 +1,23 @@
|
||||
#pragma once
|
||||
|
||||
#include <cstdlib>
|
||||
#include <cstdint>
|
||||
#include <memory>
|
||||
|
||||
namespace bolt
|
||||
{
|
||||
|
||||
class Session;
|
||||
|
||||
class State
|
||||
{
|
||||
public:
|
||||
using uptr = std::unique_ptr<State>;
|
||||
|
||||
State() = default;
|
||||
virtual ~State() = default;
|
||||
|
||||
virtual State* run(Session& session) = 0;
|
||||
};
|
||||
|
||||
}
|
158
src/bolt/v1/transport/bolt_decoder.cpp
Normal file
158
src/bolt/v1/transport/bolt_decoder.cpp
Normal file
@ -0,0 +1,158 @@
|
||||
#include "bolt_decoder.hpp"
|
||||
|
||||
#include <iostream>
|
||||
|
||||
#include "utils/bswap.hpp"
|
||||
#include "logging/default.hpp"
|
||||
|
||||
#include "bolt/v1/packing/codes.hpp"
|
||||
|
||||
namespace bolt
|
||||
{
|
||||
|
||||
void BoltDecoder::handshake(const byte*& data, size_t len)
|
||||
{
|
||||
buffer.write(data, len);
|
||||
data += len;
|
||||
}
|
||||
|
||||
bool BoltDecoder::decode(const byte*& data, size_t len)
|
||||
{
|
||||
return decoder(data, len);
|
||||
}
|
||||
|
||||
bool BoltDecoder::empty() const
|
||||
{
|
||||
return pos == buffer.size();
|
||||
}
|
||||
|
||||
void BoltDecoder::reset()
|
||||
{
|
||||
buffer.clear();
|
||||
pos = 0;
|
||||
}
|
||||
|
||||
byte BoltDecoder::peek() const
|
||||
{
|
||||
return buffer[pos];
|
||||
}
|
||||
|
||||
byte BoltDecoder::read_byte()
|
||||
{
|
||||
return buffer[pos++];
|
||||
}
|
||||
|
||||
void BoltDecoder::read_bytes(void* dest, size_t n)
|
||||
{
|
||||
std::memcpy(dest, buffer.data() + pos, n);
|
||||
pos += n;
|
||||
}
|
||||
|
||||
template <class T>
|
||||
T parse(const void* data)
|
||||
{
|
||||
// reinterpret bytes as the target value
|
||||
auto value = reinterpret_cast<const T*>(data);
|
||||
|
||||
// swap values to little endian
|
||||
return bswap(*value);
|
||||
}
|
||||
|
||||
template <class T>
|
||||
T parse(Buffer& buffer, size_t& pos)
|
||||
{
|
||||
// get a pointer to the data we're converting
|
||||
auto ptr = buffer.data() + pos;
|
||||
|
||||
// skip sizeof bytes that we're going to read
|
||||
pos += sizeof(T);
|
||||
|
||||
// read and convert the value
|
||||
return parse<T>(ptr);
|
||||
}
|
||||
|
||||
int16_t BoltDecoder::read_int16()
|
||||
{
|
||||
return parse<int16_t>(buffer, pos);
|
||||
}
|
||||
|
||||
uint16_t BoltDecoder::read_uint16()
|
||||
{
|
||||
return parse<uint16_t>(buffer, pos);
|
||||
}
|
||||
|
||||
int32_t BoltDecoder::read_int32()
|
||||
{
|
||||
return parse<int32_t>(buffer, pos);
|
||||
}
|
||||
|
||||
uint32_t BoltDecoder::read_uint32()
|
||||
{
|
||||
return parse<uint32_t>(buffer, pos);
|
||||
}
|
||||
|
||||
int64_t BoltDecoder::read_int64()
|
||||
{
|
||||
return parse<int64_t>(buffer, pos);
|
||||
}
|
||||
|
||||
uint64_t BoltDecoder::read_uint64()
|
||||
{
|
||||
return parse<uint64_t>(buffer, pos);
|
||||
}
|
||||
|
||||
double BoltDecoder::read_float64()
|
||||
{
|
||||
auto v = parse<int64_t>(buffer, pos);
|
||||
return *reinterpret_cast<const double *>(&v);
|
||||
}
|
||||
|
||||
std::string BoltDecoder::read_string()
|
||||
{
|
||||
auto marker = read_byte();
|
||||
|
||||
std::string res;
|
||||
uint32_t size;
|
||||
|
||||
// if the first 4 bits equal to 1000 (0x8), this is a tiny string
|
||||
if((marker & 0xF0) == pack::TinyString)
|
||||
{
|
||||
// size is stored in the lower 4 bits of the marker byte
|
||||
size = marker & 0x0F;
|
||||
}
|
||||
// if the marker is 0xD0, size is an 8-bit unsigned integer
|
||||
if(marker == pack::String8)
|
||||
{
|
||||
size = read_byte();
|
||||
}
|
||||
// if the marker is 0xD1, size is a 16-bit big-endian unsigned integer
|
||||
else if(marker == pack::String16)
|
||||
{
|
||||
size = read_uint16();
|
||||
}
|
||||
// if the marker is 0xD2, size is a 32-bit big-endian unsigned integer
|
||||
else if(marker == pack::String32)
|
||||
{
|
||||
size = read_uint32();
|
||||
}
|
||||
else
|
||||
{
|
||||
// TODO error?
|
||||
return res;
|
||||
}
|
||||
|
||||
if(size == 0)
|
||||
return res;
|
||||
|
||||
res.append(reinterpret_cast<const char*>(raw()), size);
|
||||
pos += size;
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
const byte* BoltDecoder::raw() const
|
||||
{
|
||||
return buffer.data() + pos;
|
||||
}
|
||||
|
||||
}
|
45
src/bolt/v1/transport/bolt_decoder.hpp
Normal file
45
src/bolt/v1/transport/bolt_decoder.hpp
Normal file
@ -0,0 +1,45 @@
|
||||
#pragma once
|
||||
|
||||
#include "buffer.hpp"
|
||||
#include "chunked_decoder.hpp"
|
||||
|
||||
#include "utils/types/byte.hpp"
|
||||
|
||||
namespace bolt
|
||||
{
|
||||
|
||||
class BoltDecoder
|
||||
{
|
||||
public:
|
||||
void handshake(const byte*& data, size_t len);
|
||||
bool decode(const byte*& data, size_t len);
|
||||
|
||||
bool empty() const;
|
||||
void reset();
|
||||
|
||||
byte peek() const;
|
||||
byte read_byte();
|
||||
void read_bytes(void* dest, size_t n);
|
||||
|
||||
int16_t read_int16();
|
||||
uint16_t read_uint16();
|
||||
|
||||
int32_t read_int32();
|
||||
uint32_t read_uint32();
|
||||
|
||||
int64_t read_int64();
|
||||
uint64_t read_uint64();
|
||||
|
||||
double read_float64();
|
||||
|
||||
std::string read_string();
|
||||
|
||||
private:
|
||||
Buffer buffer;
|
||||
ChunkedDecoder<Buffer> decoder {buffer};
|
||||
size_t pos {0};
|
||||
|
||||
const byte* raw() const;
|
||||
};
|
||||
|
||||
}
|
1
src/bolt/v1/transport/bolt_encoder.cpp
Normal file
1
src/bolt/v1/transport/bolt_encoder.cpp
Normal file
@ -0,0 +1 @@
|
||||
#include "bolt_encoder.hpp"
|
243
src/bolt/v1/transport/bolt_encoder.hpp
Normal file
243
src/bolt/v1/transport/bolt_encoder.hpp
Normal file
@ -0,0 +1,243 @@
|
||||
#pragma once
|
||||
|
||||
#include "chunked_encoder.hpp"
|
||||
#include "socket_stream.hpp"
|
||||
|
||||
#include "bolt/v1/packing/codes.hpp"
|
||||
#include "bolt/v1/messaging/codes.hpp"
|
||||
|
||||
#include "utils/types/byte.hpp"
|
||||
|
||||
#include "utils/bswap.hpp"
|
||||
|
||||
namespace bolt
|
||||
{
|
||||
|
||||
template <class Socket>
|
||||
class BoltEncoder
|
||||
{
|
||||
static constexpr int64_t plus_2_to_the_31 = 2147483648L;
|
||||
static constexpr int64_t plus_2_to_the_15 = 32768L;
|
||||
static constexpr int64_t plus_2_to_the_7 = 128L;
|
||||
static constexpr int64_t minus_2_to_the_4 = -16L;
|
||||
static constexpr int64_t minus_2_to_the_7 = -128L;
|
||||
static constexpr int64_t minus_2_to_the_15 = -32768L;
|
||||
static constexpr int64_t minus_2_to_the_31 = -2147483648L;
|
||||
|
||||
public:
|
||||
BoltEncoder(Socket& socket) : stream(socket) {}
|
||||
|
||||
void flush()
|
||||
{
|
||||
encoder.flush();
|
||||
}
|
||||
|
||||
void write(byte value)
|
||||
{
|
||||
encoder.write(value);
|
||||
}
|
||||
|
||||
void write(const byte* values, size_t n)
|
||||
{
|
||||
encoder.write(values, n);
|
||||
}
|
||||
|
||||
void write_null()
|
||||
{
|
||||
encoder.write(pack::Null);
|
||||
}
|
||||
|
||||
void write(bool value)
|
||||
{
|
||||
if(value) write_true(); else write_false();
|
||||
}
|
||||
|
||||
void write_true()
|
||||
{
|
||||
encoder.write(pack::True);
|
||||
}
|
||||
|
||||
void write_false()
|
||||
{
|
||||
encoder.write(pack::False);
|
||||
}
|
||||
|
||||
template <class T>
|
||||
void write_value(T value)
|
||||
{
|
||||
value = bswap(value);
|
||||
encoder.write(reinterpret_cast<const byte*>(&value), sizeof(value));
|
||||
}
|
||||
|
||||
void write_integer(int64_t value)
|
||||
{
|
||||
if(value >= minus_2_to_the_4 && value < plus_2_to_the_7)
|
||||
{
|
||||
write(static_cast<byte>(value));
|
||||
}
|
||||
else if(value >= minus_2_to_the_7 && value < minus_2_to_the_4)
|
||||
{
|
||||
write(pack::Int8);
|
||||
write(static_cast<byte>(value));
|
||||
}
|
||||
else if(value >= minus_2_to_the_15 && value < plus_2_to_the_15)
|
||||
{
|
||||
write(pack::Int16);
|
||||
write_value(static_cast<int16_t>(value));
|
||||
}
|
||||
else if(value >= minus_2_to_the_31 && value < plus_2_to_the_31)
|
||||
{
|
||||
write(pack::Int32);
|
||||
write_value(static_cast<int32_t>(value));
|
||||
}
|
||||
else
|
||||
{
|
||||
write(pack::Int64);
|
||||
write_value(value);
|
||||
}
|
||||
}
|
||||
|
||||
void write(double value)
|
||||
{
|
||||
write(pack::Float64);
|
||||
write_value(*reinterpret_cast<const int64_t*>(&value));
|
||||
}
|
||||
|
||||
void write_map_header(size_t size)
|
||||
{
|
||||
if(size < 0x10)
|
||||
{
|
||||
write(static_cast<byte>(pack::TinyMap | size));
|
||||
}
|
||||
else if(size <= 0xFF)
|
||||
{
|
||||
write(pack::Map8);
|
||||
write(static_cast<byte>(size));
|
||||
}
|
||||
else if(size <= 0xFFFF)
|
||||
{
|
||||
write(pack::Map16);
|
||||
write_value<uint16_t>(size);
|
||||
}
|
||||
else
|
||||
{
|
||||
write(pack::Map32);
|
||||
write_value<uint32_t>(size);
|
||||
}
|
||||
}
|
||||
|
||||
void write_empty_map()
|
||||
{
|
||||
write(pack::TinyMap);
|
||||
}
|
||||
|
||||
void write_list_header(size_t size)
|
||||
{
|
||||
if(size < 0x10)
|
||||
{
|
||||
write(static_cast<byte>(pack::TinyList | size));
|
||||
}
|
||||
else if(size <= 0xFF)
|
||||
{
|
||||
write(pack::List8);
|
||||
write(static_cast<byte>(size));
|
||||
}
|
||||
else if(size <= 0xFFFF)
|
||||
{
|
||||
write(pack::List16);
|
||||
write_value<uint16_t>(size);
|
||||
}
|
||||
else
|
||||
{
|
||||
write(pack::List32);
|
||||
write_value<uint32_t>(size);
|
||||
}
|
||||
}
|
||||
|
||||
void write_empty_list()
|
||||
{
|
||||
write(pack::TinyList);
|
||||
}
|
||||
|
||||
void write_string_header(size_t size)
|
||||
{
|
||||
if(size < 0x10)
|
||||
{
|
||||
write(static_cast<byte>(pack::TinyString | size));
|
||||
}
|
||||
else if(size <= 0xFF)
|
||||
{
|
||||
write(pack::String8);
|
||||
write(static_cast<byte>(size));
|
||||
}
|
||||
else if(size <= 0xFFFF)
|
||||
{
|
||||
write(pack::String16);
|
||||
write_value<uint16_t>(size);
|
||||
}
|
||||
else
|
||||
{
|
||||
write(pack::String32);
|
||||
write_value<uint32_t>(size);
|
||||
}
|
||||
}
|
||||
|
||||
void write_string(const std::string& str)
|
||||
{
|
||||
write_string(str.c_str(), str.size());
|
||||
}
|
||||
|
||||
void write_string(const char* str, size_t len)
|
||||
{
|
||||
write_string_header(len);
|
||||
write(reinterpret_cast<const byte*>(str), len);
|
||||
}
|
||||
|
||||
void write_struct_header(size_t size)
|
||||
{
|
||||
if(size < 0x10)
|
||||
{
|
||||
write(static_cast<byte>(pack::TinyStruct | size));
|
||||
}
|
||||
else if(size <= 0xFF)
|
||||
{
|
||||
write(pack::Struct8);
|
||||
write(static_cast<byte>(size));
|
||||
}
|
||||
else
|
||||
{
|
||||
write(pack::Struct16);
|
||||
write_value<uint16_t>(size);
|
||||
}
|
||||
}
|
||||
|
||||
void message_success()
|
||||
{
|
||||
write_struct_header(1);
|
||||
write(underlying_cast(MessageCode::Success));
|
||||
}
|
||||
|
||||
void message_success_empty()
|
||||
{
|
||||
message_success();
|
||||
write_empty_map();
|
||||
}
|
||||
|
||||
void message_record()
|
||||
{
|
||||
write_struct_header(1);
|
||||
write(underlying_cast(MessageCode::Record));
|
||||
}
|
||||
|
||||
void message_record_empty()
|
||||
{
|
||||
message_record();
|
||||
write_empty_list();
|
||||
}
|
||||
|
||||
private:
|
||||
SocketStream stream;
|
||||
ChunkedEncoder<SocketStream> encoder {stream};
|
||||
};
|
||||
|
||||
}
|
16
src/bolt/v1/transport/buffer.cpp
Normal file
16
src/bolt/v1/transport/buffer.cpp
Normal file
@ -0,0 +1,16 @@
|
||||
#include "buffer.hpp"
|
||||
|
||||
namespace bolt
|
||||
{
|
||||
|
||||
void Buffer::write(const byte* data, size_t len)
|
||||
{
|
||||
buffer.insert(buffer.end(), data, data + len);
|
||||
}
|
||||
|
||||
void Buffer::clear()
|
||||
{
|
||||
buffer.clear();
|
||||
}
|
||||
|
||||
}
|
39
src/bolt/v1/transport/buffer.hpp
Normal file
39
src/bolt/v1/transport/buffer.hpp
Normal file
@ -0,0 +1,39 @@
|
||||
#pragma once
|
||||
|
||||
#include <cstdint>
|
||||
#include <cstdlib>
|
||||
#include <vector>
|
||||
|
||||
namespace bolt
|
||||
{
|
||||
|
||||
class Buffer
|
||||
{
|
||||
public:
|
||||
using byte = uint8_t;
|
||||
|
||||
void write(const byte* data, size_t len);
|
||||
|
||||
void clear();
|
||||
|
||||
size_t size() const
|
||||
{
|
||||
return buffer.size();
|
||||
}
|
||||
|
||||
byte operator[](size_t idx) const
|
||||
{
|
||||
return buffer[idx];
|
||||
}
|
||||
|
||||
const byte* data() const
|
||||
{
|
||||
return buffer.data();
|
||||
}
|
||||
|
||||
private:
|
||||
std::vector<byte> buffer;
|
||||
};
|
||||
|
||||
|
||||
}
|
77
src/bolt/v1/transport/chunked_decoder.hpp
Normal file
77
src/bolt/v1/transport/chunked_decoder.hpp
Normal file
@ -0,0 +1,77 @@
|
||||
#pragma once
|
||||
|
||||
#include <cstring>
|
||||
#include <functional>
|
||||
#include <cassert>
|
||||
|
||||
#include "utils/exceptions/basic_exception.hpp"
|
||||
#include "utils/likely.hpp"
|
||||
|
||||
#include "logging/default.hpp"
|
||||
|
||||
namespace bolt
|
||||
{
|
||||
|
||||
template <class Stream>
|
||||
class ChunkedDecoder
|
||||
{
|
||||
public:
|
||||
class DecoderError : public BasicException
|
||||
{
|
||||
public:
|
||||
using BasicException::BasicException;
|
||||
};
|
||||
|
||||
using byte = unsigned char;
|
||||
|
||||
ChunkedDecoder(Stream& stream) : stream(stream) {}
|
||||
|
||||
/* Decode chunked data
|
||||
*
|
||||
* Chunk format looks like:
|
||||
*
|
||||
* |Header| Data ||Header| Data || ... || End |
|
||||
* | 2B | size bytes || 2B | size bytes || ... ||00 00|
|
||||
*/
|
||||
bool decode(const byte*& chunk, size_t n)
|
||||
{
|
||||
while(n > 0)
|
||||
{
|
||||
// get size from first two bytes in the chunk
|
||||
auto size = get_size(chunk);
|
||||
|
||||
if(UNLIKELY(size + 2 > n))
|
||||
throw DecoderError("Chunk size larger than available data.");
|
||||
|
||||
// advance chunk to pass those two bytes
|
||||
chunk += 2;
|
||||
n -= 2;
|
||||
|
||||
// if chunk size is 0, we're done!
|
||||
if(size == 0)
|
||||
return true;
|
||||
|
||||
stream.get().write(chunk, size);
|
||||
|
||||
chunk += size;
|
||||
n -= size;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
bool operator()(const byte*& chunk, size_t n)
|
||||
{
|
||||
return decode(chunk, n);
|
||||
}
|
||||
|
||||
private:
|
||||
std::reference_wrapper<Stream> stream;
|
||||
|
||||
size_t get_size(const byte* chunk)
|
||||
{
|
||||
return size_t(chunk[0]) << 8 | chunk[1];
|
||||
}
|
||||
};
|
||||
|
||||
}
|
92
src/bolt/v1/transport/chunked_encoder.hpp
Normal file
92
src/bolt/v1/transport/chunked_encoder.hpp
Normal file
@ -0,0 +1,92 @@
|
||||
#pragma once
|
||||
|
||||
#include <array>
|
||||
#include <cstring>
|
||||
#include <functional>
|
||||
|
||||
#include "utils/likely.hpp"
|
||||
|
||||
namespace bolt
|
||||
{
|
||||
|
||||
template <class Stream>
|
||||
class ChunkedEncoder
|
||||
{
|
||||
static constexpr size_t N = 65535;
|
||||
static constexpr size_t C = N + 2 /* end mark */;
|
||||
|
||||
public:
|
||||
using byte = unsigned char;
|
||||
|
||||
ChunkedEncoder(Stream& stream) : stream(stream) {}
|
||||
|
||||
static constexpr size_t chunk_size = N - 2;
|
||||
|
||||
void write(byte value)
|
||||
{
|
||||
if(UNLIKELY(pos == N))
|
||||
end_chunk();
|
||||
|
||||
chunk[pos++] = value;
|
||||
}
|
||||
|
||||
void write(const byte* values, size_t n)
|
||||
{
|
||||
while(n > 0)
|
||||
{
|
||||
auto size = n < N - pos ? n : N - pos;
|
||||
|
||||
std::memcpy(chunk.data() + pos, values, size);
|
||||
|
||||
pos += size;
|
||||
n -= size;
|
||||
|
||||
if(pos == N)
|
||||
end_chunk();
|
||||
}
|
||||
}
|
||||
|
||||
void flush()
|
||||
{
|
||||
write_chunk_header();
|
||||
|
||||
// write two zeros to signal message end
|
||||
chunk[pos++] = 0x00;
|
||||
chunk[pos++] = 0x00;
|
||||
|
||||
flush_stream();
|
||||
}
|
||||
|
||||
private:
|
||||
std::reference_wrapper<Stream> stream;
|
||||
|
||||
std::array<byte, C> chunk;
|
||||
size_t pos {2};
|
||||
|
||||
void end_chunk()
|
||||
{
|
||||
write_chunk_header();
|
||||
flush();
|
||||
}
|
||||
|
||||
void write_chunk_header()
|
||||
{
|
||||
// write the size of the chunk
|
||||
uint16_t size = pos - 2;
|
||||
|
||||
// write the higher byte
|
||||
chunk[0] = size >> 8;
|
||||
|
||||
// write the lower byte
|
||||
chunk[1] = size & 0xFF;
|
||||
}
|
||||
|
||||
void flush_stream()
|
||||
{
|
||||
// write chunk to the stream
|
||||
stream.get().write(chunk.data(), pos);
|
||||
pos = 2;
|
||||
}
|
||||
};
|
||||
|
||||
}
|
38
src/bolt/v1/transport/socket_stream.hpp
Normal file
38
src/bolt/v1/transport/socket_stream.hpp
Normal file
@ -0,0 +1,38 @@
|
||||
#pragma once
|
||||
|
||||
#include <cstdint>
|
||||
#include <vector>
|
||||
#include <cstdio>
|
||||
|
||||
#include "io/network/socket.hpp"
|
||||
#include "stream_error.hpp"
|
||||
|
||||
namespace bolt
|
||||
{
|
||||
|
||||
class SocketStream
|
||||
{
|
||||
public:
|
||||
using byte = uint8_t;
|
||||
|
||||
SocketStream(io::Socket& socket) : socket(socket) {}
|
||||
|
||||
void write(const byte* data, size_t n)
|
||||
{
|
||||
while(n > 0)
|
||||
{
|
||||
auto written = socket.get().write(data, n);
|
||||
|
||||
if(UNLIKELY(written == -1))
|
||||
throw StreamError("Can't write to stream");
|
||||
|
||||
n -= written;
|
||||
data += written;
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
std::reference_wrapper<io::Socket> socket;
|
||||
};
|
||||
|
||||
}
|
14
src/bolt/v1/transport/stream_error.hpp
Normal file
14
src/bolt/v1/transport/stream_error.hpp
Normal file
@ -0,0 +1,14 @@
|
||||
#pragma once
|
||||
|
||||
#include "utils/exceptions/basic_exception.hpp"
|
||||
|
||||
namespace bolt
|
||||
{
|
||||
|
||||
class StreamError : BasicException
|
||||
{
|
||||
public:
|
||||
using BasicException::BasicException;
|
||||
};
|
||||
|
||||
}
|
@ -11,6 +11,8 @@ public:
|
||||
class Printer
|
||||
{
|
||||
public:
|
||||
friend class Entry;
|
||||
|
||||
Printer(std::ostream& stream, const std::string& header)
|
||||
: stream(stream)
|
||||
{
|
||||
@ -49,10 +51,10 @@ public:
|
||||
}
|
||||
|
||||
template <class T>
|
||||
friend Entry& operator<<(Entry& entry, const T& item)
|
||||
Entry& operator<<(const T& item)
|
||||
{
|
||||
entry.printer.stream << item;
|
||||
return entry;
|
||||
printer.stream << item;
|
||||
return *this;
|
||||
}
|
||||
|
||||
private:
|
||||
|
1
src/cypher/lexertl
Submodule
1
src/cypher/lexertl
Submodule
@ -0,0 +1 @@
|
||||
Subproject commit 7d4d36a357027df0e817453cc9cf948f71047ca9
|
8
src/dbms/dbms.hpp
Normal file
8
src/dbms/dbms.hpp
Normal file
@ -0,0 +1,8 @@
|
||||
#pragma once
|
||||
|
||||
class Dbms
|
||||
{
|
||||
public:
|
||||
|
||||
|
||||
};
|
9
src/dbms/server/bolt.hpp
Normal file
9
src/dbms/server/bolt.hpp
Normal file
@ -0,0 +1,9 @@
|
||||
#pragma once
|
||||
|
||||
class BoltServer
|
||||
{
|
||||
public:
|
||||
BoltServer() = default;
|
||||
|
||||
|
||||
};
|
68
src/examples/bolt.cpp
Normal file
68
src/examples/bolt.cpp
Normal file
@ -0,0 +1,68 @@
|
||||
#include <iostream>
|
||||
#include <signal.h>
|
||||
|
||||
#include "bolt/v1/server/server.hpp"
|
||||
#include "bolt/v1/server/worker.hpp"
|
||||
|
||||
#include "io/network/socket.hpp"
|
||||
|
||||
#include "logging/default.hpp"
|
||||
#include "logging/streams/stdout.hpp"
|
||||
|
||||
static bolt::Server<bolt::Worker>* serverptr;
|
||||
|
||||
Logger logger;
|
||||
|
||||
void sigint_handler(int s)
|
||||
{
|
||||
auto signal = s == SIGINT ? "SIGINT" : "SIGABRT";
|
||||
|
||||
logger.info("Recieved signal {}", signal);
|
||||
logger.info("Shutting down...");
|
||||
|
||||
std::exit(EXIT_SUCCESS);
|
||||
}
|
||||
|
||||
static constexpr const char* interface = "0.0.0.0";
|
||||
static constexpr const char* port = "7687";
|
||||
|
||||
int main(void)
|
||||
{
|
||||
logging::init_sync();
|
||||
logging::log->pipe(std::make_unique<Stdout>());
|
||||
logger = logging::log->logger("Main");
|
||||
|
||||
signal(SIGINT, sigint_handler);
|
||||
signal(SIGABRT, sigint_handler);
|
||||
|
||||
io::Socket socket;
|
||||
|
||||
try
|
||||
{
|
||||
socket = io::Socket::bind(interface, port);
|
||||
}
|
||||
catch(io::NetworkError e)
|
||||
{
|
||||
logger.error("Cannot bind to socket on {} at {}", interface, port);
|
||||
logger.error("{}", e.what());
|
||||
|
||||
std::exit(EXIT_FAILURE);
|
||||
}
|
||||
|
||||
socket.set_non_blocking();
|
||||
socket.listen(1024);
|
||||
|
||||
logger.info("Listening on {} at {}", interface, port);
|
||||
|
||||
bolt::Server<bolt::Worker> server(std::move(socket));
|
||||
serverptr = &server;
|
||||
|
||||
constexpr size_t N = 1;
|
||||
|
||||
logger.info("Starting {} workers", N);
|
||||
server.start(N);
|
||||
|
||||
logger.info("Shutting down...");
|
||||
|
||||
return EXIT_SUCCESS;
|
||||
}
|
3
src/examples/compile-bolt.sh
Normal file
3
src/examples/compile-bolt.sh
Normal file
@ -0,0 +1,3 @@
|
||||
#!/bin/bash
|
||||
|
||||
clang++ -g -rdynamic ../bolt/v1/states/executor.cpp ../logging/streams/stdout.cpp ../logging/levels.cpp ../logging/logs/sync_log.cpp ../logging/logs/async_log.cpp ../logging/default.cpp ../logging/log.cpp ../bolt/v1/bolt.cpp ../bolt/v1/states/init.cpp ../bolt/v1/states.cpp ../bolt/v1/states/handshake.cpp ../bolt/v1/transport/bolt_decoder.cpp ../bolt/v1/transport/buffer.cpp ../bolt/v1/session.cpp bolt.cpp ../io/network/tls.cpp -o bolt -std=c++14 -I ../ -I ../../libs/fmt/ -pthread -lcppformat -lssl -lcrypto
|
81
src/examples/endinan.cpp
Normal file
81
src/examples/endinan.cpp
Normal file
@ -0,0 +1,81 @@
|
||||
#include <iostream>
|
||||
#include <cstdint>
|
||||
|
||||
#include <byteswap.h>
|
||||
|
||||
char b[8] = {1, 2, 3, 4, 0, 0, 0, 1};
|
||||
|
||||
int64_t safe_int64(const char* b)
|
||||
{
|
||||
return int64_t(b[0]) << 56 | int64_t(b[1]) << 48
|
||||
| int64_t(b[2]) << 40 | int64_t(b[3]) << 32
|
||||
| int64_t(b[4]) << 24 | int64_t(b[5]) << 16
|
||||
| int64_t(b[6]) << 8 | int64_t(b[7]);
|
||||
}
|
||||
|
||||
int64_t unsafe_int64(const char* b)
|
||||
{
|
||||
auto i = reinterpret_cast<const int64_t*>(b);
|
||||
return __bswap_64(*i);
|
||||
}
|
||||
|
||||
int32_t safe_int32(const char* b)
|
||||
{
|
||||
return b[0] << 24 | b[1] << 16 | b[2] << 8 | b[3];
|
||||
}
|
||||
|
||||
int32_t unsafe_int32(const char* b)
|
||||
{
|
||||
auto i = reinterpret_cast<const int32_t*>(b);
|
||||
return __bswap_32(*i);
|
||||
}
|
||||
|
||||
[[clang::optnone]]
|
||||
void test(uint64_t n)
|
||||
{
|
||||
for(uint64_t i = 0; i < n; ++i)
|
||||
unsafe_int64(b);
|
||||
}
|
||||
|
||||
uint8_t f[8] = {0x3F, 0xF1, 0x99, 0x99, 0x99, 0x99, 0x99, 0x9A};
|
||||
|
||||
double ff = 1.1;
|
||||
|
||||
double get_double(const uint8_t* b)
|
||||
{
|
||||
auto v = __bswap_64(*reinterpret_cast<const uint64_t*>(b));
|
||||
return *reinterpret_cast<const double*>(&v);
|
||||
}
|
||||
|
||||
void print_hex(const char* buf, size_t n)
|
||||
{
|
||||
for (size_t i = 0; i < n; ++i)
|
||||
printf("%02X ", (unsigned char)buf[i]);
|
||||
}
|
||||
|
||||
void print_hex(const uint8_t* buf, size_t n)
|
||||
{
|
||||
print_hex((const char*)buf, n);
|
||||
}
|
||||
|
||||
int main(void)
|
||||
{
|
||||
auto dd = get_double(f);
|
||||
|
||||
print_hex(f, 8);
|
||||
|
||||
std::cout << std::endl;
|
||||
print_hex((const uint8_t*)(&ff), 8);
|
||||
|
||||
std::cout << std::endl;
|
||||
print_hex((const uint8_t*)(&dd), 8);
|
||||
|
||||
std::cout << dd << std::endl;
|
||||
|
||||
/* std::cout << safe_int64(b) << std::endl; */
|
||||
/* std::cout << unsafe_int64(b) << std::endl; */
|
||||
|
||||
/* test(1000000000ull); */
|
||||
|
||||
return 0;
|
||||
}
|
@ -1,17 +0,0 @@
|
||||
CXX=clang++
|
||||
CFLAGS=-std=c++1y -O2 -Wall -Wno-unknown-pragmas
|
||||
LDFLAGS=-lhttp_parser -pthread
|
||||
|
||||
INC=-I../../
|
||||
SOURCES=test.cpp
|
||||
EXECUTABLE=test
|
||||
|
||||
all: $(EXECUTABLE)
|
||||
|
||||
$(EXECUTABLE): $(SOURCES)
|
||||
$(CXX) $(CFLAGS) $(SOURCES) -o $(EXECUTABLE) $(INC) $(LDFLAGS)
|
||||
|
||||
.PHONY:
|
||||
clean:
|
||||
rm -f test
|
||||
rm -f *.o
|
@ -1,281 +0,0 @@
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/socket.h>
|
||||
#include <netdb.h>
|
||||
#include <unistd.h>
|
||||
#include <fcntl.h>
|
||||
#include <sys/epoll.h>
|
||||
#include <errno.h>
|
||||
|
||||
char buf[512];
|
||||
#define MAXEVENTS 64
|
||||
|
||||
static int
|
||||
make_socket_non_blocking (int sfd)
|
||||
{
|
||||
int flags, s;
|
||||
|
||||
flags = fcntl (sfd, F_GETFL, 0);
|
||||
if (flags == -1)
|
||||
{
|
||||
perror ("fcntl");
|
||||
return -1;
|
||||
}
|
||||
|
||||
flags |= O_NONBLOCK;
|
||||
s = fcntl (sfd, F_SETFL, flags);
|
||||
if (s == -1)
|
||||
{
|
||||
perror ("fcntl");
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
create_and_bind (char *port)
|
||||
{
|
||||
struct addrinfo hints;
|
||||
struct addrinfo *result, *rp;
|
||||
int s, sfd;
|
||||
|
||||
memset (&hints, 0, sizeof (struct addrinfo));
|
||||
hints.ai_family = AF_UNSPEC; /* Return IPv4 and IPv6 choices */
|
||||
hints.ai_socktype = SOCK_STREAM; /* We want a TCP socket */
|
||||
hints.ai_flags = AI_PASSIVE; /* All interfaces */
|
||||
|
||||
s = getaddrinfo (NULL, port, &hints, &result);
|
||||
if (s != 0)
|
||||
{
|
||||
fprintf (stderr, "getaddrinfo: %s\n", gai_strerror (s));
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (rp = result; rp != NULL; rp = rp->ai_next)
|
||||
{
|
||||
sfd = socket (rp->ai_family, rp->ai_socktype, rp->ai_protocol);
|
||||
if (sfd == -1)
|
||||
continue;
|
||||
|
||||
s = bind (sfd, rp->ai_addr, rp->ai_addrlen);
|
||||
if (s == 0)
|
||||
{
|
||||
/* We managed to bind successfully! */
|
||||
break;
|
||||
}
|
||||
|
||||
close (sfd);
|
||||
}
|
||||
|
||||
if (rp == NULL)
|
||||
{
|
||||
fprintf (stderr, "Could not bind\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
freeaddrinfo (result);
|
||||
|
||||
return sfd;
|
||||
}
|
||||
|
||||
int
|
||||
main (int argc, char *argv[])
|
||||
{
|
||||
const char* response = "HTTP/1.1 200 OK\r\nContent-Length:0\r\nConnection:Keep-Alive\r\n\r\n";
|
||||
|
||||
size_t len = strlen(response);
|
||||
|
||||
int sfd, s;
|
||||
int efd;
|
||||
struct epoll_event event;
|
||||
struct epoll_event *events;
|
||||
|
||||
if (argc != 2)
|
||||
{
|
||||
fprintf (stderr, "Usage: %s [port]\n", argv[0]);
|
||||
exit (EXIT_FAILURE);
|
||||
}
|
||||
|
||||
sfd = create_and_bind (argv[1]);
|
||||
if (sfd == -1)
|
||||
abort ();
|
||||
|
||||
s = make_socket_non_blocking (sfd);
|
||||
if (s == -1)
|
||||
abort ();
|
||||
|
||||
s = listen (sfd, SOMAXCONN);
|
||||
if (s == -1)
|
||||
{
|
||||
perror ("listen");
|
||||
abort ();
|
||||
}
|
||||
|
||||
efd = epoll_create1 (0);
|
||||
if (efd == -1)
|
||||
{
|
||||
perror ("epoll_create");
|
||||
abort ();
|
||||
}
|
||||
|
||||
event.data.fd = sfd;
|
||||
event.events = EPOLLIN | EPOLLET;
|
||||
s = epoll_ctl (efd, EPOLL_CTL_ADD, sfd, &event);
|
||||
if (s == -1)
|
||||
{
|
||||
perror ("epoll_ctl");
|
||||
abort ();
|
||||
}
|
||||
|
||||
/* Buffer where events are returned */
|
||||
events = calloc (MAXEVENTS, sizeof event);
|
||||
|
||||
/* The event loop */
|
||||
while (1)
|
||||
{
|
||||
int n, i;
|
||||
|
||||
n = epoll_wait (efd, events, MAXEVENTS, -1);
|
||||
for (i = 0; i < n; i++)
|
||||
{
|
||||
if ((events[i].events & EPOLLERR) ||
|
||||
(events[i].events & EPOLLHUP) ||
|
||||
(!(events[i].events & EPOLLIN)))
|
||||
{
|
||||
/* An error has occured on this fd, or the socket is not
|
||||
ready for reading (why were we notified then?) */
|
||||
fprintf (stderr, "epoll error\n");
|
||||
close (events[i].data.fd);
|
||||
continue;
|
||||
}
|
||||
|
||||
else if (sfd == events[i].data.fd)
|
||||
{
|
||||
/* We have a notification on the listening socket, which
|
||||
means one or more incoming connections. */
|
||||
while (1)
|
||||
{
|
||||
struct sockaddr in_addr;
|
||||
socklen_t in_len;
|
||||
int infd;
|
||||
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
|
||||
|
||||
in_len = sizeof in_addr;
|
||||
infd = accept (sfd, &in_addr, &in_len);
|
||||
if (infd == -1)
|
||||
{
|
||||
if ((errno == EAGAIN) ||
|
||||
(errno == EWOULDBLOCK))
|
||||
{
|
||||
/* We have processed all incoming
|
||||
connections. */
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
perror ("accept");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
s = getnameinfo (&in_addr, in_len,
|
||||
hbuf, sizeof hbuf,
|
||||
sbuf, sizeof sbuf,
|
||||
NI_NUMERICHOST | NI_NUMERICSERV);
|
||||
if (s == 0)
|
||||
{
|
||||
printf("Accepted connection on descriptor %d "
|
||||
"(host=%s, port=%s)\n", infd, hbuf, sbuf);
|
||||
}
|
||||
|
||||
/* Make the incoming socket non-blocking and add it to the
|
||||
list of fds to monitor. */
|
||||
s = make_socket_non_blocking (infd);
|
||||
if (s == -1)
|
||||
abort ();
|
||||
|
||||
event.data.fd = infd;
|
||||
event.events = EPOLLIN | EPOLLET;
|
||||
s = epoll_ctl (efd, EPOLL_CTL_ADD, infd, &event);
|
||||
if (s == -1)
|
||||
{
|
||||
perror ("epoll_ctl");
|
||||
abort ();
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* We have data on the fd waiting to be read. Read and
|
||||
display it. We must read whatever data is available
|
||||
completely, as we are running in edge-triggered mode
|
||||
and won't get a notification again for the same
|
||||
data. */
|
||||
int done = 0;
|
||||
|
||||
while (1)
|
||||
{
|
||||
ssize_t count;
|
||||
|
||||
count = read (events[i].data.fd, buf, sizeof buf);
|
||||
if (count == -1)
|
||||
{
|
||||
/* If errno == EAGAIN, that means we have read all
|
||||
data. So go back to the main loop. */
|
||||
if (errno != EAGAIN)
|
||||
{
|
||||
perror ("read");
|
||||
done = 1;
|
||||
}
|
||||
break;
|
||||
}
|
||||
else if (count == 0)
|
||||
{
|
||||
/* End of file. The remote has closed the
|
||||
connection. */
|
||||
done = 1;
|
||||
break;
|
||||
}
|
||||
|
||||
size_t sum = 0;
|
||||
char* resp = (char*)response;
|
||||
|
||||
while(sum < len)
|
||||
{
|
||||
int k = write(event.data.fd, resp, len - sum);
|
||||
sum += k;
|
||||
resp += k;
|
||||
|
||||
}
|
||||
|
||||
/* Write the buffer to standard output */
|
||||
if (s == -1)
|
||||
{
|
||||
perror ("write");
|
||||
abort ();
|
||||
}
|
||||
}
|
||||
|
||||
if (done)
|
||||
{
|
||||
printf ("Closed connection on descriptor %d\n",
|
||||
events[i].data.fd);
|
||||
|
||||
/* Closing the descriptor will make epoll remove it
|
||||
from the set of descriptors which are monitored. */
|
||||
close (events[i].data.fd);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
free (events);
|
||||
|
||||
close (sfd);
|
||||
|
||||
return EXIT_SUCCESS;
|
||||
}
|
@ -9,6 +9,12 @@
|
||||
namespace io
|
||||
{
|
||||
|
||||
class EpollError : BasicException
|
||||
{
|
||||
public:
|
||||
using BasicException::BasicException;
|
||||
};
|
||||
|
||||
class Epoll
|
||||
{
|
||||
public:
|
||||
@ -17,14 +23,18 @@ public:
|
||||
Epoll(int flags)
|
||||
{
|
||||
epoll_fd = epoll_create1(flags);
|
||||
|
||||
if(UNLIKELY(epoll_fd == -1))
|
||||
throw EpollError("Can't create epoll file descriptor");
|
||||
}
|
||||
|
||||
void add(Socket& socket, Event* event)
|
||||
template <class Stream>
|
||||
void add(Stream& stream, Event* event)
|
||||
{
|
||||
auto status = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, socket, event);
|
||||
auto status = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, stream, event);
|
||||
|
||||
if(UNLIKELY(status))
|
||||
throw NetworkError("Can't add an event to epoll listener.");
|
||||
throw EpollError("Can't add an event to epoll listener.");
|
||||
}
|
||||
|
||||
int wait(Event* events, int max_events, int timeout)
|
||||
|
@ -1,14 +1,12 @@
|
||||
#pragma once
|
||||
|
||||
#include "socket.hpp"
|
||||
#include "epoll.hpp"
|
||||
#include "utils/crtp.hpp"
|
||||
|
||||
namespace io
|
||||
{
|
||||
|
||||
template <class Derived, class Stream,
|
||||
size_t max_events = 64, int wait_timeout = -1>
|
||||
template <class Derived, size_t max_events = 64, int wait_timeout = -1>
|
||||
class EventListener : public Crtp<Derived>
|
||||
{
|
||||
public:
|
||||
@ -16,32 +14,28 @@ public:
|
||||
|
||||
EventListener(uint32_t flags = 0) : listener(flags) {}
|
||||
|
||||
void add(Stream& stream)
|
||||
{
|
||||
// add the stream to the event listener
|
||||
listener.add(stream.socket, &stream.event);
|
||||
}
|
||||
|
||||
void wait_and_process_events()
|
||||
{
|
||||
// TODO hardcoded a wait timeout because of thread joining
|
||||
// when you shutdown the server. This should be wait_timeout of the
|
||||
// template parameter and should almost never change from that.
|
||||
// thread joining should be resolved using a signal that interrupts
|
||||
// the system call.
|
||||
|
||||
// waits for an event/multiple events and returns a maximum of
|
||||
// max_events and stores them in the events array. it waits for
|
||||
// wait_timeout milliseconds. if wait_timeout is achieved, returns 0
|
||||
auto n = listener.wait(events, max_events, wait_timeout);
|
||||
|
||||
LOG_DEBUG("received " << n << " events");
|
||||
auto n = listener.wait(events, max_events, 200);
|
||||
|
||||
// go through all events and process them in order
|
||||
for(int i = 0; i < n; ++i)
|
||||
{
|
||||
auto& event = events[i];
|
||||
auto& stream = *reinterpret_cast<Stream*>(event.data.ptr);
|
||||
|
||||
// a stream was closed
|
||||
// hangup event
|
||||
if(UNLIKELY(event.events & EPOLLRDHUP))
|
||||
{
|
||||
LOG_DEBUG("EPOLLRDHUP event recieved on socket " << stream.id());
|
||||
this->derived().on_close(stream);
|
||||
this->derived().on_close_event(event);
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -49,18 +43,12 @@ public:
|
||||
if(UNLIKELY(!(event.events & EPOLLIN) ||
|
||||
event.events & (EPOLLHUP | EPOLLERR)))
|
||||
{
|
||||
LOG_DEBUG(">> EPOLL ERR");
|
||||
LOG_DEBUG("EPOLLIN" << (event.events & EPOLLIN));
|
||||
LOG_DEBUG("EPOLLHUP" << (event.events & EPOLLHUP));
|
||||
LOG_DEBUG("EPOLLERR" << (event.events & EPOLLERR));
|
||||
|
||||
this->derived().on_error(stream);
|
||||
this->derived().on_error_event(event);
|
||||
continue;
|
||||
}
|
||||
|
||||
LOG_DEBUG("signalling that data exists on socket " << stream.id());
|
||||
// we have some data waiting to be read
|
||||
this->derived().on_data(stream);
|
||||
this->derived().on_data_event(event);
|
||||
}
|
||||
|
||||
// this will be optimized out :D
|
||||
|
@ -2,9 +2,15 @@
|
||||
|
||||
#include <stdexcept>
|
||||
|
||||
class NetworkError : public std::runtime_error
|
||||
#include "utils/exceptions/basic_exception.hpp"
|
||||
|
||||
namespace io
|
||||
{
|
||||
|
||||
class NetworkError : public BasicException
|
||||
{
|
||||
public:
|
||||
using std::runtime_error::runtime_error;
|
||||
using BasicException::BasicException;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -1,27 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
INTERVAL="1" # update interval in seconds
|
||||
|
||||
if [ -z "$1" ]; then
|
||||
echo
|
||||
echo usage: $0 [network-interface]
|
||||
echo
|
||||
echo e.g. $0 eth0
|
||||
echo
|
||||
echo shows packets-per-second
|
||||
exit
|
||||
fi
|
||||
|
||||
IF=$1
|
||||
|
||||
while true
|
||||
do
|
||||
R1=`cat /sys/class/net/$1/statistics/rx_packets`
|
||||
T1=`cat /sys/class/net/$1/statistics/tx_packets`
|
||||
sleep $INTERVAL
|
||||
R2=`cat /sys/class/net/$1/statistics/rx_packets`
|
||||
T2=`cat /sys/class/net/$1/statistics/tx_packets`
|
||||
TXPPS=`expr $T2 - $T1`
|
||||
RXPPS=`expr $R2 - $R1`
|
||||
echo "TX $1: $TXPPS pkts/s RX $1: $RXPPS pkts/s"
|
||||
done
|
93
src/io/network/secure_socket.hpp
Normal file
93
src/io/network/secure_socket.hpp
Normal file
@ -0,0 +1,93 @@
|
||||
#pragma once
|
||||
|
||||
#include "tls.hpp"
|
||||
#include "io/network/socket.hpp"
|
||||
#include "tls_error.hpp"
|
||||
#include "utils/types/byte.hpp"
|
||||
|
||||
#include <iostream>
|
||||
|
||||
namespace io
|
||||
{
|
||||
|
||||
class SecureSocket
|
||||
{
|
||||
public:
|
||||
SecureSocket(Socket&& socket, const Tls::Context& tls)
|
||||
: socket(std::forward<Socket>(socket))
|
||||
{
|
||||
ssl = SSL_new(tls);
|
||||
SSL_set_fd(ssl, this->socket);
|
||||
|
||||
SSL_set_accept_state(ssl);
|
||||
|
||||
if(SSL_accept(ssl) <= 0)
|
||||
ERR_print_errors_fp(stderr);
|
||||
}
|
||||
|
||||
SecureSocket(SecureSocket&& other)
|
||||
{
|
||||
*this = std::forward<SecureSocket>(other);
|
||||
}
|
||||
|
||||
SecureSocket& operator=(SecureSocket&& other)
|
||||
{
|
||||
socket = std::move(other.socket);
|
||||
|
||||
ssl = other.ssl;
|
||||
other.ssl = nullptr;
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
||||
~SecureSocket()
|
||||
{
|
||||
if(ssl == nullptr)
|
||||
return;
|
||||
|
||||
std::cout << "DELETING SSL" << std::endl;
|
||||
|
||||
SSL_free(ssl);
|
||||
}
|
||||
|
||||
int error(int status)
|
||||
{
|
||||
return SSL_get_error(ssl, status);
|
||||
}
|
||||
|
||||
int write(const std::string& str)
|
||||
{
|
||||
return write(str.c_str(), str.size());
|
||||
}
|
||||
|
||||
int write(const byte* data, size_t len)
|
||||
{
|
||||
return SSL_write(ssl, data, len);
|
||||
}
|
||||
|
||||
int write(const char* data, size_t len)
|
||||
{
|
||||
return SSL_write(ssl, data, len);
|
||||
}
|
||||
|
||||
int read(char* buffer, size_t len)
|
||||
{
|
||||
return SSL_read(ssl, buffer, len);
|
||||
}
|
||||
|
||||
operator int()
|
||||
{
|
||||
return socket;
|
||||
}
|
||||
|
||||
operator Socket&()
|
||||
{
|
||||
return socket;
|
||||
}
|
||||
|
||||
private:
|
||||
Socket socket;
|
||||
SSL* ssl {nullptr};
|
||||
};
|
||||
|
||||
}
|
61
src/io/network/secure_stream_reader.hpp
Normal file
61
src/io/network/secure_stream_reader.hpp
Normal file
@ -0,0 +1,61 @@
|
||||
#pragma once
|
||||
|
||||
#include <openssl/ssl.h>
|
||||
|
||||
#include "stream_reader.hpp"
|
||||
#include "logging/default.hpp"
|
||||
|
||||
namespace io
|
||||
{
|
||||
using namespace memory::literals;
|
||||
|
||||
template <class Derived, class Stream>
|
||||
class SecureStreamReader : public StreamReader<Derived, Stream>
|
||||
{
|
||||
public:
|
||||
struct Buffer
|
||||
{
|
||||
char* ptr;
|
||||
size_t len;
|
||||
};
|
||||
|
||||
SecureStreamReader(uint32_t flags = 0)
|
||||
: StreamReader<Derived, Stream>(flags) {}
|
||||
|
||||
void on_data(Stream& stream)
|
||||
{
|
||||
while(true)
|
||||
{
|
||||
// allocate the buffer to fill the data
|
||||
auto buf = this->derived().on_alloc(stream);
|
||||
|
||||
// read from the buffer at most buf.len bytes
|
||||
auto len = stream.socket.read(buf.ptr, buf.len);
|
||||
|
||||
if(LIKELY(len > 0))
|
||||
{
|
||||
buf.len = len;
|
||||
return this->derived().on_read(stream, buf);
|
||||
}
|
||||
|
||||
auto err = stream.socket.error(len);
|
||||
|
||||
// the socket is not ready for reading yet
|
||||
if(err == SSL_ERROR_WANT_READ ||
|
||||
err == SSL_ERROR_WANT_WRITE ||
|
||||
err == SSL_ERROR_WANT_X509_LOOKUP)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
// the socket notified a close event
|
||||
if(err == SSL_ERROR_ZERO_RETURN)
|
||||
return stream.close();
|
||||
|
||||
// some other error occurred, check errno
|
||||
return this->derived().on_error(stream);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
}
|
@ -5,33 +5,39 @@
|
||||
namespace io
|
||||
{
|
||||
|
||||
template <class Derived, class Stream>
|
||||
class Server : public StreamReader<Derived, Stream>
|
||||
template <class Derived>
|
||||
class Server : public EventListener<Derived>
|
||||
{
|
||||
public:
|
||||
bool accept(Socket& socket)
|
||||
Server(Socket&& socket) : socket(std::forward<Socket>(socket))
|
||||
{
|
||||
// accept a connection from a socket
|
||||
auto s = socket.accept(nullptr, nullptr);
|
||||
LOG_DEBUG("socket " << s.id() << " accepted");
|
||||
event.data.fd = this->socket;
|
||||
event.events = EPOLLIN | EPOLLET;
|
||||
|
||||
if(!s.is_open())
|
||||
return false;
|
||||
|
||||
// make the recieved socket non blocking
|
||||
s.set_non_blocking();
|
||||
|
||||
auto& stream = this->derived().on_connect(std::move(s));
|
||||
|
||||
// we want to listen to an incoming event which is edge triggered and
|
||||
// we also want to listen on the hangup event
|
||||
stream.event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
|
||||
|
||||
// add the connection to the event listener
|
||||
this->add(stream);
|
||||
|
||||
return true;
|
||||
this->listener.add(this->socket, &event);
|
||||
}
|
||||
|
||||
void on_close_event(Epoll::Event& event)
|
||||
{
|
||||
::close(event.data.fd);
|
||||
}
|
||||
|
||||
void on_error_event(Epoll::Event& event)
|
||||
{
|
||||
::close(event.data.fd);
|
||||
}
|
||||
|
||||
void on_data_event(Epoll::Event& event)
|
||||
{
|
||||
if(UNLIKELY(socket != event.data.fd))
|
||||
return;
|
||||
|
||||
this->derived().on_connect();
|
||||
}
|
||||
|
||||
protected:
|
||||
Epoll::Event event;
|
||||
Socket socket;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -16,17 +16,24 @@
|
||||
#include "addrinfo.hpp"
|
||||
#include "utils/likely.hpp"
|
||||
|
||||
#include "logging/default.hpp"
|
||||
|
||||
#include <iostream>
|
||||
|
||||
namespace io
|
||||
{
|
||||
|
||||
class Socket
|
||||
{
|
||||
protected:
|
||||
Socket(int family, int socket_type, int protocol)
|
||||
{
|
||||
socket = ::socket(family, socket_type, protocol);
|
||||
}
|
||||
|
||||
public:
|
||||
using byte = uint8_t;
|
||||
|
||||
Socket(int socket = -1) : socket(socket) {}
|
||||
|
||||
Socket(const Socket&) = delete;
|
||||
@ -41,7 +48,16 @@ public:
|
||||
if(socket == -1)
|
||||
return;
|
||||
|
||||
close(socket);
|
||||
|
||||
std::cout << "DELETING SOCKET" << std::endl;
|
||||
|
||||
::close(socket);
|
||||
}
|
||||
|
||||
void close()
|
||||
{
|
||||
::close(socket);
|
||||
socket = -1;
|
||||
}
|
||||
|
||||
Socket& operator=(Socket&& other)
|
||||
@ -73,7 +89,7 @@ public:
|
||||
continue;
|
||||
|
||||
if(::connect(s, it->ai_addr, it->ai_addrlen) == 0)
|
||||
return std::move(s);
|
||||
return s;
|
||||
}
|
||||
|
||||
throw NetworkError("Unable to connect to socket");
|
||||
@ -100,7 +116,7 @@ public:
|
||||
continue;
|
||||
|
||||
if(::bind(s, it->ai_addr, it->ai_addrlen) == 0)
|
||||
return std::move(s);
|
||||
return s;
|
||||
}
|
||||
|
||||
throw NetworkError("Unable to bind to socket");
|
||||
@ -141,22 +157,38 @@ public:
|
||||
return socket;
|
||||
}
|
||||
|
||||
size_t write(const std::string& str)
|
||||
int write(const std::string& str)
|
||||
{
|
||||
return ::write(socket, str.c_str(), str.size());
|
||||
return write(str.c_str(), str.size());
|
||||
}
|
||||
|
||||
size_t write(const char* data, size_t len)
|
||||
int write(const char* data, size_t len)
|
||||
{
|
||||
return write(reinterpret_cast<const byte*>(data), len);
|
||||
}
|
||||
|
||||
int write(const byte* data, size_t len)
|
||||
{
|
||||
#ifndef NDEBUG
|
||||
std::stringstream stream;
|
||||
|
||||
for(size_t i = 0; i < len; ++i)
|
||||
stream << fmt::format("{:02X} ", static_cast<byte>(data[i]));
|
||||
|
||||
auto str = stream.str();
|
||||
|
||||
logging::debug("[Write {}B] {}", len, str);
|
||||
#endif
|
||||
|
||||
return ::write(socket, data, len);
|
||||
}
|
||||
|
||||
size_t read(char* buffer, size_t len)
|
||||
int read(void* buffer, size_t len)
|
||||
{
|
||||
return ::read(socket, buffer, len);
|
||||
}
|
||||
|
||||
private:
|
||||
protected:
|
||||
int socket;
|
||||
};
|
||||
|
||||
|
13
src/io/network/stream_dispatcher.hpp
Normal file
13
src/io/network/stream_dispatcher.hpp
Normal file
@ -0,0 +1,13 @@
|
||||
#pragma once
|
||||
|
||||
#include "
|
||||
|
||||
namespace io
|
||||
{
|
||||
|
||||
class StreamDispatcher
|
||||
{
|
||||
|
||||
};
|
||||
|
||||
}
|
43
src/io/network/stream_listener.hpp
Normal file
43
src/io/network/stream_listener.hpp
Normal file
@ -0,0 +1,43 @@
|
||||
#pragma once
|
||||
|
||||
#include "event_listener.hpp"
|
||||
|
||||
namespace io
|
||||
{
|
||||
|
||||
template <class Derived, class Stream,
|
||||
size_t max_events = 64, int wait_timeout = -1>
|
||||
class StreamListener : public EventListener<Derived, max_events, wait_timeout>
|
||||
{
|
||||
public:
|
||||
using EventListener<Derived, max_events, wait_timeout>::EventListener;
|
||||
|
||||
void add(Stream& stream)
|
||||
{
|
||||
// add the stream to the event listener
|
||||
this->listener.add(stream.socket, &stream.event);
|
||||
}
|
||||
|
||||
void on_close_event(Epoll::Event& event)
|
||||
{
|
||||
this->derived().on_close(to_stream(event));
|
||||
}
|
||||
|
||||
void on_error_event(Epoll::Event& event)
|
||||
{
|
||||
this->derived().on_error(to_stream(event));
|
||||
}
|
||||
|
||||
void on_data_event(Epoll::Event& event)
|
||||
{
|
||||
this->derived().on_data(to_stream(event));
|
||||
}
|
||||
|
||||
private:
|
||||
Stream& to_stream(Epoll::Event& event)
|
||||
{
|
||||
return *reinterpret_cast<Stream*>(event.data.ptr);
|
||||
}
|
||||
};
|
||||
|
||||
}
|
@ -1,6 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include "event_listener.hpp"
|
||||
#include "stream_listener.hpp"
|
||||
#include "memory/literals.hpp"
|
||||
|
||||
namespace io
|
||||
@ -8,7 +8,7 @@ namespace io
|
||||
using namespace memory::literals;
|
||||
|
||||
template <class Derived, class Stream>
|
||||
class StreamReader : public EventListener<Derived, Stream>
|
||||
class StreamReader : public StreamListener<Derived, Stream>
|
||||
{
|
||||
public:
|
||||
struct Buffer
|
||||
@ -17,12 +17,41 @@ public:
|
||||
size_t len;
|
||||
};
|
||||
|
||||
StreamReader(uint32_t flags = 0) : EventListener<Derived, Stream>(flags) {}
|
||||
StreamReader(uint32_t flags = 0) : StreamListener<Derived, Stream>(flags) {}
|
||||
|
||||
bool accept(Socket& socket)
|
||||
{
|
||||
// accept a connection from a socket
|
||||
auto s = socket.accept(nullptr, nullptr);
|
||||
|
||||
if(!s.is_open())
|
||||
return false;
|
||||
|
||||
// make the recieved socket non blocking
|
||||
s.set_non_blocking();
|
||||
|
||||
auto& stream = this->derived().on_connect(std::move(s));
|
||||
|
||||
// we want to listen to an incoming event which is edge triggered and
|
||||
// we also want to listen on the hangup event
|
||||
stream.event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
|
||||
|
||||
// add the connection to the event listener
|
||||
this->add(stream);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void on_data(Stream& stream)
|
||||
{
|
||||
while(true)
|
||||
{
|
||||
if(UNLIKELY(!stream.alive()))
|
||||
{
|
||||
stream.close();
|
||||
break;
|
||||
}
|
||||
|
||||
// allocate the buffer to fill the data
|
||||
auto buf = this->derived().on_alloc(stream);
|
||||
|
||||
@ -35,23 +64,21 @@ public:
|
||||
// this means we have read all available data
|
||||
if(LIKELY(errno == EAGAIN))
|
||||
{
|
||||
LOG_DEBUG("EAGAIN read all data on socket " << stream.id());
|
||||
break;
|
||||
}
|
||||
|
||||
// some other error occurred, check errno
|
||||
this->derived().on_error(stream);
|
||||
break;
|
||||
}
|
||||
|
||||
// end of file, the client has closed the connection
|
||||
if(UNLIKELY(buf.len == 0))
|
||||
{
|
||||
LOG_DEBUG("EOF stream closed on socket " << stream.id());
|
||||
stream.close();
|
||||
break;
|
||||
}
|
||||
|
||||
LOG_DEBUG("data on socket " << stream.id());
|
||||
this->derived().on_read(stream, buf);
|
||||
}
|
||||
}
|
||||
|
@ -8,6 +8,7 @@ namespace io
|
||||
namespace tcp
|
||||
{
|
||||
|
||||
template <class Socket>
|
||||
class Stream
|
||||
{
|
||||
public:
|
||||
@ -24,11 +25,6 @@ public:
|
||||
event.data.ptr = this;
|
||||
}
|
||||
|
||||
void close()
|
||||
{
|
||||
delete reinterpret_cast<Stream*>(event.data.ptr);
|
||||
}
|
||||
|
||||
int id() const { return socket.id(); }
|
||||
|
||||
Socket socket;
|
||||
|
@ -1,94 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <list>
|
||||
#include <thread>
|
||||
#include <atomic>
|
||||
|
||||
#include "debug/log.hpp"
|
||||
#include "tcp_listener.hpp"
|
||||
|
||||
namespace io
|
||||
{
|
||||
|
||||
template <class T>
|
||||
class TcpServer : TcpListener<TcpServer<T>, 64, -1>
|
||||
{
|
||||
public:
|
||||
TcpServer(const char* addr, const char* port)
|
||||
: stream(std::move(Socket::bind(addr, port))) {}
|
||||
|
||||
~TcpServer()
|
||||
{
|
||||
stop();
|
||||
|
||||
for(auto& worker : workers)
|
||||
worker.join();
|
||||
}
|
||||
|
||||
template <class F>
|
||||
void listen(size_t n, size_t backlog, F&& f)
|
||||
{
|
||||
for(size_t i = 0; i < n; ++i)
|
||||
{
|
||||
workers.emplace_back();
|
||||
auto& w = workers.back();
|
||||
|
||||
threads[i] = std::thread([this, &w]() {
|
||||
while(alive)
|
||||
{
|
||||
LOG_DEBUG("Worker " << hash(std::this_thread::get_id())
|
||||
<< " waiting... ");
|
||||
w.wait_and_process_events();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
stream.socket.listen(backlog);
|
||||
}
|
||||
|
||||
void stop()
|
||||
{
|
||||
alive.store(false, std::memory_order_release);
|
||||
}
|
||||
|
||||
private:
|
||||
std::list<std::thread> threads;
|
||||
std::list<T> workers;
|
||||
std::atomic<bool> alive { true };
|
||||
|
||||
TcpStream stream;
|
||||
size_t idx = 0;
|
||||
|
||||
void on_close(TcpStream& stream)
|
||||
{
|
||||
LOG_DEBUG("server on_close!!!!");
|
||||
}
|
||||
|
||||
void on_error(TcpStream& stream)
|
||||
{
|
||||
LOG_DEBUG("server on_error!!!!");
|
||||
}
|
||||
|
||||
void on_data(TcpStream&)
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
LOG_DEBUG("Trying to accept... ");
|
||||
if(!workers[idx].accept(socket))
|
||||
{
|
||||
LOG_DEBUG("Did not accept!");
|
||||
break;
|
||||
}
|
||||
|
||||
idx = (idx + 1) % workers.size();
|
||||
LOG_DEBUG("Accepted a new connection!");
|
||||
}
|
||||
}
|
||||
|
||||
void on_wait_timeout()
|
||||
{
|
||||
|
||||
}
|
||||
};
|
||||
|
||||
}
|
55
src/io/network/tls.cpp
Normal file
55
src/io/network/tls.cpp
Normal file
@ -0,0 +1,55 @@
|
||||
#include "tls.hpp"
|
||||
#include "tls_error.hpp"
|
||||
|
||||
namespace io
|
||||
{
|
||||
|
||||
Tls::Context::Context()
|
||||
{
|
||||
auto method = SSLv23_server_method();
|
||||
ctx = SSL_CTX_new(method);
|
||||
|
||||
if(!ctx)
|
||||
{
|
||||
ERR_print_errors_fp(stderr);
|
||||
throw io::TlsError("Unable to create TLS context");
|
||||
}
|
||||
|
||||
SSL_CTX_set_ecdh_auto(ctx, 1);
|
||||
}
|
||||
|
||||
Tls::Context::~Context()
|
||||
{
|
||||
SSL_CTX_free(ctx);
|
||||
}
|
||||
|
||||
Tls::Context& Tls::Context::cert(const std::string& path)
|
||||
{
|
||||
if(SSL_CTX_use_certificate_file(ctx, path.c_str(), SSL_FILETYPE_PEM) >= 0)
|
||||
return *this;
|
||||
|
||||
ERR_print_errors_fp(stderr);
|
||||
throw TlsError("Error Loading cert '{}'", path);
|
||||
}
|
||||
|
||||
Tls::Context& Tls::Context::key(const std::string& path)
|
||||
{
|
||||
if(SSL_CTX_use_PrivateKey_file(ctx, path.c_str(), SSL_FILETYPE_PEM) >= 0)
|
||||
return *this;
|
||||
|
||||
ERR_print_errors_fp(stderr);
|
||||
throw TlsError("Error Loading private key '{}'", path);
|
||||
}
|
||||
|
||||
void Tls::initialize()
|
||||
{
|
||||
SSL_load_error_strings();
|
||||
OpenSSL_add_ssl_algorithms();
|
||||
}
|
||||
|
||||
void Tls::cleanup()
|
||||
{
|
||||
EVP_cleanup();
|
||||
}
|
||||
|
||||
}
|
33
src/io/network/tls.hpp
Normal file
33
src/io/network/tls.hpp
Normal file
@ -0,0 +1,33 @@
|
||||
#pragma once
|
||||
|
||||
#include <string>
|
||||
|
||||
#include <openssl/ssl.h>
|
||||
#include <openssl/err.h>
|
||||
|
||||
namespace io
|
||||
{
|
||||
|
||||
class Tls
|
||||
{
|
||||
public:
|
||||
class Context
|
||||
{
|
||||
public:
|
||||
Context();
|
||||
~Context();
|
||||
|
||||
Context& cert(const std::string& path);
|
||||
Context& key(const std::string& path);
|
||||
|
||||
operator SSL_CTX*() const { return ctx; }
|
||||
|
||||
private:
|
||||
SSL_CTX* ctx;
|
||||
};
|
||||
|
||||
static void initialize();
|
||||
static void cleanup();
|
||||
};
|
||||
|
||||
}
|
14
src/io/network/tls_error.hpp
Normal file
14
src/io/network/tls_error.hpp
Normal file
@ -0,0 +1,14 @@
|
||||
#pragma once
|
||||
|
||||
#include "utils/exceptions/basic_exception.hpp"
|
||||
|
||||
namespace io
|
||||
{
|
||||
|
||||
class TlsError : public BasicException
|
||||
{
|
||||
public:
|
||||
using BasicException::BasicException;
|
||||
};
|
||||
|
||||
}
|
@ -1,91 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include "listener.hpp"
|
||||
#include "tcp_stream.hpp"
|
||||
|
||||
namespace io
|
||||
{
|
||||
const char* response = "HTTP/1.1 200 OK\r\nContent-Length:0\r\nConnection:Keep-Alive\r\n\r\n";
|
||||
|
||||
size_t len = strlen(response);
|
||||
|
||||
class Worker : public Listener<Worker>
|
||||
{
|
||||
char buf[64_kB];
|
||||
|
||||
public:
|
||||
using Listener::Listener;
|
||||
|
||||
bool accept(Socket& socket)
|
||||
{
|
||||
auto s = socket.accept(nullptr, nullptr);
|
||||
|
||||
if(!s.is_open())
|
||||
return false;
|
||||
|
||||
this->add(s);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void on_error(TcpStream* stream)
|
||||
{
|
||||
delete stream;
|
||||
}
|
||||
|
||||
std::atomic<int> requests {0};
|
||||
|
||||
void on_read(TcpStream* stream)
|
||||
{
|
||||
int done = 0;
|
||||
|
||||
while (1)
|
||||
{
|
||||
ssize_t count;
|
||||
|
||||
count = read(stream->socket, buf, sizeof buf);
|
||||
if (count == -1)
|
||||
{
|
||||
/* If errno == EAGAIN, that means we have read all
|
||||
data. So go back to the main loop. */
|
||||
if (errno != EAGAIN)
|
||||
{
|
||||
perror ("read");
|
||||
done = 1;
|
||||
}
|
||||
break;
|
||||
}
|
||||
else if (count == 0)
|
||||
{
|
||||
/* End of file. The remote has closed the
|
||||
connection. */
|
||||
done = 1;
|
||||
break;
|
||||
}
|
||||
|
||||
size_t sum = 0;
|
||||
char* resp = (char*)response;
|
||||
|
||||
while(sum < len)
|
||||
{
|
||||
int k = write(stream->socket, resp, len - sum);
|
||||
sum += k;
|
||||
resp += k;
|
||||
}
|
||||
|
||||
requests.fetch_add(1, std::memory_order_relaxed);
|
||||
|
||||
}
|
||||
|
||||
if (done)
|
||||
{
|
||||
LOG_DEBUG("Closing TCP stream at " << stream->socket.id())
|
||||
|
||||
/* Closing the descriptor will make epoll remove it
|
||||
from the set of descriptors which are monitored. */
|
||||
delete stream;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
}
|
33
src/logging/default.cpp
Normal file
33
src/logging/default.cpp
Normal file
@ -0,0 +1,33 @@
|
||||
#include "default.hpp"
|
||||
|
||||
#include "logging/logs/async_log.hpp"
|
||||
#include "logging/logs/sync_log.hpp"
|
||||
|
||||
#include "logging/streams/stdout.hpp"
|
||||
|
||||
namespace logging
|
||||
{
|
||||
|
||||
std::unique_ptr<Log> log;
|
||||
|
||||
std::unique_ptr<Log> debug_log = std::make_unique<SyncLog>();
|
||||
|
||||
Logger init_debug_logger()
|
||||
{
|
||||
debug_log->pipe(std::make_unique<Stdout>());
|
||||
return debug_log->logger("DEBUG");
|
||||
}
|
||||
|
||||
Logger debug_logger = init_debug_logger();
|
||||
|
||||
void init_async()
|
||||
{
|
||||
log = std::make_unique<AsyncLog>();
|
||||
}
|
||||
|
||||
void init_sync()
|
||||
{
|
||||
log = std::make_unique<SyncLog>();
|
||||
}
|
||||
|
||||
}
|
22
src/logging/default.hpp
Normal file
22
src/logging/default.hpp
Normal file
@ -0,0 +1,22 @@
|
||||
#pragma once
|
||||
|
||||
#include "log.hpp"
|
||||
#include "logger.hpp"
|
||||
|
||||
namespace logging
|
||||
{
|
||||
|
||||
extern std::unique_ptr<Log> log;
|
||||
|
||||
extern Logger debug_logger;
|
||||
|
||||
template <class... Args>
|
||||
void debug(Args&&... args)
|
||||
{
|
||||
debug_logger.debug(std::forward<Args>(args)...);
|
||||
}
|
||||
|
||||
void init_async();
|
||||
void init_sync();
|
||||
|
||||
}
|
@ -5,5 +5,5 @@
|
||||
|
||||
Logger Log::logger(const std::string& name)
|
||||
{
|
||||
return Logger(*this, name);
|
||||
return Logger(this, name);
|
||||
}
|
||||
|
@ -1,5 +1,8 @@
|
||||
#pragma once
|
||||
|
||||
#include <cassert>
|
||||
#include <fmt/format.h>
|
||||
|
||||
#include "log.hpp"
|
||||
#include "levels.hpp"
|
||||
|
||||
@ -44,50 +47,68 @@ class Logger
|
||||
};
|
||||
|
||||
public:
|
||||
Logger(Log& log, const std::string& name) : log(log), name(name) {}
|
||||
Logger() = default;
|
||||
|
||||
Logger(Log* log, const std::string& name) : log(log), name(name) {}
|
||||
|
||||
template <class Level, class... Args>
|
||||
void emit(Args&&... args)
|
||||
{
|
||||
assert(log != nullptr);
|
||||
|
||||
auto message = std::make_unique<Message<Level>>(
|
||||
Timestamp::now(), name, fmt::format(std::forward<Args>(args)...)
|
||||
);
|
||||
|
||||
log.get().emit(std::move(message));
|
||||
log->emit(std::move(message));
|
||||
}
|
||||
|
||||
template <class... Args>
|
||||
void trace(Args&&... args)
|
||||
{
|
||||
#ifndef NDEBUG
|
||||
#ifndef LOG_NO_TRACE
|
||||
emit<Trace>(std::forward<Args>(args)...);
|
||||
#endif
|
||||
#endif
|
||||
}
|
||||
|
||||
template <class... Args>
|
||||
void debug(Args&&... args)
|
||||
{
|
||||
#ifndef NDEBUG
|
||||
#ifndef LOG_NO_DEBUG
|
||||
emit<Debug>(std::forward<Args>(args)...);
|
||||
#endif
|
||||
#endif
|
||||
}
|
||||
|
||||
template <class... Args>
|
||||
void info(Args&&... args)
|
||||
{
|
||||
#ifndef LOG_NO_INFO
|
||||
emit<Info>(std::forward<Args>(args)...);
|
||||
#endif
|
||||
}
|
||||
|
||||
template <class... Args>
|
||||
void warn(Args&&... args)
|
||||
{
|
||||
#ifndef LOG_NO_WARN
|
||||
emit<Warn>(std::forward<Args>(args)...);
|
||||
#endif
|
||||
}
|
||||
|
||||
template <class... Args>
|
||||
void error(Args&&... args)
|
||||
{
|
||||
#ifndef LOG_NO_ERROR
|
||||
emit<Error>(std::forward<Args>(args)...);
|
||||
#endif
|
||||
}
|
||||
|
||||
private:
|
||||
std::reference_wrapper<Log> log;
|
||||
Log* log;
|
||||
std::string name;
|
||||
};
|
||||
|
||||
|
@ -1,10 +1,17 @@
|
||||
#include "stdout.hpp"
|
||||
|
||||
#include <cppformat/format.h>
|
||||
#include <iostream>
|
||||
#include <fmt/format.h>
|
||||
|
||||
void Stdout::emit(const Log::Record& record)
|
||||
{
|
||||
fmt::print("{} {:<5} [{}] {}\n", record.when(), record.level_str(),
|
||||
record.where(), record.text());
|
||||
auto s = fmt::format("{} {:<5} [{}] {}\n", static_cast<std::string>(
|
||||
record.when()), record.level_str(), record.where(),
|
||||
record.text());
|
||||
|
||||
std::cout << s;
|
||||
|
||||
/* fmt::printf("{} {:<5} [{}] {}\n", static_cast<std::string>(record.when()), */
|
||||
/* record.level_str(), record.where(), record.text()); */
|
||||
}
|
||||
|
||||
|
36
src/mvcc/id.hpp
Normal file
36
src/mvcc/id.hpp
Normal file
@ -0,0 +1,36 @@
|
||||
#pragma once
|
||||
|
||||
#include <ostream>
|
||||
#include <stdint.h>
|
||||
#include "utils/total_ordering.hpp"
|
||||
|
||||
class Id : public TotalOrdering<Id>
|
||||
{
|
||||
public:
|
||||
Id() = default;
|
||||
|
||||
Id(uint64_t id) : id(id) {}
|
||||
|
||||
friend bool operator<(const Id& a, const Id& b)
|
||||
{
|
||||
return a.id < b.id;
|
||||
}
|
||||
|
||||
friend bool operator==(const Id& a, const Id& b)
|
||||
{
|
||||
return a.id == b.id;
|
||||
}
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& stream, const Id& id)
|
||||
{
|
||||
return stream << id.id;
|
||||
}
|
||||
|
||||
operator uint64_t() const
|
||||
{
|
||||
return id;
|
||||
}
|
||||
|
||||
private:
|
||||
uint64_t id {0};
|
||||
};
|
1
src/speedy/rapidjson
Submodule
1
src/speedy/rapidjson
Submodule
@ -0,0 +1 @@
|
||||
Subproject commit c02d52ad56595dc70b38daf46b5f315d3a7115fa
|
45
src/utils/bswap.hpp
Normal file
45
src/utils/bswap.hpp
Normal file
@ -0,0 +1,45 @@
|
||||
#pragma once
|
||||
|
||||
#include <cstdint>
|
||||
#include <cstdlib>
|
||||
|
||||
#include <byteswap.h>
|
||||
|
||||
template <class T>
|
||||
inline T bswap(T value);
|
||||
|
||||
template<>
|
||||
inline int16_t bswap<int16_t>(int16_t value)
|
||||
{
|
||||
return __bswap_16(value);
|
||||
}
|
||||
|
||||
template<>
|
||||
inline uint16_t bswap<uint16_t>(uint16_t value)
|
||||
{
|
||||
return __bswap_16(value);
|
||||
}
|
||||
|
||||
template<>
|
||||
inline int32_t bswap<int32_t>(int32_t value)
|
||||
{
|
||||
return __bswap_32(value);
|
||||
}
|
||||
|
||||
template<>
|
||||
inline uint32_t bswap<uint32_t>(uint32_t value)
|
||||
{
|
||||
return __bswap_32(value);
|
||||
}
|
||||
|
||||
template<>
|
||||
inline int64_t bswap<int64_t>(int64_t value)
|
||||
{
|
||||
return __bswap_64(value);
|
||||
}
|
||||
|
||||
template<>
|
||||
inline uint64_t bswap<uint64_t>(uint64_t value)
|
||||
{
|
||||
return __bswap_64(value);
|
||||
}
|
@ -5,7 +5,7 @@
|
||||
#include <iomanip>
|
||||
#include <ostream>
|
||||
|
||||
#include <cppformat/format.h>
|
||||
#include <fmt/format.h>
|
||||
|
||||
#include "utils/datetime/datetime_error.hpp"
|
||||
#include "utils/total_ordering.hpp"
|
||||
@ -66,7 +66,7 @@ public:
|
||||
|
||||
long subsec() const
|
||||
{
|
||||
return nsec;
|
||||
return nsec / 10000;
|
||||
}
|
||||
|
||||
const std::string to_iso8601() const
|
||||
@ -103,5 +103,5 @@ private:
|
||||
long nsec;
|
||||
|
||||
static constexpr auto fiso8601 =
|
||||
"{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:09d}Z";
|
||||
"{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:05d}Z";
|
||||
};
|
||||
|
@ -1,7 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <stdexcept>
|
||||
|
||||
#include <fmt/format.h>
|
||||
|
||||
#include "utils/auto_scope.hpp"
|
||||
@ -10,21 +9,23 @@
|
||||
class BasicException : public std::exception
|
||||
{
|
||||
public:
|
||||
template <class... Args>
|
||||
BasicException(Args&&... args) noexcept
|
||||
: message(fmt::format(std::forward<Args>(args)...))
|
||||
BasicException(const std::string& message) noexcept : message(message)
|
||||
{
|
||||
#ifndef NDEBUG
|
||||
message += '\n';
|
||||
this->message += '\n';
|
||||
|
||||
Stacktrace stacktrace;
|
||||
|
||||
for(auto& line : stacktrace)
|
||||
message += fmt::format(" at {} ({})\n",
|
||||
this->message += fmt::format(" at {} ({})\n",
|
||||
line.function, line.location);
|
||||
#endif
|
||||
}
|
||||
|
||||
template <class... Args>
|
||||
BasicException(const std::string& format, Args&&... args) noexcept
|
||||
: BasicException(fmt::format(format, std::forward<Args>(args)...)) {}
|
||||
|
||||
const char* what() const noexcept override
|
||||
{
|
||||
return message.c_str();
|
||||
@ -34,4 +35,3 @@ private:
|
||||
std::string message;
|
||||
};
|
||||
|
||||
|
||||
|
@ -79,7 +79,6 @@ public:
|
||||
return !(lhs == rhs);
|
||||
}
|
||||
|
||||
|
||||
private:
|
||||
const char* str;
|
||||
size_t len;
|
||||
|
5
src/utils/types/byte.hpp
Normal file
5
src/utils/types/byte.hpp
Normal file
@ -0,0 +1,5 @@
|
||||
#pragma once
|
||||
|
||||
#include <cstdint>
|
||||
|
||||
using byte = uint8_t;
|
62
tests/unit/chunked_decoder.cpp
Normal file
62
tests/unit/chunked_decoder.cpp
Normal file
@ -0,0 +1,62 @@
|
||||
#include <iostream>
|
||||
#include <deque>
|
||||
#include <cassert>
|
||||
#include <cstring>
|
||||
#include <array>
|
||||
#include <vector>
|
||||
|
||||
#include "bolt/v1/transport/chunked_decoder.hpp"
|
||||
|
||||
using byte = unsigned char;
|
||||
|
||||
void print_hex(byte x)
|
||||
{
|
||||
printf("%02X ", static_cast<byte>(x));
|
||||
}
|
||||
|
||||
class DummyStream
|
||||
{
|
||||
public:
|
||||
void write(const byte* values, size_t n)
|
||||
{
|
||||
data.insert(data.end(), values, values + n);
|
||||
}
|
||||
|
||||
std::vector<byte> data;
|
||||
};
|
||||
|
||||
using Decoder = bolt::ChunkedDecoder<DummyStream>;
|
||||
|
||||
std::vector<byte> chunks[] = {
|
||||
{0x00,0x08,'A',' ','q','u','i','c','k',' ',0x00,0x06,'b','r','o','w','n',' '},
|
||||
{0x00,0x0A,'f','o','x',' ','j','u','m','p','s',' '},
|
||||
{0x00,0x07,'o','v','e','r',' ','a',' '},
|
||||
{0x00,0x08,'l','a','z','y',' ','d','o','g',0x00,0x00}
|
||||
};
|
||||
|
||||
static constexpr size_t N = std::extent<decltype(chunks)>::value;
|
||||
|
||||
std::string decoded = "A quick brown fox jumps over a lazy dog";
|
||||
|
||||
int main(void)
|
||||
{
|
||||
DummyStream stream;
|
||||
Decoder decoder(stream);
|
||||
|
||||
for(size_t i = 0; i < N; ++i)
|
||||
{
|
||||
auto& chunk = chunks[i];
|
||||
auto finished = decoder.decode(chunk.data(), chunk.size());
|
||||
|
||||
// break early if finished
|
||||
if(finished)
|
||||
break;
|
||||
}
|
||||
|
||||
assert(decoded.size() == stream.data.size());
|
||||
|
||||
for(size_t i = 0; i < decoded.size(); ++i)
|
||||
assert(decoded[i] == stream.data[i]);
|
||||
|
||||
return 0;
|
||||
}
|
115
tests/unit/chunked_encoder.cpp
Normal file
115
tests/unit/chunked_encoder.cpp
Normal file
@ -0,0 +1,115 @@
|
||||
#include <iostream>
|
||||
#include <deque>
|
||||
#include <cassert>
|
||||
#include <vector>
|
||||
|
||||
#include "bolt/v1/transport/chunked_encoder.hpp"
|
||||
|
||||
using byte = unsigned char;
|
||||
|
||||
void print_hex(byte x)
|
||||
{
|
||||
printf("%02X ", static_cast<byte>(x));
|
||||
}
|
||||
|
||||
class DummyStream
|
||||
{
|
||||
public:
|
||||
void write(const byte* values, size_t n)
|
||||
{
|
||||
num_calls++;
|
||||
data.insert(data.end(), values, values + n);
|
||||
}
|
||||
|
||||
byte pop()
|
||||
{
|
||||
auto c = data.front();
|
||||
data.pop_front();
|
||||
return c;
|
||||
}
|
||||
|
||||
size_t pop_size()
|
||||
{
|
||||
return ((size_t)pop() << 8) | pop();
|
||||
}
|
||||
|
||||
void print()
|
||||
{
|
||||
for(size_t i = 0; i < data.size(); ++i)
|
||||
print_hex(data[i]);
|
||||
}
|
||||
|
||||
std::deque<byte> data;
|
||||
size_t num_calls {0};
|
||||
};
|
||||
|
||||
using Encoder = bolt::ChunkedEncoder<DummyStream>;
|
||||
|
||||
void write_ff(Encoder& encoder, size_t n)
|
||||
{
|
||||
std::vector<byte> v;
|
||||
|
||||
for(size_t i = 0; i < n; ++i)
|
||||
v.push_back('\xFF');
|
||||
|
||||
encoder.write(v.data(), v.size());
|
||||
}
|
||||
|
||||
void check_ff(DummyStream& stream, size_t n)
|
||||
{
|
||||
for(size_t i = 0; i < n; ++i)
|
||||
assert(stream.pop() == byte('\xFF'));
|
||||
|
||||
(void)stream;
|
||||
}
|
||||
|
||||
int main(void)
|
||||
{
|
||||
DummyStream stream;
|
||||
bolt::ChunkedEncoder<DummyStream> encoder(stream);
|
||||
|
||||
write_ff(encoder, 10);
|
||||
write_ff(encoder, 10);
|
||||
encoder.finish();
|
||||
|
||||
write_ff(encoder, 10);
|
||||
write_ff(encoder, 10);
|
||||
encoder.finish();
|
||||
|
||||
// this should be two chunks, one of size 65533 and the other of size 1467
|
||||
write_ff(encoder, 67000);
|
||||
encoder.finish();
|
||||
|
||||
for(int i = 0; i < 10000; ++i)
|
||||
write_ff(encoder, 1500);
|
||||
encoder.finish();
|
||||
|
||||
assert(stream.pop_size() == 20);
|
||||
check_ff(stream, 20);
|
||||
assert(stream.pop_size() == 0);
|
||||
|
||||
assert(stream.pop_size() == 20);
|
||||
check_ff(stream, 20);
|
||||
assert(stream.pop_size() == 0);
|
||||
|
||||
assert(stream.pop_size() == encoder.chunk_size);
|
||||
check_ff(stream, encoder.chunk_size);
|
||||
assert(stream.pop_size() == 1467);
|
||||
check_ff(stream, 1467);
|
||||
assert(stream.pop_size() == 0);
|
||||
|
||||
size_t k = 10000 * 1500;
|
||||
|
||||
while(k > 0)
|
||||
{
|
||||
auto size = k > encoder.chunk_size ? encoder.chunk_size : k;
|
||||
assert(stream.pop_size() == size);
|
||||
check_ff(stream, size);
|
||||
|
||||
k -= size;
|
||||
}
|
||||
|
||||
assert(stream.pop_size() == 0);
|
||||
|
||||
return 0;
|
||||
}
|
Loading…
Reference in New Issue
Block a user