memgraph/src/memgraph.cpp
Teon Banek 283a91cc60 Integrate loading openCypher module procedures
Summary:
All mgp_* symbols are exported from Memgraph executable, no other
symbols should be visible.

The primary C API header, mg_procedure.h, is now part of the
installation. Also, added a shippable query module example.

Directory `query_modules` is meant to contain sources of modules we
write and ship as part of the installation. Currently, there's only an
example module, but there may be potentially more. Some modules could
only be installed as part of the enterprise release.

For Memgraph to load custom procedures, it needs to be started with a
flag pointing to a directory with compiled shared libraries implementing
those procedures.

Reviewers: mferencevic, ipaljak, llugovic, dsantl, buda

Reviewed By: mferencevic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2538
2019-11-07 15:23:02 +01:00

235 lines
9.4 KiB
C++

#include <algorithm>
#include <chrono>
#include <cstdint>
#include <exception>
#include <functional>
#include <limits>
#include <thread>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "communication/server.hpp"
#ifdef MG_SINGLE_NODE_V2
#include "storage/v2/storage.hpp"
#else
#include "database/single_node/graph_db.hpp"
#endif
#include "memgraph_init.hpp"
#include "query/exceptions.hpp"
#include "telemetry/telemetry.hpp"
#include "utils/file.hpp"
#include "utils/flag_validation.hpp"
#include "query/procedure/module.hpp"
// General purpose flags.
DEFINE_string(interface, "0.0.0.0",
"Communication interface on which to listen.");
DEFINE_VALIDATED_int32(port, 7687, "Communication port on which to listen.",
FLAG_IN_RANGE(0, std::numeric_limits<uint16_t>::max()));
DEFINE_VALIDATED_int32(num_workers,
std::max(std::thread::hardware_concurrency(), 1U),
"Number of workers (Bolt)", FLAG_IN_RANGE(1, INT32_MAX));
DEFINE_VALIDATED_int32(session_inactivity_timeout, 1800,
"Time in seconds after which inactive sessions will be "
"closed.",
FLAG_IN_RANGE(1, INT32_MAX));
DEFINE_string(cert_file, "", "Certificate file to use.");
DEFINE_string(key_file, "", "Key file to use.");
#ifdef MG_SINGLE_NODE_V2
// General purpose flags.
DEFINE_string(data_directory, "mg_data",
"Path to directory in which to save all permanent data.");
// Storage flags.
DEFINE_VALIDATED_uint64(storage_gc_cycle_sec, 30,
"Storage garbage collector interval (in seconds).",
FLAG_IN_RANGE(1, 24 * 3600));
DEFINE_bool(storage_properties_on_edges, false,
"Controls whether edges have properties.");
DEFINE_bool(storage_recover_on_startup, false,
"Controls whether the storage recovers persisted data on startup.");
DEFINE_VALIDATED_uint64(storage_snapshot_interval_sec, 0,
"Storage snapshot creation interval (in seconds). Set "
"to 0 to disable periodic snapshot creation.",
FLAG_IN_RANGE(0, 7 * 24 * 3600));
DEFINE_bool(storage_wal_enabled, false,
"Controls whether the storage uses write-ahead-logging. To enable "
"WAL periodic snapshots must be enabled.");
DEFINE_VALIDATED_uint64(storage_snapshot_retention_count, 3,
"The number of snapshots that should always be kept.",
FLAG_IN_RANGE(1, 1000000));
DEFINE_VALIDATED_uint64(storage_wal_file_size_kib,
storage::Config::Durability().wal_file_size_kibibytes,
"Minimum file size of each WAL file.",
FLAG_IN_RANGE(1, 1000 * 1024));
DEFINE_VALIDATED_uint64(
storage_wal_file_flush_every_n_tx,
storage::Config::Durability().wal_file_flush_every_n_tx,
"Issue a 'fsync' call after this amount of transactions are written to the "
"WAL file. Set to 1 for fully synchronous operation.",
FLAG_IN_RANGE(1, 1000000));
DEFINE_bool(storage_snapshot_on_exit, false,
"Controls whether the storage creates another snapshot on exit.");
#endif
DEFINE_bool(telemetry_enabled, false,
"Set to true to enable telemetry. We collect information about the "
"running system (CPU and memory information) and information about "
"the database runtime (vertex and edge counts and resource usage) "
"to allow for easier improvement of the product.");
// Audit logging flags.
DEFINE_bool(audit_enabled, false, "Set to true to enable audit logging.");
DEFINE_VALIDATED_int32(audit_buffer_size, audit::kBufferSizeDefault,
"Maximum number of items in the audit log buffer.",
FLAG_IN_RANGE(1, INT32_MAX));
DEFINE_VALIDATED_int32(
audit_buffer_flush_interval_ms, audit::kBufferFlushIntervalMillisDefault,
"Interval (in milliseconds) used for flushing the audit log buffer.",
FLAG_IN_RANGE(10, INT32_MAX));
DEFINE_VALIDATED_string(
query_modules_directory, "",
"Directory where modules with custom query procedures are stored", {
if (value.empty()) return true;
if (utils::DirExists(value)) return true;
std::cout << "Expected --" << flagname << " to point to a directory."
<< std::endl;
return false;
});
using ServerT = communication::Server<BoltSession, SessionData>;
using communication::ServerContext;
void SingleNodeMain() {
google::SetUsageMessage("Memgraph single-node database server");
// All enterprise features should be constructed before the main database
// storage. This will cause them to be destructed *after* the main database
// storage. That way any errors that happen during enterprise features
// destruction won't have an impact on the storage engine.
// Example: When the main storage is destructed it makes a snapshot. When
// audit logging is destructed it syncs all pending data to disk and that can
// fail. That is why it must be destructed *after* the main database storage
// to minimise the impact of their failure on the main storage.
// Begin enterprise features initialization
#ifdef MG_SINGLE_NODE_V2
auto data_directory = std::filesystem::path(FLAGS_data_directory);
#else
auto data_directory = std::filesystem::path(FLAGS_durability_directory);
#endif
// Auth
auth::Auth auth{data_directory / "auth"};
// Audit log
audit::Log audit_log{data_directory / "audit", FLAGS_audit_buffer_size,
FLAGS_audit_buffer_flush_interval_ms};
// Start the log if enabled.
if (FLAGS_audit_enabled) {
audit_log.Start();
}
// Setup SIGUSR2 to be used for reopening audit log files, when e.g. logrotate
// rotates our audit logs.
CHECK(utils::SignalHandler::RegisterHandler(
utils::Signal::User2, [&audit_log]() { audit_log.ReopenLog(); }))
<< "Unable to register SIGUSR2 handler!";
// End enterprise features initialization
// Main storage and execution engines initialization
#ifdef MG_SINGLE_NODE_V2
storage::Config db_config{
.gc = {.type = storage::Config::Gc::Type::PERIODIC,
.interval = std::chrono::seconds(FLAGS_storage_gc_cycle_sec)},
.items = {.properties_on_edges = FLAGS_storage_properties_on_edges},
.durability = {
.storage_directory = FLAGS_data_directory,
.recover_on_startup = FLAGS_storage_recover_on_startup,
.snapshot_retention_count = FLAGS_storage_snapshot_retention_count,
.wal_file_size_kibibytes = FLAGS_storage_wal_file_size_kib,
.wal_file_flush_every_n_tx = FLAGS_storage_wal_file_flush_every_n_tx,
.snapshot_on_exit = FLAGS_storage_snapshot_on_exit}};
if (FLAGS_storage_snapshot_interval_sec == 0) {
LOG_IF(FATAL, FLAGS_storage_wal_enabled)
<< "In order to use write-ahead-logging you must enable "
"periodic snapshots by setting the snapshot interval to a "
"value larger than 0!";
db_config.durability.snapshot_wal_mode =
storage::Config::Durability::SnapshotWalMode::DISABLED;
} else {
if (FLAGS_storage_wal_enabled) {
db_config.durability.snapshot_wal_mode = storage::Config::Durability::
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL;
} else {
db_config.durability.snapshot_wal_mode =
storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT;
}
db_config.durability.snapshot_interval =
std::chrono::seconds(FLAGS_storage_snapshot_interval_sec);
}
storage::Storage db(db_config);
#else
database::GraphDb db;
#endif
query::InterpreterContext interpreter_context{&db};
SessionData session_data{&db, &interpreter_context, &auth, &audit_log};
// Register modules
if (!FLAGS_query_modules_directory.empty()) {
for (const auto &entry :
std::filesystem::directory_iterator(FLAGS_query_modules_directory)) {
if (entry.is_regular_file() && entry.path().extension() == ".so")
query::procedure::gModuleRegistry.LoadModuleLibrary(entry.path());
}
}
// Register modules END
interpreter_context.auth = &auth;
ServerContext context;
std::string service_name = "Bolt";
if (FLAGS_key_file != "" && FLAGS_cert_file != "") {
context = ServerContext(FLAGS_key_file, FLAGS_cert_file);
service_name = "BoltS";
}
ServerT server({FLAGS_interface, static_cast<uint16_t>(FLAGS_port)},
&session_data, &context, FLAGS_session_inactivity_timeout,
service_name, FLAGS_num_workers);
// Setup telemetry
std::optional<telemetry::Telemetry> telemetry;
#ifndef MG_SINGLE_NODE_V2
if (FLAGS_telemetry_enabled) {
telemetry.emplace(
"https://telemetry.memgraph.com/88b5e7e8-746a-11e8-9f85-538a9e9690cc/",
data_directory / "telemetry", std::chrono::minutes(10));
telemetry->AddCollector("db", [&db]() -> nlohmann::json {
auto dba = db.Access();
return {{"vertices", dba.VerticesCount()}, {"edges", dba.EdgesCount()}};
});
}
#endif
// Handler for regular termination signals
auto shutdown = [&server] {
// Server needs to be shutdown first and then the database. This prevents a
// race condition when a transaction is accepted during server shutdown.
server.Shutdown();
};
InitSignalHandlers(shutdown);
CHECK(server.Start()) << "Couldn't start the Bolt server!";
server.AwaitShutdown();
query::procedure::gModuleRegistry.UnloadAllModules();
}
int main(int argc, char **argv) { return WithInit(argc, argv, SingleNodeMain); }