Remove Kafka, auth and audit log from Memgraph HA

Summary: Removing Kafka, auth and audit log features from HA instance for the time being.

Reviewers: mferencevic, msantl

Reviewed By: mferencevic, msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1882
This commit is contained in:
Ivan Paljak 2019-02-26 15:13:41 +01:00
parent 4790d6458e
commit ed28ed873d
3 changed files with 23 additions and 43 deletions

View File

@ -11,8 +11,6 @@
#include "communication/server.hpp"
#include "database/single_node_ha/graph_db.hpp"
#include "integrations/kafka/exceptions.hpp"
#include "integrations/kafka/streams.hpp"
#include "memgraph_init.hpp"
#include "query/exceptions.hpp"
#include "utils/flag_validation.hpp"
@ -32,57 +30,19 @@ DEFINE_VALIDATED_int32(session_inactivity_timeout, 1800,
DEFINE_string(cert_file, "", "Certificate file to use.");
DEFINE_string(key_file, "", "Key file to use.");
// Audit logging flags.
DEFINE_bool(audit_enabled, false, "Set to true to enable audit logging.");
DEFINE_VALIDATED_int32(audit_buffer_size, audit::kBufferSizeDefault,
"Maximum number of items in the audit log buffer.",
FLAG_IN_RANGE(1, INT32_MAX));
DEFINE_VALIDATED_int32(
audit_buffer_flush_interval_ms, audit::kBufferFlushIntervalMillisDefault,
"Interval (in milliseconds) used for flushing the audit log buffer.",
FLAG_IN_RANGE(10, INT32_MAX));
using ServerT = communication::Server<BoltSession, SessionData>;
using communication::ServerContext;
void SingleNodeHAMain() {
google::SetUsageMessage("Memgraph high availability single-node database server");
google::SetUsageMessage(
"Memgraph high availability single-node database server");
auto durability_directory =
std::experimental::filesystem::path(FLAGS_durability_directory);
auth::Auth auth{durability_directory / "auth"};
audit::Log audit_log{durability_directory / "audit", FLAGS_audit_buffer_size,
FLAGS_audit_buffer_flush_interval_ms};
if (FLAGS_audit_enabled) {
audit_log.Start();
}
CHECK(utils::SignalHandler::RegisterHandler(
utils::Signal::User2, [&audit_log]() { audit_log.ReopenLog(); }))
<< "Unable to register SIGUSR2 handler!";
database::GraphDb db;
query::Interpreter interpreter;
SessionData session_data{&db, &interpreter, &auth, &audit_log};
integrations::kafka::Streams kafka_streams{
durability_directory / "streams",
[&session_data](
const std::string &query,
const std::map<std::string, communication::bolt::Value> &params) {
KafkaStreamWriter(session_data, query, params);
}};
try {
// Recover possible streams.
kafka_streams.Recover();
} catch (const integrations::kafka::KafkaStreamException &e) {
LOG(ERROR) << e.what();
}
session_data.interpreter->auth_ = &auth;
session_data.interpreter->kafka_streams_ = &kafka_streams;
SessionData session_data{&db, &interpreter, nullptr, nullptr};
ServerContext context;
std::string service_name = "Bolt";

View File

@ -43,10 +43,13 @@ std::vector<std::string> BoltSession::Interpret(
std::map<std::string, PropertyValue> params_pv;
for (const auto &kv : params)
params_pv.emplace(kv.first, glue::ToPropertyValue(kv.second));
#ifndef MG_SINGLE_NODE_HA
audit_log_->Record(endpoint_.address(), user_ ? user_->username() : "", query,
params_pv);
#endif
try {
auto result = transaction_engine_.Interpret(query, params_pv);
#ifndef MG_SINGLE_NODE_HA
if (user_) {
const auto &permissions = user_->GetPermissions();
for (const auto &privilege : result.second) {
@ -59,6 +62,7 @@ std::vector<std::string> BoltSession::Interpret(
}
}
}
#endif
return result.first;
} catch (const query::QueryException &e) {
@ -89,9 +93,13 @@ void BoltSession::Abort() { transaction_engine_.Abort(); }
bool BoltSession::Authenticate(const std::string &username,
const std::string &password) {
#ifdef MG_SINGLE_NODE_HA
return true;
#else
if (!auth_->HasUsers()) return true;
user_ = auth_->Authenticate(username, password);
return !!user_;
#endif
}
BoltSession::TypedValueResultStream::TypedValueResultStream(TEncoder *encoder)

View File

@ -18,6 +18,7 @@
#include "query/plan/planner.hpp"
#include "query/plan/profile.hpp"
#include "query/plan/vertex_count_cache.hpp"
#include "utils/exceptions.hpp"
#include "utils/flag_validation.hpp"
#include "utils/string.hpp"
#include "utils/tsc.hpp"
@ -865,17 +866,28 @@ Interpreter::Results Interpreter::operator()(
HandleIndexQuery(index_query, invalidate_plan_cache, &db_accessor);
} else if (auto *auth_query =
utils::Downcast<AuthQuery>(parsed_query.query)) {
#ifdef MG_SINGLE_NODE_HA
throw utils::NotYetImplemented(
"Managing user privileges is not yet supported in Memgraph HA "
"instance.");
#else
if (in_explicit_transaction) {
throw UserModificationInMulticommandTxException();
}
callback = HandleAuthQuery(auth_query, auth_, parameters, &db_accessor);
#endif
} else if (auto *stream_query =
utils::Downcast<StreamQuery>(parsed_query.query)) {
#ifdef MG_SINGLE_NODE_HA
throw utils::NotYetImplemented(
"Graph streams are not yet supported in Memgraph HA instance.");
#else
if (in_explicit_transaction) {
throw StreamClauseInMulticommandTxException();
}
callback = HandleStreamQuery(stream_query, kafka_streams_, parameters,
&db_accessor);
#endif
} else if (auto *info_query =
utils::Downcast<InfoQuery>(parsed_query.query)) {
callback = HandleInfoQuery(info_query, &db_accessor);