Allow non-HA instance in MG_ENTERPRISE

This commit is contained in:
Andi Skrgat 2024-03-21 08:52:58 +01:00
parent 16b3df3b77
commit 8688c7361a
8 changed files with 39 additions and 36 deletions

View File

@ -636,11 +636,7 @@ auto CoordinatorInstance::HasReplicaState(std::string_view instance_name) const
return raft_state_.HasReplicaState(instance_name); return raft_state_.HasReplicaState(instance_name);
} }
// TODO: (andi) Remove accepting param auto CoordinatorInstance::GetRoutingTable() const -> RoutingTable { return raft_state_.GetRoutingTable(); }
auto CoordinatorInstance::GetRoutingTable(std::map<std::string, std::string> const & /*routing*/) const
-> RoutingTable {
return raft_state_.GetRoutingTable();
}
} // namespace memgraph::coordination } // namespace memgraph::coordination
#endif #endif

View File

@ -99,10 +99,10 @@ auto CoordinatorState::AddCoordinatorInstance(coordination::CoordinatorToCoordin
return std::get<CoordinatorInstance>(data_).AddCoordinatorInstance(config); return std::get<CoordinatorInstance>(data_).AddCoordinatorInstance(config);
} }
auto CoordinatorState::GetRoutingTable(std::map<std::string, std::string> const &routing) -> RoutingTable { auto CoordinatorState::GetRoutingTable() -> RoutingTable {
MG_ASSERT(std::holds_alternative<CoordinatorInstance>(data_), MG_ASSERT(std::holds_alternative<CoordinatorInstance>(data_),
"Coordinator cannot get routing table since variant holds wrong alternative"); "Coordinator cannot get routing table since variant holds wrong alternative");
return std::get<CoordinatorInstance>(data_).GetRoutingTable(routing); return std::get<CoordinatorInstance>(data_).GetRoutingTable();
} }
} // namespace memgraph::coordination } // namespace memgraph::coordination

View File

