config is better now (yaml-cpp) + web::Client + web::Logger

Summary:
init struct size problem fix

cast issue bolt::State::init

TCP server bugfix EPOLLET + bolt bug fix

Test Plan: normal

Reviewers: ktf

Subscribers: ktf

Maniphest Tasks: T87

Differential Revision: https://phabricator.tomicevic.com/D2
This commit is contained in:
Marko Budiselic 2016-09-08 12:10:20 +01:00
parent b0951a3f76
commit 35f882644e
12 changed files with 63 additions and 9 deletions

View File

@ -15,6 +15,8 @@ enum Code : uint8_t
TinyMap = 0xA0,
TinyStruct = 0xB0,
StructOne = 0xB1,
StructTwo = 0xB2,
Null = 0xC0,

View File

@ -57,6 +57,7 @@ public:
}
chunk();
send();
}
void write_field(const std::string& field)
@ -67,6 +68,7 @@ public:
write_list_header(1);
bolt_encoder.write_string(field);
chunk();
send();
}
void write_list_header(size_t size)
@ -112,11 +114,13 @@ public:
void send()
{
logger.trace("send");
chunked_buffer.flush();
}
void chunk()
{
logger.trace("chunk");
chunked_encoder.write_chunk();
}

View File

@ -8,6 +8,7 @@
#include "io/network/server.hpp"
#include "communication/bolt/v1/bolt.hpp"
#include "logging/default.hpp"
namespace bolt
{
@ -17,7 +18,8 @@ class Server : public io::Server<Server<Worker>>
{
public:
Server(io::Socket&& socket)
: io::Server<Server<Worker>>(std::forward<io::Socket>(socket)) {}
: io::Server<Server<Worker>>(std::forward<io::Socket>(socket)),
logger(logging::log->logger("bolt::Server")) {}
void start(size_t n)
{
@ -47,6 +49,8 @@ public:
{
assert(idx < workers.size());
logger.trace("on connect");
if(UNLIKELY(!workers[idx]->accept(this->socket)))
return;
@ -62,6 +66,7 @@ private:
std::atomic<bool> alive {true};
int idx {0};
Logger logger;
};
}

View File

@ -27,7 +27,7 @@ public:
Worker(Bolt &bolt) : bolt(bolt)
{
logger = logging::log->logger("Network");
logger = logging::log->logger("bolt::Worker");
}
Session &on_connect(io::Socket &&socket)

View File

@ -8,6 +8,7 @@ namespace bolt
class Handshake : public State
{
public:
Handshake();
State* run(Session& session) override;
};

View File

@ -5,6 +5,7 @@
#include "io/network/socket.hpp"
#include "utils/likely.hpp"
#include "logging/default.hpp"
namespace io
{
@ -20,7 +21,8 @@ class Epoll
public:
using Event = struct epoll_event;
Epoll(int flags)
Epoll(int flags) :
logger(logging::log->logger("io::Epoll"))
{
epoll_fd = epoll_create1(flags);
@ -49,6 +51,7 @@ public:
private:
int epoll_fd;
Logger logger;
};
}

View File

@ -2,6 +2,7 @@
#include "io/network/epoll.hpp"
#include "utils/crtp.hpp"
#include "logging/default.hpp"
namespace io
{
@ -12,7 +13,11 @@ class EventListener : public Crtp<Derived>
public:
using Crtp<Derived>::derived;
EventListener(uint32_t flags = 0) : listener(flags) {}
EventListener(uint32_t flags = 0) :
listener(flags),
logger(logging::log->logger("io::EventListener"))
{
}
void wait_and_process_events()
{
@ -25,8 +30,16 @@ public:
// waits for an event/multiple events and returns a maximum of
// max_events and stores them in the events array. it waits for
// wait_timeout milliseconds. if wait_timeout is achieved, returns 0
auto n = listener.wait(events, max_events, 200);
#ifndef NDEBUG
#ifndef LOG_NO_TRACE
if (n > 0)
logger.trace("number of events: {}", n);
#endif
#endif
// go through all events and process them in order
for (int i = 0; i < n; ++i) {
auto &event = events[i];
@ -70,5 +83,8 @@ public:
protected:
Epoll listener;
Epoll::Event events[max_events];
private:
Logger logger;
};
}

View File

@ -9,10 +9,14 @@ template <class Derived>
class Server : public EventListener<Derived>
{
public:
Server(Socket &&socket) : socket(std::forward<Socket>(socket))
Server(Socket &&socket) : socket(std::forward<Socket>(socket)),
logger(logging::log->logger("io::Server"))
{
event.data.fd = this->socket;
event.events = EPOLLIN | EPOLLET;
// TODO: EPOLLET is hard to use -> figure out how should EPOLLET be used
// event.events = EPOLLIN | EPOLLET;
event.events = EPOLLIN;
this->listener.add(this->socket, &event);
}
@ -32,10 +36,12 @@ public:
void on_exception_event(Epoll::Event &event, Args &&... args)
{
// TODO: Do something about it
logger.warn("epoll exception");
}
protected:
Epoll::Event event;
Socket socket;
Logger logger;
};
}

View File

@ -17,10 +17,13 @@ public:
size_t len;
};
StreamReader(uint32_t flags = 0) : StreamListener<Derived, Stream>(flags) {}
StreamReader(uint32_t flags = 0) : StreamListener<Derived, Stream>(flags),
logger(logging::log->logger("io::StreamReader")) {}
bool accept(Socket& socket)
{
logger.trace("accept");
// accept a connection from a socket
auto s = socket.accept(nullptr, nullptr);
@ -44,6 +47,8 @@ public:
void on_data(Stream& stream)
{
logger.trace("on data");
while(true)
{
if(UNLIKELY(!stream.alive()))
@ -82,6 +87,9 @@ public:
this->derived().on_read(stream, buf);
}
}
private:
Logger logger;
};
}

View File

@ -27,6 +27,7 @@ State *Executor::run(Session &session)
try {
return this->run(session, q);
// TODO: RETURN success MAYBE
} catch (QueryEngineException &e) {
session.output_stream.write_failure(
{{"code", "Memgraph.QueryEngineException"},

View File

@ -9,8 +9,12 @@ static constexpr uint32_t preamble = 0x6060B017;
static constexpr byte protocol[4] = {0x00, 0x00, 0x00, 0x01};
Handshake::Handshake() : State(logging::log->logger("Handshake")) {}
State* Handshake::run(Session& session)
{
logger.debug("run");
if(UNLIKELY(session.decoder.read_uint32() != preamble))
return nullptr;

View File

@ -12,9 +12,11 @@ Init::Init() : MessageParser<Init>(logging::log->logger("Init")) {}
State *Init::parse(Session &session, Message &message)
{
logger.debug("bolt::Init.parse()");
auto struct_type = session.decoder.read_byte();
if (UNLIKELY((struct_type & 0x0F) <= pack::Rule::MaxInitStructSize)) {
if (UNLIKELY((struct_type & 0x0F) > pack::Rule::MaxInitStructSize)) {
logger.debug("{}", struct_type);
logger.debug(
@ -35,7 +37,9 @@ State *Init::parse(Session &session, Message &message)
message.client_name = session.decoder.read_string();
// TODO read authentication tokens if B2
if (struct_type == pack::Code::StructTwo) {
// TODO process authentication tokens
}
return this;
}