From adb65b2ffff0126ad8e214496e80ce99d1bae39b Mon Sep 17 00:00:00 2001 From: Gareth Andrew Lloyd Date: Fri, 18 Aug 2023 17:23:15 +0100 Subject: [PATCH] Refactor memgraph.cpp (#1164) --- CMakeLists.txt | 4 + src/CMakeLists.txt | 12 +- src/communication/CMakeLists.txt | 1 + src/communication/buffer.hpp | 9 +- src/communication/http/server.hpp | 31 +- src/communication/v2/server.hpp | 75 ++- src/data_structures/ring_buffer.hpp | 4 +- src/flags/all.hpp | 18 + src/flags/audit.cpp | 28 + src/flags/audit.hpp | 23 + src/flags/bolt.cpp | 38 ++ src/flags/bolt.hpp | 27 + src/flags/general.cpp | 194 ++++++ src/flags/general.hpp | 122 ++++ src/flags/isolation_level.cpp | 54 ++ src/flags/isolation_level.hpp | 19 + src/flags/log_level.cpp | 101 ++++ src/flags/log_level.hpp | 18 + src/flags/memory_limit.cpp | 41 ++ src/flags/memory_limit.hpp | 17 + src/glue/CMakeLists.txt | 8 +- src/glue/MonitoringServerT.cpp | 14 + src/glue/MonitoringServerT.hpp | 25 + src/glue/ServerT.cpp | 17 + src/glue/ServerT.hpp | 35 ++ src/glue/SessionHL.cpp | 386 ++++++++++++ src/glue/SessionHL.hpp | 161 +++++ src/helpers.hpp | 4 +- src/http_handlers/metrics.hpp | 2 + src/memgraph.cpp | 887 +--------------------------- src/query/procedure/module.cpp | 2 +- src/query/procedure/module.hpp | 5 +- src/utils/exceptions.hpp | 4 +- src/utils/signals.hpp | 5 +- src/utils/terminate_handler.hpp | 6 +- src/version.hpp.in | 2 +- 36 files changed, 1467 insertions(+), 932 deletions(-) create mode 100644 src/flags/all.hpp create mode 100644 src/flags/audit.cpp create mode 100644 src/flags/audit.hpp create mode 100644 src/flags/bolt.cpp create mode 100644 src/flags/bolt.hpp create mode 100644 src/flags/general.cpp create mode 100644 src/flags/general.hpp create mode 100644 src/flags/isolation_level.cpp create mode 100644 src/flags/isolation_level.hpp create mode 100644 src/flags/log_level.cpp create mode 100644 src/flags/log_level.hpp create mode 100644 src/flags/memory_limit.cpp create mode 100644 src/flags/memory_limit.hpp create mode 100644 src/glue/MonitoringServerT.cpp create mode 100644 src/glue/MonitoringServerT.hpp create mode 100644 src/glue/ServerT.cpp create mode 100644 src/glue/ServerT.hpp create mode 100644 src/glue/SessionHL.cpp create mode 100644 src/glue/SessionHL.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index a6443bd3d..f04f079bd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -42,6 +42,10 @@ endif() project(memgraph LANGUAGES C CXX) +# NOTE: once in a while this needs to be toggled to check headers are +# correct and PCH isn't masking any include issues +set(CMAKE_DISABLE_PRECOMPILE_HEADERS OFF) + #TODO: upgrade to cmake 3.24 + CheckIPOSupported #cmake_policy(SET CMP0138 NEW) #include(CheckIPOSupported) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 91e1d31e5..c3483779d 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -31,15 +31,19 @@ include_directories(${CMAKE_CURRENT_BINARY_DIR}) # ---------------------------------------------------------------------------- set(mg_single_node_v2_sources memgraph.cpp + flags/isolation_level.cpp + flags/memory_limit.cpp + flags/log_level.cpp + flags/general.cpp + flags/audit.cpp + flags/bolt.cpp ) -set(mg_single_node_v2_libs stdc++fs Threads::Threads - mg-telemetry mg-query mg-communication mg-memory mg-utils mg-auth mg-license mg-settings mg-glue mg-audit) - # memgraph main executable add_executable(memgraph ${mg_single_node_v2_sources}) target_include_directories(memgraph PUBLIC ${CMAKE_SOURCE_DIR}/include) -target_link_libraries(memgraph ${mg_single_node_v2_libs}) +target_link_libraries(memgraph stdc++fs Threads::Threads + mg-telemetry mg-communication mg-memory mg-utils mg-license mg-settings mg-glue) # NOTE: `include/mg_procedure.syms` describes a pattern match for symbols which # should be dynamically exported, so that `dlopen` can correctly link the diff --git a/src/communication/CMakeLists.txt b/src/communication/CMakeLists.txt index a50e062d1..2159f8472 100644 --- a/src/communication/CMakeLists.txt +++ b/src/communication/CMakeLists.txt @@ -22,3 +22,4 @@ target_link_libraries(mg-communication Boost::headers Threads::Threads mg-utils find_package(OpenSSL REQUIRED) target_link_libraries(mg-communication ${OPENSSL_LIBRARIES}) target_include_directories(mg-communication SYSTEM PUBLIC ${OPENSSL_INCLUDE_DIR}) +target_precompile_headers(mg-communication INTERFACE http/server.hpp bolt/v1/session.hpp) diff --git a/src/communication/buffer.hpp b/src/communication/buffer.hpp index 9843edc95..63aae98e3 100644 --- a/src/communication/buffer.hpp +++ b/src/communication/buffer.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -43,6 +43,7 @@ class Buffer final { Buffer(Buffer &&) = delete; Buffer &operator=(const Buffer &) = delete; Buffer &operator=(Buffer &&) = delete; + ~Buffer() = default; /** * This class provides all functions from the buffer that are needed to allow @@ -50,12 +51,13 @@ class Buffer final { */ class ReadEnd { public: - ReadEnd(Buffer *buffer); + explicit ReadEnd(Buffer *buffer); ReadEnd(const ReadEnd &) = delete; ReadEnd(ReadEnd &&) = delete; ReadEnd &operator=(const ReadEnd &) = delete; ReadEnd &operator=(ReadEnd &&) = delete; + ~ReadEnd() = default; uint8_t *data(); @@ -77,12 +79,13 @@ class Buffer final { */ class WriteEnd { public: - WriteEnd(Buffer *buffer); + explicit WriteEnd(Buffer *buffer); WriteEnd(const WriteEnd &) = delete; WriteEnd(WriteEnd &&) = delete; WriteEnd &operator=(const WriteEnd &) = delete; WriteEnd &operator=(WriteEnd &&) = delete; + ~WriteEnd() = default; io::network::StreamBuffer Allocate(); diff --git a/src/communication/http/server.hpp b/src/communication/http/server.hpp index 7b23e5788..3ce5e96ae 100644 --- a/src/communication/http/server.hpp +++ b/src/communication/http/server.hpp @@ -26,10 +26,7 @@ class Server final { using tcp = boost::asio::ip::tcp; public: - explicit Server(io::network::Endpoint endpoint, TSessionContext *session_context, ServerContext *context) - : listener_{Listener::Create( - ioc_, session_context, context, - tcp::endpoint{boost::asio::ip::make_address(endpoint.address), endpoint.port})} {} + explicit Server(io::network::Endpoint endpoint, TSessionContext *session_context, ServerContext *context); Server(const Server &) = delete; Server(Server &&) = delete; @@ -41,11 +38,7 @@ class Server final { "Server wasn't shutdown properly"); } - void Start() { - MG_ASSERT(!background_thread_, "The server was already started!"); - listener_->Run(); - background_thread_.emplace([this] { ioc_.run(); }); - } + void Start(); void Shutdown() { ioc_.stop(); } @@ -55,7 +48,7 @@ class Server final { } } bool IsRunning() const { return background_thread_ && !ioc_.stopped(); } - tcp::endpoint GetEndpoint() const { return listener_->GetEndpoint(); } + tcp::endpoint GetEndpoint() const; private: boost::asio::io_context ioc_; @@ -63,4 +56,22 @@ class Server final { std::shared_ptr> listener_; std::optional background_thread_; }; +template +Server::Server(io::network::Endpoint endpoint, TSessionContext *session_context, + ServerContext *context) + : listener_{Listener::Create( + ioc_, session_context, context, + tcp::endpoint{boost::asio::ip::make_address(endpoint.address), endpoint.port})} {} + +template +void Server::Start() { + MG_ASSERT(!background_thread_, "The server was already started!"); + listener_->Run(); + background_thread_.emplace([this] { ioc_.run(); }); +} + +template +boost::asio::ip::tcp::endpoint Server::GetEndpoint() const { + return listener_->GetEndpoint(); +} } // namespace memgraph::communication::http diff --git a/src/communication/v2/server.hpp b/src/communication/v2/server.hpp index 95c0af94c..a0c261315 100644 --- a/src/communication/v2/server.hpp +++ b/src/communication/v2/server.hpp @@ -73,40 +73,19 @@ class Server final { * invokes workers_count workers */ Server(ServerEndpoint &endpoint, TSessionContext *session_context, ServerContext *server_context, - const int inactivity_timeout_sec, const std::string_view service_name, - size_t workers_count = std::thread::hardware_concurrency()) - : endpoint_{endpoint}, - service_name_{service_name}, - context_thread_pool_{workers_count}, - listener_{Listener::Create(context_thread_pool_.GetIOContext(), session_context, - server_context, endpoint_, service_name_, - inactivity_timeout_sec)} {} + int inactivity_timeout_sec, std::string_view service_name, + size_t workers_count = std::thread::hardware_concurrency()); - ~Server() { MG_ASSERT(!IsRunning(), "Server wasn't shutdown properly"); } + ~Server(); Server(const Server &) = delete; Server(Server &&) = delete; Server &operator=(const Server &) = delete; Server &operator=(Server &&) = delete; - const auto &Endpoint() const { - MG_ASSERT(IsRunning(), "You can't get the server endpoint when it's not running!"); - return endpoint_; - } + const auto &Endpoint() const; - bool Start() { - if (IsRunning()) { - spdlog::error("The server is already running"); - return false; - } - listener_->Start(); - - spdlog::info("{} server is fully armed and operational", service_name_); - spdlog::info("{} listening on {}", service_name_, endpoint_.address()); - context_thread_pool_.Run(); - - return true; - } + bool Start(); void Shutdown() { context_thread_pool_.Shutdown(); @@ -115,7 +94,7 @@ class Server final { void AwaitShutdown() { context_thread_pool_.AwaitShutdown(); } - bool IsRunning() const noexcept { return context_thread_pool_.IsRunning() && listener_->IsRunning(); } + bool IsRunning() const noexcept; private: ServerEndpoint endpoint_; @@ -125,4 +104,46 @@ class Server final { std::shared_ptr> listener_; }; +template +Server::~Server() { + MG_ASSERT(!IsRunning(), "Server wasn't shutdown properly"); +} + +template +Server::Server(ServerEndpoint &endpoint, TSessionContext *session_context, + ServerContext *server_context, const int inactivity_timeout_sec, + const std::string_view service_name, size_t workers_count) + : endpoint_{endpoint}, + service_name_{service_name}, + context_thread_pool_{workers_count}, + listener_{Listener::Create(context_thread_pool_.GetIOContext(), session_context, + server_context, endpoint_, service_name_, + inactivity_timeout_sec)} {} + +template +bool Server::Start() { + if (IsRunning()) { + spdlog::error("The server is already running"); + return false; + } + listener_->Start(); + + spdlog::info("{} server is fully armed and operational", service_name_); + spdlog::info("{} listening on {}", service_name_, endpoint_.address()); + context_thread_pool_.Run(); + + return true; +} + +template +const auto &Server::Endpoint() const { + MG_ASSERT(IsRunning(), "You can't get the server endpoint when it's not running!"); + return endpoint_; +} + +template +bool Server::IsRunning() const noexcept { + return context_thread_pool_.IsRunning() && listener_->IsRunning(); +} + } // namespace memgraph::communication::v2 diff --git a/src/data_structures/ring_buffer.hpp b/src/data_structures/ring_buffer.hpp index c583d4309..5b4ea0493 100644 --- a/src/data_structures/ring_buffer.hpp +++ b/src/data_structures/ring_buffer.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -31,7 +31,7 @@ template class RingBuffer { public: - explicit RingBuffer(int capacity) : capacity_(capacity) { buffer_ = std::make_unique(capacity_); } + explicit RingBuffer(int capacity) : capacity_(capacity), buffer_{std::make_unique(capacity_)} {} RingBuffer(const RingBuffer &) = delete; RingBuffer(RingBuffer &&) = delete; diff --git a/src/flags/all.hpp b/src/flags/all.hpp new file mode 100644 index 000000000..6a0309133 --- /dev/null +++ b/src/flags/all.hpp @@ -0,0 +1,18 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +#pragma once + +#include "flags/audit.hpp" +#include "flags/bolt.hpp" +#include "flags/general.hpp" +#include "flags/isolation_level.hpp" +#include "flags/log_level.hpp" +#include "flags/memory_limit.hpp" diff --git a/src/flags/audit.cpp b/src/flags/audit.cpp new file mode 100644 index 000000000..864f8f66c --- /dev/null +++ b/src/flags/audit.cpp @@ -0,0 +1,28 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +#include "flags/audit.hpp" + +#include "audit/log.hpp" + +#include "utils/flag_validation.hpp" + +// Audit logging flags. +#ifdef MG_ENTERPRISE +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_bool(audit_enabled, false, "Set to true to enable audit logging."); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_VALIDATED_int32(audit_buffer_size, memgraph::audit::kBufferSizeDefault, + "Maximum number of items in the audit log buffer.", FLAG_IN_RANGE(1, INT32_MAX)); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_VALIDATED_int32(audit_buffer_flush_interval_ms, memgraph::audit::kBufferFlushIntervalMillisDefault, + "Interval (in milliseconds) used for flushing the audit log buffer.", + FLAG_IN_RANGE(10, INT32_MAX)); +#endif diff --git a/src/flags/audit.hpp b/src/flags/audit.hpp new file mode 100644 index 000000000..1cbf70e89 --- /dev/null +++ b/src/flags/audit.hpp @@ -0,0 +1,23 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +#pragma once + +#include "gflags/gflags.h" + +// Audit logging flags. +#ifdef MG_ENTERPRISE +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_bool(audit_enabled); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_int32(audit_buffer_size); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_int32(audit_buffer_flush_interval_ms); +#endif diff --git a/src/flags/bolt.cpp b/src/flags/bolt.cpp new file mode 100644 index 000000000..9401acf3d --- /dev/null +++ b/src/flags/bolt.cpp @@ -0,0 +1,38 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +#include "flags/bolt.hpp" + +#include "utils/flag_validation.hpp" + +#include +#include + +// Bolt server flags. +DEFINE_string(bolt_address, "0.0.0.0", "IP address on which the Bolt server should listen."); + +DEFINE_VALIDATED_int32(bolt_port, 7687, "Port on which the Bolt server should listen.", + FLAG_IN_RANGE(0, std::numeric_limits::max())); + +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_VALIDATED_int32(bolt_num_workers, std::max(std::thread::hardware_concurrency(), 1U), + "Number of workers used by the Bolt server. By default, this will be the " + "number of processing units available on the machine.", + FLAG_IN_RANGE(1, INT32_MAX)); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_VALIDATED_int32(bolt_session_inactivity_timeout, 1800, + "Time in seconds after which inactive Bolt sessions will be " + "closed.", + FLAG_IN_RANGE(1, INT32_MAX)); + +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_string(bolt_cert_file, "", "Certificate file which should be used for the Bolt server."); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_string(bolt_key_file, "", "Key file which should be used for the Bolt server."); diff --git a/src/flags/bolt.hpp b/src/flags/bolt.hpp new file mode 100644 index 000000000..f03b75e99 --- /dev/null +++ b/src/flags/bolt.hpp @@ -0,0 +1,27 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +#pragma once + +#include "gflags/gflags.h" + +// Bolt server flags. +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_string(bolt_address); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_int32(bolt_port); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_int32(bolt_num_workers); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_int32(bolt_session_inactivity_timeout); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_string(bolt_cert_file); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_string(bolt_key_file); diff --git a/src/flags/general.cpp b/src/flags/general.cpp new file mode 100644 index 000000000..30e8dd4ad --- /dev/null +++ b/src/flags/general.cpp @@ -0,0 +1,194 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include "general.hpp" + +#include "utils/flag_validation.hpp" + +#include "glue/auth_handler.hpp" + +// Short help flag. +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_HIDDEN_bool(h, false, "Print usage and exit."); + +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_string(monitoring_address, "0.0.0.0", + "IP address on which the websocket server for Memgraph monitoring should listen."); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_string(metrics_address, "0.0.0.0", + "IP address on which the Memgraph server for exposing metrics should listen."); + +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_VALIDATED_int32(monitoring_port, 7444, + "Port on which the websocket server for Memgraph monitoring should listen.", + FLAG_IN_RANGE(0, std::numeric_limits::max())); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_VALIDATED_int32(metrics_port, 9091, "Port on which the Memgraph server for exposing metrics should listen.", + FLAG_IN_RANGE(0, std::numeric_limits::max())); + +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_string(init_file, "", + "Path to cypherl file that is used for configuring users and database schema before server starts."); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_string(init_data_file, "", "Path to cypherl file that is used for creating data after server starts."); + +// General purpose flags. +// NOTE: The `data_directory` flag must be the same here and in +// `mg_import_csv`. If you change it, make sure to change it there as well. +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_string(data_directory, "mg_data", "Path to directory in which to save all permanent data."); + +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_bool(data_recovery_on_startup, false, "Controls whether the database recovers persisted data on startup."); + +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_uint64(memory_warning_threshold, 1024, + "Memory warning threshold, in MB. If Memgraph detects there is " + "less available RAM it will log a warning. Set to 0 to " + "disable."); + +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_bool(allow_load_csv, true, "Controls whether LOAD CSV clause is allowed in queries."); + +// Storage flags. +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_VALIDATED_uint64(storage_gc_cycle_sec, 30, "Storage garbage collector interval (in seconds).", + FLAG_IN_RANGE(1, 24 * 3600)); +// NOTE: The `storage_properties_on_edges` flag must be the same here and in +// `mg_import_csv`. If you change it, make sure to change it there as well. +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_bool(storage_properties_on_edges, false, "Controls whether edges have properties."); + +// storage_recover_on_startup deprecated; use data_recovery_on_startup instead +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_HIDDEN_bool(storage_recover_on_startup, false, + "Controls whether the storage recovers persisted data on startup."); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_VALIDATED_uint64(storage_snapshot_interval_sec, 0, + "Storage snapshot creation interval (in seconds). Set " + "to 0 to disable periodic snapshot creation.", + FLAG_IN_RANGE(0, 7 * 24 * 3600)); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_bool(storage_wal_enabled, false, + "Controls whether the storage uses write-ahead-logging. To enable " + "WAL periodic snapshots must be enabled."); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_VALIDATED_uint64(storage_snapshot_retention_count, 3, "The number of snapshots that should always be kept.", + FLAG_IN_RANGE(1, 1000000)); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_VALIDATED_uint64(storage_wal_file_size_kib, memgraph::storage::Config::Durability().wal_file_size_kibibytes, + "Minimum file size of each WAL file.", + FLAG_IN_RANGE(1, static_cast(1000) * 1024)); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_VALIDATED_uint64(storage_wal_file_flush_every_n_tx, + memgraph::storage::Config::Durability().wal_file_flush_every_n_tx, + "Issue a 'fsync' call after this amount of transactions are written to the " + "WAL file. Set to 1 for fully synchronous operation.", + FLAG_IN_RANGE(1, 1000000)); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_bool(storage_snapshot_on_exit, false, "Controls whether the storage creates another snapshot on exit."); + +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_uint64(storage_items_per_batch, memgraph::storage::Config::Durability().items_per_batch, + "The number of edges and vertices stored in a batch in a snapshot file."); + +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_bool(storage_parallel_index_recovery, false, + "Controls whether the index creation can be done in a multithreaded fashion."); + +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_uint64(storage_recovery_thread_count, + std::max(static_cast(std::thread::hardware_concurrency()), + memgraph::storage::Config::Durability().recovery_thread_count), + "The number of threads used to recover persisted data from disk."); + +#ifdef MG_ENTERPRISE +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_bool(storage_delete_on_drop, true, + "If set to true the query 'DROP DATABASE x' will delete the underlying storage as well."); +#endif + +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_bool(telemetry_enabled, false, + "Set to true to enable telemetry. We collect information about the " + "running system (CPU and memory information) and information about " + "the database runtime (vertex and edge counts and resource usage) " + "to allow for easier improvement of the product."); + +// Streams flags +// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_uint32( + stream_transaction_conflict_retries, 30, + "Number of times to retry when a stream transformation fails to commit because of conflicting transactions"); +// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_uint32( + stream_transaction_retry_interval, 500, + "Retry interval in milliseconds when a stream transformation fails to commit because of conflicting transactions"); +// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_string(kafka_bootstrap_servers, "", + "List of default Kafka brokers as a comma separated list of broker host or host:port."); + +// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_string(pulsar_service_url, "", "Default URL used while connecting to Pulsar brokers."); + +// Query flags. + +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_double(query_execution_timeout_sec, 600, + "Maximum allowed query execution time. Queries exceeding this " + "limit will be aborted. Value of 0 means no limit."); + +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_uint64(replication_replica_check_frequency_sec, 1, + "The time duration between two replica checks/pings. If < 1, replicas will NOT be checked at all. NOTE: " + "The MAIN instance allocates a new thread for each REPLICA."); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_bool(replication_restore_state_on_startup, false, "Restore replication state on startup, e.g. recover replica"); + +DEFINE_VALIDATED_string(query_modules_directory, "", + "Directory where modules with custom query procedures are stored. " + "NOTE: Multiple comma-separated directories can be defined.", + { + if (value.empty()) return true; + const auto directories = memgraph::utils::Split(value, ","); + for (const auto &dir : directories) { + if (!memgraph::utils::DirExists(dir)) { + std::cout << "Expected --" << flagname << " to point to directories." << std::endl; + std::cout << dir << " is not a directory." << std::endl; + return false; + } + } + return true; + }); + +auto memgraph::flags::ParseQueryModulesDirectory() -> std::vector { + const auto directories = memgraph::utils::Split(FLAGS_query_modules_directory, ","); + std::vector query_modules_directories; + query_modules_directories.reserve(directories.size()); + std::transform(directories.begin(), directories.end(), std::back_inserter(query_modules_directories), + [](const auto &dir) { return dir; }); + + return query_modules_directories; +} + +// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_string(query_callable_mappings_path, "", + "The path to mappings that describes aliases to callables in cypher queries in the form of key-value " + "pairs in a json file. With this option query module procedures that do not exist in memgraph can be " + "mapped to ones that exist."); + +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_HIDDEN_string(license_key, "", "License key for Memgraph Enterprise."); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_HIDDEN_string(organization_name, "", "Organization name."); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_string(auth_user_or_role_name_regex, memgraph::glue::kDefaultUserRoleRegex.data(), + "Set to the regular expression that each user or role name must fulfill."); diff --git a/src/flags/general.hpp b/src/flags/general.hpp new file mode 100644 index 000000000..6963f980a --- /dev/null +++ b/src/flags/general.hpp @@ -0,0 +1,122 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include "gflags/gflags.h" + +#include + +// Short help flag. +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_bool(h); + +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_string(monitoring_address); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_int32(monitoring_port); + +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_string(metrics_address); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_int32(metrics_port); + +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_string(init_file); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_string(init_data_file); + +// General purpose flags. +// NOTE: The `data_directory` flag must be the same here and in +// `mg_import_csv`. If you change it, make sure to change it there as well. +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_string(data_directory); + +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_bool(data_recovery_on_startup); + +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_uint64(memory_warning_threshold); + +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_bool(allow_load_csv); + +// Storage flags. +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_uint64(storage_gc_cycle_sec); +// NOTE: The `storage_properties_on_edges` flag must be the same here and in +// `mg_import_csv`. If you change it, make sure to change it there as well. +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_bool(storage_properties_on_edges); +// storage_recover_on_startup deprecated; use data_recovery_on_startup instead +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_bool(storage_recover_on_startup); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_uint64(storage_snapshot_interval_sec); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_bool(storage_wal_enabled); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_uint64(storage_snapshot_retention_count); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_uint64(storage_wal_file_size_kib); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_uint64(storage_wal_file_flush_every_n_tx); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_bool(storage_snapshot_on_exit); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_uint64(storage_items_per_batch); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_bool(storage_parallel_index_recovery); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_uint64(storage_recovery_thread_count); +#ifdef MG_ENTERPRISE +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_bool(storage_delete_on_drop); +#endif + +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_bool(telemetry_enabled); + +// Streams flags +// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_uint32(stream_transaction_conflict_retries); +// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_uint32(stream_transaction_retry_interval); + +// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_string(kafka_bootstrap_servers); + +// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_string(pulsar_service_url); + +// Query flags. + +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_double(query_execution_timeout_sec); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_string(query_modules_directory); +// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_string(query_callable_mappings_path); +namespace memgraph::flags { +auto ParseQueryModulesDirectory() -> std::vector; +} // namespace memgraph::flags + +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_uint64(replication_replica_check_frequency_sec); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_bool(replication_restore_state_on_startup); + +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_string(license_key); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_string(organization_name); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_string(auth_user_or_role_name_regex); diff --git a/src/flags/isolation_level.cpp b/src/flags/isolation_level.cpp new file mode 100644 index 000000000..6f1af5132 --- /dev/null +++ b/src/flags/isolation_level.cpp @@ -0,0 +1,54 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +#include "flags/isolation_level.hpp" + +#include "utils/enum.hpp" +#include "utils/flag_validation.hpp" + +#include "gflags/gflags.h" + +#include +#include + +inline constexpr std::array isolation_level_mappings{ + std::pair{std::string_view{"SNAPSHOT_ISOLATION"}, memgraph::storage::IsolationLevel::SNAPSHOT_ISOLATION}, + std::pair{std::string_view{"READ_COMMITTED"}, memgraph::storage::IsolationLevel::READ_COMMITTED}, + std::pair{std::string_view{"READ_UNCOMMITTED"}, memgraph::storage::IsolationLevel::READ_UNCOMMITTED}}; + +const std::string isolation_level_help_string = + fmt::format("Default isolation level used for the transactions. Allowed values: {}", + memgraph::utils::GetAllowedEnumValuesString(isolation_level_mappings)); + +// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_VALIDATED_string(isolation_level, "SNAPSHOT_ISOLATION", isolation_level_help_string.c_str(), { + if (const auto result = memgraph::utils::IsValidEnumValueString(value, isolation_level_mappings); result.HasError()) { + switch (result.GetError()) { + case memgraph::utils::ValidationError::EmptyValue: { + std::cout << "Isolation level cannot be empty." << std::endl; + break; + } + case memgraph::utils::ValidationError::InvalidValue: { + std::cout << "Invalid value for isolation level. Allowed values: " + << memgraph::utils::GetAllowedEnumValuesString(isolation_level_mappings) << std::endl; + break; + } + } + return false; + } + return true; +}); + +memgraph::storage::IsolationLevel memgraph::flags::ParseIsolationLevel() { + const auto isolation_level = + memgraph::utils::StringToEnum(FLAGS_isolation_level, isolation_level_mappings); + MG_ASSERT(isolation_level, "Invalid isolation level"); + return *isolation_level; +} diff --git a/src/flags/isolation_level.hpp b/src/flags/isolation_level.hpp new file mode 100644 index 000000000..8c911d406 --- /dev/null +++ b/src/flags/isolation_level.hpp @@ -0,0 +1,19 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +#pragma once + +#include "storage/v2/isolation_level.hpp" + +namespace memgraph::flags { + +memgraph::storage::IsolationLevel ParseIsolationLevel(); + +} // namespace memgraph::flags diff --git a/src/flags/log_level.cpp b/src/flags/log_level.cpp new file mode 100644 index 000000000..b292410d4 --- /dev/null +++ b/src/flags/log_level.cpp @@ -0,0 +1,101 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +#include "flags/log_level.hpp" + +#include "utils/enum.hpp" +#include "utils/flag_validation.hpp" +#include "utils/logging.hpp" + +#include "gflags/gflags.h" +#include "spdlog/common.h" +#include "spdlog/sinks/daily_file_sink.h" + +#include +#include +#include + +using namespace std::string_view_literals; + +// Logging flags +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_HIDDEN_bool(also_log_to_stderr, false, "Log messages go to stderr in addition to logfiles"); +DEFINE_string(log_file, "", "Path to where the log should be stored."); + +inline constexpr std::array log_level_mappings{ + std::pair{"TRACE"sv, spdlog::level::trace}, std::pair{"DEBUG"sv, spdlog::level::debug}, + std::pair{"INFO"sv, spdlog::level::info}, std::pair{"WARNING"sv, spdlog::level::warn}, + std::pair{"ERROR"sv, spdlog::level::err}, std::pair{"CRITICAL"sv, spdlog::level::critical}}; + +const std::string log_level_help_string = fmt::format("Minimum log level. Allowed values: {}", + memgraph::utils::GetAllowedEnumValuesString(log_level_mappings)); + +DEFINE_VALIDATED_string(log_level, "WARNING", log_level_help_string.c_str(), { + if (const auto result = memgraph::utils::IsValidEnumValueString(value, log_level_mappings); result.HasError()) { + const auto error = result.GetError(); + switch (error) { + case memgraph::utils::ValidationError::EmptyValue: { + std::cout << "Log level cannot be empty." << std::endl; + break; + } + case memgraph::utils::ValidationError::InvalidValue: { + std::cout << "Invalid value for log level. Allowed values: " + << memgraph::utils::GetAllowedEnumValuesString(log_level_mappings) << std::endl; + break; + } + } + return false; + } + + return true; +}); + +spdlog::level::level_enum ParseLogLevel() { + const auto log_level = memgraph::utils::StringToEnum(FLAGS_log_level, log_level_mappings); + MG_ASSERT(log_level, "Invalid log level"); + return *log_level; +} + +// 5 weeks * 7 days +inline constexpr auto log_retention_count = 35; +void CreateLoggerFromSink(const auto &sinks, const auto log_level) { + auto logger = std::make_shared("memgraph_log", sinks.begin(), sinks.end()); + logger->set_level(log_level); + logger->flush_on(spdlog::level::trace); + spdlog::set_default_logger(std::move(logger)); +} + +void memgraph::flags::InitializeLogger() { + std::vector sinks; + + if (FLAGS_also_log_to_stderr) { + sinks.emplace_back(std::make_shared()); + } + + if (!FLAGS_log_file.empty()) { + // get local time + time_t current_time{0}; + struct tm *local_time{nullptr}; + + time(¤t_time); + local_time = localtime(¤t_time); + + sinks.emplace_back(std::make_shared( + FLAGS_log_file, local_time->tm_hour, local_time->tm_min, false, log_retention_count)); + } + CreateLoggerFromSink(sinks, ParseLogLevel()); +} + +void memgraph::flags::AddLoggerSink(spdlog::sink_ptr new_sink) { + auto default_logger = spdlog::default_logger(); + auto sinks = default_logger->sinks(); + sinks.push_back(new_sink); + CreateLoggerFromSink(sinks, default_logger->level()); +} diff --git a/src/flags/log_level.hpp b/src/flags/log_level.hpp new file mode 100644 index 000000000..18d9df81b --- /dev/null +++ b/src/flags/log_level.hpp @@ -0,0 +1,18 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +#pragma once + +#include + +namespace memgraph::flags { +void InitializeLogger(); +void AddLoggerSink(spdlog::sink_ptr new_sink); +} // namespace memgraph::flags diff --git a/src/flags/memory_limit.cpp b/src/flags/memory_limit.cpp new file mode 100644 index 000000000..56a6f4ad2 --- /dev/null +++ b/src/flags/memory_limit.cpp @@ -0,0 +1,41 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +#include "flags/memory_limit.hpp" + +#include "utils/logging.hpp" +#include "utils/sysinfo/memory.hpp" + +#include "gflags/gflags.h" + +// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_uint64( + memory_limit, 0, + "Total memory limit in MiB. Set to 0 to use the default values which are 100\% of the phyisical memory if the swap " + "is enabled and 90\% of the physical memory otherwise."); + +int64_t memgraph::flags::GetMemoryLimit() { + if (FLAGS_memory_limit == 0) { + auto maybe_total_memory = memgraph::utils::sysinfo::TotalMemory(); + MG_ASSERT(maybe_total_memory, "Failed to fetch the total physical memory"); + const auto maybe_swap_memory = memgraph::utils::sysinfo::SwapTotalMemory(); + MG_ASSERT(maybe_swap_memory, "Failed to fetch the total swap memory"); + + if (*maybe_swap_memory == 0) { + // take only 90% of the total memory + *maybe_total_memory *= 9; + *maybe_total_memory /= 10; + } + return *maybe_total_memory * 1024; + } + + // We parse the memory as MiB every time + return FLAGS_memory_limit * 1024 * 1024; +} diff --git a/src/flags/memory_limit.hpp b/src/flags/memory_limit.hpp new file mode 100644 index 000000000..0b2796231 --- /dev/null +++ b/src/flags/memory_limit.hpp @@ -0,0 +1,17 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +#pragma once + +#include + +namespace memgraph::flags { +int64_t GetMemoryLimit(); +} // namespace memgraph::flags diff --git a/src/glue/CMakeLists.txt b/src/glue/CMakeLists.txt index fa8a87997..eef05cd81 100644 --- a/src/glue/CMakeLists.txt +++ b/src/glue/CMakeLists.txt @@ -1,4 +1,4 @@ -set(mg_glue_sources auth.cpp auth_checker.cpp auth_handler.cpp communication.cpp) - -add_library(mg-glue STATIC ${mg_glue_sources}) -target_link_libraries(mg-glue mg-query mg-auth) +add_library(mg-glue STATIC ) +target_sources(mg-glue PRIVATE auth.cpp auth_checker.cpp auth_handler.cpp communication.cpp SessionHL.cpp ServerT.cpp MonitoringServerT.cpp) +target_link_libraries(mg-glue mg-query mg-auth mg-audit) +target_precompile_headers(mg-glue INTERFACE auth_checker.hpp auth_handler.hpp) diff --git a/src/glue/MonitoringServerT.cpp b/src/glue/MonitoringServerT.cpp new file mode 100644 index 000000000..2fde4f572 --- /dev/null +++ b/src/glue/MonitoringServerT.cpp @@ -0,0 +1,14 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +#include "glue/MonitoringServerT.hpp" + +template class memgraph::communication::http::Server< + memgraph::http::MetricsRequestHandler, memgraph::dbms::SessionContext>; diff --git a/src/glue/MonitoringServerT.hpp b/src/glue/MonitoringServerT.hpp new file mode 100644 index 000000000..aab7a1c0c --- /dev/null +++ b/src/glue/MonitoringServerT.hpp @@ -0,0 +1,25 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +#pragma once + +#include "communication/http/server.hpp" +#include "dbms/session_context.hpp" +#include "http_handlers/metrics.hpp" + +extern template class memgraph::communication::http::Server< + memgraph::http::MetricsRequestHandler, memgraph::dbms::SessionContext>; + +namespace memgraph::glue { + +using MonitoringServerT = + memgraph::communication::http::Server, + memgraph::dbms::SessionContext>; +} // namespace memgraph::glue diff --git a/src/glue/ServerT.cpp b/src/glue/ServerT.cpp new file mode 100644 index 000000000..94eafcb6b --- /dev/null +++ b/src/glue/ServerT.cpp @@ -0,0 +1,17 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +#include "glue/ServerT.hpp" + +#ifdef MG_ENTERPRISE +template class memgraph::communication::v2::Server; +#else +template class memgraph::communication::v2::Server; +#endif diff --git a/src/glue/ServerT.hpp b/src/glue/ServerT.hpp new file mode 100644 index 000000000..3652f94d0 --- /dev/null +++ b/src/glue/ServerT.hpp @@ -0,0 +1,35 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +#pragma once + +#include "communication/v2/server.hpp" +#include "glue/SessionHL.hpp" + +#ifdef MG_ENTERPRISE +#include "dbms/session_context_handler.hpp" +#else +#include "dbms/session_context.hpp" +#endif + +#ifdef MG_ENTERPRISE +extern template class memgraph::communication::v2::Server; +#else +extern template class memgraph::communication::v2::Server; +#endif + +namespace memgraph::glue { +#ifdef MG_ENTERPRISE +using ServerT = memgraph::communication::v2::Server; +#else +using ServerT = memgraph::communication::v2::Server; +#endif +} // namespace memgraph::glue diff --git a/src/glue/SessionHL.cpp b/src/glue/SessionHL.cpp new file mode 100644 index 000000000..4983e02a8 --- /dev/null +++ b/src/glue/SessionHL.cpp @@ -0,0 +1,386 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include "glue/SessionHL.hpp" + +#include "audit/log.hpp" +#include "glue/auth_checker.hpp" +#include "glue/communication.hpp" +#include "license/license.hpp" +#include "query/discard_value_stream.hpp" + +#include "gflags/gflags.h" + +namespace memgraph::metrics { +extern const Event ActiveBoltSessions; +} // namespace memgraph::metrics + +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_string(bolt_server_name_for_init, "", + "Server name which the database should send to the client in the " + "Bolt INIT message."); + +auto ToQueryExtras(const memgraph::communication::bolt::Value &extra) -> memgraph::query::QueryExtras { + auto const &as_map = extra.ValueMap(); + + auto metadata_pv = std::map{}; + + if (auto const it = as_map.find("tx_metadata"); it != as_map.cend() && it->second.IsMap()) { + for (const auto &[key, bolt_md] : it->second.ValueMap()) { + metadata_pv.emplace(key, memgraph::glue::ToPropertyValue(bolt_md)); + } + } + + auto tx_timeout = std::optional{}; + if (auto const it = as_map.find("tx_timeout"); it != as_map.cend() && it->second.IsInt()) { + tx_timeout = it->second.ValueInt(); + } + + return memgraph::query::QueryExtras{std::move(metadata_pv), tx_timeout}; +} + +class TypedValueResultStreamBase { + public: + explicit TypedValueResultStreamBase(memgraph::query::InterpreterContext *interpreterContext); + + std::vector DecodeValues( + const std::vector &values) const; + + private: + // NOTE: Needed only for ToBoltValue conversions + memgraph::query::InterpreterContext *interpreter_context_; +}; + +/// Wrapper around TEncoder which converts TypedValue to Value +/// before forwarding the calls to original TEncoder. +template +class TypedValueResultStream : public TypedValueResultStreamBase { + public: + TypedValueResultStream(TEncoder *encoder, memgraph::query::InterpreterContext *ic) + : TypedValueResultStreamBase{ic}, encoder_(encoder) {} + + void Result(const std::vector &values) { encoder_->MessageRecord(DecodeValues(values)); } + + private: + TEncoder *encoder_; +}; + +std::vector TypedValueResultStreamBase::DecodeValues( + const std::vector &values) const { + std::vector decoded_values; + decoded_values.reserve(values.size()); + for (const auto &v : values) { + auto maybe_value = memgraph::glue::ToBoltValue(v, *interpreter_context_->db, memgraph::storage::View::NEW); + if (maybe_value.HasError()) { + switch (maybe_value.GetError()) { + case memgraph::storage::Error::DELETED_OBJECT: + throw memgraph::communication::bolt::ClientError("Returning a deleted object as a result."); + case memgraph::storage::Error::NONEXISTENT_OBJECT: + throw memgraph::communication::bolt::ClientError("Returning a nonexistent object as a result."); + case memgraph::storage::Error::VERTEX_HAS_EDGES: + case memgraph::storage::Error::SERIALIZATION_ERROR: + case memgraph::storage::Error::PROPERTIES_DISABLED: + throw memgraph::communication::bolt::ClientError("Unexpected storage error when streaming results."); + } + } + decoded_values.emplace_back(std::move(*maybe_value)); + } + return decoded_values; +} +TypedValueResultStreamBase::TypedValueResultStreamBase(memgraph::query::InterpreterContext *interpreterContext) + : interpreter_context_(interpreterContext) {} + +namespace memgraph::glue { + +#ifdef MG_ENTERPRISE + +void SessionHL::UpdateAndDefunct(const std::string &db_name) { + UpdateAndDefunct(ContextWrapper(sc_handler_.Get(db_name))); +} +void SessionHL::UpdateAndDefunct(ContextWrapper &&cntxt) { + defunct_.emplace(std::move(current_)); + Update(std::forward(cntxt)); + defunct_->Defunct(); +} +void SessionHL::Update(const std::string &db_name) { + ContextWrapper tmp(sc_handler_.Get(db_name)); + Update(std::move(tmp)); +} +void SessionHL::Update(ContextWrapper &&cntxt) { + current_ = std::move(cntxt); + interpreter_ = current_.interp(); + interpreter_->in_explicit_db_ = in_explicit_db_; + interpreter_context_ = current_.interpreter_context(); +} +void SessionHL::MultiDatabaseAuth(const std::string &db) { + if (user_ && !AuthChecker::IsUserAuthorized(*user_, {}, db)) { + throw memgraph::communication::bolt::ClientError( + "You are not authorized on the database \"{}\"! Please contact your database administrator.", db); + } +} +std::string SessionHL::GetDefaultDB() { + if (user_.has_value()) { + return user_->db_access().GetDefault(); + } + return memgraph::dbms::kDefaultDB; +} + +bool SessionHL::OnDelete(const std::string &db_name) { + MG_ASSERT(current_.interpreter_context()->db->id() != db_name && (!defunct_ || defunct_->defunct()), + "Trying to delete a database while still in use."); + return true; +} +memgraph::dbms::SetForResult SessionHL::OnChange(const std::string &db_name) { + MultiDatabaseAuth(db_name); + if (db_name != current_.interpreter_context()->db->id()) { + UpdateAndDefunct(db_name); // Done during Pull, so we cannot just replace the current db + return memgraph::dbms::SetForResult::SUCCESS; + } + return memgraph::dbms::SetForResult::ALREADY_SET; +} + +#endif +std::string SessionHL::GetDatabaseName() const { return interpreter_context_->db->id(); } + +std::optional SessionHL::GetServerNameForInit() { + if (FLAGS_bolt_server_name_for_init.empty()) return std::nullopt; + return FLAGS_bolt_server_name_for_init; +} +bool SessionHL::Authenticate(const std::string &username, const std::string &password) { + auto locked_auth = auth_->Lock(); + if (!locked_auth->HasUsers()) { + return true; + } + user_ = locked_auth->Authenticate(username, password); +#ifdef MG_ENTERPRISE + if (user_.has_value()) { + const auto &db = user_->db_access().GetDefault(); + // Check if the underlying database needs to be updated + if (db != current_.interpreter_context()->db->id()) { + const auto &res = sc_handler_.SetFor(UUID(), db); + return res == memgraph::dbms::SetForResult::SUCCESS || res == memgraph::dbms::SetForResult::ALREADY_SET; + } + } +#endif + return user_.has_value(); +} +void SessionHL::Abort() { interpreter_->Abort(); } + +std::map SessionHL::Discard(std::optional n, + std::optional qid) { + try { + memgraph::query::DiscardValueResultStream stream; + return DecodeSummary(interpreter_->Pull(&stream, n, qid)); + } catch (const memgraph::query::QueryException &e) { + // Wrap QueryException into ClientError, because we want to allow the + // client to fix their query. + throw memgraph::communication::bolt::ClientError(e.what()); + } +} +std::map SessionHL::Pull(SessionHL::TEncoder *encoder, + std::optional n, + std::optional qid) { + try { + TypedValueResultStream stream(encoder, interpreter_context_); + return DecodeSummary(interpreter_->Pull(&stream, n, qid)); + } catch (const memgraph::query::QueryException &e) { + // Wrap QueryException into ClientError, because we want to allow the + // client to fix their query. + throw memgraph::communication::bolt::ClientError(e.what()); + } +} +std::pair, std::optional> SessionHL::Interpret( + const std::string &query, const std::map ¶ms, + const std::map &extra) { + std::map params_pv; + for (const auto &[key, bolt_param] : params) { + params_pv.emplace(key, ToPropertyValue(bolt_param)); + } + const std::string *username{nullptr}; + if (user_) { + username = &user_->username(); + } + +#ifdef MG_ENTERPRISE + if (memgraph::license::global_license_checker.IsEnterpriseValidFast()) { + audit_log_->Record(endpoint_.address().to_string(), user_ ? *username : "", query, + memgraph::storage::PropertyValue(params_pv), interpreter_context_->db->id()); + } +#endif + try { + auto result = interpreter_->Prepare(query, params_pv, username, ToQueryExtras(extra), UUID()); + const std::string db_name = result.db ? *result.db : ""; + if (user_ && !AuthChecker::IsUserAuthorized(*user_, result.privileges, db_name)) { + interpreter_->Abort(); + if (db_name.empty()) { + throw memgraph::communication::bolt::ClientError( + "You are not authorized to execute this query! Please contact your database administrator."); + } + throw memgraph::communication::bolt::ClientError( + "You are not authorized to execute this query on database \"{}\"! Please contact your database " + "administrator.", + db_name); + } + return {std::move(result.headers), result.qid}; + + } catch (const memgraph::query::QueryException &e) { + // Wrap QueryException into ClientError, because we want to allow the + // client to fix their query. + throw memgraph::communication::bolt::ClientError(e.what()); + } catch (const memgraph::query::ReplicationException &e) { + throw memgraph::communication::bolt::ClientError(e.what()); + } +} +void SessionHL::RollbackTransaction() { interpreter_->RollbackTransaction(); } +void SessionHL::CommitTransaction() { interpreter_->CommitTransaction(); } +void SessionHL::BeginTransaction(const std::map &extra) { + interpreter_->BeginTransaction(ToQueryExtras(extra)); +} +void SessionHL::Configure(const std::map &run_time_info) { +#ifdef MG_ENTERPRISE + std::string db; + bool update = false; + // Check if user explicitly defined the database to use + if (run_time_info.contains("db")) { + const auto &db_info = run_time_info.at("db"); + if (!db_info.IsString()) { + throw memgraph::communication::bolt::ClientError("Malformed database name."); + } + db = db_info.ValueString(); + update = db != current_.interpreter_context()->db->id(); + in_explicit_db_ = true; + // NOTE: Once in a transaction, the drivers stop explicitly sending the db and count on using it until commit + } else if (in_explicit_db_ && !interpreter_->in_explicit_transaction_) { // Just on a switch + db = GetDefaultDB(); + update = db != current_.interpreter_context()->db->id(); + in_explicit_db_ = false; + } + + // Check if the underlying database needs to be updated + if (update) { + sc_handler_.SetInPlace(db, [this](auto new_sc) mutable { + const auto &db_name = new_sc.interpreter_context->db->id(); + MultiDatabaseAuth(db_name); + try { + Update(ContextWrapper(new_sc)); + return memgraph::dbms::SetForResult::SUCCESS; + } catch (memgraph::dbms::UnknownDatabaseException &e) { + throw memgraph::communication::bolt::ClientError("No database named \"{}\" found!", db_name); + } + }); + } +#endif +} +SessionHL::~SessionHL() { memgraph::metrics::DecrementCounter(memgraph::metrics::ActiveBoltSessions); } +SessionHL::SessionHL( +#ifdef MG_ENTERPRISE + memgraph::dbms::SessionContextHandler &sc_handler, +#else + memgraph::dbms::SessionContext sc, +#endif + const memgraph::communication::v2::ServerEndpoint &endpoint, memgraph::communication::v2::InputStream *input_stream, + memgraph::communication::v2::OutputStream *output_stream, const std::string &default_db) // NOLINT + : Session(input_stream, + output_stream), +#ifdef MG_ENTERPRISE + sc_handler_(sc_handler), + current_(sc_handler_.Get(default_db)), +#else + current_(sc), +#endif + interpreter_context_(current_.interpreter_context()), + interpreter_(current_.interp()), + auth_(current_.auth()), +#ifdef MG_ENTERPRISE + audit_log_(current_.audit_log()), +#endif + endpoint_(endpoint), + run_id_(current_.run_id()) { + memgraph::metrics::IncrementCounter(memgraph::metrics::ActiveBoltSessions); +} + +/// ContextWrapper +ContextWrapper::ContextWrapper(memgraph::dbms::SessionContext sc) + : session_context(sc), + interpreter(std::make_unique(session_context.interpreter_context.get())), + defunct_(false) { + session_context.interpreter_context->interpreters.WithLock( + [this](auto &interpreters) { interpreters.insert(interpreter.get()); }); +} +ContextWrapper::~ContextWrapper() { Defunct(); } +void ContextWrapper::Defunct() { + if (!defunct_) { + session_context.interpreter_context->interpreters.WithLock( + [this](auto &interpreters) { interpreters.erase(interpreter.get()); }); + defunct_ = true; + } +} +ContextWrapper::ContextWrapper(ContextWrapper &&in) noexcept + : session_context(std::move(in.session_context)), interpreter(std::move(in.interpreter)), defunct_(in.defunct_) { + in.defunct_ = true; +} +ContextWrapper &ContextWrapper::operator=(ContextWrapper &&in) noexcept { + if (this != &in) { + Defunct(); + session_context = std::move(in.session_context); + interpreter = std::move(in.interpreter); + defunct_ = in.defunct_; + in.defunct_ = true; + } + return *this; +} +memgraph::query::InterpreterContext *ContextWrapper::interpreter_context() { + return session_context.interpreter_context.get(); +} +memgraph::query::Interpreter *ContextWrapper::interp() { return interpreter.get(); } +memgraph::utils::Synchronized *ContextWrapper::auth() + const { + return session_context.auth; +} +std::string ContextWrapper::run_id() const { return session_context.run_id; } +bool ContextWrapper::defunct() const { return defunct_; } +#ifdef MG_ENTERPRISE +memgraph::audit::Log *ContextWrapper::audit_log() const { return session_context.audit_log; } +#endif + +std::map SessionHL::DecodeSummary( + const std::map &summary) { + std::map decoded_summary; + for (const auto &kv : summary) { + auto maybe_value = ToBoltValue(kv.second, *interpreter_context_->db, memgraph::storage::View::NEW); + if (maybe_value.HasError()) { + switch (maybe_value.GetError()) { + case memgraph::storage::Error::DELETED_OBJECT: + case memgraph::storage::Error::SERIALIZATION_ERROR: + case memgraph::storage::Error::VERTEX_HAS_EDGES: + case memgraph::storage::Error::PROPERTIES_DISABLED: + case memgraph::storage::Error::NONEXISTENT_OBJECT: + throw memgraph::communication::bolt::ClientError("Unexpected storage error when streaming summary."); + } + } + decoded_summary.emplace(kv.first, std::move(*maybe_value)); + } + // Add this memgraph instance run_id, received from telemetry + // This is sent with every query, instead of only on bolt init inside + // communication/bolt/v1/states/init.hpp because neo4jdriver does not + // read the init message. + if (auto run_id = run_id_; run_id) { + decoded_summary.emplace("run_id", *run_id); + } + + // Clean up previous session (session gets defunct when switching between databases) + if (defunct_) { + defunct_.reset(); + } + + return decoded_summary; +} +} // namespace memgraph::glue diff --git a/src/glue/SessionHL.hpp b/src/glue/SessionHL.hpp new file mode 100644 index 000000000..5f15ed4cc --- /dev/null +++ b/src/glue/SessionHL.hpp @@ -0,0 +1,161 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +#pragma once + +#include "communication/v2/server.hpp" +#include "communication/v2/session.hpp" +#include "dbms/session_context.hpp" + +#ifdef MG_ENTERPRISE +#include "dbms/session_context_handler.hpp" +#else +#include "dbms/session_context.hpp" +#endif + +namespace memgraph::glue { + +struct ContextWrapper { + explicit ContextWrapper(memgraph::dbms::SessionContext sc); + ~ContextWrapper(); + + ContextWrapper(const ContextWrapper &) = delete; + ContextWrapper &operator=(const ContextWrapper &) = delete; + + ContextWrapper(ContextWrapper &&in) noexcept; + ContextWrapper &operator=(ContextWrapper &&in) noexcept; + + void Defunct(); + memgraph::query::InterpreterContext *interpreter_context(); + memgraph::query::Interpreter *interp(); + memgraph::utils::Synchronized *auth() const; + std::string run_id() const; + bool defunct() const; +#ifdef MG_ENTERPRISE + memgraph::audit::Log *audit_log() const; +#endif + + private: + memgraph::dbms::SessionContext session_context; + std::unique_ptr interpreter; + bool defunct_; +}; + +class SessionHL final : public memgraph::communication::bolt::Session { + public: + SessionHL( +#ifdef MG_ENTERPRISE + memgraph::dbms::SessionContextHandler &sc_handler, +#else + memgraph::dbms::SessionContext sc, +#endif + const memgraph::communication::v2::ServerEndpoint &endpoint, + memgraph::communication::v2::InputStream *input_stream, memgraph::communication::v2::OutputStream *output_stream, + const std::string &default_db = memgraph::dbms::kDefaultDB); + + ~SessionHL() override; + + SessionHL(const SessionHL &) = delete; + SessionHL &operator=(const SessionHL &) = delete; + SessionHL(SessionHL &&) = delete; + SessionHL &operator=(SessionHL &&) = delete; + + void Configure(const std::map &run_time_info) override; + + using TEncoder = memgraph::communication::bolt::Encoder< + memgraph::communication::bolt::ChunkedEncoderBuffer>; + + void BeginTransaction(const std::map &extra) override; + + void CommitTransaction() override; + + void RollbackTransaction() override; + + std::pair, std::optional> Interpret( + const std::string &query, const std::map ¶ms, + const std::map &extra) override; + + std::map Pull(TEncoder *encoder, std::optional n, + std::optional qid) override; + + std::map Discard(std::optional n, + std::optional qid) override; + + void Abort() override; + + // Called during Init + // During Init, the user cannot choose the landing DB (switch is done during query execution) + bool Authenticate(const std::string &username, const std::string &password) override; + +#ifdef MG_ENTERPRISE + memgraph::dbms::SetForResult OnChange(const std::string &db_name) override; + + bool OnDelete(const std::string &db_name) override; +#endif + std::optional GetServerNameForInit() override; + + std::string GetDatabaseName() const override; + + private: + std::map DecodeSummary( + const std::map &summary); + +#ifdef MG_ENTERPRISE + /** + * @brief Update setup to the new database. + * + * @param db_name name of the target database + * @throws UnknownDatabaseException if handler cannot get it + */ + void UpdateAndDefunct(const std::string &db_name); + + void UpdateAndDefunct(ContextWrapper &&cntxt); + + void Update(const std::string &db_name); + + void Update(ContextWrapper &&cntxt); + + /** + * @brief Authenticate user on passed database. + * + * @param db database to check against + * @throws bolt::ClientError when user is not authorized + */ + void MultiDatabaseAuth(const std::string &db); + + /** + * @brief Get the user's default database + * + * @return std::string + */ + std::string GetDefaultDB(); +#endif + +#ifdef MG_ENTERPRISE + memgraph::dbms::SessionContextHandler &sc_handler_; +#endif + ContextWrapper current_; + std::optional defunct_; + + memgraph::query::InterpreterContext *interpreter_context_; + memgraph::query::Interpreter *interpreter_; + memgraph::utils::Synchronized *auth_; + std::optional user_; +#ifdef MG_ENTERPRISE + memgraph::audit::Log *audit_log_; + bool in_explicit_db_{false}; //!< If true, the user has defined the database to use via metadata +#endif + memgraph::communication::v2::ServerEndpoint endpoint_; + // NOTE: run_id should be const but that complicates code a lot. + std::optional run_id_; +}; + +} // namespace memgraph::glue diff --git a/src/helpers.hpp b/src/helpers.hpp index d82e2f1d2..1cf4e0ec0 100644 --- a/src/helpers.hpp +++ b/src/helpers.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -63,7 +63,7 @@ inline void LoadConfig(const std::string &product_name) { delete[] custom_argv; } -std::pair LoadUsernameAndPassword(const std::string &pass_file) { +inline std::pair LoadUsernameAndPassword(const std::string &pass_file) { std::ifstream file(pass_file); if (file.fail()) { spdlog::warn("Problem with opening MG_PASSFILE, memgraph server will start without user"); diff --git a/src/http_handlers/metrics.hpp b/src/http_handlers/metrics.hpp index 43970e616..a852639a3 100644 --- a/src/http_handlers/metrics.hpp +++ b/src/http_handlers/metrics.hpp @@ -101,6 +101,7 @@ class MetricsService { auto GetEventCounters() { // NOLINTNEXTLINE(cppcoreguidelines-init-variables) std::vector> event_counters{}; + event_counters.reserve(memgraph::metrics::CounterEnd()); for (auto i = 0; i < memgraph::metrics::CounterEnd(); i++) { event_counters.emplace_back(memgraph::metrics::GetCounterName(i), memgraph::metrics::GetCounterType(i), @@ -113,6 +114,7 @@ class MetricsService { auto GetEventGauges() { // NOLINTNEXTLINE(cppcoreguidelines-init-variables) std::vector> event_gauges{}; + event_gauges.reserve(memgraph::metrics::GaugeEnd()); for (auto i = 0; i < memgraph::metrics::GaugeEnd(); i++) { event_gauges.emplace_back(memgraph::metrics::GetGaugeName(i), memgraph::metrics::GetGaugeType(i), diff --git a/src/memgraph.cpp b/src/memgraph.cpp index ee6fac3be..55ba2a921 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -9,465 +9,36 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include +#ifndef MG_ENTERPRISE +#include "dbms/session_context_handler.hpp" +#endif #include "audit/log.hpp" -#include "auth/models.hpp" -#include "communication/bolt/v1/constants.hpp" -#include "communication/http/server.hpp" #include "communication/websocket/auth.hpp" #include "communication/websocket/server.hpp" -#include "dbms/constants.hpp" -#include "dbms/global.hpp" -#include "dbms/session_context.hpp" +#include "flags/all.hpp" +#include "glue/MonitoringServerT.hpp" +#include "glue/ServerT.hpp" #include "glue/auth_checker.hpp" #include "glue/auth_handler.hpp" #include "helpers.hpp" -#include "http_handlers/metrics.hpp" -#include "license/license.hpp" #include "license/license_sender.hpp" -#include "py/py.hpp" -#include "query/auth_checker.hpp" #include "query/discard_value_stream.hpp" -#include "query/exceptions.hpp" -#include "query/frontend/ast/ast.hpp" -#include "query/interpreter.hpp" -#include "query/plan/operator.hpp" #include "query/procedure/callable_alias_mapper.hpp" #include "query/procedure/module.hpp" #include "query/procedure/py_module.hpp" #include "requests/requests.hpp" -#include "storage/v2/config.hpp" -#include "storage/v2/disk/storage.hpp" -#include "storage/v2/inmemory/storage.hpp" -#include "storage/v2/isolation_level.hpp" -#include "storage/v2/storage.hpp" -#include "storage/v2/view.hpp" #include "telemetry/telemetry.hpp" -#include "utils/enum.hpp" -#include "utils/event_counter.hpp" -#include "utils/file.hpp" -#include "utils/flag_validation.hpp" -#include "utils/logging.hpp" -#include "utils/memory_tracker.hpp" -#include "utils/message.hpp" -#include "utils/readable_size.hpp" -#include "utils/rw_lock.hpp" -#include "utils/settings.hpp" #include "utils/signals.hpp" -#include "utils/string.hpp" -#include "utils/synchronized.hpp" #include "utils/sysinfo/memory.hpp" #include "utils/system_info.hpp" #include "utils/terminate_handler.hpp" #include "version.hpp" -// Communication libraries must be included after query libraries are included. -// This is to enable compilation of the binary when linking with old OpenSSL -// libraries (as on CentOS 7). -// -// The OpenSSL library available on CentOS 7 is v1.0.0, that version includes -// `libkrb5` in its public API headers (that we include in our communication -// stack). The `libkrb5` library has `#define`s for `TRUE` and `FALSE`. Those -// defines clash with Antlr's usage of `TRUE` and `FALSE` as enumeration keys. -// Because of that the definitions of `TRUE` and `FALSE` that are inherited -// from `libkrb5` must be included after the Antlr includes. Hence, -// communication headers must be included after query headers. -#include "communication/bolt/v1/exceptions.hpp" -#include "communication/bolt/v1/session.hpp" -#include "communication/init.hpp" -#include "communication/v2/server.hpp" -#include "communication/v2/session.hpp" -#include "dbms/session_context_handler.hpp" -#include "glue/communication.hpp" - -#include "auth/auth.hpp" -#include "glue/auth.hpp" - constexpr const char *kMgUser = "MEMGRAPH_USER"; constexpr const char *kMgPassword = "MEMGRAPH_PASSWORD"; constexpr const char *kMgPassfile = "MEMGRAPH_PASSFILE"; -// Short help flag. -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_HIDDEN_bool(h, false, "Print usage and exit."); - -// Bolt server flags. -DEFINE_string(bolt_address, "0.0.0.0", "IP address on which the Bolt server should listen."); -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_string(monitoring_address, "0.0.0.0", - "IP address on which the websocket server for Memgraph monitoring should listen."); -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_string(metrics_address, "0.0.0.0", - "IP address on which the Memgraph server for exposing metrics should listen."); -DEFINE_VALIDATED_int32(bolt_port, 7687, "Port on which the Bolt server should listen.", - FLAG_IN_RANGE(0, std::numeric_limits::max())); -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_VALIDATED_int32(monitoring_port, 7444, - "Port on which the websocket server for Memgraph monitoring should listen.", - FLAG_IN_RANGE(0, std::numeric_limits::max())); -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_VALIDATED_int32(metrics_port, 9091, "Port on which the Memgraph server for exposing metrics should listen.", - FLAG_IN_RANGE(0, std::numeric_limits::max())); -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_VALIDATED_int32(bolt_num_workers, std::max(std::thread::hardware_concurrency(), 1U), - "Number of workers used by the Bolt server. By default, this will be the " - "number of processing units available on the machine.", - FLAG_IN_RANGE(1, INT32_MAX)); -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_VALIDATED_int32(bolt_session_inactivity_timeout, 1800, - "Time in seconds after which inactive Bolt sessions will be " - "closed.", - FLAG_IN_RANGE(1, INT32_MAX)); -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_string(bolt_cert_file, "", "Certificate file which should be used for the Bolt server."); -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_string(bolt_key_file, "", "Key file which should be used for the Bolt server."); -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_string(bolt_server_name_for_init, "", - "Server name which the database should send to the client in the " - "Bolt INIT message."); -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_string(init_file, "", - "Path to cypherl file that is used for configuring users and database schema before server starts."); -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_string(init_data_file, "", "Path to cypherl file that is used for creating data after server starts."); - -// General purpose flags. -// NOTE: The `data_directory` flag must be the same here and in -// `mg_import_csv`. If you change it, make sure to change it there as well. -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_string(data_directory, "mg_data", "Path to directory in which to save all permanent data."); - -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_bool(data_recovery_on_startup, false, "Controls whether the database recovers persisted data on startup."); - -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_uint64(memory_warning_threshold, 1024, - "Memory warning threshold, in MB. If Memgraph detects there is " - "less available RAM it will log a warning. Set to 0 to " - "disable."); - -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_bool(allow_load_csv, true, "Controls whether LOAD CSV clause is allowed in queries."); - -// Storage flags. -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_VALIDATED_uint64(storage_gc_cycle_sec, 30, "Storage garbage collector interval (in seconds).", - FLAG_IN_RANGE(1, 24 * 3600)); -// NOTE: The `storage_properties_on_edges` flag must be the same here and in -// `mg_import_csv`. If you change it, make sure to change it there as well. -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_bool(storage_properties_on_edges, false, "Controls whether edges have properties."); - -// storage_recover_on_startup deprecated; use data_recovery_on_startup instead -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_HIDDEN_bool(storage_recover_on_startup, false, - "Controls whether the storage recovers persisted data on startup."); -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_VALIDATED_uint64(storage_snapshot_interval_sec, 0, - "Storage snapshot creation interval (in seconds). Set " - "to 0 to disable periodic snapshot creation.", - FLAG_IN_RANGE(0, 7 * 24 * 3600)); -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_bool(storage_wal_enabled, false, - "Controls whether the storage uses write-ahead-logging. To enable " - "WAL periodic snapshots must be enabled."); -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_VALIDATED_uint64(storage_snapshot_retention_count, 3, "The number of snapshots that should always be kept.", - FLAG_IN_RANGE(1, 1000000)); -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_VALIDATED_uint64(storage_wal_file_size_kib, memgraph::storage::Config::Durability().wal_file_size_kibibytes, - "Minimum file size of each WAL file.", - FLAG_IN_RANGE(1, static_cast(1000) * 1024)); -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_VALIDATED_uint64(storage_wal_file_flush_every_n_tx, - memgraph::storage::Config::Durability().wal_file_flush_every_n_tx, - "Issue a 'fsync' call after this amount of transactions are written to the " - "WAL file. Set to 1 for fully synchronous operation.", - FLAG_IN_RANGE(1, 1000000)); -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_bool(storage_snapshot_on_exit, false, "Controls whether the storage creates another snapshot on exit."); - -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_uint64(storage_items_per_batch, memgraph::storage::Config::Durability().items_per_batch, - "The number of edges and vertices stored in a batch in a snapshot file."); - -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_bool(storage_parallel_index_recovery, false, - "Controls whether the index creation can be done in a multithreaded fashion."); - -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_uint64(storage_recovery_thread_count, - std::max(static_cast(std::thread::hardware_concurrency()), - memgraph::storage::Config::Durability().recovery_thread_count), - "The number of threads used to recover persisted data from disk."); - -#ifdef MG_ENTERPRISE -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_bool(storage_delete_on_drop, true, - "If set to true the query 'DROP DATABASE x' will delete the underlying storage as well."); -#endif - -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_bool(telemetry_enabled, false, - "Set to true to enable telemetry. We collect information about the " - "running system (CPU and memory information) and information about " - "the database runtime (vertex and edge counts and resource usage) " - "to allow for easier improvement of the product."); - -// Streams flags -// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_uint32( - stream_transaction_conflict_retries, 30, - "Number of times to retry when a stream transformation fails to commit because of conflicting transactions"); -// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_uint32( - stream_transaction_retry_interval, 500, - "Retry interval in milliseconds when a stream transformation fails to commit because of conflicting transactions"); -// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_string(kafka_bootstrap_servers, "", - "List of default Kafka brokers as a comma separated list of broker host or host:port."); - -// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_string(pulsar_service_url, "", "Default URL used while connecting to Pulsar brokers."); - -// Audit logging flags. -#ifdef MG_ENTERPRISE -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_bool(audit_enabled, false, "Set to true to enable audit logging."); -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_VALIDATED_int32(audit_buffer_size, memgraph::audit::kBufferSizeDefault, - "Maximum number of items in the audit log buffer.", FLAG_IN_RANGE(1, INT32_MAX)); -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_VALIDATED_int32(audit_buffer_flush_interval_ms, memgraph::audit::kBufferFlushIntervalMillisDefault, - "Interval (in milliseconds) used for flushing the audit log buffer.", - FLAG_IN_RANGE(10, INT32_MAX)); -#endif - -// Query flags. - -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_double(query_execution_timeout_sec, 600, - "Maximum allowed query execution time. Queries exceeding this " - "limit will be aborted. Value of 0 means no limit."); - -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_uint64(replication_replica_check_frequency_sec, 1, - "The time duration between two replica checks/pings. If < 1, replicas will NOT be checked at all. NOTE: " - "The MAIN instance allocates a new thread for each REPLICA."); -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_bool(replication_restore_state_on_startup, false, "Restore replication state on startup, e.g. recover replica"); - -// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_uint64( - memory_limit, 0, - "Total memory limit in MiB. Set to 0 to use the default values which are 100\% of the phyisical memory if the swap " - "is enabled and 90\% of the physical memory otherwise."); - -namespace { -using namespace std::literals; -inline constexpr std::array isolation_level_mappings{ - std::pair{"SNAPSHOT_ISOLATION"sv, memgraph::storage::IsolationLevel::SNAPSHOT_ISOLATION}, - std::pair{"READ_COMMITTED"sv, memgraph::storage::IsolationLevel::READ_COMMITTED}, - std::pair{"READ_UNCOMMITTED"sv, memgraph::storage::IsolationLevel::READ_UNCOMMITTED}}; - -const std::string isolation_level_help_string = - fmt::format("Default isolation level used for the transactions. Allowed values: {}", - memgraph::utils::GetAllowedEnumValuesString(isolation_level_mappings)); -} // namespace - -// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_VALIDATED_string(isolation_level, "SNAPSHOT_ISOLATION", isolation_level_help_string.c_str(), { - if (const auto result = memgraph::utils::IsValidEnumValueString(value, isolation_level_mappings); result.HasError()) { - const auto error = result.GetError(); - switch (error) { - case memgraph::utils::ValidationError::EmptyValue: { - std::cout << "Isolation level cannot be empty." << std::endl; - break; - } - case memgraph::utils::ValidationError::InvalidValue: { - std::cout << "Invalid value for isolation level. Allowed values: " - << memgraph::utils::GetAllowedEnumValuesString(isolation_level_mappings) << std::endl; - break; - } - } - return false; - } - - return true; -}); - -namespace { -memgraph::storage::IsolationLevel ParseIsolationLevel() { - const auto isolation_level = - memgraph::utils::StringToEnum(FLAGS_isolation_level, isolation_level_mappings); - MG_ASSERT(isolation_level, "Invalid isolation level"); - return *isolation_level; -} - -int64_t GetMemoryLimit() { - if (FLAGS_memory_limit == 0) { - auto maybe_total_memory = memgraph::utils::sysinfo::TotalMemory(); - MG_ASSERT(maybe_total_memory, "Failed to fetch the total physical memory"); - const auto maybe_swap_memory = memgraph::utils::sysinfo::SwapTotalMemory(); - MG_ASSERT(maybe_swap_memory, "Failed to fetch the total swap memory"); - - if (*maybe_swap_memory == 0) { - // take only 90% of the total memory - *maybe_total_memory *= 9; - *maybe_total_memory /= 10; - } - return *maybe_total_memory * 1024; - } - - // We parse the memory as MiB every time - return FLAGS_memory_limit * 1024 * 1024; -} -} // namespace - -namespace { -std::vector query_modules_directories; -} // namespace -DEFINE_VALIDATED_string(query_modules_directory, "", - "Directory where modules with custom query procedures are stored. " - "NOTE: Multiple comma-separated directories can be defined.", - { - query_modules_directories.clear(); - if (value.empty()) return true; - const auto directories = memgraph::utils::Split(value, ","); - for (const auto &dir : directories) { - if (!memgraph::utils::DirExists(dir)) { - std::cout << "Expected --" << flagname << " to point to directories." << std::endl; - std::cout << dir << " is not a directory." << std::endl; - return false; - } - } - query_modules_directories.reserve(directories.size()); - std::transform(directories.begin(), directories.end(), - std::back_inserter(query_modules_directories), - [](const auto &dir) { return dir; }); - return true; - }); - -// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_string(query_callable_mappings_path, "", - "The path to mappings that describes aliases to callables in cypher queries in the form of key-value " - "pairs in a json file. With this option query module procedures that do not exist in memgraph can be " - "mapped to ones that exist."); - -// Logging flags -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_HIDDEN_bool(also_log_to_stderr, false, "Log messages go to stderr in addition to logfiles"); -DEFINE_string(log_file, "", "Path to where the log should be stored."); - -namespace { -inline constexpr std::array log_level_mappings{ - std::pair{"TRACE"sv, spdlog::level::trace}, std::pair{"DEBUG"sv, spdlog::level::debug}, - std::pair{"INFO"sv, spdlog::level::info}, std::pair{"WARNING"sv, spdlog::level::warn}, - std::pair{"ERROR"sv, spdlog::level::err}, std::pair{"CRITICAL"sv, spdlog::level::critical}}; - -const std::string log_level_help_string = fmt::format("Minimum log level. Allowed values: {}", - memgraph::utils::GetAllowedEnumValuesString(log_level_mappings)); -} // namespace - -DEFINE_VALIDATED_string(log_level, "WARNING", log_level_help_string.c_str(), { - if (const auto result = memgraph::utils::IsValidEnumValueString(value, log_level_mappings); result.HasError()) { - const auto error = result.GetError(); - switch (error) { - case memgraph::utils::ValidationError::EmptyValue: { - std::cout << "Log level cannot be empty." << std::endl; - break; - } - case memgraph::utils::ValidationError::InvalidValue: { - std::cout << "Invalid value for log level. Allowed values: " - << memgraph::utils::GetAllowedEnumValuesString(log_level_mappings) << std::endl; - break; - } - } - return false; - } - - return true; -}); - -namespace { -spdlog::level::level_enum ParseLogLevel() { - const auto log_level = memgraph::utils::StringToEnum(FLAGS_log_level, log_level_mappings); - MG_ASSERT(log_level, "Invalid log level"); - return *log_level; -} - -// 5 weeks * 7 days -inline constexpr auto log_retention_count = 35; -void CreateLoggerFromSink(const auto &sinks, const auto log_level) { - auto logger = std::make_shared("memgraph_log", sinks.begin(), sinks.end()); - logger->set_level(log_level); - logger->flush_on(spdlog::level::trace); - spdlog::set_default_logger(std::move(logger)); -} - -void InitializeLogger() { - std::vector sinks; - - if (FLAGS_also_log_to_stderr) { - sinks.emplace_back(std::make_shared()); - } - - if (!FLAGS_log_file.empty()) { - // get local time - time_t current_time{0}; - struct tm *local_time{nullptr}; - - time(¤t_time); - local_time = localtime(¤t_time); - - sinks.emplace_back(std::make_shared( - FLAGS_log_file, local_time->tm_hour, local_time->tm_min, false, log_retention_count)); - } - CreateLoggerFromSink(sinks, ParseLogLevel()); -} - -void AddLoggerSink(spdlog::sink_ptr new_sink) { - auto default_logger = spdlog::default_logger(); - auto sinks = default_logger->sinks(); - sinks.push_back(new_sink); - CreateLoggerFromSink(sinks, default_logger->level()); -} - -} // namespace - -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_HIDDEN_string(license_key, "", "License key for Memgraph Enterprise."); -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_HIDDEN_string(organization_name, "", "Organization name."); -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_string(auth_user_or_role_name_regex, memgraph::glue::kDefaultUserRoleRegex.data(), - "Set to the regular expression that each user or role name must fulfill."); - void InitFromCypherlFile(memgraph::query::InterpreterContext &ctx, std::string cypherl_file_path, memgraph::audit::Log *audit_log = nullptr) { memgraph::query::Interpreter interpreter(&ctx); @@ -494,430 +65,6 @@ void InitFromCypherlFile(memgraph::query::InterpreterContext &ctx, std::string c file.close(); } -namespace memgraph::metrics { -extern const Event ActiveBoltSessions; -} // namespace memgraph::metrics - -auto ToQueryExtras(memgraph::communication::bolt::Value const &extra) -> memgraph::query::QueryExtras { - auto const &as_map = extra.ValueMap(); - - auto metadata_pv = std::map{}; - - if (auto const it = as_map.find("tx_metadata"); it != as_map.cend() && it->second.IsMap()) { - for (const auto &[key, bolt_md] : it->second.ValueMap()) { - metadata_pv.emplace(key, memgraph::glue::ToPropertyValue(bolt_md)); - } - } - - auto tx_timeout = std::optional{}; - if (auto const it = as_map.find("tx_timeout"); it != as_map.cend() && it->second.IsInt()) { - tx_timeout = it->second.ValueInt(); - } - - return memgraph::query::QueryExtras{std::move(metadata_pv), tx_timeout}; -} - -class SessionHL final : public memgraph::communication::bolt::Session { - public: - struct ContextWrapper { - explicit ContextWrapper(memgraph::dbms::SessionContext sc) - : session_context(sc), - interpreter(std::make_unique(session_context.interpreter_context.get())), - defunct_(false) { - session_context.interpreter_context->interpreters.WithLock( - [this](auto &interpreters) { interpreters.insert(interpreter.get()); }); - } - ~ContextWrapper() { Defunct(); } - - void Defunct() { - if (!defunct_) { - session_context.interpreter_context->interpreters.WithLock( - [this](auto &interpreters) { interpreters.erase(interpreter.get()); }); - defunct_ = true; - } - } - - ContextWrapper(const ContextWrapper &) = delete; - ContextWrapper &operator=(const ContextWrapper &) = delete; - - ContextWrapper(ContextWrapper &&in) noexcept - : session_context(std::move(in.session_context)), - interpreter(std::move(in.interpreter)), - defunct_(in.defunct_) { - in.defunct_ = true; - } - - ContextWrapper &operator=(ContextWrapper &&in) noexcept { - if (this != &in) { - Defunct(); - session_context = std::move(in.session_context); - interpreter = std::move(in.interpreter); - defunct_ = in.defunct_; - in.defunct_ = true; - } - return *this; - } - - memgraph::query::InterpreterContext *interpreter_context() { return session_context.interpreter_context.get(); } - memgraph::query::Interpreter *interp() { return interpreter.get(); } - memgraph::utils::Synchronized *auth() const { - return session_context.auth; - } -#ifdef MG_ENTERPRISE - memgraph::audit::Log *audit_log() const { return session_context.audit_log; } -#endif - std::string run_id() const { return session_context.run_id; } - bool defunct() const { return defunct_; } - - private: - memgraph::dbms::SessionContext session_context; - std::unique_ptr interpreter; - bool defunct_; - }; - - SessionHL( -#ifdef MG_ENTERPRISE - memgraph::dbms::SessionContextHandler &sc_handler, -#else - memgraph::dbms::SessionContext sc, -#endif - const memgraph::communication::v2::ServerEndpoint &endpoint, - memgraph::communication::v2::InputStream *input_stream, memgraph::communication::v2::OutputStream *output_stream, - const std::string &default_db = memgraph::dbms::kDefaultDB) // NOLINT - : memgraph::communication::bolt::Session(input_stream, output_stream), -#ifdef MG_ENTERPRISE - sc_handler_(sc_handler), - current_(sc_handler_.Get(default_db)), -#else - current_(sc), -#endif - interpreter_context_(current_.interpreter_context()), - interpreter_(current_.interp()), - auth_(current_.auth()), -#ifdef MG_ENTERPRISE - audit_log_(current_.audit_log()), -#endif - endpoint_(endpoint), - run_id_(current_.run_id()) { - memgraph::metrics::IncrementCounter(memgraph::metrics::ActiveBoltSessions); - } - - ~SessionHL() override { memgraph::metrics::DecrementCounter(memgraph::metrics::ActiveBoltSessions); } - - SessionHL(const SessionHL &) = delete; - SessionHL &operator=(const SessionHL &) = delete; - SessionHL(SessionHL &&) = delete; - SessionHL &operator=(SessionHL &&) = delete; - - void Configure(const std::map &run_time_info) override { -#ifdef MG_ENTERPRISE - std::string db; - bool update = false; - // Check if user explicitly defined the database to use - if (run_time_info.contains("db")) { - const auto &db_info = run_time_info.at("db"); - if (!db_info.IsString()) { - throw memgraph::communication::bolt::ClientError("Malformed database name."); - } - db = db_info.ValueString(); - update = db != current_.interpreter_context()->db->id(); - in_explicit_db_ = true; - // NOTE: Once in a transaction, the drivers stop explicitly sending the db and count on using it until commit - } else if (in_explicit_db_ && !interpreter_->in_explicit_transaction_) { // Just on a switch - db = GetDefaultDB(); - update = db != current_.interpreter_context()->db->id(); - in_explicit_db_ = false; - } - - // Check if the underlying database needs to be updated - if (update) { - sc_handler_.SetInPlace(db, [this](auto new_sc) mutable { - const auto &db_name = new_sc.interpreter_context->db->id(); - MultiDatabaseAuth(db_name); - try { - Update(ContextWrapper(new_sc)); - return memgraph::dbms::SetForResult::SUCCESS; - } catch (memgraph::dbms::UnknownDatabaseException &e) { - throw memgraph::communication::bolt::ClientError("No database named \"{}\" found!", db_name); - } - }); - } -#endif - } - - using TEncoder = memgraph::communication::bolt::Encoder< - memgraph::communication::bolt::ChunkedEncoderBuffer>; - - void BeginTransaction(const std::map &extra) override { - interpreter_->BeginTransaction(ToQueryExtras(extra)); - } - - void CommitTransaction() override { interpreter_->CommitTransaction(); } - - void RollbackTransaction() override { interpreter_->RollbackTransaction(); } - - std::pair, std::optional> Interpret( - const std::string &query, const std::map ¶ms, - const std::map &extra) override { - std::map params_pv; - for (const auto &[key, bolt_param] : params) { - params_pv.emplace(key, memgraph::glue::ToPropertyValue(bolt_param)); - } - const std::string *username{nullptr}; - if (user_) { - username = &user_->username(); - } - -#ifdef MG_ENTERPRISE - if (memgraph::license::global_license_checker.IsEnterpriseValidFast()) { - audit_log_->Record(endpoint_.address().to_string(), user_ ? *username : "", query, - memgraph::storage::PropertyValue(params_pv), interpreter_context_->db->id()); - } -#endif - try { - auto result = interpreter_->Prepare(query, params_pv, username, ToQueryExtras(extra), UUID()); - const std::string db_name = result.db ? *result.db : ""; - if (user_ && !memgraph::glue::AuthChecker::IsUserAuthorized(*user_, result.privileges, db_name)) { - interpreter_->Abort(); - if (db_name.empty()) { - throw memgraph::communication::bolt::ClientError( - "You are not authorized to execute this query! Please contact your database administrator."); - } - throw memgraph::communication::bolt::ClientError( - "You are not authorized to execute this query on database \"{}\"! Please contact your database " - "administrator.", - db_name); - } - return {result.headers, result.qid}; - - } catch (const memgraph::query::QueryException &e) { - // Wrap QueryException into ClientError, because we want to allow the - // client to fix their query. - throw memgraph::communication::bolt::ClientError(e.what()); - } catch (const memgraph::query::ReplicationException &e) { - throw memgraph::communication::bolt::ClientError(e.what()); - } - } - - std::map Pull(TEncoder *encoder, std::optional n, - std::optional qid) override { - TypedValueResultStream stream(encoder, interpreter_context_); - return PullResults(stream, n, qid); - } - - std::map Discard(std::optional n, - std::optional qid) override { - memgraph::query::DiscardValueResultStream stream; - return PullResults(stream, n, qid); - } - - void Abort() override { interpreter_->Abort(); } - - // Called during Init - // During Init, the user cannot choose the landing DB (switch is done during query execution) - bool Authenticate(const std::string &username, const std::string &password) override { - auto locked_auth = auth_->Lock(); - if (!locked_auth->HasUsers()) { - return true; - } - user_ = locked_auth->Authenticate(username, password); -#ifdef MG_ENTERPRISE - if (user_.has_value()) { - const auto &db = user_->db_access().GetDefault(); - // Check if the underlying database needs to be updated - if (db != current_.interpreter_context()->db->id()) { - const auto &res = sc_handler_.SetFor(UUID(), db); - return res == memgraph::dbms::SetForResult::SUCCESS || res == memgraph::dbms::SetForResult::ALREADY_SET; - } - } -#endif - return user_.has_value(); - } - - std::optional GetServerNameForInit() override { - if (FLAGS_bolt_server_name_for_init.empty()) return std::nullopt; - return FLAGS_bolt_server_name_for_init; - } - -#ifdef MG_ENTERPRISE - memgraph::dbms::SetForResult OnChange(const std::string &db_name) override { - MultiDatabaseAuth(db_name); - if (db_name != current_.interpreter_context()->db->id()) { - UpdateAndDefunct(db_name); // Done during Pull, so we cannot just replace the current db - return memgraph::dbms::SetForResult::SUCCESS; - } - return memgraph::dbms::SetForResult::ALREADY_SET; - } - - bool OnDelete(const std::string &db_name) override { - MG_ASSERT(current_.interpreter_context()->db->id() != db_name && (!defunct_ || defunct_->defunct()), - "Trying to delete a database while still in use."); - return true; - } -#endif - - std::string GetDatabaseName() const override { return interpreter_context_->db->id(); } - - private: - template - std::map PullResults(TStream &stream, std::optional n, - std::optional qid) { - try { - const auto &summary = interpreter_->Pull(&stream, n, qid); - std::map decoded_summary; - for (const auto &kv : summary) { - auto maybe_value = - memgraph::glue::ToBoltValue(kv.second, *interpreter_context_->db, memgraph::storage::View::NEW); - if (maybe_value.HasError()) { - switch (maybe_value.GetError()) { - case memgraph::storage::Error::DELETED_OBJECT: - case memgraph::storage::Error::SERIALIZATION_ERROR: - case memgraph::storage::Error::VERTEX_HAS_EDGES: - case memgraph::storage::Error::PROPERTIES_DISABLED: - case memgraph::storage::Error::NONEXISTENT_OBJECT: - throw memgraph::communication::bolt::ClientError("Unexpected storage error when streaming summary."); - } - } - decoded_summary.emplace(kv.first, std::move(*maybe_value)); - } - // Add this memgraph instance run_id, received from telemetry - // This is sent with every query, instead of only on bolt init inside - // communication/bolt/v1/states/init.hpp because neo4jdriver does not - // read the init message. - if (auto run_id = run_id_; run_id) { - decoded_summary.emplace("run_id", *run_id); - } - - // Clean up previous session (session gets defunct when switching between databases) - if (defunct_) { - defunct_.reset(); - } - - return decoded_summary; - } catch (const memgraph::query::QueryException &e) { - // Wrap QueryException into ClientError, because we want to allow the - // client to fix their query. - throw memgraph::communication::bolt::ClientError(e.what()); - } - } - -#ifdef MG_ENTERPRISE - /** - * @brief Update setup to the new database. - * - * @param db_name name of the target database - * @throws UnknownDatabaseException if handler cannot get it - */ - void UpdateAndDefunct(const std::string &db_name) { UpdateAndDefunct(ContextWrapper(sc_handler_.Get(db_name))); } - - void UpdateAndDefunct(ContextWrapper &&cntxt) { - defunct_.emplace(std::move(current_)); - Update(std::forward(cntxt)); - defunct_->Defunct(); - } - - void Update(const std::string &db_name) { - ContextWrapper tmp(sc_handler_.Get(db_name)); - Update(std::move(tmp)); - } - - void Update(ContextWrapper &&cntxt) { - current_ = std::move(cntxt); - interpreter_ = current_.interp(); - interpreter_->in_explicit_db_ = in_explicit_db_; - interpreter_context_ = current_.interpreter_context(); - } - - /** - * @brief Authenticate user on passed database. - * - * @param db database to check against - * @throws bolt::ClientError when user is not authorized - */ - void MultiDatabaseAuth(const std::string &db) { - if (user_ && !memgraph::glue::AuthChecker::IsUserAuthorized(*user_, {}, db)) { - throw memgraph::communication::bolt::ClientError( - "You are not authorized on the database \"{}\"! Please contact your database administrator.", db); - } - } - - /** - * @brief Get the user's default database - * - * @return std::string - */ - std::string GetDefaultDB() { - if (user_.has_value()) { - return user_->db_access().GetDefault(); - } - return memgraph::dbms::kDefaultDB; - } -#endif - - /// Wrapper around TEncoder which converts TypedValue to Value - /// before forwarding the calls to original TEncoder. - class TypedValueResultStream { - public: - TypedValueResultStream(TEncoder *encoder, memgraph::query::InterpreterContext *ic) - : encoder_(encoder), interpreter_context_(ic) {} - - void Result(const std::vector &values) { - std::vector decoded_values; - decoded_values.reserve(values.size()); - for (const auto &v : values) { - auto maybe_value = memgraph::glue::ToBoltValue(v, *interpreter_context_->db, memgraph::storage::View::NEW); - if (maybe_value.HasError()) { - switch (maybe_value.GetError()) { - case memgraph::storage::Error::DELETED_OBJECT: - throw memgraph::communication::bolt::ClientError("Returning a deleted object as a result."); - case memgraph::storage::Error::NONEXISTENT_OBJECT: - throw memgraph::communication::bolt::ClientError("Returning a nonexistent object as a result."); - case memgraph::storage::Error::VERTEX_HAS_EDGES: - case memgraph::storage::Error::SERIALIZATION_ERROR: - case memgraph::storage::Error::PROPERTIES_DISABLED: - throw memgraph::communication::bolt::ClientError("Unexpected storage error when streaming results."); - } - } - decoded_values.emplace_back(std::move(*maybe_value)); - } - encoder_->MessageRecord(decoded_values); - } - - private: - TEncoder *encoder_; - // NOTE: Needed only for ToBoltValue conversions - memgraph::query::InterpreterContext *interpreter_context_; - }; - -#ifdef MG_ENTERPRISE - memgraph::dbms::SessionContextHandler &sc_handler_; -#endif - ContextWrapper current_; - std::optional defunct_; - - memgraph::query::InterpreterContext *interpreter_context_; - memgraph::query::Interpreter *interpreter_; - memgraph::utils::Synchronized *auth_; - std::optional user_; -#ifdef MG_ENTERPRISE - memgraph::audit::Log *audit_log_; - bool in_explicit_db_{false}; //!< If true, the user has defined the database to use via metadata -#endif - memgraph::communication::v2::ServerEndpoint endpoint_; - // NOTE: run_id should be const but that complicates code a lot. - std::optional run_id_; -}; - -#ifdef MG_ENTERPRISE -using ServerT = memgraph::communication::v2::Server; -#else -using ServerT = memgraph::communication::v2::Server; -#endif -using MonitoringServerT = - memgraph::communication::http::Server, - memgraph::dbms::SessionContext>; using memgraph::communication::ServerContext; // Needed to correctly handle memgraph destruction from a signal handler. @@ -925,6 +72,7 @@ using memgraph::communication::ServerContext; // when we are exiting main, inside destructors of database::GraphDb and // similar. The signal handler may then initiate another shutdown on memgraph // which is in half destructed state, causing invalid memory access and crash. +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) volatile sig_atomic_t is_shutting_down = 0; void InitSignalHandlers(const std::function &shutdown_fun) { @@ -965,7 +113,7 @@ int main(int argc, char **argv) { exit(1); } - InitializeLogger(); + memgraph::flags::InitializeLogger(); // Unhandled exception handler init. std::set_terminate(&memgraph::utils::TerminateHandler); @@ -1049,7 +197,7 @@ int main(int argc, char **argv) { auto data_directory = std::filesystem::path(FLAGS_data_directory); - const auto memory_limit = GetMemoryLimit(); + const auto memory_limit = memgraph::flags::GetMemoryLimit(); // NOLINTNEXTLINE(bugprone-narrowing-conversions,cppcoreguidelines-narrowing-conversions) spdlog::info("Memory limit in config is set to {}", memgraph::utils::GetReadableSize(memory_limit)); memgraph::utils::total_memory_tracker.SetMaximumHardLimit(memory_limit); @@ -1112,7 +260,7 @@ int main(int argc, char **argv) { .items_per_batch = FLAGS_storage_items_per_batch, .recovery_thread_count = FLAGS_storage_recovery_thread_count, .allow_parallel_index_creation = FLAGS_storage_parallel_index_recovery}, - .transaction = {.isolation_level = ParseIsolationLevel()}, + .transaction = {.isolation_level = memgraph::flags::ParseIsolationLevel()}, .disk = {.main_storage_directory = FLAGS_data_directory + "/rocksdb_main_storage", .label_index_directory = FLAGS_data_directory + "/rocksdb_label_index", .label_property_index_directory = FLAGS_data_directory + "/rocksdb_label_property_index", @@ -1192,7 +340,8 @@ int main(int argc, char **argv) { auto *auth = session_context.auth; auto &interpreter_context = *session_context.interpreter_context; // TODO remove - memgraph::query::procedure::gModuleRegistry.SetModulesDirectory(query_modules_directories, FLAGS_data_directory); + memgraph::query::procedure::gModuleRegistry.SetModulesDirectory(memgraph::flags::ParseQueryModulesDirectory(), + FLAGS_data_directory); memgraph::query::procedure::gModuleRegistry.UnloadAndLoadModulesFromDirectories(); memgraph::query::procedure::gCallableAliasMapper.LoadMapping(FLAGS_query_callable_mappings_path); @@ -1239,11 +388,11 @@ int main(int argc, char **argv) { auto server_endpoint = memgraph::communication::v2::ServerEndpoint{ boost::asio::ip::address::from_string(FLAGS_bolt_address), static_cast(FLAGS_bolt_port)}; #ifdef MG_ENTERPRISE - ServerT server(server_endpoint, &sc_handler, &context, FLAGS_bolt_session_inactivity_timeout, service_name, - FLAGS_bolt_num_workers); + memgraph::glue::ServerT server(server_endpoint, &sc_handler, &context, FLAGS_bolt_session_inactivity_timeout, + service_name, FLAGS_bolt_num_workers); #else - ServerT server(server_endpoint, &session_context, &context, FLAGS_bolt_session_inactivity_timeout, service_name, - FLAGS_bolt_num_workers); + memgraph::glue::ServerT server(server_endpoint, &session_context, &context, FLAGS_bolt_session_inactivity_timeout, + service_name, FLAGS_bolt_num_workers); #endif const auto machine_id = memgraph::utils::GetMachineId(); @@ -1283,9 +432,9 @@ int main(int argc, char **argv) { memgraph::communication::websocket::SafeAuth websocket_auth{auth}; memgraph::communication::websocket::Server websocket_server{ {FLAGS_monitoring_address, static_cast(FLAGS_monitoring_port)}, &context, websocket_auth}; - AddLoggerSink(websocket_server.GetLoggingSink()); + memgraph::flags::AddLoggerSink(websocket_server.GetLoggingSink()); - MonitoringServerT metrics_server{ + memgraph::glue::MonitoringServerT metrics_server{ {FLAGS_metrics_address, static_cast(FLAGS_metrics_port)}, &session_context, &context}; #ifdef MG_ENTERPRISE diff --git a/src/query/procedure/module.cpp b/src/query/procedure/module.cpp index 895afad79..dc6b5ac73 100644 --- a/src/query/procedure/module.cpp +++ b/src/query/procedure/module.cpp @@ -1268,7 +1268,7 @@ void ModuleRegistry::UnloadAndLoadModulesFromDirectories() { ModulePtr ModuleRegistry::GetModuleNamed(const std::string_view name) const { std::shared_lock guard(lock_); auto found_it = modules_.find(name); - if (found_it == modules_.end()) return nullptr; + if (found_it == modules_.end()) return ModulePtr{nullptr}; return ModulePtr(found_it->second.get(), std::move(guard)); } diff --git a/src/query/procedure/module.hpp b/src/query/procedure/module.hpp index ccb56b2fc..8963173e7 100644 --- a/src/query/procedure/module.hpp +++ b/src/query/procedure/module.hpp @@ -60,7 +60,7 @@ class ModulePtr final { public: ModulePtr() = default; - ModulePtr(std::nullptr_t) {} + explicit ModulePtr(std::nullptr_t) {} ModulePtr(const Module *module, std::shared_lock lock) : module_(module), lock_(std::move(lock)) {} explicit operator bool() const { return static_cast(module_); } @@ -176,6 +176,7 @@ class ModuleRegistry final { }; /// Single, global module registry. +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) extern ModuleRegistry gModuleRegistry; /// Return the ModulePtr and `mgp_proc *` of the found procedure after resolving @@ -231,7 +232,7 @@ void ConstructArguments(const std::vector &args, const TCall &callab for (size_t i = 0; i < n_args; ++i) { auto arg = args[i]; std::string_view name; - const query::procedure::CypherType *type; + const query::procedure::CypherType *type = nullptr; if (is_not_optional_arg(i)) { name = callable.args[i].first; type = callable.args[i].second; diff --git a/src/utils/exceptions.hpp b/src/utils/exceptions.hpp index 0e8379adb..f89aa037b 100644 --- a/src/utils/exceptions.hpp +++ b/src/utils/exceptions.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -43,7 +43,7 @@ class BasicException : public std::exception { * * @param message The error message. */ - explicit BasicException(const std::string_view message) noexcept : msg_(message) {} + explicit BasicException(std::string_view message) noexcept : msg_(message) {} /** * @brief Constructor with format string (C++ STL strings). diff --git a/src/utils/signals.hpp b/src/utils/signals.hpp index 6d4314f93..82791b9fc 100644 --- a/src/utils/signals.hpp +++ b/src/utils/signals.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -39,10 +39,11 @@ enum class Signal : int { * This function ignores a signal for the whole process. That means that a * signal that is ignored from any thread will be ignored in all threads. */ -bool SignalIgnore(const Signal signal); +bool SignalIgnore(Signal signal); class SignalHandler { private: + // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) static std::map> handlers_; static void Handle(int signal); diff --git a/src/utils/terminate_handler.hpp b/src/utils/terminate_handler.hpp index 574ae664c..d1487a8e1 100644 --- a/src/utils/terminate_handler.hpp +++ b/src/utils/terminate_handler.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -24,7 +24,7 @@ namespace memgraph::utils { * about the abort please take a look at * http://en.cppreference.com/w/cpp/utility/program/abort. */ -void TerminateHandler(std::ostream &stream) noexcept { +inline void TerminateHandler(std::ostream &stream) noexcept { if (auto exc = std::current_exception()) { try { std::rethrow_exception(exc); @@ -40,6 +40,6 @@ void TerminateHandler(std::ostream &stream) noexcept { std::abort(); } -void TerminateHandler() noexcept { TerminateHandler(std::cout); } +inline void TerminateHandler() noexcept { TerminateHandler(std::cout); } } // namespace memgraph::utils diff --git a/src/version.hpp.in b/src/version.hpp.in index 7ba42f003..6734b0a44 100644 --- a/src/version.hpp.in +++ b/src/version.hpp.in @@ -10,4 +10,4 @@ #pragma once -static const char *version_string = "@VERSION_STRING@"; +static const char * const version_string = "@VERSION_STRING@";