diff --git a/src/memgraph_ha.cpp b/src/memgraph_ha.cpp index 8ee4d109e..850bea04f 100644 --- a/src/memgraph_ha.cpp +++ b/src/memgraph_ha.cpp @@ -11,8 +11,6 @@ #include "communication/server.hpp" #include "database/single_node_ha/graph_db.hpp" -#include "integrations/kafka/exceptions.hpp" -#include "integrations/kafka/streams.hpp" #include "memgraph_init.hpp" #include "query/exceptions.hpp" #include "utils/flag_validation.hpp" @@ -32,57 +30,19 @@ DEFINE_VALIDATED_int32(session_inactivity_timeout, 1800, DEFINE_string(cert_file, "", "Certificate file to use."); DEFINE_string(key_file, "", "Key file to use."); -// Audit logging flags. -DEFINE_bool(audit_enabled, false, "Set to true to enable audit logging."); -DEFINE_VALIDATED_int32(audit_buffer_size, audit::kBufferSizeDefault, - "Maximum number of items in the audit log buffer.", - FLAG_IN_RANGE(1, INT32_MAX)); -DEFINE_VALIDATED_int32( - audit_buffer_flush_interval_ms, audit::kBufferFlushIntervalMillisDefault, - "Interval (in milliseconds) used for flushing the audit log buffer.", - FLAG_IN_RANGE(10, INT32_MAX)); - using ServerT = communication::Server; using communication::ServerContext; void SingleNodeHAMain() { - google::SetUsageMessage("Memgraph high availability single-node database server"); + google::SetUsageMessage( + "Memgraph high availability single-node database server"); auto durability_directory = std::experimental::filesystem::path(FLAGS_durability_directory); - auth::Auth auth{durability_directory / "auth"}; - - audit::Log audit_log{durability_directory / "audit", FLAGS_audit_buffer_size, - FLAGS_audit_buffer_flush_interval_ms}; - if (FLAGS_audit_enabled) { - audit_log.Start(); - } - CHECK(utils::SignalHandler::RegisterHandler( - utils::Signal::User2, [&audit_log]() { audit_log.ReopenLog(); })) - << "Unable to register SIGUSR2 handler!"; - database::GraphDb db; query::Interpreter interpreter; - SessionData session_data{&db, &interpreter, &auth, &audit_log}; - - integrations::kafka::Streams kafka_streams{ - durability_directory / "streams", - [&session_data]( - const std::string &query, - const std::map ¶ms) { - KafkaStreamWriter(session_data, query, params); - }}; - - try { - // Recover possible streams. - kafka_streams.Recover(); - } catch (const integrations::kafka::KafkaStreamException &e) { - LOG(ERROR) << e.what(); - } - - session_data.interpreter->auth_ = &auth; - session_data.interpreter->kafka_streams_ = &kafka_streams; + SessionData session_data{&db, &interpreter, nullptr, nullptr}; ServerContext context; std::string service_name = "Bolt"; diff --git a/src/memgraph_init.cpp b/src/memgraph_init.cpp index 9a7b807af..7be1d652b 100644 --- a/src/memgraph_init.cpp +++ b/src/memgraph_init.cpp @@ -43,10 +43,13 @@ std::vector BoltSession::Interpret( std::map params_pv; for (const auto &kv : params) params_pv.emplace(kv.first, glue::ToPropertyValue(kv.second)); +#ifndef MG_SINGLE_NODE_HA audit_log_->Record(endpoint_.address(), user_ ? user_->username() : "", query, params_pv); +#endif try { auto result = transaction_engine_.Interpret(query, params_pv); +#ifndef MG_SINGLE_NODE_HA if (user_) { const auto &permissions = user_->GetPermissions(); for (const auto &privilege : result.second) { @@ -59,6 +62,7 @@ std::vector BoltSession::Interpret( } } } +#endif return result.first; } catch (const query::QueryException &e) { @@ -89,9 +93,13 @@ void BoltSession::Abort() { transaction_engine_.Abort(); } bool BoltSession::Authenticate(const std::string &username, const std::string &password) { +#ifdef MG_SINGLE_NODE_HA + return true; +#else if (!auth_->HasUsers()) return true; user_ = auth_->Authenticate(username, password); return !!user_; +#endif } BoltSession::TypedValueResultStream::TypedValueResultStream(TEncoder *encoder) diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 9308912c4..4bc1e9bcb 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -18,6 +18,7 @@ #include "query/plan/planner.hpp" #include "query/plan/profile.hpp" #include "query/plan/vertex_count_cache.hpp" +#include "utils/exceptions.hpp" #include "utils/flag_validation.hpp" #include "utils/string.hpp" #include "utils/tsc.hpp" @@ -865,17 +866,28 @@ Interpreter::Results Interpreter::operator()( HandleIndexQuery(index_query, invalidate_plan_cache, &db_accessor); } else if (auto *auth_query = utils::Downcast(parsed_query.query)) { +#ifdef MG_SINGLE_NODE_HA + throw utils::NotYetImplemented( + "Managing user privileges is not yet supported in Memgraph HA " + "instance."); +#else if (in_explicit_transaction) { throw UserModificationInMulticommandTxException(); } callback = HandleAuthQuery(auth_query, auth_, parameters, &db_accessor); +#endif } else if (auto *stream_query = utils::Downcast(parsed_query.query)) { +#ifdef MG_SINGLE_NODE_HA + throw utils::NotYetImplemented( + "Graph streams are not yet supported in Memgraph HA instance."); +#else if (in_explicit_transaction) { throw StreamClauseInMulticommandTxException(); } callback = HandleStreamQuery(stream_query, kafka_streams_, parameters, &db_accessor); +#endif } else if (auto *info_query = utils::Downcast(parsed_query.query)) { callback = HandleInfoQuery(info_query, &db_accessor);