Removed Dbms and QueryEngine from the Network stack.

Reviewers: buda, mislav.bradac

Reviewed By: buda, mislav.bradac

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D628
This commit is contained in:
Matej Ferencevic 2017-08-03 15:26:48 +02:00
parent e45ae4c4b6
commit 71ded22b65
12 changed files with 59 additions and 57 deletions

View File

@ -8,6 +8,7 @@
#include <glog/logging.h>
#include "communication/bolt/v1/constants.hpp"
#include "utils/assert.hpp"
#include "utils/bswap.hpp"
namespace communication::bolt {

View File

@ -23,6 +23,21 @@
namespace communication::bolt {
/**
* Bolt SessionData
*
* This class is responsible for holding references to Dbms and QueryEngine
* that are passed through the network server and worker to the session.
*
* @tparam OutputStream type of output stream (could be a bolt output stream or
* a test output stream)
*/
template <typename OutputStream>
struct SessionData {
Dbms dbms;
QueryEngine<OutputStream> query_engine;
};
/**
* Bolt Session
*
@ -37,8 +52,8 @@ class Session {
using StreamBuffer = io::network::StreamBuffer;
public:
Session(Socket &&socket, Dbms &dbms, QueryEngine<OutputStream> &query_engine)
: socket_(std::move(socket)), dbms_(dbms), query_engine_(query_engine) {
Session(Socket &&socket, SessionData<OutputStream> &data)
: socket_(std::move(socket)), dbms_(data.dbms), query_engine_(data.query_engine) {
event_.data.ptr = this;
}

View File

@ -9,9 +9,6 @@
#include <fmt/format.h>
#include <glog/logging.h>
#include "database/dbms.hpp"
#include "query/engine.hpp"
#include "communication/worker.hpp"
#include "io/network/event_listener.hpp"
#include "utils/assert.hpp"
@ -36,17 +33,17 @@ namespace communication {
* @tparam OutputStream the server has to get the output stream as a template
parameter because the output stream is templated
* @tparam Socket the input/output socket that should be used
* @tparam SessionData the class with objects that will be forwarded to the session
*/
template <typename Session, typename OutputStream, typename Socket>
template <typename Session, typename OutputStream, typename Socket, typename SessionData>
class Server
: public io::network::EventListener<Server<Session, OutputStream, Socket>> {
: public io::network::EventListener<Server<Session, OutputStream, Socket, SessionData>> {
using Event = io::network::Epoll::Event;
public:
Server(Socket &&socket, Dbms &dbms, QueryEngine<OutputStream> &query_engine)
Server(Socket &&socket, SessionData &session_data)
: socket_(std::forward<Socket>(socket)),
dbms_(dbms),
query_engine_(query_engine) {
session_data_(session_data) {
event_.data.fd = socket_;
// TODO: EPOLLET is hard to use -> figure out how should EPOLLET be used
@ -61,8 +58,8 @@ class Server
workers_.reserve(n);
for (size_t i = 0; i < n; ++i) {
workers_.push_back(
std::make_unique<Worker<Session, OutputStream, Socket>>(
dbms_, query_engine_));
std::make_unique<Worker<Session, OutputStream, Socket, SessionData>>(
session_data_));
workers_.back()->Start(alive_);
}
std::cout << "Server is fully armed and operational" << std::endl;
@ -113,14 +110,13 @@ class Server
void OnErrorEvent(Event &event) { close(event.data.fd); }
private:
std::vector<typename Worker<Session, OutputStream, Socket>::uptr> workers_;
std::vector<typename Worker<Session, OutputStream, Socket, SessionData>::uptr> workers_;
std::atomic<bool> alive_{true};
int idx_{0};
Socket socket_;
Dbms &dbms_;
QueryEngine<OutputStream> &query_engine_;
Event event_;
SessionData &session_data_;
};
} // namespace communication

View File

@ -9,10 +9,6 @@
#include <glog/logging.h>
#include "database/dbms.hpp"
#include "query/engine.hpp"
#include "communication/bolt/v1/session.hpp"
#include "io/network/network_error.hpp"
#include "io/network/stream_reader.hpp"
@ -33,19 +29,19 @@ namespace communication {
* @tparam OutputStream the worker has to get the output stream as a template
parameter because the output stream is templated
* @tparam Socket the input/output socket that should be used
* @tparam SessionData the class with objects that will be forwarded to the session
*/
template <typename Session, typename OutputStream, typename Socket>
template <typename Session, typename OutputStream, typename Socket, typename SessionData>
class Worker
: public io::network::StreamReader<Worker<Session, OutputStream, Socket>,
: public io::network::StreamReader<Worker<Session, OutputStream, Socket, SessionData>,
Session> {
using StreamBuffer = io::network::StreamBuffer;
public:
using uptr = std::unique_ptr<Worker<Session, OutputStream, Socket>>;
using uptr = std::unique_ptr<Worker<Session, OutputStream, Socket, SessionData>>;
Worker(Dbms &dbms, QueryEngine<OutputStream> &query_engine)
: dbms_(dbms), query_engine_(query_engine) {}
Worker(SessionData &session_data) : session_data_(session_data) {}
Session &OnConnect(Socket &&socket) {
DLOG(INFO) << "Accepting connection on socket " << socket.id();
@ -53,7 +49,7 @@ class Worker
// TODO fix session lifecycle handling
// dangling pointers are not cool :)
// TODO attach currently active Db
return *(new Session(std::forward<Socket>(socket), dbms_, query_engine_));
return *(new Session(std::forward<Socket>(socket), session_data_));
}
void OnError(Session &session) {
@ -101,7 +97,6 @@ class Worker
}
private:
Dbms &dbms_;
QueryEngine<OutputStream> &query_engine_;
SessionData &session_data_;
};
}

View File

@ -4,9 +4,6 @@
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "database/dbms.hpp"
#include "query/engine.hpp"
#include "communication/bolt/v1/session.hpp"
#include "communication/server.hpp"
@ -26,8 +23,9 @@ using session_t = communication::bolt::Session<socket_t>;
using result_stream_t =
communication::bolt::ResultStream<communication::bolt::Encoder<
communication::bolt::ChunkedEncoderBuffer<socket_t>>>;
using session_data_t = communication::bolt::SessionData<result_stream_t>;
using bolt_server_t =
communication::Server<session_t, result_stream_t, socket_t>;
communication::Server<session_t, result_stream_t, socket_t, session_data_t>;
DEFINE_string(interface, "0.0.0.0", "Default interface on which to listen.");
DEFINE_string(port, "7687", "Default port on which to listen.");
@ -123,11 +121,11 @@ int main(int argc, char **argv) {
LOG(FATAL) << "Cannot listen on socket!";
}
Dbms dbms;
QueryEngine<result_stream_t> query_engine;
// Initialize bolt session data (Dbms and QueryEngine).
session_data_t session_data;
// Initialize server.
bolt_server_t server(std::move(socket), dbms, query_engine);
bolt_server_t server(std::move(socket), session_data);
// register SIGTERM handler
SignalHandler::register_handler(Signal::Terminate,

View File

@ -4,6 +4,7 @@
#include <cassert>
#include <cstring>
#include <iostream>
#include <random>
#include <vector>
#include <glog/logging.h>
@ -11,10 +12,8 @@
#include "communication/bolt/v1/decoder/buffer.hpp"
#include "communication/server.hpp"
#include "database/dbms.hpp"
#include "io/network/epoll.hpp"
#include "io/network/socket.hpp"
#include "query/engine.hpp"
static constexpr const int SIZE = 60000;
static constexpr const int REPLY = 10;
@ -24,10 +23,11 @@ using socket_t = io::network::Socket;
class TestOutputStream {};
class TestData {};
class TestSession {
public:
TestSession(socket_t &&socket, Dbms &dbms,
QueryEngine<TestOutputStream> &query_engine)
TestSession(socket_t &&socket, TestData &data)
: socket_(std::move(socket)) {
event_.data.ptr = this;
}
@ -65,7 +65,7 @@ class TestSession {
};
using test_server_t =
communication::Server<TestSession, TestOutputStream, socket_t>;
communication::Server<TestSession, TestOutputStream, socket_t, TestData>;
void server_start(void *serverptr, int num) {
((test_server_t *)serverptr)->Start(num);

View File

@ -15,10 +15,8 @@
#include "communication/bolt/v1/decoder/buffer.hpp"
#include "communication/server.hpp"
#include "database/dbms.hpp"
#include "io/network/epoll.hpp"
#include "io/network/socket.hpp"
#include "query/engine.hpp"
static constexpr const char interface[] = "127.0.0.1";
@ -27,10 +25,11 @@ using socket_t = io::network::Socket;
class TestOutputStream {};
class TestData {};
class TestSession {
public:
TestSession(socket_t &&socket, Dbms &dbms,
QueryEngine<TestOutputStream> &query_engine)
TestSession(socket_t &&socket, TestData &data)
: socket_(std::move(socket)) {
event_.data.ptr = this;
}
@ -53,7 +52,7 @@ class TestSession {
};
using test_server_t =
communication::Server<TestSession, TestOutputStream, socket_t>;
communication::Server<TestSession, TestOutputStream, socket_t, TestData>;
test_server_t *serverptr;
std::atomic<bool> run{true};
@ -90,9 +89,8 @@ TEST(Network, SocketReadHangOnConcurrentConnections) {
printf("ADDRESS: %s, PORT: %d\n", ep.address(), ep.port());
// initialize server
Dbms dbms;
QueryEngine<TestOutputStream> query_engine;
test_server_t server(std::move(socket), dbms, query_engine);
TestData data;
test_server_t server(std::move(socket), data);
serverptr = &server;
// start server

View File

@ -26,9 +26,8 @@ TEST(Network, Server) {
printf("ADDRESS: %s, PORT: %d\n", ep.address(), ep.port());
// initialize server
Dbms dbms;
QueryEngine<TestOutputStream> query_engine;
test_server_t server(std::move(socket), dbms, query_engine);
TestData session_data;
test_server_t server(std::move(socket), session_data);
serverptr = &server;
// start server

View File

@ -30,9 +30,8 @@ TEST(Network, SessionLeak) {
printf("ADDRESS: %s, PORT: %d\n", ep.address(), ep.port());
// initialize server
Dbms dbms;
QueryEngine<TestOutputStream> query_engine;
test_server_t server(std::move(socket), dbms, query_engine);
TestData session_data;
test_server_t server(std::move(socket), session_data);
serverptr = &server;
// start server

View File

@ -2,11 +2,11 @@
#include <cassert>
#include <cstring>
#include <iostream>
#include <random>
#include <vector>
#include <glog/logging.h>
#include "database/dbms.hpp"
#include "gtest/gtest.h"
/**

View File

@ -2,6 +2,7 @@
#include "bolt_testdata.hpp"
#include "communication/bolt/v1/encoder/encoder.hpp"
#include "database/dbms.hpp"
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "query/typed_value.hpp"

View File

@ -11,15 +11,15 @@ DECLARE_bool(interpret);
// TODO: This could be done in fixture.
// Shortcuts for writing variable initializations in tests
#define INIT_VARS \
Dbms dbms; \
TestSocket socket(10); \
QueryEngine<ResultStreamT> query_engine; \
SessionT session(std::move(socket), dbms, query_engine); \
SessionDataT session_data; \
SessionT session(std::move(socket), session_data); \
std::vector<uint8_t> &output = session.socket_.output;
using ResultStreamT =
communication::bolt::ResultStream<communication::bolt::Encoder<
communication::bolt::ChunkedEncoderBuffer<TestSocket>>>;
using SessionDataT = communication::bolt::SessionData<ResultStreamT>;
using SessionT = communication::bolt::Session<TestSocket>;
using StateT = communication::bolt::State;