@ -56,7 +56,7 @@ class CoordinatorInstance {
auto AddCoordinatorInstance(CoordinatorToCoordinatorConfig const &config) -> void; auto AddCoordinatorInstance(CoordinatorToCoordinatorConfig const &config) -> void;
auto GetRoutingTable(std::map<std::string, std::string> const &routing) const -> RoutingTable; auto GetRoutingTable() const -> RoutingTable;
static auto ChooseMostUpToDateInstance(std::span<InstanceNameDbHistories> histories) -> NewMainRes; static auto ChooseMostUpToDateInstance(std::span<InstanceNameDbHistories> histories) -> NewMainRes;

View File

@ -48,7 +48,7 @@ class CoordinatorState {
// NOTE: The client code must check that the server exists before calling this method. // NOTE: The client code must check that the server exists before calling this method.
auto GetCoordinatorServer() const -> CoordinatorServer &; auto GetCoordinatorServer() const -> CoordinatorServer &;
auto GetRoutingTable(std::map<std::string, std::string> const &routing) -> RoutingTable; auto GetRoutingTable() -> RoutingTable;
private: private:
struct CoordinatorMainReplicaData { struct CoordinatorMainReplicaData {

View File

@ -410,17 +410,21 @@ int main(int argc, char **argv) {
using memgraph::coordination::CoordinatorInstanceInitConfig; using memgraph::coordination::CoordinatorInstanceInitConfig;
using memgraph::coordination::CoordinatorState; using memgraph::coordination::CoordinatorState;
using memgraph::coordination::ReplicationInstanceInitConfig; using memgraph::coordination::ReplicationInstanceInitConfig;
auto coordinator_state = [&]() -> CoordinatorState { std::optional<CoordinatorState> coordinator_state{std::nullopt};
if (FLAGS_coordinator_id && FLAGS_coordinator_port) { if (FLAGS_management_port && (FLAGS_coordinator_id || FLAGS_coordinator_port)) {
return CoordinatorState{CoordinatorInstanceInitConfig{.coordinator_id = FLAGS_coordinator_id, throw std::runtime_error(
"Coordinator cannot be started with both coordinator_id/port and management_port. Specify coordinator_id and "
"port for coordinator instance and management port for replication instance.");
}
if (FLAGS_coordinator_id && FLAGS_coordinator_port) {
coordinator_state.emplace(CoordinatorInstanceInitConfig{.coordinator_id = FLAGS_coordinator_id,
.coordinator_port = FLAGS_coordinator_port, .coordinator_port = FLAGS_coordinator_port,
.bolt_port = FLAGS_bolt_port}}; .bolt_port = FLAGS_bolt_port});
} }
if (FLAGS_management_port) { if (FLAGS_management_port) {
return CoordinatorState{ReplicationInstanceInitConfig{.management_port = FLAGS_management_port}}; coordinator_state.emplace(ReplicationInstanceInitConfig{.management_port = FLAGS_management_port});
} }
throw std::runtime_error("Coordination or replication instances must be started with valid flags!");
}();
#endif #endif
memgraph::dbms::DbmsHandler dbms_handler(db_config, repl_state memgraph::dbms::DbmsHandler dbms_handler(db_config, repl_state
@ -443,19 +447,20 @@ int main(int argc, char **argv) {
#ifdef MG_ENTERPRISE #ifdef MG_ENTERPRISE
// MAIN or REPLICA instance // MAIN or REPLICA instance
if (FLAGS_management_port) { if (FLAGS_management_port) {
memgraph::dbms::CoordinatorHandlers::Register(coordinator_state.GetCoordinatorServer(), replication_handler); memgraph::dbms::CoordinatorHandlers::Register(coordinator_state->GetCoordinatorServer(), replication_handler);
MG_ASSERT(coordinator_state.GetCoordinatorServer().Start(), "Failed to start coordinator server!"); MG_ASSERT(coordinator_state->GetCoordinatorServer().Start(), "Failed to start coordinator server!");
} }
#endif #endif
auto db_acc = dbms_handler.Get(); auto db_acc = dbms_handler.Get();
memgraph::query::InterpreterContext interpreter_context_(interp_config, &dbms_handler, &repl_state, system, memgraph::query::InterpreterContext interpreter_context_(
interp_config, &dbms_handler, &repl_state, system,
#ifdef MG_ENTERPRISE #ifdef MG_ENTERPRISE
&coordinator_state, coordinator_state ? std::optional<std::reference_wrapper<CoordinatorState>>{std::ref(*coordinator_state)}
: std::nullopt,
#endif #endif
auth_handler.get(), auth_checker.get(), auth_handler.get(), auth_checker.get(), &replication_handler);
&replication_handler);
MG_ASSERT(db_acc, "Failed to access the main database"); MG_ASSERT(db_acc, "Failed to access the main database");
memgraph::query::procedure::gModuleRegistry.SetModulesDirectory(memgraph::flags::ParseQueryModulesDirectory(), memgraph::query::procedure::gModuleRegistry.SetModulesDirectory(memgraph::flags::ParseQueryModulesDirectory(),

View File

@ -4299,8 +4299,10 @@ void Interpreter::RollbackTransaction() {
#ifdef MG_ENTERPRISE #ifdef MG_ENTERPRISE
auto Interpreter::Route(std::map<std::string, std::string> const &routing) -> RouteResult { auto Interpreter::Route(std::map<std::string, std::string> const &routing) -> RouteResult {
// TODO: (andi) Test if (!interpreter_context_->coordinator_state_) {
if (!FLAGS_coordinator_id) { throw QueryException("You cannot fetch routing table from an instance which is not part of a cluster.");
}
if (FLAGS_management_port) {
auto const &address = routing.find("address"); auto const &address = routing.find("address");
if (address == routing.end()) { if (address == routing.end()) {
throw QueryException("Routing table must contain address field."); throw QueryException("Routing table must contain address field.");
@ -4315,7 +4317,7 @@ auto Interpreter::Route(std::map<std::string, std::string> const &routing) -> Ro
return result; return result;
} }
return RouteResult{.servers = interpreter_context_->coordinator_state_->GetRoutingTable(routing)}; return RouteResult{.servers = interpreter_context_->coordinator_state_->get().GetRoutingTable()};
} }
#endif #endif

View File

@ -15,18 +15,18 @@
#include "system/include/system/system.hpp" #include "system/include/system/system.hpp"
namespace memgraph::query { namespace memgraph::query {
InterpreterContext::InterpreterContext(InterpreterConfig interpreter_config, dbms::DbmsHandler *dbms_handler, InterpreterContext::InterpreterContext(
replication::ReplicationState *rs, memgraph::system::System &system, InterpreterConfig interpreter_config, dbms::DbmsHandler *dbms_handler, replication::ReplicationState *rs,
memgraph::system::System &system,
#ifdef MG_ENTERPRISE #ifdef MG_ENTERPRISE
memgraph::coordination::CoordinatorState *coordinator_state, std::optional<std::reference_wrapper<memgraph::coordination::CoordinatorState>> const &coordinator_state,
#endif #endif
AuthQueryHandler *ah, AuthChecker *ac, AuthQueryHandler *ah, AuthChecker *ac, ReplicationQueryHandler *replication_handler)
ReplicationQueryHandler *replication_handler)
: dbms_handler(dbms_handler), : dbms_handler(dbms_handler),
config(interpreter_config), config(interpreter_config),
repl_state(rs), repl_state(rs),
#ifdef MG_ENTERPRISE #ifdef MG_ENTERPRISE
coordinator_state_{coordinator_state}, coordinator_state_(coordinator_state),
#endif #endif
auth(ah), auth(ah),
auth_checker(ac), auth_checker(ac),

View File

@ -57,7 +57,7 @@ struct InterpreterContext {
InterpreterContext(InterpreterConfig interpreter_config, dbms::DbmsHandler *dbms_handler, InterpreterContext(InterpreterConfig interpreter_config, dbms::DbmsHandler *dbms_handler,
replication::ReplicationState *rs, memgraph::system::System &system, replication::ReplicationState *rs, memgraph::system::System &system,
#ifdef MG_ENTERPRISE #ifdef MG_ENTERPRISE
memgraph::coordination::CoordinatorState *coordinator_state, std::optional<std::reference_wrapper<coordination::CoordinatorState>> const &coordinator_state,
#endif #endif
AuthQueryHandler *ah = nullptr, AuthChecker *ac = nullptr, AuthQueryHandler *ah = nullptr, AuthChecker *ac = nullptr,
ReplicationQueryHandler *replication_handler = nullptr); ReplicationQueryHandler *replication_handler = nullptr);
@ -72,7 +72,7 @@ struct InterpreterContext {
// GLOBAL // GLOBAL
memgraph::replication::ReplicationState *repl_state; memgraph::replication::ReplicationState *repl_state;
#ifdef MG_ENTERPRISE #ifdef MG_ENTERPRISE
memgraph::coordination::CoordinatorState *coordinator_state_; std::optional<std::reference_wrapper<coordination::CoordinatorState>> coordinator_state_;
#endif #endif
AuthQueryHandler *auth; AuthQueryHandler *auth;