diff --git a/include/communication/bolt/v1/packing/codes.hpp b/include/communication/bolt/v1/packing/codes.hpp index a8c6e7c3e..2b91e14b2 100644 --- a/include/communication/bolt/v1/packing/codes.hpp +++ b/include/communication/bolt/v1/packing/codes.hpp @@ -15,6 +15,8 @@ enum Code : uint8_t TinyMap = 0xA0, TinyStruct = 0xB0, + StructOne = 0xB1, + StructTwo = 0xB2, Null = 0xC0, diff --git a/include/communication/bolt/v1/serialization/record_stream.hpp b/include/communication/bolt/v1/serialization/record_stream.hpp index a09eda0db..13aad4a11 100644 --- a/include/communication/bolt/v1/serialization/record_stream.hpp +++ b/include/communication/bolt/v1/serialization/record_stream.hpp @@ -57,6 +57,7 @@ public: } chunk(); + send(); } void write_field(const std::string &field) @@ -67,6 +68,7 @@ public: write_list_header(1); bolt_encoder.write_string(field); chunk(); + send(); } void write_list_header(size_t size) diff --git a/include/communication/bolt/v1/server/server.hpp b/include/communication/bolt/v1/server/server.hpp index a6493ea2d..b7d11a579 100644 --- a/include/communication/bolt/v1/server/server.hpp +++ b/include/communication/bolt/v1/server/server.hpp @@ -8,6 +8,7 @@ #include "io/network/server.hpp" #include "communication/bolt/v1/bolt.hpp" +#include "logging/default.hpp" namespace bolt { @@ -17,7 +18,8 @@ class Server : public io::Server<Server<Worker>> { public: Server(io::Socket&& socket) - : io::Server<Server<Worker>>(std::forward<io::Socket>(socket)) {} + : io::Server<Server<Worker>>(std::forward<io::Socket>(socket)), + logger(logging::log->logger("bolt::Server")) {} void start(size_t n) { @@ -47,6 +49,8 @@ public: { assert(idx < workers.size()); + logger.trace("on connect"); + if(UNLIKELY(!workers[idx]->accept(this->socket))) return; @@ -62,6 +66,7 @@ private: std::atomic<bool> alive {true}; int idx {0}; + Logger logger; }; } diff --git a/include/communication/bolt/v1/server/worker.hpp b/include/communication/bolt/v1/server/worker.hpp index d6b51bdde..fbcaf2d45 100644 --- a/include/communication/bolt/v1/server/worker.hpp +++ b/include/communication/bolt/v1/server/worker.hpp @@ -27,7 +27,7 @@ public: Worker(Bolt &bolt) : bolt(bolt) { - logger = logging::log->logger("Network"); + logger = logging::log->logger("bolt::Worker"); } Session &on_connect(io::Socket &&socket) diff --git a/include/communication/bolt/v1/states/handshake.hpp b/include/communication/bolt/v1/states/handshake.hpp index 8ca4c2e10..e5967dc46 100644 --- a/include/communication/bolt/v1/states/handshake.hpp +++ b/include/communication/bolt/v1/states/handshake.hpp @@ -8,6 +8,7 @@ namespace bolt class Handshake : public State { public: + Handshake(); State* run(Session& session) override; }; diff --git a/include/io/network/epoll.hpp b/include/io/network/epoll.hpp index 61a501c6f..1b64828f7 100644 --- a/include/io/network/epoll.hpp +++ b/include/io/network/epoll.hpp @@ -5,6 +5,7 @@ #include "io/network/socket.hpp" #include "utils/likely.hpp" +#include "logging/default.hpp" namespace io { @@ -20,7 +21,8 @@ class Epoll public: using Event = struct epoll_event; - Epoll(int flags) + Epoll(int flags) : + logger(logging::log->logger("io::Epoll")) { epoll_fd = epoll_create1(flags); @@ -49,6 +51,7 @@ public: private: int epoll_fd; + Logger logger; }; } diff --git a/include/io/network/event_listener.hpp b/include/io/network/event_listener.hpp index 4794447d5..d951fa6c3 100644 --- a/include/io/network/event_listener.hpp +++ b/include/io/network/event_listener.hpp @@ -2,6 +2,7 @@ #include "io/network/epoll.hpp" #include "utils/crtp.hpp" +#include "logging/default.hpp" namespace io { @@ -12,7 +13,11 @@ class EventListener : public Crtp<Derived> public: using Crtp<Derived>::derived; - EventListener(uint32_t flags = 0) : listener(flags) {} + EventListener(uint32_t flags = 0) : + listener(flags), + logger(logging::log->logger("io::EventListener")) + { + } void wait_and_process_events() { @@ -25,8 +30,16 @@ public: // waits for an event/multiple events and returns a maximum of // max_events and stores them in the events array. it waits for // wait_timeout milliseconds. if wait_timeout is achieved, returns 0 + auto n = listener.wait(events, max_events, 200); +#ifndef NDEBUG +#ifndef LOG_NO_TRACE + if (n > 0) + logger.trace("number of events: {}", n); +#endif +#endif + // go through all events and process them in order for (int i = 0; i < n; ++i) { auto &event = events[i]; @@ -70,5 +83,8 @@ public: protected: Epoll listener; Epoll::Event events[max_events]; + +private: + Logger logger; }; } diff --git a/include/io/network/server.hpp b/include/io/network/server.hpp index ece730fc8..4eaa524f2 100644 --- a/include/io/network/server.hpp +++ b/include/io/network/server.hpp @@ -9,10 +9,14 @@ template <class Derived> class Server : public EventListener<Derived> { public: - Server(Socket &&socket) : socket(std::forward<Socket>(socket)) + Server(Socket &&socket) : socket(std::forward<Socket>(socket)), + logger(logging::log->logger("io::Server")) { event.data.fd = this->socket; - event.events = EPOLLIN | EPOLLET; + + // TODO: EPOLLET is hard to use -> figure out how should EPOLLET be used + // event.events = EPOLLIN | EPOLLET; + event.events = EPOLLIN; this->listener.add(this->socket, &event); } @@ -32,10 +36,12 @@ public: void on_exception_event(Epoll::Event &event, Args &&... args) { // TODO: Do something about it + logger.warn("epoll exception"); } protected: Epoll::Event event; Socket socket; + Logger logger; }; } diff --git a/include/io/network/stream_reader.hpp b/include/io/network/stream_reader.hpp index 7f7406312..8e2035256 100644 --- a/include/io/network/stream_reader.hpp +++ b/include/io/network/stream_reader.hpp @@ -17,10 +17,13 @@ public: size_t len; }; - StreamReader(uint32_t flags = 0) : StreamListener<Derived, Stream>(flags) {} + StreamReader(uint32_t flags = 0) : StreamListener<Derived, Stream>(flags), + logger(logging::log->logger("io::StreamReader")) {} bool accept(Socket& socket) { + logger.trace("accept"); + // accept a connection from a socket auto s = socket.accept(nullptr, nullptr); @@ -44,6 +47,8 @@ public: void on_data(Stream& stream) { + logger.trace("on data"); + while(true) { if(UNLIKELY(!stream.alive())) @@ -82,6 +87,9 @@ public: this->derived().on_read(stream, buf); } } + +private: + Logger logger; }; } diff --git a/src/communication/bolt/v1/states/executor.cpp b/src/communication/bolt/v1/states/executor.cpp index 11342ca23..c38d4d323 100644 --- a/src/communication/bolt/v1/states/executor.cpp +++ b/src/communication/bolt/v1/states/executor.cpp @@ -27,6 +27,7 @@ State *Executor::run(Session &session) try { return this->run(session, q); + // TODO: RETURN success MAYBE } catch (QueryEngineException &e) { session.output_stream.write_failure( {{"code", "Memgraph.QueryEngineException"}, diff --git a/src/communication/bolt/v1/states/handshake.cpp b/src/communication/bolt/v1/states/handshake.cpp index faaa4ee0c..aba159361 100644 --- a/src/communication/bolt/v1/states/handshake.cpp +++ b/src/communication/bolt/v1/states/handshake.cpp @@ -9,8 +9,12 @@ static constexpr uint32_t preamble = 0x6060B017; static constexpr byte protocol[4] = {0x00, 0x00, 0x00, 0x01}; +Handshake::Handshake() : State(logging::log->logger("Handshake")) {} + State* Handshake::run(Session& session) { + logger.debug("run"); + if(UNLIKELY(session.decoder.read_uint32() != preamble)) return nullptr; diff --git a/src/communication/bolt/v1/states/init.cpp b/src/communication/bolt/v1/states/init.cpp index 19c4b7578..bc76eafa5 100644 --- a/src/communication/bolt/v1/states/init.cpp +++ b/src/communication/bolt/v1/states/init.cpp @@ -12,6 +12,8 @@ Init::Init() : MessageParser<Init>(logging::log->logger("Init")) {} State *Init::parse(Session &session, Message &message) { + logger.debug("bolt::Init.parse()"); + auto struct_type = session.decoder.read_byte(); if (UNLIKELY((struct_type & 0x0F) > pack::Rule::MaxInitStructSize)) { @@ -35,7 +37,9 @@ State *Init::parse(Session &session, Message &message) message.client_name = session.decoder.read_string(); - // TODO read authentication tokens if B2 + if (struct_type == pack::Code::StructTwo) { + // TODO process authentication tokens + } return this; }