#pragma once #include #include #include #include #include #include #include "communication/bolt/v1/bolt.hpp" #include "communication/bolt/v1/session.hpp" #include "io/network/stream_reader.hpp" #include "logging/default.hpp" namespace bolt { template class Server; class Worker : public io::StreamReader { friend class bolt::Server; public: using sptr = std::shared_ptr; Worker(Bolt &bolt) : bolt(bolt) { logger = logging::log->logger("Network"); } Session &on_connect(io::Socket &&socket) { logger.trace("Accepting connection on socket {}", socket.id()); return *bolt.get().create_session(std::forward(socket)); } void on_error(Session &) { logger.trace("[on_error] errno = {}", errno); #ifndef NDEBUG auto err = io::NetworkError(""); logger.debug("{}", err.what()); #endif logger.error("Error occured in this session"); } void on_wait_timeout() {} Buffer on_alloc(Session &) { /* logger.trace("[on_alloc] Allocating {}B", sizeof buf); */ return Buffer{buf, sizeof buf}; } void on_read(Session &session, Buffer &buf) { logger.trace("[on_read] Received {}B", buf.len); #ifndef NDEBUG std::stringstream stream; for (size_t i = 0; i < buf.len; ++i) stream << fmt::format("{:02X} ", static_cast(buf.ptr[i])); logger.trace("[on_read] {}", stream.str()); #endif try { session.execute(reinterpret_cast(buf.ptr), buf.len); } catch (const std::exception &e) { logger.error("Error occured while executing statement."); logger.error("{}", e.what()); // TODO: report to client } } void on_close(Session &session) { logger.trace("[on_close] Client closed the connection"); session.close(); } template void on_exception(Session &session, Args &&... args) { logger.error("Error occured in this session"); logger.error(args...); // TODO: Do something about it } char buf[65536]; protected: std::reference_wrapper bolt; Logger logger; std::thread thread; void start(std::atomic &alive) { thread = std::thread([&, this]() { while (alive) wait_and_process_events(); }); } }; }