memgraph/include/communication/bolt/v1/server/worker.hpp

105 lines
2.2 KiB
C++
Raw Normal View History

2016-08-02 05:14:09 +08:00
#pragma once
#include <atomic>
#include <cstdio>
#include <iomanip>
2016-08-02 05:14:09 +08:00
#include <memory>
#include <sstream>
2016-08-02 05:14:09 +08:00
#include <thread>
#include "communication/bolt/v1/bolt.hpp"
#include "communication/bolt/v1/session.hpp"
2016-08-02 05:14:09 +08:00
#include "logging/default.hpp"
#include "io/network/stream_reader.hpp"
2016-08-02 05:14:09 +08:00
namespace bolt
{
template <class Worker>
class Server;
class Worker : public io::StreamReader<Worker, Session>
{
friend class bolt::Server<Worker>;
public:
using sptr = std::shared_ptr<Worker>;
Worker(Bolt &bolt) : bolt(bolt)
2016-08-02 05:14:09 +08:00
{
logger = logging::log->logger("Network");
}
Session &on_connect(io::Socket &&socket)
2016-08-02 05:14:09 +08:00
{
logger.trace("Accepting connection on socket {}", socket.id());
return *bolt.get().create_session(std::forward<io::Socket>(socket));
}
void on_error(Session &)
2016-08-02 05:14:09 +08:00
{
logger.trace("[on_error] errno = {}", errno);
#ifndef NDEBUG
auto err = io::NetworkError("");
logger.debug("{}", err.what());
#endif
logger.error("Error occured in this session");
}
void on_wait_timeout() {}
Buffer on_alloc(Session &)
2016-08-02 05:14:09 +08:00
{
/* logger.trace("[on_alloc] Allocating {}B", sizeof buf); */
return Buffer{buf, sizeof buf};
2016-08-02 05:14:09 +08:00
}
void on_read(Session &session, Buffer &buf)
2016-08-02 05:14:09 +08:00
{
logger.trace("[on_read] Received {}B", buf.len);
#ifndef NDEBUG
std::stringstream stream;
for (size_t i = 0; i < buf.len; ++i)
2016-08-02 05:14:09 +08:00
stream << fmt::format("{:02X} ", static_cast<byte>(buf.ptr[i]));
logger.trace("[on_read] {}", stream.str());
#endif
try {
session.execute(reinterpret_cast<const byte *>(buf.ptr), buf.len);
} catch (const std::exception &e) {
2016-08-02 05:14:09 +08:00
logger.error("Error occured while executing statement.");
logger.error("{}", e.what());
}
}
void on_close(Session &session)
2016-08-02 05:14:09 +08:00
{
logger.trace("[on_close] Client closed the connection");
session.close();
}
char buf[65536];
protected:
std::reference_wrapper<Bolt> bolt;
Logger logger;
std::thread thread;
void start(std::atomic<bool> &alive)
2016-08-02 05:14:09 +08:00
{
thread = std::thread([&, this]() {
while (alive)
2016-08-02 05:14:09 +08:00
wait_and_process_events();
});
}
};
}