memgraph/src/memgraph_bolt.cpp

125 lines
3.4 KiB
C++
Raw Normal View History

2016-12-20 01:32:44 +08:00
#include <signal.h>
#include <experimental/filesystem>
#include <iostream>
#include "gflags/gflags.h"
#include "dbms/dbms.hpp"
#include "query/engine.hpp"
#include "communication/bolt/v1/session.hpp"
#include "communication/server.hpp"
#include "io/network/network_endpoint.hpp"
#include "io/network/network_error.hpp"
#include "io/network/socket.hpp"
#include "logging/default.hpp"
#include "logging/streams/stdout.hpp"
#include "utils/config/config.hpp"
#include "utils/signals/handler.hpp"
2016-12-20 01:32:44 +08:00
#include "utils/stacktrace/log.hpp"
2017-01-13 17:47:17 +08:00
#include "utils/terminate_handler.hpp"
namespace fs = std::experimental::filesystem;
using endpoint_t = io::network::NetworkEndpoint;
using socket_t = io::network::Socket;
using session_t = communication::bolt::Session<socket_t>;
using result_stream_t =
communication::bolt::ResultStream<communication::bolt::Encoder<
communication::bolt::ChunkedEncoderBuffer<socket_t>>>;
using bolt_server_t =
communication::Server<session_t, result_stream_t, socket_t>;
static bolt_server_t *serverptr;
Logger logger;
DEFINE_string(interface, "0.0.0.0", "Default interface on which to listen.");
DEFINE_string(port, "7687", "Default port on which to listen.");
DEFINE_int32(num_workers, std::thread::hardware_concurrency(),
"Number of workers");
void throw_and_stacktace(std::string message) {
Stacktrace stacktrace;
logger.info(stacktrace.dump());
2017-01-13 17:47:17 +08:00
}
int main(int argc, char **argv) {
fs::current_path(fs::path(argv[0]).parent_path());
gflags::ParseCommandLineFlags(&argc, &argv, true);
// Logging init.
2016-08-19 08:28:22 +08:00
#ifdef SYNC_LOGGER
logging::init_sync();
2016-08-19 08:28:22 +08:00
#else
logging::init_async();
2016-08-19 08:28:22 +08:00
#endif
logging::log->pipe(std::make_unique<Stdout>());
// Get logger.
logger = logging::log->logger("Main");
logger.info("{}", logging::log->type());
// Unhandled exception handler init.
std::set_terminate(&terminate_handler);
// Signal handling init.
SignalHandler::register_handler(Signal::SegmentationFault, []() {
log_stacktrace("SegmentationFault signal raised");
std::exit(EXIT_FAILURE);
});
SignalHandler::register_handler(Signal::Terminate, []() {
log_stacktrace("Terminate signal raised");
std::exit(EXIT_FAILURE);
});
SignalHandler::register_handler(Signal::Abort, []() {
log_stacktrace("Abort signal raised");
std::exit(EXIT_FAILURE);
});
// Register args.
CONFIG_REGISTER_ARGS(argc, argv);
// Initialize endpoint.
endpoint_t endpoint;
try {
endpoint = endpoint_t(FLAGS_interface, FLAGS_port);
} catch (io::network::NetworkEndpointException &e) {
logger.error("{}", e.what());
std::exit(EXIT_FAILURE);
}
// Initialize socket.
socket_t socket;
if (!socket.Bind(endpoint)) {
logger.error("Cannot bind to socket on {} at {}", FLAGS_interface,
FLAGS_port);
std::exit(EXIT_FAILURE);
}
if (!socket.SetNonBlocking()) {
logger.error("Cannot set socket to non blocking!");
std::exit(EXIT_FAILURE);
}
if (!socket.Listen(1024)) {
logger.error("Cannot listen on socket!");
std::exit(EXIT_FAILURE);
}
logger.info("Listening on {} at {}", FLAGS_interface, FLAGS_port);
Dbms dbms;
QueryEngine<result_stream_t> query_engine;
// Initialize server.
bolt_server_t server(std::move(socket), dbms, query_engine);
serverptr = &server;
// Start worker threads.
logger.info("Starting {} workers", FLAGS_num_workers);
server.Start(FLAGS_num_workers);
logger.info("Shutting down...");
return EXIT_SUCCESS;
}