Merge memgraph_init with main file
Reviewers: teon.banek Reviewed By: teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2684
This commit is contained in:
parent
2562070b06
commit
4e5a91e7fb
@ -135,7 +135,6 @@ include_directories(${CMAKE_CURRENT_BINARY_DIR})
|
||||
# ----------------------------------------------------------------------------
|
||||
set(mg_single_node_v2_sources
|
||||
glue/communication.cpp
|
||||
memgraph_init.cpp
|
||||
memgraph.cpp
|
||||
)
|
||||
if (MG_ENTERPRISE)
|
||||
|
335
src/memgraph.cpp
335
src/memgraph.cpp
@ -1,30 +1,51 @@
|
||||
#include <algorithm>
|
||||
#include <chrono>
|
||||
#include <csignal>
|
||||
#include <cstdint>
|
||||
#include <exception>
|
||||
#include <filesystem>
|
||||
#include <functional>
|
||||
#include <limits>
|
||||
#include <map>
|
||||
#include <optional>
|
||||
#include <regex>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include "communication/bolt/v1/exceptions.hpp"
|
||||
#include "communication/bolt/v1/session.hpp"
|
||||
#include "communication/init.hpp"
|
||||
#include "communication/server.hpp"
|
||||
#include "memgraph_init.hpp"
|
||||
#include "communication/session.hpp"
|
||||
#include "config.hpp"
|
||||
#include "glue/communication.hpp"
|
||||
#include "py/py.hpp"
|
||||
#include "query/exceptions.hpp"
|
||||
#include "query/interpreter.hpp"
|
||||
#include "query/procedure/module.hpp"
|
||||
#include "query/procedure/py_module.hpp"
|
||||
#include "requests/requests.hpp"
|
||||
#include "storage/v2/storage.hpp"
|
||||
#include "storage/v2/view.hpp"
|
||||
#include "telemetry/telemetry.hpp"
|
||||
#include "utils/file.hpp"
|
||||
#include "utils/flag_validation.hpp"
|
||||
#include "utils/signals.hpp"
|
||||
#include "utils/string.hpp"
|
||||
#include "utils/sysinfo/memory.hpp"
|
||||
#include "utils/terminate_handler.hpp"
|
||||
#include "version.hpp"
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
#include "audit/log.hpp"
|
||||
#include "auth/auth.hpp"
|
||||
#include "glue/auth.hpp"
|
||||
#endif
|
||||
|
||||
// General purpose flags.
|
||||
// Bolt server flags.
|
||||
DEFINE_string(bolt_address, "0.0.0.0",
|
||||
"IP address on which the Bolt server should listen.");
|
||||
DEFINE_VALIDATED_int32(bolt_port, 7687,
|
||||
@ -44,10 +65,21 @@ DEFINE_string(bolt_cert_file, "",
|
||||
"Certificate file which should be used for the Bolt server.");
|
||||
DEFINE_string(bolt_key_file, "",
|
||||
"Key file which should be used for the Bolt server.");
|
||||
DEFINE_string(bolt_server_name_for_init, "",
|
||||
"Server name which the database should send to the client in the "
|
||||
"Bolt INIT message.");
|
||||
|
||||
// General purpose flags.
|
||||
DEFINE_string(data_directory, "mg_data",
|
||||
"Path to directory in which to save all permanent data.");
|
||||
DEFINE_string(log_file, "", "Path to where the log should be stored.");
|
||||
DEFINE_HIDDEN_string(
|
||||
log_link_basename, "",
|
||||
"Basename used for symlink creation to the last log file.");
|
||||
DEFINE_uint64(memory_warning_threshold, 1024,
|
||||
"Memory warning threshold, in MB. If Memgraph detects there is "
|
||||
"less available RAM it will log a warning. Set to 0 to "
|
||||
"disable.");
|
||||
|
||||
// Storage flags.
|
||||
DEFINE_VALIDATED_uint64(storage_gc_cycle_sec, 30,
|
||||
@ -113,11 +145,208 @@ DEFINE_VALIDATED_string(
|
||||
return false;
|
||||
});
|
||||
|
||||
/// Encapsulates Dbms and Interpreter that are passed through the network server
|
||||
/// and worker to the session.
|
||||
#ifdef MG_ENTERPRISE
|
||||
struct SessionData {
|
||||
// Explicit constructor here to ensure that pointers to all objects are
|
||||
// supplied.
|
||||
SessionData(storage::Storage *db,
|
||||
query::InterpreterContext *interpreter_context, auth::Auth *auth,
|
||||
audit::Log *audit_log)
|
||||
: db(db),
|
||||
interpreter_context(interpreter_context),
|
||||
auth(auth),
|
||||
audit_log(audit_log) {}
|
||||
storage::Storage *db;
|
||||
query::InterpreterContext *interpreter_context;
|
||||
auth::Auth *auth;
|
||||
audit::Log *audit_log;
|
||||
};
|
||||
#else
|
||||
struct SessionData {
|
||||
// Explicit constructor here to ensure that pointers to all objects are
|
||||
// supplied.
|
||||
SessionData(storage::Storage *db,
|
||||
query::InterpreterContext *interpreter_context)
|
||||
: db(db), interpreter_context(interpreter_context) {}
|
||||
storage::Storage *db;
|
||||
query::InterpreterContext *interpreter_context;
|
||||
};
|
||||
#endif
|
||||
|
||||
class BoltSession final
|
||||
: public communication::bolt::Session<communication::InputStream,
|
||||
communication::OutputStream> {
|
||||
public:
|
||||
BoltSession(SessionData *data, const io::network::Endpoint &endpoint,
|
||||
communication::InputStream *input_stream,
|
||||
communication::OutputStream *output_stream)
|
||||
: communication::bolt::Session<communication::InputStream,
|
||||
communication::OutputStream>(
|
||||
input_stream, output_stream),
|
||||
db_(data->db),
|
||||
interpreter_(data->interpreter_context),
|
||||
#ifdef MG_ENTERPRISE
|
||||
#ifndef MG_SINGLE_NODE_HA
|
||||
auth_(data->auth),
|
||||
audit_log_(data->audit_log),
|
||||
#endif
|
||||
#endif
|
||||
endpoint_(endpoint) {
|
||||
}
|
||||
|
||||
using communication::bolt::Session<communication::InputStream,
|
||||
communication::OutputStream>::TEncoder;
|
||||
|
||||
std::vector<std::string> Interpret(
|
||||
const std::string &query,
|
||||
const std::map<std::string, communication::bolt::Value> ¶ms)
|
||||
override {
|
||||
std::map<std::string, storage::PropertyValue> params_pv;
|
||||
for (const auto &kv : params)
|
||||
params_pv.emplace(kv.first, glue::ToPropertyValue(kv.second));
|
||||
#ifdef MG_ENTERPRISE
|
||||
#ifndef MG_SINGLE_NODE_HA
|
||||
audit_log_->Record(endpoint_.address(), user_ ? user_->username() : "",
|
||||
query, storage::PropertyValue(params_pv));
|
||||
#endif
|
||||
#endif
|
||||
try {
|
||||
auto result = interpreter_.Prepare(query, params_pv);
|
||||
#ifdef MG_ENTERPRISE
|
||||
#ifndef MG_SINGLE_NODE_HA
|
||||
if (user_) {
|
||||
const auto &permissions = user_->GetPermissions();
|
||||
for (const auto &privilege : result.second) {
|
||||
if (permissions.Has(glue::PrivilegeToPermission(privilege)) !=
|
||||
auth::PermissionLevel::GRANT) {
|
||||
interpreter_.Abort();
|
||||
throw communication::bolt::ClientError(
|
||||
"You are not authorized to execute this query! Please contact "
|
||||
"your database administrator.");
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
#endif
|
||||
return result.first;
|
||||
|
||||
} catch (const query::QueryException &e) {
|
||||
// Wrap QueryException into ClientError, because we want to allow the
|
||||
// client to fix their query.
|
||||
throw communication::bolt::ClientError(e.what());
|
||||
}
|
||||
}
|
||||
|
||||
std::map<std::string, communication::bolt::Value> PullAll(
|
||||
TEncoder *encoder) override {
|
||||
try {
|
||||
TypedValueResultStream stream(encoder, db_);
|
||||
const auto &summary = interpreter_.PullAll(&stream);
|
||||
std::map<std::string, communication::bolt::Value> decoded_summary;
|
||||
for (const auto &kv : summary) {
|
||||
auto maybe_value =
|
||||
glue::ToBoltValue(kv.second, *db_, storage::View::NEW);
|
||||
if (maybe_value.HasError()) {
|
||||
switch (maybe_value.GetError()) {
|
||||
case storage::Error::DELETED_OBJECT:
|
||||
case storage::Error::SERIALIZATION_ERROR:
|
||||
case storage::Error::VERTEX_HAS_EDGES:
|
||||
case storage::Error::PROPERTIES_DISABLED:
|
||||
case storage::Error::NONEXISTENT_OBJECT:
|
||||
throw communication::bolt::ClientError(
|
||||
"Unexpected storage error when streaming summary.");
|
||||
}
|
||||
}
|
||||
decoded_summary.emplace(kv.first, std::move(*maybe_value));
|
||||
}
|
||||
return decoded_summary;
|
||||
} catch (const query::QueryException &e) {
|
||||
// Wrap QueryException into ClientError, because we want to allow the
|
||||
// client to fix their query.
|
||||
throw communication::bolt::ClientError(e.what());
|
||||
}
|
||||
}
|
||||
|
||||
void Abort() override { interpreter_.Abort(); }
|
||||
|
||||
bool Authenticate(const std::string &username,
|
||||
const std::string &password) override {
|
||||
#ifdef MG_ENTERPRISE
|
||||
#ifdef MG_SINGLE_NODE_HA
|
||||
return true;
|
||||
#else
|
||||
if (!auth_->HasUsers()) return true;
|
||||
user_ = auth_->Authenticate(username, password);
|
||||
return !!user_;
|
||||
#endif
|
||||
#else
|
||||
return true;
|
||||
#endif
|
||||
}
|
||||
|
||||
std::optional<std::string> GetServerNameForInit() override {
|
||||
if (FLAGS_bolt_server_name_for_init.empty()) return std::nullopt;
|
||||
return FLAGS_bolt_server_name_for_init;
|
||||
}
|
||||
|
||||
private:
|
||||
/// Wrapper around TEncoder which converts TypedValue to Value
|
||||
/// before forwarding the calls to original TEncoder.
|
||||
class TypedValueResultStream {
|
||||
public:
|
||||
TypedValueResultStream(TEncoder *encoder, const storage::Storage *db)
|
||||
: encoder_(encoder), db_(db) {}
|
||||
|
||||
void Result(const std::vector<query::TypedValue> &values) {
|
||||
std::vector<communication::bolt::Value> decoded_values;
|
||||
decoded_values.reserve(values.size());
|
||||
for (const auto &v : values) {
|
||||
auto maybe_value = glue::ToBoltValue(v, *db_, storage::View::NEW);
|
||||
if (maybe_value.HasError()) {
|
||||
switch (maybe_value.GetError()) {
|
||||
case storage::Error::DELETED_OBJECT:
|
||||
throw communication::bolt::ClientError(
|
||||
"Returning a deleted object as a result.");
|
||||
case storage::Error::NONEXISTENT_OBJECT:
|
||||
throw communication::bolt::ClientError(
|
||||
"Returning a nonexistent object as a result.");
|
||||
case storage::Error::VERTEX_HAS_EDGES:
|
||||
case storage::Error::SERIALIZATION_ERROR:
|
||||
case storage::Error::PROPERTIES_DISABLED:
|
||||
throw communication::bolt::ClientError(
|
||||
"Unexpected storage error when streaming results.");
|
||||
}
|
||||
}
|
||||
decoded_values.emplace_back(std::move(*maybe_value));
|
||||
}
|
||||
encoder_->MessageRecord(decoded_values);
|
||||
}
|
||||
|
||||
private:
|
||||
TEncoder *encoder_;
|
||||
// NOTE: Needed only for ToBoltValue conversions
|
||||
const storage::Storage *db_;
|
||||
};
|
||||
|
||||
// NOTE: Needed only for ToBoltValue conversions
|
||||
const storage::Storage *db_;
|
||||
query::Interpreter interpreter_;
|
||||
#ifdef MG_ENTERPRISE
|
||||
#ifndef MG_SINGLE_NODE_HA
|
||||
auth::Auth *auth_;
|
||||
std::optional<auth::User> user_;
|
||||
audit::Log *audit_log_;
|
||||
#endif
|
||||
#endif
|
||||
io::network::Endpoint endpoint_;
|
||||
};
|
||||
|
||||
using ServerT = communication::Server<BoltSession, SessionData>;
|
||||
using communication::ServerContext;
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
|
||||
DEFINE_string(
|
||||
auth_user_or_role_name_regex, "[a-zA-Z0-9_.+-@]+",
|
||||
"Set to the regular expression that each user or role name must fulfill.");
|
||||
@ -539,7 +768,95 @@ class AuthQueryHandler final : public query::AuthQueryHandler {
|
||||
};
|
||||
#endif
|
||||
|
||||
void SingleNodeMain() {
|
||||
// Needed to correctly handle memgraph destruction from a signal handler.
|
||||
// Without having some sort of a flag, it is possible that a signal is handled
|
||||
// when we are exiting main, inside destructors of database::GraphDb and
|
||||
// similar. The signal handler may then initiate another shutdown on memgraph
|
||||
// which is in half destructed state, causing invalid memory access and crash.
|
||||
volatile sig_atomic_t is_shutting_down = 0;
|
||||
|
||||
void InitSignalHandlers(const std::function<void()> &shutdown_fun) {
|
||||
// Prevent handling shutdown inside a shutdown. For example, SIGINT handler
|
||||
// being interrupted by SIGTERM before is_shutting_down is set, thus causing
|
||||
// double shutdown.
|
||||
sigset_t block_shutdown_signals;
|
||||
sigemptyset(&block_shutdown_signals);
|
||||
sigaddset(&block_shutdown_signals, SIGTERM);
|
||||
sigaddset(&block_shutdown_signals, SIGINT);
|
||||
|
||||
// Wrap the shutdown function in a safe way to prevent recursive shutdown.
|
||||
auto shutdown = [shutdown_fun]() {
|
||||
if (is_shutting_down) return;
|
||||
is_shutting_down = 1;
|
||||
shutdown_fun();
|
||||
};
|
||||
|
||||
CHECK(utils::SignalHandler::RegisterHandler(utils::Signal::Terminate,
|
||||
shutdown, block_shutdown_signals))
|
||||
<< "Unable to register SIGTERM handler!";
|
||||
CHECK(utils::SignalHandler::RegisterHandler(utils::Signal::Interupt, shutdown,
|
||||
block_shutdown_signals))
|
||||
<< "Unable to register SIGINT handler!";
|
||||
|
||||
// Setup SIGUSR1 to be used for reopening log files, when e.g. logrotate
|
||||
// rotates our logs.
|
||||
CHECK(utils::SignalHandler::RegisterHandler(utils::Signal::User1, []() {
|
||||
google::CloseLogDestination(google::INFO);
|
||||
})) << "Unable to register SIGUSR1 handler!";
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
google::SetUsageMessage("Memgraph database server");
|
||||
gflags::SetVersionString(version_string);
|
||||
|
||||
// Load config before parsing arguments, so that flags from the command line
|
||||
// overwrite the config.
|
||||
LoadConfig();
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
|
||||
google::InitGoogleLogging(argv[0]);
|
||||
google::SetLogDestination(google::INFO, FLAGS_log_file.c_str());
|
||||
google::SetLogSymlink(google::INFO, FLAGS_log_link_basename.c_str());
|
||||
|
||||
// Unhandled exception handler init.
|
||||
std::set_terminate(&utils::TerminateHandler);
|
||||
|
||||
// Initialize Python
|
||||
auto *program_name = Py_DecodeLocale(argv[0], nullptr);
|
||||
CHECK(program_name);
|
||||
// Set program name, so Python can find its way to runtime libraries relative
|
||||
// to executable.
|
||||
Py_SetProgramName(program_name);
|
||||
PyImport_AppendInittab("_mgp", &query::procedure::PyInitMgpModule);
|
||||
Py_InitializeEx(0 /* = initsigs */);
|
||||
PyEval_InitThreads();
|
||||
Py_BEGIN_ALLOW_THREADS;
|
||||
|
||||
// Initialize the communication library.
|
||||
communication::Init();
|
||||
|
||||
// Initialize the requests library.
|
||||
requests::Init();
|
||||
|
||||
// Start memory warning logger.
|
||||
utils::Scheduler mem_log_scheduler;
|
||||
if (FLAGS_memory_warning_threshold > 0) {
|
||||
auto free_ram = utils::sysinfo::AvailableMemoryKilobytes();
|
||||
if (free_ram) {
|
||||
mem_log_scheduler.Run("Memory warning", std::chrono::seconds(3), [] {
|
||||
auto free_ram = utils::sysinfo::AvailableMemoryKilobytes();
|
||||
if (free_ram && *free_ram / 1024 < FLAGS_memory_warning_threshold)
|
||||
LOG(WARNING) << "Running out of available RAM, only "
|
||||
<< *free_ram / 1024 << " MB left.";
|
||||
});
|
||||
} else {
|
||||
// Kernel version for the `MemAvailable` value is from: man procfs
|
||||
LOG(WARNING) << "You have an older kernel version (<3.14) or the /proc "
|
||||
"filesystem isn't available so remaining memory warnings "
|
||||
"won't be available.";
|
||||
}
|
||||
}
|
||||
|
||||
std::cout << "You are running Memgraph v" << gflags::VersionString()
|
||||
<< std::endl;
|
||||
|
||||
@ -626,6 +943,7 @@ void SingleNodeMain() {
|
||||
}
|
||||
}
|
||||
// Register modules END
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
AuthQueryHandler auth_handler(&auth,
|
||||
std::regex(FLAGS_auth_user_or_role_name_regex));
|
||||
@ -671,9 +989,10 @@ void SingleNodeMain() {
|
||||
CHECK(server.Start()) << "Couldn't start the Bolt server!";
|
||||
server.AwaitShutdown();
|
||||
query::procedure::gModuleRegistry.UnloadAllModules();
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
google::SetUsageMessage("Memgraph database server");
|
||||
return WithInit(argc, argv, SingleNodeMain);
|
||||
Py_END_ALLOW_THREADS;
|
||||
// Shutdown Python
|
||||
Py_Finalize();
|
||||
PyMem_RawFree(program_name);
|
||||
return 0;
|
||||
}
|
||||
|
@ -1,267 +0,0 @@
|
||||
#include "memgraph_init.hpp"
|
||||
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include "config.hpp"
|
||||
#include "glue/communication.hpp"
|
||||
#include "py/py.hpp"
|
||||
#include "query/exceptions.hpp"
|
||||
#include "query/procedure/py_module.hpp"
|
||||
#include "requests/requests.hpp"
|
||||
#include "storage/v2/view.hpp"
|
||||
#include "utils/signals.hpp"
|
||||
#include "utils/sysinfo/memory.hpp"
|
||||
#include "utils/terminate_handler.hpp"
|
||||
#include "version.hpp"
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
#include "glue/auth.hpp"
|
||||
#endif
|
||||
|
||||
DEFINE_string(log_file, "", "Path to where the log should be stored.");
|
||||
DEFINE_HIDDEN_string(
|
||||
log_link_basename, "",
|
||||
"Basename used for symlink creation to the last log file.");
|
||||
DEFINE_uint64(memory_warning_threshold, 1024,
|
||||
"Memory warning threshold, in MB. If Memgraph detects there is "
|
||||
"less available RAM it will log a warning. Set to 0 to "
|
||||
"disable.");
|
||||
DEFINE_string(bolt_server_name_for_init, "",
|
||||
"Server name which the database should send to the client in the "
|
||||
"Bolt INIT message.");
|
||||
|
||||
BoltSession::BoltSession(SessionData *data,
|
||||
const io::network::Endpoint &endpoint,
|
||||
communication::InputStream *input_stream,
|
||||
communication::OutputStream *output_stream)
|
||||
: communication::bolt::Session<communication::InputStream,
|
||||
communication::OutputStream>(input_stream,
|
||||
output_stream),
|
||||
db_(data->db),
|
||||
interpreter_(data->interpreter_context),
|
||||
#ifdef MG_ENTERPRISE
|
||||
#ifndef MG_SINGLE_NODE_HA
|
||||
auth_(data->auth),
|
||||
audit_log_(data->audit_log),
|
||||
#endif
|
||||
#endif
|
||||
endpoint_(endpoint) {
|
||||
}
|
||||
|
||||
using TEncoder =
|
||||
communication::bolt::Session<communication::InputStream,
|
||||
communication::OutputStream>::TEncoder;
|
||||
|
||||
std::vector<std::string> BoltSession::Interpret(
|
||||
const std::string &query,
|
||||
const std::map<std::string, communication::bolt::Value> ¶ms) {
|
||||
std::map<std::string, storage::PropertyValue> params_pv;
|
||||
for (const auto &kv : params)
|
||||
params_pv.emplace(kv.first, glue::ToPropertyValue(kv.second));
|
||||
#ifdef MG_ENTERPRISE
|
||||
#ifndef MG_SINGLE_NODE_HA
|
||||
audit_log_->Record(endpoint_.address(), user_ ? user_->username() : "", query,
|
||||
storage::PropertyValue(params_pv));
|
||||
#endif
|
||||
#endif
|
||||
try {
|
||||
auto result = interpreter_.Prepare(query, params_pv);
|
||||
#ifdef MG_ENTERPRISE
|
||||
#ifndef MG_SINGLE_NODE_HA
|
||||
if (user_) {
|
||||
const auto &permissions = user_->GetPermissions();
|
||||
for (const auto &privilege : result.second) {
|
||||
if (permissions.Has(glue::PrivilegeToPermission(privilege)) !=
|
||||
auth::PermissionLevel::GRANT) {
|
||||
interpreter_.Abort();
|
||||
throw communication::bolt::ClientError(
|
||||
"You are not authorized to execute this query! Please contact "
|
||||
"your database administrator.");
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
#endif
|
||||
return result.first;
|
||||
|
||||
} catch (const query::QueryException &e) {
|
||||
// Wrap QueryException into ClientError, because we want to allow the
|
||||
// client to fix their query.
|
||||
throw communication::bolt::ClientError(e.what());
|
||||
}
|
||||
}
|
||||
|
||||
std::map<std::string, communication::bolt::Value> BoltSession::PullAll(
|
||||
TEncoder *encoder) {
|
||||
try {
|
||||
TypedValueResultStream stream(encoder, db_);
|
||||
const auto &summary = interpreter_.PullAll(&stream);
|
||||
std::map<std::string, communication::bolt::Value> decoded_summary;
|
||||
for (const auto &kv : summary) {
|
||||
auto maybe_value = glue::ToBoltValue(kv.second, *db_, storage::View::NEW);
|
||||
if (maybe_value.HasError()) {
|
||||
switch (maybe_value.GetError()) {
|
||||
case storage::Error::DELETED_OBJECT:
|
||||
case storage::Error::SERIALIZATION_ERROR:
|
||||
case storage::Error::VERTEX_HAS_EDGES:
|
||||
case storage::Error::PROPERTIES_DISABLED:
|
||||
case storage::Error::NONEXISTENT_OBJECT:
|
||||
throw communication::bolt::ClientError(
|
||||
"Unexpected storage error when streaming summary.");
|
||||
}
|
||||
}
|
||||
decoded_summary.emplace(kv.first, std::move(*maybe_value));
|
||||
}
|
||||
return decoded_summary;
|
||||
} catch (const query::QueryException &e) {
|
||||
// Wrap QueryException into ClientError, because we want to allow the
|
||||
// client to fix their query.
|
||||
throw communication::bolt::ClientError(e.what());
|
||||
}
|
||||
}
|
||||
|
||||
void BoltSession::Abort() { interpreter_.Abort(); }
|
||||
|
||||
bool BoltSession::Authenticate(const std::string &username,
|
||||
const std::string &password) {
|
||||
#ifdef MG_ENTERPRISE
|
||||
#ifdef MG_SINGLE_NODE_HA
|
||||
return true;
|
||||
#else
|
||||
if (!auth_->HasUsers()) return true;
|
||||
user_ = auth_->Authenticate(username, password);
|
||||
return !!user_;
|
||||
#endif
|
||||
#else
|
||||
return true;
|
||||
#endif
|
||||
}
|
||||
|
||||
std::optional<std::string> BoltSession::GetServerNameForInit() {
|
||||
if (FLAGS_bolt_server_name_for_init.empty()) return std::nullopt;
|
||||
return FLAGS_bolt_server_name_for_init;
|
||||
}
|
||||
|
||||
BoltSession::TypedValueResultStream::TypedValueResultStream(
|
||||
TEncoder *encoder, const storage::Storage *db)
|
||||
: encoder_(encoder), db_(db) {}
|
||||
|
||||
void BoltSession::TypedValueResultStream::Result(
|
||||
const std::vector<query::TypedValue> &values) {
|
||||
std::vector<communication::bolt::Value> decoded_values;
|
||||
decoded_values.reserve(values.size());
|
||||
for (const auto &v : values) {
|
||||
auto maybe_value = glue::ToBoltValue(v, *db_, storage::View::NEW);
|
||||
if (maybe_value.HasError()) {
|
||||
switch (maybe_value.GetError()) {
|
||||
case storage::Error::DELETED_OBJECT:
|
||||
throw communication::bolt::ClientError(
|
||||
"Returning a deleted object as a result.");
|
||||
case storage::Error::NONEXISTENT_OBJECT:
|
||||
throw communication::bolt::ClientError(
|
||||
"Returning a nonexistent object as a result.");
|
||||
case storage::Error::VERTEX_HAS_EDGES:
|
||||
case storage::Error::SERIALIZATION_ERROR:
|
||||
case storage::Error::PROPERTIES_DISABLED:
|
||||
throw communication::bolt::ClientError(
|
||||
"Unexpected storage error when streaming results.");
|
||||
}
|
||||
}
|
||||
decoded_values.emplace_back(std::move(*maybe_value));
|
||||
}
|
||||
encoder_->MessageRecord(decoded_values);
|
||||
}
|
||||
|
||||
// Needed to correctly handle memgraph destruction from a signal handler.
|
||||
// Without having some sort of a flag, it is possible that a signal is handled
|
||||
// when we are exiting main, inside destructors of database::GraphDb and
|
||||
// similar. The signal handler may then initiate another shutdown on memgraph
|
||||
// which is in half destructed state, causing invalid memory access and crash.
|
||||
volatile sig_atomic_t is_shutting_down = 0;
|
||||
|
||||
void InitSignalHandlers(const std::function<void()> &shutdown_fun) {
|
||||
// Prevent handling shutdown inside a shutdown. For example, SIGINT handler
|
||||
// being interrupted by SIGTERM before is_shutting_down is set, thus causing
|
||||
// double shutdown.
|
||||
sigset_t block_shutdown_signals;
|
||||
sigemptyset(&block_shutdown_signals);
|
||||
sigaddset(&block_shutdown_signals, SIGTERM);
|
||||
sigaddset(&block_shutdown_signals, SIGINT);
|
||||
|
||||
// Wrap the shutdown function in a safe way to prevent recursive shutdown.
|
||||
auto shutdown = [shutdown_fun]() {
|
||||
if (is_shutting_down) return;
|
||||
is_shutting_down = 1;
|
||||
shutdown_fun();
|
||||
};
|
||||
|
||||
CHECK(utils::SignalHandler::RegisterHandler(utils::Signal::Terminate,
|
||||
shutdown, block_shutdown_signals))
|
||||
<< "Unable to register SIGTERM handler!";
|
||||
CHECK(utils::SignalHandler::RegisterHandler(utils::Signal::Interupt, shutdown,
|
||||
block_shutdown_signals))
|
||||
<< "Unable to register SIGINT handler!";
|
||||
|
||||
// Setup SIGUSR1 to be used for reopening log files, when e.g. logrotate
|
||||
// rotates our logs.
|
||||
CHECK(utils::SignalHandler::RegisterHandler(utils::Signal::User1, []() {
|
||||
google::CloseLogDestination(google::INFO);
|
||||
})) << "Unable to register SIGUSR1 handler!";
|
||||
}
|
||||
|
||||
int WithInit(int argc, char **argv,
|
||||
const std::function<void()> &memgraph_main) {
|
||||
gflags::SetVersionString(version_string);
|
||||
|
||||
// Load config before parsing arguments, so that flags from the command line
|
||||
// overwrite the config.
|
||||
LoadConfig();
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
|
||||
google::InitGoogleLogging(argv[0]);
|
||||
google::SetLogDestination(google::INFO, FLAGS_log_file.c_str());
|
||||
google::SetLogSymlink(google::INFO, FLAGS_log_link_basename.c_str());
|
||||
|
||||
// Unhandled exception handler init.
|
||||
std::set_terminate(&utils::TerminateHandler);
|
||||
|
||||
// Initialize Python
|
||||
auto *program_name = Py_DecodeLocale(argv[0], nullptr);
|
||||
CHECK(program_name);
|
||||
// Set program name, so Python can find its way to runtime libraries relative
|
||||
// to executable.
|
||||
Py_SetProgramName(program_name);
|
||||
PyImport_AppendInittab("_mgp", &query::procedure::PyInitMgpModule);
|
||||
Py_InitializeEx(0 /* = initsigs */);
|
||||
PyEval_InitThreads();
|
||||
Py_BEGIN_ALLOW_THREADS;
|
||||
// Initialize the communication library.
|
||||
communication::Init();
|
||||
|
||||
// Start memory warning logger.
|
||||
utils::Scheduler mem_log_scheduler;
|
||||
if (FLAGS_memory_warning_threshold > 0) {
|
||||
auto free_ram = utils::sysinfo::AvailableMemoryKilobytes();
|
||||
if (free_ram) {
|
||||
mem_log_scheduler.Run("Memory warning", std::chrono::seconds(3), [] {
|
||||
auto free_ram = utils::sysinfo::AvailableMemoryKilobytes();
|
||||
if (free_ram && *free_ram / 1024 < FLAGS_memory_warning_threshold)
|
||||
LOG(WARNING) << "Running out of available RAM, only "
|
||||
<< *free_ram / 1024 << " MB left.";
|
||||
});
|
||||
} else {
|
||||
// Kernel version for the `MemAvailable` value is from: man procfs
|
||||
LOG(WARNING) << "You have an older kernel version (<3.14) or the /proc "
|
||||
"filesystem isn't available so remaining memory warnings "
|
||||
"won't be available.";
|
||||
}
|
||||
}
|
||||
requests::Init();
|
||||
|
||||
memgraph_main();
|
||||
Py_END_ALLOW_THREADS;
|
||||
// Shutdown Python
|
||||
Py_Finalize();
|
||||
PyMem_RawFree(program_name);
|
||||
return 0;
|
||||
}
|
@ -1,135 +0,0 @@
|
||||
/// @file
|
||||
#pragma once
|
||||
|
||||
#include <csignal>
|
||||
#include <filesystem>
|
||||
#include <map>
|
||||
#include <optional>
|
||||
#include <string>
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
|
||||
#include "communication/bolt/v1/exceptions.hpp"
|
||||
#include "communication/bolt/v1/session.hpp"
|
||||
#include "communication/init.hpp"
|
||||
#include "communication/session.hpp"
|
||||
#include "query/interpreter.hpp"
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
#include "audit/log.hpp"
|
||||
#include "auth/auth.hpp"
|
||||
#endif
|
||||
|
||||
/// Encapsulates Dbms and Interpreter that are passed through the network server
|
||||
/// and worker to the session.
|
||||
#ifdef MG_ENTERPRISE
|
||||
struct SessionData {
|
||||
// Explicit constructor here to ensure that pointers to all objects are
|
||||
// supplied.
|
||||
SessionData(storage::Storage *db,
|
||||
query::InterpreterContext *interpreter_context, auth::Auth *auth,
|
||||
audit::Log *audit_log)
|
||||
: db(db),
|
||||
interpreter_context(interpreter_context),
|
||||
auth(auth),
|
||||
audit_log(audit_log) {}
|
||||
storage::Storage *db;
|
||||
query::InterpreterContext *interpreter_context;
|
||||
auth::Auth *auth;
|
||||
audit::Log *audit_log;
|
||||
};
|
||||
#else
|
||||
struct SessionData {
|
||||
// Explicit constructor here to ensure that pointers to all objects are
|
||||
// supplied.
|
||||
SessionData(storage::Storage *db,
|
||||
query::InterpreterContext *interpreter_context)
|
||||
: db(db), interpreter_context(interpreter_context) {}
|
||||
storage::Storage *db;
|
||||
query::InterpreterContext *interpreter_context;
|
||||
};
|
||||
#endif
|
||||
|
||||
class BoltSession final
|
||||
: public communication::bolt::Session<communication::InputStream,
|
||||
communication::OutputStream> {
|
||||
public:
|
||||
BoltSession(SessionData *data, const io::network::Endpoint &endpoint,
|
||||
communication::InputStream *input_stream,
|
||||
communication::OutputStream *output_stream);
|
||||
|
||||
using communication::bolt::Session<communication::InputStream,
|
||||
communication::OutputStream>::TEncoder;
|
||||
|
||||
std::vector<std::string> Interpret(
|
||||
const std::string &query,
|
||||
const std::map<std::string, communication::bolt::Value> ¶ms) override;
|
||||
|
||||
std::map<std::string, communication::bolt::Value> PullAll(
|
||||
TEncoder *encoder) override;
|
||||
|
||||
void Abort() override;
|
||||
|
||||
bool Authenticate(const std::string &username,
|
||||
const std::string &password) override;
|
||||
|
||||
std::optional<std::string> GetServerNameForInit() override;
|
||||
|
||||
private:
|
||||
/// Wrapper around TEncoder which converts TypedValue to Value
|
||||
/// before forwarding the calls to original TEncoder.
|
||||
class TypedValueResultStream {
|
||||
public:
|
||||
TypedValueResultStream(TEncoder *encoder, const storage::Storage *db);
|
||||
|
||||
void Result(const std::vector<query::TypedValue> &values);
|
||||
|
||||
private:
|
||||
TEncoder *encoder_;
|
||||
// NOTE: Needed only for ToBoltValue conversions
|
||||
const storage::Storage *db_;
|
||||
};
|
||||
|
||||
// NOTE: Needed only for ToBoltValue conversions
|
||||
const storage::Storage *db_;
|
||||
query::Interpreter interpreter_;
|
||||
#ifdef MG_ENTERPRISE
|
||||
#ifndef MG_SINGLE_NODE_HA
|
||||
auth::Auth *auth_;
|
||||
std::optional<auth::User> user_;
|
||||
audit::Log *audit_log_;
|
||||
#endif
|
||||
#endif
|
||||
io::network::Endpoint endpoint_;
|
||||
};
|
||||
|
||||
/// Set up signal handlers and register `shutdown` on SIGTERM and SIGINT.
|
||||
/// In most cases you don't have to call this. If you are using a custom server
|
||||
/// startup function for `WithInit`, then you probably need to use this to
|
||||
/// shutdown your server.
|
||||
void InitSignalHandlers(const std::function<void()> &shutdown_fun);
|
||||
|
||||
/// Run the Memgraph server.
|
||||
///
|
||||
/// Sets up all the required state before running `memgraph_main` and does any
|
||||
/// required cleanup afterwards.
|
||||
///
|
||||
/// Command line arguments and configuration files are read before calling any
|
||||
/// of the supplied functions. Therefore, you should use flags only from those
|
||||
/// functions, and *not before* invoking `WithInit`.
|
||||
///
|
||||
/// This should be the first and last thing a OS specific main function does.
|
||||
///
|
||||
/// A common example of usage is:
|
||||
///
|
||||
/// @code
|
||||
/// int main(int argc, char *argv[]) {
|
||||
/// return WithInit(argc, argv, SingleNodeMain);
|
||||
/// }
|
||||
/// @endcode
|
||||
///
|
||||
/// If you wish to start Memgraph server in another way, you can pass a
|
||||
/// `memgraph_main` functions which does that. You should take care to call
|
||||
/// `InitSignalHandlers` with appropriate function to shutdown the server you
|
||||
/// started.
|
||||
int WithInit(int argc, char **argv, const std::function<void()> &memgraph_main);
|
Loading…
Reference in New Issue
Block a user