2018-04-20 20:58:49 +08:00
|
|
|
#include <algorithm>
|
|
|
|
#include <chrono>
|
|
|
|
#include <cstdint>
|
|
|
|
#include <exception>
|
|
|
|
#include <functional>
|
|
|
|
#include <limits>
|
|
|
|
#include <thread>
|
2016-08-10 16:39:02 +08:00
|
|
|
|
2017-07-06 19:53:39 +08:00
|
|
|
#include <gflags/gflags.h>
|
|
|
|
#include <glog/logging.h>
|
2017-05-22 18:31:04 +08:00
|
|
|
|
2018-08-31 16:32:53 +08:00
|
|
|
#include "communication/server.hpp"
|
2018-10-05 18:37:23 +08:00
|
|
|
#include "database/single_node/graph_db.hpp"
|
2018-07-06 15:28:05 +08:00
|
|
|
#include "integrations/kafka/exceptions.hpp"
|
|
|
|
#include "integrations/kafka/streams.hpp"
|
2018-08-22 21:00:16 +08:00
|
|
|
#include "memgraph_init.hpp"
|
Extract communication to static library
Summary:
Session specifics have been move out of the Bolt `executing` state, and
are accessed via pure virtual Session type. Our server is templated on
the session and we are setting the concrete type, so there should be no
virtual call overhead. Abstract Session is used to indicate the
interface, this could have also been templated, but the explicit
interface definition makes it clearer.
Specific session implementation for running Memgraph is now implemented
in memgraph_bolt, which instantiates the concrete session type. This may
not be 100% appropriate place, but Memgraph specific session isn't
needed anywhere else.
Bolt/communication tests now use a dummy session and depend only on
communication, which significantly improves test run times.
All these changes make the communication a library which doesn't depend
on storage nor the database. Only shared connection points, which aren't
part of the base communication library are:
* glue/conversion -- which converts between storage and bolt types, and
* communication/result_stream_faker -- templated, but used in tests and query/repl
Depends on D1453
Reviewers: mferencevic, buda, mtomic, msantl
Reviewed By: mferencevic, mtomic
Subscribers: pullbot
Differential Revision: https://phabricator.memgraph.io/D1456
2018-07-10 22:18:19 +08:00
|
|
|
#include "query/exceptions.hpp"
|
2018-06-20 19:46:54 +08:00
|
|
|
#include "telemetry/telemetry.hpp"
|
2017-06-09 21:48:40 +08:00
|
|
|
#include "utils/flag_validation.hpp"
|
2017-09-26 15:43:43 +08:00
|
|
|
|
2017-12-19 19:40:30 +08:00
|
|
|
// General purpose flags.
|
2017-09-25 21:56:14 +08:00
|
|
|
DEFINE_string(interface, "0.0.0.0",
|
|
|
|
"Communication interface on which to listen.");
|
2018-01-15 21:03:07 +08:00
|
|
|
DEFINE_VALIDATED_int32(port, 7687, "Communication port on which to listen.",
|
|
|
|
FLAG_IN_RANGE(0, std::numeric_limits<uint16_t>::max()));
|
2017-06-09 21:48:40 +08:00
|
|
|
DEFINE_VALIDATED_int32(num_workers,
|
|
|
|
std::max(std::thread::hardware_concurrency(), 1U),
|
2018-02-23 17:56:56 +08:00
|
|
|
"Number of workers (Bolt)", FLAG_IN_RANGE(1, INT32_MAX));
|
2018-03-23 23:32:17 +08:00
|
|
|
DEFINE_VALIDATED_int32(session_inactivity_timeout, 1800,
|
|
|
|
"Time in seconds after which inactive sessions will be "
|
|
|
|
"closed.",
|
|
|
|
FLAG_IN_RANGE(1, INT32_MAX));
|
2018-06-20 23:44:47 +08:00
|
|
|
DEFINE_string(cert_file, "", "Certificate file to use.");
|
|
|
|
DEFINE_string(key_file, "", "Key file to use.");
|
2018-08-22 21:00:16 +08:00
|
|
|
|
2018-06-20 19:46:54 +08:00
|
|
|
DEFINE_bool(telemetry_enabled, false,
|
|
|
|
"Set to true to enable telemetry. We collect information about the "
|
|
|
|
"running system (CPU and memory information) and information about "
|
|
|
|
"the database runtime (vertex and edge counts and resource usage) "
|
|
|
|
"to allow for easier improvement of the product.");
|
Extract communication to static library
Summary:
Session specifics have been move out of the Bolt `executing` state, and
are accessed via pure virtual Session type. Our server is templated on
the session and we are setting the concrete type, so there should be no
virtual call overhead. Abstract Session is used to indicate the
interface, this could have also been templated, but the explicit
interface definition makes it clearer.
Specific session implementation for running Memgraph is now implemented
in memgraph_bolt, which instantiates the concrete session type. This may
not be 100% appropriate place, but Memgraph specific session isn't
needed anywhere else.
Bolt/communication tests now use a dummy session and depend only on
communication, which significantly improves test run times.
All these changes make the communication a library which doesn't depend
on storage nor the database. Only shared connection points, which aren't
part of the base communication library are:
* glue/conversion -- which converts between storage and bolt types, and
* communication/result_stream_faker -- templated, but used in tests and query/repl
Depends on D1453
Reviewers: mferencevic, buda, mtomic, msantl
Reviewed By: mferencevic, mtomic
Subscribers: pullbot
Differential Revision: https://phabricator.memgraph.io/D1456
2018-07-10 22:18:19 +08:00
|
|
|
|
2019-02-19 20:50:46 +08:00
|
|
|
// Audit logging flags.
|
|
|
|
DEFINE_bool(audit_enabled, false, "Set to true to enable audit logging.");
|
|
|
|
DEFINE_VALIDATED_int32(audit_buffer_size, audit::kBufferSizeDefault,
|
|
|
|
"Maximum number of items in the audit log buffer.",
|
|
|
|
FLAG_IN_RANGE(1, INT32_MAX));
|
|
|
|
DEFINE_VALIDATED_int32(
|
|
|
|
audit_buffer_flush_interval_ms, audit::kBufferFlushIntervalMillisDefault,
|
|
|
|
"Interval (in milliseconds) used for flushing the audit log buffer.",
|
|
|
|
FLAG_IN_RANGE(10, INT32_MAX));
|
|
|
|
|
Extract communication to static library
Summary:
Session specifics have been move out of the Bolt `executing` state, and
are accessed via pure virtual Session type. Our server is templated on
the session and we are setting the concrete type, so there should be no
virtual call overhead. Abstract Session is used to indicate the
interface, this could have also been templated, but the explicit
interface definition makes it clearer.
Specific session implementation for running Memgraph is now implemented
in memgraph_bolt, which instantiates the concrete session type. This may
not be 100% appropriate place, but Memgraph specific session isn't
needed anywhere else.
Bolt/communication tests now use a dummy session and depend only on
communication, which significantly improves test run times.
All these changes make the communication a library which doesn't depend
on storage nor the database. Only shared connection points, which aren't
part of the base communication library are:
* glue/conversion -- which converts between storage and bolt types, and
* communication/result_stream_faker -- templated, but used in tests and query/repl
Depends on D1453
Reviewers: mferencevic, buda, mtomic, msantl
Reviewed By: mferencevic, mtomic
Subscribers: pullbot
Differential Revision: https://phabricator.memgraph.io/D1456
2018-07-10 22:18:19 +08:00
|
|
|
using ServerT = communication::Server<BoltSession, SessionData>;
|
|
|
|
using communication::ServerContext;
|
|
|
|
|
2018-04-20 20:58:49 +08:00
|
|
|
void SingleNodeMain() {
|
|
|
|
google::SetUsageMessage("Memgraph single-node database server");
|
2019-02-19 20:50:46 +08:00
|
|
|
|
|
|
|
// All enterprise features should be constructed before the main database
|
|
|
|
// storage. This will cause them to be destructed *after* the main database
|
|
|
|
// storage. That way any errors that happen during enterprise features
|
|
|
|
// destruction won't have an impact on the storage engine.
|
|
|
|
// Example: When the main storage is destructed it makes a snapshot. When
|
|
|
|
// audit logging is destructed it syncs all pending data to disk and that can
|
|
|
|
// fail. That is why it must be destructed *after* the main database storage
|
|
|
|
// to minimise the impact of their failure on the main storage.
|
|
|
|
|
|
|
|
// Begin enterprise features initialization
|
|
|
|
|
2019-04-23 17:00:49 +08:00
|
|
|
auto durability_directory = std::filesystem::path(FLAGS_durability_directory);
|
2019-02-19 20:50:46 +08:00
|
|
|
|
|
|
|
// Auth
|
2019-02-27 22:22:54 +08:00
|
|
|
auth::Init();
|
2019-02-19 20:50:46 +08:00
|
|
|
auth::Auth auth{durability_directory / "auth"};
|
|
|
|
|
|
|
|
// Audit log
|
|
|
|
audit::Log audit_log{durability_directory / "audit", FLAGS_audit_buffer_size,
|
|
|
|
FLAGS_audit_buffer_flush_interval_ms};
|
|
|
|
// Start the log if enabled.
|
|
|
|
if (FLAGS_audit_enabled) {
|
|
|
|
audit_log.Start();
|
|
|
|
}
|
|
|
|
// Setup SIGUSR2 to be used for reopening audit log files, when e.g. logrotate
|
|
|
|
// rotates our audit logs.
|
|
|
|
CHECK(utils::SignalHandler::RegisterHandler(
|
|
|
|
utils::Signal::User2, [&audit_log]() { audit_log.ReopenLog(); }))
|
|
|
|
<< "Unable to register SIGUSR2 handler!";
|
|
|
|
|
|
|
|
// End enterprise features initialization
|
|
|
|
|
|
|
|
// Main storage and execution engines initialization
|
|
|
|
|
2018-10-09 17:09:10 +08:00
|
|
|
database::GraphDb db;
|
2018-08-24 16:12:04 +08:00
|
|
|
query::Interpreter interpreter;
|
2019-02-19 20:50:46 +08:00
|
|
|
SessionData session_data{&db, &interpreter, &auth, &audit_log};
|
2018-06-20 23:44:47 +08:00
|
|
|
|
2018-07-06 15:28:05 +08:00
|
|
|
integrations::kafka::Streams kafka_streams{
|
2019-02-19 20:50:46 +08:00
|
|
|
durability_directory / "streams",
|
2018-08-22 21:00:16 +08:00
|
|
|
[&session_data](
|
|
|
|
const std::string &query,
|
|
|
|
const std::map<std::string, communication::bolt::Value> ¶ms) {
|
|
|
|
KafkaStreamWriter(session_data, query, params);
|
|
|
|
}};
|
2018-07-06 15:28:05 +08:00
|
|
|
|
|
|
|
try {
|
|
|
|
// Recover possible streams.
|
|
|
|
kafka_streams.Recover();
|
|
|
|
} catch (const integrations::kafka::KafkaStreamException &e) {
|
|
|
|
LOG(ERROR) << e.what();
|
|
|
|
}
|
|
|
|
|
2019-02-19 20:50:46 +08:00
|
|
|
session_data.interpreter->auth_ = &auth;
|
2018-08-24 16:12:04 +08:00
|
|
|
session_data.interpreter->kafka_streams_ = &kafka_streams;
|
2018-07-06 15:28:05 +08:00
|
|
|
|
2018-06-20 23:44:47 +08:00
|
|
|
ServerContext context;
|
|
|
|
std::string service_name = "Bolt";
|
|
|
|
if (FLAGS_key_file != "" && FLAGS_cert_file != "") {
|
|
|
|
context = ServerContext(FLAGS_key_file, FLAGS_cert_file);
|
|
|
|
service_name = "BoltS";
|
|
|
|
}
|
|
|
|
|
2018-01-15 21:03:07 +08:00
|
|
|
ServerT server({FLAGS_interface, static_cast<uint16_t>(FLAGS_port)},
|
2018-09-03 21:29:06 +08:00
|
|
|
&session_data, &context, FLAGS_session_inactivity_timeout,
|
2018-06-20 23:44:47 +08:00
|
|
|
service_name, FLAGS_num_workers);
|
2017-12-19 19:40:30 +08:00
|
|
|
|
2018-06-20 19:46:54 +08:00
|
|
|
// Setup telemetry
|
2019-04-23 17:00:49 +08:00
|
|
|
std::optional<telemetry::Telemetry> telemetry;
|
2018-06-20 19:46:54 +08:00
|
|
|
if (FLAGS_telemetry_enabled) {
|
|
|
|
telemetry.emplace(
|
|
|
|
"https://telemetry.memgraph.com/88b5e7e8-746a-11e8-9f85-538a9e9690cc/",
|
2019-02-19 20:50:46 +08:00
|
|
|
durability_directory / "telemetry", std::chrono::minutes(10));
|
2018-06-20 19:46:54 +08:00
|
|
|
telemetry->AddCollector("db", [&db]() -> nlohmann::json {
|
2018-07-26 15:08:21 +08:00
|
|
|
auto dba = db.Access();
|
2019-04-15 17:36:43 +08:00
|
|
|
return {{"vertices", dba.VerticesCount()}, {"edges", dba.EdgesCount()}};
|
2018-06-20 19:46:54 +08:00
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2017-12-19 19:40:30 +08:00
|
|
|
// Handler for regular termination signals
|
2018-01-12 22:17:04 +08:00
|
|
|
auto shutdown = [&server] {
|
2017-12-19 19:40:30 +08:00
|
|
|
// Server needs to be shutdown first and then the database. This prevents a
|
|
|
|
// race condition when a transaction is accepted during server shutdown.
|
|
|
|
server.Shutdown();
|
|
|
|
};
|
|
|
|
InitSignalHandlers(shutdown);
|
2018-04-20 20:58:49 +08:00
|
|
|
|
2018-10-16 16:58:41 +08:00
|
|
|
CHECK(server.Start()) << "Couldn't start the Bolt server!";
|
2018-01-10 20:56:12 +08:00
|
|
|
server.AwaitShutdown();
|
2017-12-19 19:40:30 +08:00
|
|
|
}
|
|
|
|
|
2019-02-19 20:50:46 +08:00
|
|
|
int main(int argc, char **argv) { return WithInit(argc, argv, SingleNodeMain); }
|