diff --git a/src/query/frontend/ast/ast.hpp b/src/query/frontend/ast/ast.hpp index f136975bc..29f7be3cf 100644 --- a/src/query/frontend/ast/ast.hpp +++ b/src/query/frontend/ast/ast.hpp @@ -26,6 +26,11 @@ namespace memgraph::query { +constexpr std::string_view kBoltServer = "bolt_server"; +constexpr std::string_view kReplicationServer = "replication_server"; +constexpr std::string_view kCoordinatorServer = "coordinator_server"; +constexpr std::string_view kManagementServer = "management_server"; + struct LabelIx { static const utils::TypeInfo kType; const utils::TypeInfo &GetTypeInfo() const { return kType; } @@ -3140,24 +3145,21 @@ class CoordinatorQuery : public memgraph::query::Query { DEFVISITABLE(QueryVisitor); memgraph::query::CoordinatorQuery::Action action_; - std::string instance_name_; - memgraph::query::Expression *replication_socket_address_{nullptr}; - memgraph::query::Expression *coordinator_socket_address_{nullptr}; - memgraph::query::Expression *raft_socket_address_{nullptr}; - memgraph::query::Expression *raft_server_id_{nullptr}; + std::string instance_name_{}; + std::unordered_map configs_; + memgraph::query::Expression *coordinator_server_id_{nullptr}; memgraph::query::CoordinatorQuery::SyncMode sync_mode_; CoordinatorQuery *Clone(AstStorage *storage) const override { auto *object = storage->Create(); + object->action_ = action_; object->instance_name_ = instance_name_; - object->replication_socket_address_ = - replication_socket_address_ ? replication_socket_address_->Clone(storage) : nullptr; + object->coordinator_server_id_ = coordinator_server_id_ ? coordinator_server_id_->Clone(storage) : nullptr; object->sync_mode_ = sync_mode_; - object->coordinator_socket_address_ = - coordinator_socket_address_ ? coordinator_socket_address_->Clone(storage) : nullptr; - object->raft_socket_address_ = raft_socket_address_ ? raft_socket_address_->Clone(storage) : nullptr; - object->raft_server_id_ = raft_server_id_ ? raft_server_id_->Clone(storage) : nullptr; + for (const auto &[key, value] : configs_) { + object->configs_[key->Clone(storage)] = value->Clone(storage); + } return object; } diff --git a/src/query/frontend/ast/cypher_main_visitor.cpp b/src/query/frontend/ast/cypher_main_visitor.cpp index ceebe2815..6da48c97c 100644 --- a/src/query/frontend/ast/cypher_main_visitor.cpp +++ b/src/query/frontend/ast/cypher_main_visitor.cpp @@ -398,24 +398,17 @@ antlrcpp::Any CypherMainVisitor::visitRegisterReplica(MemgraphCypher::RegisterRe antlrcpp::Any CypherMainVisitor::visitRegisterInstanceOnCoordinator( MemgraphCypher::RegisterInstanceOnCoordinatorContext *ctx) { auto *coordinator_query = storage_->Create(); - if (!ctx->replicationSocketAddress()->literal()->StringLiteral()) { - throw SemanticException("Replication socket address should be a string literal!"); - } - if (!ctx->coordinatorSocketAddress()->literal()->StringLiteral()) { - throw SemanticException("Coordinator socket address should be a string literal!"); - } coordinator_query->action_ = CoordinatorQuery::Action::REGISTER_INSTANCE; - coordinator_query->replication_socket_address_ = - std::any_cast(ctx->replicationSocketAddress()->accept(this)); - coordinator_query->coordinator_socket_address_ = - std::any_cast(ctx->coordinatorSocketAddress()->accept(this)); coordinator_query->instance_name_ = std::any_cast(ctx->instanceName()->symbolicName()->accept(this)); - if (ctx->ASYNC()) { - coordinator_query->sync_mode_ = memgraph::query::CoordinatorQuery::SyncMode::ASYNC; - } else { - coordinator_query->sync_mode_ = memgraph::query::CoordinatorQuery::SyncMode::SYNC; - } + coordinator_query->configs_ = + std::any_cast>(ctx->configsMap->accept(this)); + coordinator_query->sync_mode_ = [ctx]() { + if (ctx->ASYNC()) { + return CoordinatorQuery::SyncMode::ASYNC; + } + return CoordinatorQuery::SyncMode::SYNC; + }(); return coordinator_query; } @@ -431,17 +424,10 @@ antlrcpp::Any CypherMainVisitor::visitUnregisterInstanceOnCoordinator( antlrcpp::Any CypherMainVisitor::visitAddCoordinatorInstance(MemgraphCypher::AddCoordinatorInstanceContext *ctx) { auto *coordinator_query = storage_->Create(); - if (!ctx->raftSocketAddress()->literal()->StringLiteral()) { - throw SemanticException("Raft socket address should be a string literal!"); - } - - if (!ctx->raftServerId()->literal()->numberLiteral()) { - throw SemanticException("Raft server id should be a number literal!"); - } - coordinator_query->action_ = CoordinatorQuery::Action::ADD_COORDINATOR_INSTANCE; - coordinator_query->raft_socket_address_ = std::any_cast(ctx->raftSocketAddress()->accept(this)); - coordinator_query->raft_server_id_ = std::any_cast(ctx->raftServerId()->accept(this)); + coordinator_query->coordinator_server_id_ = std::any_cast(ctx->coordinatorServerId()->accept(this)); + coordinator_query->configs_ = + std::any_cast>(ctx->configsMap->accept(this)); return coordinator_query; } diff --git a/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 b/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 index 0147bba04..378310c22 100644 --- a/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 +++ b/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 @@ -388,22 +388,22 @@ instanceName : symbolicName ; socketAddress : literal ; -coordinatorSocketAddress : literal ; -replicationSocketAddress : literal ; -raftSocketAddress : literal ; - registerReplica : REGISTER REPLICA instanceName ( SYNC | ASYNC ) TO socketAddress ; -registerInstanceOnCoordinator : REGISTER INSTANCE instanceName ON coordinatorSocketAddress ( AS ASYNC ) ? WITH replicationSocketAddress ; +configKeyValuePair : literal ':' literal ; + +configMap : '{' ( configKeyValuePair ( ',' configKeyValuePair )* )? '}' ; + +registerInstanceOnCoordinator : REGISTER INSTANCE instanceName ( AS ASYNC ) ? WITH CONFIG configsMap=configMap ; unregisterInstanceOnCoordinator : UNREGISTER INSTANCE instanceName ; setInstanceToMain : SET INSTANCE instanceName TO MAIN ; -raftServerId : literal ; +coordinatorServerId : literal ; -addCoordinatorInstance : ADD COORDINATOR raftServerId ON raftSocketAddress ; +addCoordinatorInstance : ADD COORDINATOR coordinatorServerId WITH CONFIG configsMap=configMap ; dropReplica : DROP REPLICA instanceName ; @@ -457,10 +457,6 @@ commonCreateStreamConfig : TRANSFORM transformationName=procedureName createStream : kafkaCreateStream | pulsarCreateStream ; -configKeyValuePair : literal ':' literal ; - -configMap : '{' ( configKeyValuePair ( ',' configKeyValuePair )* )? '}' ; - kafkaCreateStreamConfig : TOPICS topicNames | CONSUMER_GROUP consumerGroup=symbolicNameWithDotsAndMinus | BOOTSTRAP_SERVERS bootstrapServers=literal diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index ce74586d3..a5c81cc72 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -1146,6 +1146,27 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters & } #ifdef MG_ENTERPRISE + +auto ParseConfigMap(std::unordered_map const &config_map, + ExpressionVisitor &evaluator) + -> std::optional>> { + if (std::ranges::any_of(config_map, [&evaluator](const auto &entry) { + auto key_expr = entry.first->Accept(evaluator); + auto value_expr = entry.second->Accept(evaluator); + return !key_expr.IsString() || !value_expr.IsString(); + })) { + spdlog::error("Config map must contain only string keys and values!"); + return std::nullopt; + } + + return ranges::views::all(config_map) | ranges::views::transform([&evaluator](const auto &entry) { + auto key_expr = entry.first->Accept(evaluator); + auto value_expr = entry.second->Accept(evaluator); + return std::pair{key_expr.ValueString(), value_expr.ValueString()}; + }) | + ranges::to>>; +} + Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Parameters ¶meters, coordination::CoordinatorState *coordinator_state, const query::InterpreterConfig &config, std::vector *notifications) { @@ -1173,17 +1194,37 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param EvaluationContext evaluation_context{.timestamp = QueryTimestamp(), .parameters = parameters}; auto evaluator = PrimitiveLiteralExpressionEvaluator{evaluation_context}; - auto raft_socket_address_tv = coordinator_query->raft_socket_address_->Accept(evaluator); - auto raft_server_id_tv = coordinator_query->raft_server_id_->Accept(evaluator); - callback.fn = [handler = CoordQueryHandler{*coordinator_state}, raft_socket_address_tv, - raft_server_id_tv]() mutable { - handler.AddCoordinatorInstance(raft_server_id_tv.ValueInt(), std::string(raft_socket_address_tv.ValueString())); + auto config_map = ParseConfigMap(coordinator_query->configs_, evaluator); + if (!config_map) { + throw QueryRuntimeException("Failed to parse config map!"); + } + + if (config_map->size() != 2) { + throw QueryRuntimeException("Config map must contain exactly 2 entries: {} and !", kCoordinatorServer, + kBoltServer); + } + + auto const &coordinator_server_it = config_map->find(kCoordinatorServer); + if (coordinator_server_it == config_map->end()) { + throw QueryRuntimeException("Config map must contain {} entry!", kCoordinatorServer); + } + + auto const &bolt_server_it = config_map->find(kBoltServer); + if (bolt_server_it == config_map->end()) { + throw QueryRuntimeException("Config map must contain {} entry!", kBoltServer); + } + + auto coord_server_id = coordinator_query->coordinator_server_id_->Accept(evaluator).ValueInt(); + + callback.fn = [handler = CoordQueryHandler{*coordinator_state}, coord_server_id, + coordinator_server = coordinator_server_it->second]() mutable { + handler.AddCoordinatorInstance(coord_server_id, coordinator_server); return std::vector>(); }; notifications->emplace_back(SeverityLevel::INFO, NotificationCode::ADD_COORDINATOR_INSTANCE, fmt::format("Coordinator has added instance {} on coordinator server {}.", - coordinator_query->instance_name_, raft_socket_address_tv.ValueString())); + coordinator_query->instance_name_, coordinator_server_it->second)); return callback; } case CoordinatorQuery::Action::REGISTER_INSTANCE: { @@ -1194,27 +1235,49 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param // the argument to Callback. EvaluationContext evaluation_context{.timestamp = QueryTimestamp(), .parameters = parameters}; auto evaluator = PrimitiveLiteralExpressionEvaluator{evaluation_context}; + auto config_map = ParseConfigMap(coordinator_query->configs_, evaluator); - auto coordinator_socket_address_tv = coordinator_query->coordinator_socket_address_->Accept(evaluator); - auto replication_socket_address_tv = coordinator_query->replication_socket_address_->Accept(evaluator); - callback.fn = [handler = CoordQueryHandler{*coordinator_state}, coordinator_socket_address_tv, - replication_socket_address_tv, + if (!config_map) { + throw QueryRuntimeException("Failed to parse config map!"); + } + + if (config_map->size() != 3) { + throw QueryRuntimeException("Config map must contain exactly 3 entries: {}, {} and {}!", kBoltServer, + kManagementServer, kReplicationServer); + } + + auto const &replication_server_it = config_map->find(kReplicationServer); + if (replication_server_it == config_map->end()) { + throw QueryRuntimeException("Config map must contain {} entry!", kReplicationServer); + } + + auto const &management_server_it = config_map->find(kManagementServer); + if (management_server_it == config_map->end()) { + throw QueryRuntimeException("Config map must contain {} entry!", kManagementServer); + } + + auto const &bolt_server_it = config_map->find(kBoltServer); + if (bolt_server_it == config_map->end()) { + throw QueryRuntimeException("Config map must contain {} entry!", kBoltServer); + } + + callback.fn = [handler = CoordQueryHandler{*coordinator_state}, instance_health_check_frequency_sec = config.instance_health_check_frequency_sec, + management_server = management_server_it->second, + replication_server = replication_server_it->second, bolt_server = bolt_server_it->second, instance_name = coordinator_query->instance_name_, instance_down_timeout_sec = config.instance_down_timeout_sec, instance_get_uuid_frequency_sec = config.instance_get_uuid_frequency_sec, sync_mode = coordinator_query->sync_mode_]() mutable { - handler.RegisterReplicationInstance(std::string(coordinator_socket_address_tv.ValueString()), - std::string(replication_socket_address_tv.ValueString()), - instance_health_check_frequency_sec, instance_down_timeout_sec, - instance_get_uuid_frequency_sec, instance_name, sync_mode); + handler.RegisterReplicationInstance(management_server, replication_server, instance_health_check_frequency_sec, + instance_down_timeout_sec, instance_get_uuid_frequency_sec, instance_name, + sync_mode); return std::vector>(); }; - notifications->emplace_back( - SeverityLevel::INFO, NotificationCode::REGISTER_REPLICATION_INSTANCE, - fmt::format("Coordinator has registered coordinator server on {} for instance {}.", - coordinator_socket_address_tv.ValueString(), coordinator_query->instance_name_)); + notifications->emplace_back(SeverityLevel::INFO, NotificationCode::REGISTER_REPLICATION_INSTANCE, + fmt::format("Coordinator has registered replication instance on {} for instance {}.", + bolt_server_it->second, coordinator_query->instance_name_)); return callback; } case CoordinatorQuery::Action::UNREGISTER_INSTANCE: diff --git a/tests/e2e/high_availability/coord_cluster_registration.py b/tests/e2e/high_availability/coord_cluster_registration.py index 774c6dca1..13aaf27fe 100644 --- a/tests/e2e/high_availability/coord_cluster_registration.py +++ b/tests/e2e/high_availability/coord_cluster_registration.py @@ -117,17 +117,26 @@ def test_register_repl_instances_then_coordinators(): coordinator3_cursor = connect(host="localhost", port=7692).cursor() execute_and_fetch_all( - coordinator3_cursor, "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'" + coordinator3_cursor, + "REGISTER INSTANCE instance_1 WITH CONFIG {'bolt_server': '127.0.0.1:7687', 'management_server': '127.0.0.1:10011', 'replication_server': '127.0.0.1:10001'};", ) execute_and_fetch_all( - coordinator3_cursor, "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'" + coordinator3_cursor, + "REGISTER INSTANCE instance_2 WITH CONFIG {'bolt_server': '127.0.0.1:7688', 'management_server': '127.0.0.1:10012', 'replication_server': '127.0.0.1:10002'};", ) execute_and_fetch_all( - coordinator3_cursor, "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'" + coordinator3_cursor, + "REGISTER INSTANCE instance_3 WITH CONFIG {'bolt_server': '127.0.0.1:7689', 'management_server': '127.0.0.1:10013', 'replication_server': '127.0.0.1:10003'};", ) execute_and_fetch_all(coordinator3_cursor, "SET INSTANCE instance_3 TO MAIN") - assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 1 ON '127.0.0.1:10111'") - assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 2 ON '127.0.0.1:10112'") + assert add_coordinator( + coordinator3_cursor, + "ADD COORDINATOR 1 WITH CONFIG {'bolt_server': '127.0.0.1:7690', 'coordinator_server': '127.0.0.1:10111'}", + ) + assert add_coordinator( + coordinator3_cursor, + "ADD COORDINATOR 2 WITH CONFIG {'bolt_server': '127.0.0.1:7691', 'coordinator_server': '127.0.0.1:10112'}", + ) def check_coordinator3(): return sorted(list(execute_and_fetch_all(coordinator3_cursor, "SHOW INSTANCES"))) @@ -172,16 +181,25 @@ def test_register_coordinator_then_repl_instances(): coordinator3_cursor = connect(host="localhost", port=7692).cursor() - assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 1 ON '127.0.0.1:10111'") - assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 2 ON '127.0.0.1:10112'") - execute_and_fetch_all( - coordinator3_cursor, "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'" + assert add_coordinator( + coordinator3_cursor, + "ADD COORDINATOR 1 WITH CONFIG {'bolt_server': '127.0.0.1:7690', 'coordinator_server': '127.0.0.1:10111'}", + ) + assert add_coordinator( + coordinator3_cursor, + "ADD COORDINATOR 2 WITH CONFIG {'bolt_server': '127.0.0.1:7691', 'coordinator_server': '127.0.0.1:10112'}", ) execute_and_fetch_all( - coordinator3_cursor, "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'" + coordinator3_cursor, + "REGISTER INSTANCE instance_1 WITH CONFIG {'bolt_server': '127.0.0.1:7687', 'management_server': '127.0.0.1:10011', 'replication_server': '127.0.0.1:10001'};", ) execute_and_fetch_all( - coordinator3_cursor, "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'" + coordinator3_cursor, + "REGISTER INSTANCE instance_2 WITH CONFIG {'bolt_server': '127.0.0.1:7688', 'management_server': '127.0.0.1:10012', 'replication_server': '127.0.0.1:10002'};", + ) + execute_and_fetch_all( + coordinator3_cursor, + "REGISTER INSTANCE instance_3 WITH CONFIG {'bolt_server': '127.0.0.1:7689', 'management_server': '127.0.0.1:10013', 'replication_server': '127.0.0.1:10003'};", ) execute_and_fetch_all(coordinator3_cursor, "SET INSTANCE instance_3 TO MAIN") @@ -228,16 +246,25 @@ def test_coordinators_communication_with_restarts(): coordinator3_cursor = connect(host="localhost", port=7692).cursor() - assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 1 ON '127.0.0.1:10111'") - assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 2 ON '127.0.0.1:10112'") - execute_and_fetch_all( - coordinator3_cursor, "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'" + assert add_coordinator( + coordinator3_cursor, + "ADD COORDINATOR 1 WITH CONFIG {'bolt_server': '127.0.0.1:7690', 'coordinator_server': '127.0.0.1:10111'}", + ) + assert add_coordinator( + coordinator3_cursor, + "ADD COORDINATOR 2 WITH CONFIG {'bolt_server': '127.0.0.1:7691', 'coordinator_server': '127.0.0.1:10112'}", ) execute_and_fetch_all( - coordinator3_cursor, "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'" + coordinator3_cursor, + "REGISTER INSTANCE instance_1 WITH CONFIG {'bolt_server': '127.0.0.1:7687', 'management_server': '127.0.0.1:10011', 'replication_server': '127.0.0.1:10001'};", ) execute_and_fetch_all( - coordinator3_cursor, "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'" + coordinator3_cursor, + "REGISTER INSTANCE instance_2 WITH CONFIG {'bolt_server': '127.0.0.1:7688', 'management_server': '127.0.0.1:10012', 'replication_server': '127.0.0.1:10002'};", + ) + execute_and_fetch_all( + coordinator3_cursor, + "REGISTER INSTANCE instance_3 WITH CONFIG {'bolt_server': '127.0.0.1:7689', 'management_server': '127.0.0.1:10013', 'replication_server': '127.0.0.1:10003'};", ) execute_and_fetch_all(coordinator3_cursor, "SET INSTANCE instance_3 TO MAIN") @@ -295,16 +322,25 @@ def test_unregister_replicas(kill_instance): coordinator2_cursor = connect(host="localhost", port=7691).cursor() coordinator3_cursor = connect(host="localhost", port=7692).cursor() - assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 1 ON '127.0.0.1:10111'") - assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 2 ON '127.0.0.1:10112'") - execute_and_fetch_all( - coordinator3_cursor, "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'" + assert add_coordinator( + coordinator3_cursor, + "ADD COORDINATOR 1 WITH CONFIG {'bolt_server': '127.0.0.1:7690', 'coordinator_server': '127.0.0.1:10111'}", + ) + assert add_coordinator( + coordinator3_cursor, + "ADD COORDINATOR 2 WITH CONFIG {'bolt_server': '127.0.0.1:7691', 'coordinator_server': '127.0.0.1:10112'}", ) execute_and_fetch_all( - coordinator3_cursor, "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'" + coordinator3_cursor, + "REGISTER INSTANCE instance_1 WITH CONFIG {'bolt_server': '127.0.0.1:7687', 'management_server': '127.0.0.1:10011', 'replication_server': '127.0.0.1:10001'};", ) execute_and_fetch_all( - coordinator3_cursor, "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'" + coordinator3_cursor, + "REGISTER INSTANCE instance_2 WITH CONFIG {'bolt_server': '127.0.0.1:7688', 'management_server': '127.0.0.1:10012', 'replication_server': '127.0.0.1:10002'};", + ) + execute_and_fetch_all( + coordinator3_cursor, + "REGISTER INSTANCE instance_3 WITH CONFIG {'bolt_server': '127.0.0.1:7689', 'management_server': '127.0.0.1:10013', 'replication_server': '127.0.0.1:10003'};", ) execute_and_fetch_all(coordinator3_cursor, "SET INSTANCE instance_3 TO MAIN") @@ -429,16 +465,26 @@ def test_unregister_main(): coordinator1_cursor = connect(host="localhost", port=7690).cursor() coordinator2_cursor = connect(host="localhost", port=7691).cursor() coordinator3_cursor = connect(host="localhost", port=7692).cursor() - assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 1 ON '127.0.0.1:10111'") - assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 2 ON '127.0.0.1:10112'") - execute_and_fetch_all( - coordinator3_cursor, "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'" + + assert add_coordinator( + coordinator3_cursor, + "ADD COORDINATOR 1 WITH CONFIG {'bolt_server': '127.0.0.1:7690', 'coordinator_server': '127.0.0.1:10111'}", + ) + assert add_coordinator( + coordinator3_cursor, + "ADD COORDINATOR 2 WITH CONFIG {'bolt_server': '127.0.0.1:7691', 'coordinator_server': '127.0.0.1:10112'}", ) execute_and_fetch_all( - coordinator3_cursor, "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'" + coordinator3_cursor, + "REGISTER INSTANCE instance_1 WITH CONFIG {'bolt_server': '127.0.0.1:7687', 'management_server': '127.0.0.1:10011', 'replication_server': '127.0.0.1:10001'};", ) execute_and_fetch_all( - coordinator3_cursor, "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'" + coordinator3_cursor, + "REGISTER INSTANCE instance_2 WITH CONFIG {'bolt_server': '127.0.0.1:7688', 'management_server': '127.0.0.1:10012', 'replication_server': '127.0.0.1:10002'};", + ) + execute_and_fetch_all( + coordinator3_cursor, + "REGISTER INSTANCE instance_3 WITH CONFIG {'bolt_server': '127.0.0.1:7689', 'management_server': '127.0.0.1:10013', 'replication_server': '127.0.0.1:10003'};", ) execute_and_fetch_all(coordinator3_cursor, "SET INSTANCE instance_3 TO MAIN") diff --git a/tests/e2e/high_availability/coordinator.py b/tests/e2e/high_availability/coordinator.py index ed55dff9e..8a6ae1a0a 100644 --- a/tests/e2e/high_availability/coordinator.py +++ b/tests/e2e/high_availability/coordinator.py @@ -79,7 +79,7 @@ def test_main_and_replicas_cannot_register_coord_server(port): with pytest.raises(Exception) as e: execute_and_fetch_all( cursor, - "REGISTER INSTANCE instance_1 ON '127.0.0.1:10001' WITH '127.0.0.1:10011';", + "REGISTER INSTANCE instance_1 WITH CONFIG {'bolt_server': '127.0.0.1:7690', 'management_server': '127.0.0.1:10011', 'replication_server': '127.0.0.1:10001'};", ) assert str(e.value) == "Only coordinator can register coordinator server!" diff --git a/tests/e2e/high_availability/disable_writing_on_main_after_restart.py b/tests/e2e/high_availability/disable_writing_on_main_after_restart.py index 363ce1c41..517bf346f 100644 --- a/tests/e2e/high_availability/disable_writing_on_main_after_restart.py +++ b/tests/e2e/high_availability/disable_writing_on_main_after_restart.py @@ -133,11 +133,18 @@ def test_writing_disabled_on_main_restart(): coordinator3_cursor = connect(host="localhost", port=7692).cursor() execute_and_fetch_all( - coordinator3_cursor, "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'" + coordinator3_cursor, + "REGISTER INSTANCE instance_3 WITH CONFIG {'bolt_server': '127.0.0.1:7689', 'management_server': '127.0.0.1:10013', 'replication_server': '127.0.0.1:10003'};", ) execute_and_fetch_all(coordinator3_cursor, "SET INSTANCE instance_3 TO MAIN") - assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 1 ON '127.0.0.1:10111'") - assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 2 ON '127.0.0.1:10112'") + assert add_coordinator( + coordinator3_cursor, + "ADD COORDINATOR 1 WITH CONFIG {'bolt_server': '127.0.0.1:7690', 'coordinator_server': '127.0.0.1:10111'}", + ) + assert add_coordinator( + coordinator3_cursor, + "ADD COORDINATOR 2 WITH CONFIG {'bolt_server': '127.0.0.1:7691', 'coordinator_server': '127.0.0.1:10112'}", + ) def check_coordinator3(): return sorted(list(execute_and_fetch_all(coordinator3_cursor, "SHOW INSTANCES"))) diff --git a/tests/e2e/high_availability/distributed_coords.py b/tests/e2e/high_availability/distributed_coords.py index 7dc3ef238..59e083545 100644 --- a/tests/e2e/high_availability/distributed_coords.py +++ b/tests/e2e/high_availability/distributed_coords.py @@ -110,11 +110,11 @@ MEMGRAPH_INSTANCES_DESCRIPTION = { ], "log_file": "coordinator3.log", "setup_queries": [ - "ADD COORDINATOR 1 ON '127.0.0.1:10111'", - "ADD COORDINATOR 2 ON '127.0.0.1:10112'", - "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'", - "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'", - "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'", + "ADD COORDINATOR 1 WITH CONFIG {'bolt_server': '127.0.0.1:7690', 'coordinator_server': '127.0.0.1:10111'}", + "ADD COORDINATOR 2 WITH CONFIG {'bolt_server': '127.0.0.1:7691', 'coordinator_server': '127.0.0.1:10112'}", + "REGISTER INSTANCE instance_1 WITH CONFIG {'bolt_server': '127.0.0.1:7687', 'management_server': '127.0.0.1:10011', 'replication_server': '127.0.0.1:10001'};", + "REGISTER INSTANCE instance_2 WITH CONFIG {'bolt_server': '127.0.0.1:7688', 'management_server': '127.0.0.1:10012', 'replication_server': '127.0.0.1:10002'};", + "REGISTER INSTANCE instance_3 WITH CONFIG {'bolt_server': '127.0.0.1:7689', 'management_server': '127.0.0.1:10013', 'replication_server': '127.0.0.1:10003'};", "SET INSTANCE instance_3 TO MAIN", ], }, @@ -221,11 +221,11 @@ def test_old_main_comes_back_on_new_leader_as_replica(): interactive_mg_runner.start_all(inner_instances_description) setup_queries = [ - "ADD COORDINATOR 1 ON '127.0.0.1:10111'", - "ADD COORDINATOR 2 ON '127.0.0.1:10112'", - "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'", - "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'", - "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'", + "ADD COORDINATOR 1 WITH CONFIG {'bolt_server': '127.0.0.1:7690', 'coordinator_server': '127.0.0.1:10111'}", + "ADD COORDINATOR 2 WITH CONFIG {'bolt_server': '127.0.0.1:7691', 'coordinator_server': '127.0.0.1:10112'}", + "REGISTER INSTANCE instance_1 WITH CONFIG {'bolt_server': '127.0.0.1:7687', 'management_server': '127.0.0.1:10011', 'replication_server': '127.0.0.1:10001'};", + "REGISTER INSTANCE instance_2 WITH CONFIG {'bolt_server': '127.0.0.1:7688', 'management_server': '127.0.0.1:10012', 'replication_server': '127.0.0.1:10002'};", + "REGISTER INSTANCE instance_3 WITH CONFIG {'bolt_server': '127.0.0.1:7689', 'management_server': '127.0.0.1:10013', 'replication_server': '127.0.0.1:10003'};", "SET INSTANCE instance_3 TO MAIN", ] coord_cursor_3 = connect(host="localhost", port=7692).cursor() @@ -416,11 +416,11 @@ def test_distributed_automatic_failover_with_leadership_change(): interactive_mg_runner.start_all(inner_instances_description) setup_queries = [ - "ADD COORDINATOR 1 ON '127.0.0.1:10111'", - "ADD COORDINATOR 2 ON '127.0.0.1:10112'", - "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'", - "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'", - "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'", + "ADD COORDINATOR 1 WITH CONFIG {'bolt_server': '127.0.0.1:7690', 'coordinator_server': '127.0.0.1:10111'}", + "ADD COORDINATOR 2 WITH CONFIG {'bolt_server': '127.0.0.1:7691', 'coordinator_server': '127.0.0.1:10112'}", + "REGISTER INSTANCE instance_1 WITH CONFIG {'bolt_server': '127.0.0.1:7687', 'management_server': '127.0.0.1:10011', 'replication_server': '127.0.0.1:10001'};", + "REGISTER INSTANCE instance_2 WITH CONFIG {'bolt_server': '127.0.0.1:7688', 'management_server': '127.0.0.1:10012', 'replication_server': '127.0.0.1:10002'};", + "REGISTER INSTANCE instance_3 WITH CONFIG {'bolt_server': '127.0.0.1:7689', 'management_server': '127.0.0.1:10013', 'replication_server': '127.0.0.1:10003'};", "SET INSTANCE instance_3 TO MAIN", ] coord_cursor_3 = connect(host="localhost", port=7692).cursor() @@ -522,7 +522,10 @@ def test_no_leader_after_leader_and_follower_die(): coord_cursor_1 = connect(host="localhost", port=7690).cursor() with pytest.raises(Exception) as e: - execute_and_fetch_all(coord_cursor_1, "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.10001'") + execute_and_fetch_all( + coord_cursor_1, + "REGISTER INSTANCE instance_1 WITH CONFIG {'bolt_server': '127.0.0.1:7687', 'management_server': '127.0.0.1:10011', 'replication_server': '127.0.0.1:10001'};", + ) assert str(e) == "Couldn't register replica instance since coordinator is not a leader!" @@ -541,11 +544,11 @@ def test_old_main_comes_back_on_new_leader_as_main(): coord_cursor_3 = connect(host="localhost", port=7692).cursor() setup_queries = [ - "ADD COORDINATOR 1 ON '127.0.0.1:10111'", - "ADD COORDINATOR 2 ON '127.0.0.1:10112'", - "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'", - "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'", - "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'", + "ADD COORDINATOR 1 WITH CONFIG {'bolt_server': '127.0.0.1:7690', 'coordinator_server': '127.0.0.1:10111'}", + "ADD COORDINATOR 2 WITH CONFIG {'bolt_server': '127.0.0.1:7691', 'coordinator_server': '127.0.0.1:10112'}", + "REGISTER INSTANCE instance_1 WITH CONFIG {'bolt_server': '127.0.0.1:7687', 'management_server': '127.0.0.1:10011', 'replication_server': '127.0.0.1:10001'};", + "REGISTER INSTANCE instance_2 WITH CONFIG {'bolt_server': '127.0.0.1:7688', 'management_server': '127.0.0.1:10012', 'replication_server': '127.0.0.1:10002'};", + "REGISTER INSTANCE instance_3 WITH CONFIG {'bolt_server': '127.0.0.1:7689', 'management_server': '127.0.0.1:10013', 'replication_server': '127.0.0.1:10003'};", "SET INSTANCE instance_3 TO MAIN", ] @@ -719,12 +722,12 @@ def test_registering_4_coords(): ], "log_file": "coordinator4.log", "setup_queries": [ - "ADD COORDINATOR 1 ON '127.0.0.1:10111';", - "ADD COORDINATOR 2 ON '127.0.0.1:10112';", - "ADD COORDINATOR 3 ON '127.0.0.1:10113';", - "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'", - "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'", - "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'", + "ADD COORDINATOR 1 WITH CONFIG {'bolt_server': '127.0.0.1:7690', 'coordinator_server': '127.0.0.1:10111'}", + "ADD COORDINATOR 2 WITH CONFIG {'bolt_server': '127.0.0.1:7691', 'coordinator_server': '127.0.0.1:10112'}", + "ADD COORDINATOR 3 WITH CONFIG {'bolt_server': '127.0.0.1:7692', 'coordinator_server': '127.0.0.1:10113'}", + "REGISTER INSTANCE instance_1 WITH CONFIG {'bolt_server': '127.0.0.1:7687', 'management_server': '127.0.0.1:10011', 'replication_server': '127.0.0.1:10001'};", + "REGISTER INSTANCE instance_2 WITH CONFIG {'bolt_server': '127.0.0.1:7688', 'management_server': '127.0.0.1:10012', 'replication_server': '127.0.0.1:10002'};", + "REGISTER INSTANCE instance_3 WITH CONFIG {'bolt_server': '127.0.0.1:7689', 'management_server': '127.0.0.1:10013', 'replication_server': '127.0.0.1:10003'};", "SET INSTANCE instance_3 TO MAIN", ], }, @@ -854,12 +857,12 @@ def test_registering_coord_log_store(): ], "log_file": "coordinator4.log", "setup_queries": [ - "ADD COORDINATOR 1 ON '127.0.0.1:10111';", - "ADD COORDINATOR 2 ON '127.0.0.1:10112';", - "ADD COORDINATOR 3 ON '127.0.0.1:10113';", - "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'", - "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'", - "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'", + "ADD COORDINATOR 1 WITH CONFIG {'bolt_server': '127.0.0.1:7690', 'coordinator_server': '127.0.0.1:10111'}", + "ADD COORDINATOR 2 WITH CONFIG {'bolt_server': '127.0.0.1:7691', 'coordinator_server': '127.0.0.1:10112'}", + "ADD COORDINATOR 3 WITH CONFIG {'bolt_server': '127.0.0.1:7692', 'coordinator_server': '127.0.0.1:10113'}", + "REGISTER INSTANCE instance_1 WITH CONFIG {'bolt_server': '127.0.0.1:7687', 'management_server': '127.0.0.1:10011', 'replication_server': '127.0.0.1:10001'};", + "REGISTER INSTANCE instance_2 WITH CONFIG {'bolt_server': '127.0.0.1:7688', 'management_server': '127.0.0.1:10012', 'replication_server': '127.0.0.1:10002'};", + "REGISTER INSTANCE instance_3 WITH CONFIG {'bolt_server': '127.0.0.1:7689', 'management_server': '127.0.0.1:10013', 'replication_server': '127.0.0.1:10003'};", ], }, } @@ -896,7 +899,7 @@ def test_registering_coord_log_store(): # 3 instances_ports_added = [10011, 10012, 10013] bolt_port_id = 7700 - coord_port_id = 10014 + manag_port_id = 10014 additional_instances = [] for i in range(4, 7): @@ -908,10 +911,10 @@ def test_registering_coord_log_store(): bolt_port = f"--bolt-port={bolt_port_id}" - coord_server_port = f"--coordinator-server-port={coord_port_id}" + manag_server_port = f"--coordinator-server-port={manag_port_id}" args_desc.append(bolt_port) - args_desc.append(coord_server_port) + args_desc.append(manag_server_port) instance_description = { "args": args_desc, @@ -922,17 +925,23 @@ def test_registering_coord_log_store(): full_instance_desc = {instance_name: instance_description} interactive_mg_runner.start(full_instance_desc, instance_name) - repl_port_id = coord_port_id - 10 + repl_port_id = manag_port_id - 10 assert repl_port_id < 10011, "Wrong test setup, repl port must be smaller than smallest coord port id" + bolt_server = f"127.0.0.1:{bolt_port_id}" + management_server = f"127.0.0.1:{manag_port_id}" + repl_server = f"127.0.0.1:{repl_port_id}" + + config_str = f"{{'bolt_server': '{bolt_server}', 'management_server': '{management_server}', 'replication_server': '{repl_server}'}}" + execute_and_fetch_all( coord_cursor, - f"REGISTER INSTANCE {instance_name} ON '127.0.0.1:{coord_port_id}' WITH '127.0.0.1:{repl_port_id}'", + f"REGISTER INSTANCE {instance_name} WITH CONFIG {config_str}", ) - additional_instances.append((f"{instance_name}", "", f"127.0.0.1:{coord_port_id}", "up", "replica")) - instances_ports_added.append(coord_port_id) - coord_port_id += 1 + additional_instances.append((f"{instance_name}", "", management_server, "up", "replica")) + instances_ports_added.append(manag_port_id) + manag_port_id += 1 bolt_port_id += 1 # 4 @@ -1004,11 +1013,11 @@ def test_multiple_failovers_in_row_no_leadership_change(): coord_cursor_3 = connect(host="localhost", port=7692).cursor() setup_queries = [ - "ADD COORDINATOR 1 ON '127.0.0.1:10111'", - "ADD COORDINATOR 2 ON '127.0.0.1:10112'", - "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'", - "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'", - "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'", + "ADD COORDINATOR 1 WITH CONFIG {'bolt_server': '127.0.0.1:7690', 'coordinator_server': '127.0.0.1:10111'}", + "ADD COORDINATOR 2 WITH CONFIG {'bolt_server': '127.0.0.1:7691', 'coordinator_server': '127.0.0.1:10112'}", + "REGISTER INSTANCE instance_1 WITH CONFIG {'bolt_server': '127.0.0.1:7687', 'management_server': '127.0.0.1:10011', 'replication_server': '127.0.0.1:10001'};", + "REGISTER INSTANCE instance_2 WITH CONFIG {'bolt_server': '127.0.0.1:7688', 'management_server': '127.0.0.1:10012', 'replication_server': '127.0.0.1:10002'};", + "REGISTER INSTANCE instance_3 WITH CONFIG {'bolt_server': '127.0.0.1:7689', 'management_server': '127.0.0.1:10013', 'replication_server': '127.0.0.1:10003'};", "SET INSTANCE instance_3 TO MAIN", ] diff --git a/tests/e2e/high_availability/not_replicate_from_old_main.py b/tests/e2e/high_availability/not_replicate_from_old_main.py index 7ffffc04a..d9729f650 100644 --- a/tests/e2e/high_availability/not_replicate_from_old_main.py +++ b/tests/e2e/high_availability/not_replicate_from_old_main.py @@ -185,8 +185,8 @@ def test_not_replicate_old_main_register_new_cluster(): ], "log_file": "coordinator.log", "setup_queries": [ - "REGISTER INSTANCE shared_instance ON '127.0.0.1:10011' WITH '127.0.0.1:10001';", - "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002';", + "REGISTER INSTANCE shared_instance WITH CONFIG {'bolt_server': '127.0.0.1:7688', 'management_server': '127.0.0.1:10011', 'replication_server': '127.0.0.1:10001'};", + "REGISTER INSTANCE instance_2 WITH CONFIG {'bolt_server': '127.0.0.1:7689', 'management_server': '127.0.0.1:10012', 'replication_server': '127.0.0.1:10002'};", "SET INSTANCE instance_2 TO MAIN", ], }, @@ -244,10 +244,12 @@ def test_not_replicate_old_main_register_new_cluster(): interactive_mg_runner.start_all_keep_others(MEMGRAPH_SECOND_COORD_CLUSTER_DESCRIPTION) second_cluster_coord_cursor = connect(host="localhost", port=7691).cursor() execute_and_fetch_all( - second_cluster_coord_cursor, "REGISTER INSTANCE shared_instance ON '127.0.0.1:10011' WITH '127.0.0.1:10001';" + second_cluster_coord_cursor, + "REGISTER INSTANCE shared_instance WITH CONFIG {'bolt_server': '127.0.0.1:7688', 'management_server': '127.0.0.1:10011', 'replication_server': '127.0.0.1:10001'};", ) execute_and_fetch_all( - second_cluster_coord_cursor, "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003';" + second_cluster_coord_cursor, + "REGISTER INSTANCE instance_3 WITH CONFIG {'bolt_server': '127.0.0.1:7687', 'management_server': '127.0.0.1:10013', 'replication_server': '127.0.0.1:10003'};", ) execute_and_fetch_all(second_cluster_coord_cursor, "SET INSTANCE instance_3 TO MAIN") diff --git a/tests/e2e/high_availability/single_coordinator.py b/tests/e2e/high_availability/single_coordinator.py index 7335d2847..1d839b4fc 100644 --- a/tests/e2e/high_availability/single_coordinator.py +++ b/tests/e2e/high_availability/single_coordinator.py @@ -90,9 +90,9 @@ MEMGRAPH_INSTANCES_DESCRIPTION = { ], "log_file": "coordinator.log", "setup_queries": [ - "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001';", - "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002';", - "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003';", + "REGISTER INSTANCE instance_1 WITH CONFIG {'bolt_server': '127.0.0.1:7688', 'management_server': '127.0.0.1:10011', 'replication_server': '127.0.0.1:10001'};", + "REGISTER INSTANCE instance_2 WITH CONFIG {'bolt_server': '127.0.0.1:7689', 'management_server': '127.0.0.1:10012', 'replication_server': '127.0.0.1:10002'};", + "REGISTER INSTANCE instance_3 WITH CONFIG {'bolt_server': '127.0.0.1:7687', 'management_server': '127.0.0.1:10013', 'replication_server': '127.0.0.1:10003'};", "SET INSTANCE instance_3 TO MAIN", ], }, @@ -185,9 +185,9 @@ def test_replication_works_on_failover_replica_1_epoch_2_commits_away(data_recov ], "log_file": "coordinator.log", "setup_queries": [ - "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001';", - "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002';", - "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003';", + "REGISTER INSTANCE instance_1 WITH CONFIG {'bolt_server': '127.0.0.1:7688', 'management_server': '127.0.0.1:10011', 'replication_server': '127.0.0.1:10001'};", + "REGISTER INSTANCE instance_2 WITH CONFIG {'bolt_server': '127.0.0.1:7689', 'management_server': '127.0.0.1:10012', 'replication_server': '127.0.0.1:10002'};", + "REGISTER INSTANCE instance_3 WITH CONFIG {'bolt_server': '127.0.0.1:7687', 'management_server': '127.0.0.1:10013', 'replication_server': '127.0.0.1:10003'};", "SET INSTANCE instance_3 TO MAIN", ], }, @@ -415,10 +415,10 @@ def test_replication_works_on_failover_replica_2_epochs_more_commits_away(data_r ], "log_file": "coordinator.log", "setup_queries": [ - "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001';", - "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002';", - "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003';", - "REGISTER INSTANCE instance_4 ON '127.0.0.1:10014' WITH '127.0.0.1:10004';", + "REGISTER INSTANCE instance_1 WITH CONFIG {'bolt_server': '127.0.0.1:7688', 'management_server': '127.0.0.1:10011', 'replication_server': '127.0.0.1:10001'};", + "REGISTER INSTANCE instance_2 WITH CONFIG {'bolt_server': '127.0.0.1:7689', 'management_server': '127.0.0.1:10012', 'replication_server': '127.0.0.1:10002'};", + "REGISTER INSTANCE instance_3 WITH CONFIG {'bolt_server': '127.0.0.1:7687', 'management_server': '127.0.0.1:10013', 'replication_server': '127.0.0.1:10003'};", + "REGISTER INSTANCE instance_4 WITH CONFIG {'bolt_server': '127.0.0.1:7691', 'management_server': '127.0.0.1:10014', 'replication_server': '127.0.0.1:10004'};", "SET INSTANCE instance_3 TO MAIN", ], }, @@ -702,10 +702,10 @@ def test_replication_forcefully_works_on_failover_replica_misses_epoch(data_reco ], "log_file": "coordinator.log", "setup_queries": [ - "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001';", - "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002';", - "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003';", - "REGISTER INSTANCE instance_4 ON '127.0.0.1:10014' WITH '127.0.0.1:10004';", + "REGISTER INSTANCE instance_1 WITH CONFIG {'bolt_server': '127.0.0.1:7688', 'management_server': '127.0.0.1:10011', 'replication_server': '127.0.0.1:10001'};", + "REGISTER INSTANCE instance_2 WITH CONFIG {'bolt_server': '127.0.0.1:7689', 'management_server': '127.0.0.1:10012', 'replication_server': '127.0.0.1:10002'};", + "REGISTER INSTANCE instance_3 WITH CONFIG {'bolt_server': '127.0.0.1:7687', 'management_server': '127.0.0.1:10013', 'replication_server': '127.0.0.1:10003'};", + "REGISTER INSTANCE instance_4 WITH CONFIG {'bolt_server': '127.0.0.1:7691', 'management_server': '127.0.0.1:10014', 'replication_server': '127.0.0.1:10004'};", "SET INSTANCE instance_3 TO MAIN", ], }, @@ -989,10 +989,10 @@ def test_replication_correct_replica_chosen_up_to_date_data(data_recovery): ], "log_file": "coordinator.log", "setup_queries": [ - "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001';", - "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002';", - "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003';", - "REGISTER INSTANCE instance_4 ON '127.0.0.1:10014' WITH '127.0.0.1:10004';", + "REGISTER INSTANCE instance_1 WITH CONFIG {'bolt_server': '127.0.0.1:7688', 'management_server': '127.0.0.1:10011', 'replication_server': '127.0.0.1:10001'};", + "REGISTER INSTANCE instance_2 WITH CONFIG {'bolt_server': '127.0.0.1:7689', 'management_server': '127.0.0.1:10012', 'replication_server': '127.0.0.1:10002'};", + "REGISTER INSTANCE instance_3 WITH CONFIG {'bolt_server': '127.0.0.1:7687', 'management_server': '127.0.0.1:10013', 'replication_server': '127.0.0.1:10003'};", + "REGISTER INSTANCE instance_4 WITH CONFIG {'bolt_server': '127.0.0.1:7691', 'management_server': '127.0.0.1:10014', 'replication_server': '127.0.0.1:10004'};", "SET INSTANCE instance_3 TO MAIN", ], }, @@ -1559,7 +1559,7 @@ def test_registering_replica_fails_name_exists(): with pytest.raises(Exception) as e: execute_and_fetch_all( coord_cursor, - "REGISTER INSTANCE instance_1 ON '127.0.0.1:10051' WITH '127.0.0.1:10111';", + "REGISTER INSTANCE instance_1 WITH CONFIG {'bolt_server': '127.0.0.1:7693', 'management_server': '127.0.0.1:10051', 'replication_server': '127.0.0.1:10111'};", ) assert str(e.value) == "Couldn't register replica instance since instance with such name already exists!" shutil.rmtree(TEMP_DIR) @@ -1573,7 +1573,7 @@ def test_registering_replica_fails_endpoint_exists(): with pytest.raises(Exception) as e: execute_and_fetch_all( coord_cursor, - "REGISTER INSTANCE instance_5 ON '127.0.0.1:10011' WITH '127.0.0.1:10005';", + "REGISTER INSTANCE instance_5 WITH CONFIG {'bolt_server': '127.0.0.1:7693', 'management_server': '127.0.0.1:10011', 'replication_server': '127.0.0.1:10005'};", ) assert ( str(e.value) diff --git a/tests/e2e/high_availability/workloads.yaml b/tests/e2e/high_availability/workloads.yaml index 75f17b2f7..aaf76fc6b 100644 --- a/tests/e2e/high_availability/workloads.yaml +++ b/tests/e2e/high_availability/workloads.yaml @@ -16,9 +16,9 @@ ha_cluster: &ha_cluster args: ["--experimental-enabled=high-availability", "--bolt-port", "7690", "--log-level=TRACE", "--raft-server-id=1", "--raft-server-port=10111"] log_file: "replication-e2e-coordinator.log" setup_queries: [ - "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001';", - "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002';", - "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003';", + "REGISTER INSTANCE instance_1 WITH CONFIG {'bolt_server': '127.0.0.1:7688', 'management_server': '127.0.0.1:10011', 'replication_server': '127.0.0.1:10001'};", + "REGISTER INSTANCE instance_2 WITH CONFIG {'bolt_server': '127.0.0.1:7689', 'management_server': '127.0.0.1:10012', 'replication_server': '127.0.0.1:10002'};", + "REGISTER INSTANCE instance_3 WITH CONFIG {'bolt_server': '127.0.0.1:7687', 'management_server': '127.0.0.1:10013', 'replication_server': '127.0.0.1:10003'};", "SET INSTANCE instance_3 TO MAIN;" ] diff --git a/tests/unit/cypher_main_visitor.cpp b/tests/unit/cypher_main_visitor.cpp index bcc6767f4..33e3af2e3 100644 --- a/tests/unit/cypher_main_visitor.cpp +++ b/tests/unit/cypher_main_visitor.cpp @@ -2633,15 +2633,99 @@ TEST_P(CypherMainVisitorTest, TestRegisterReplicationQuery) { } #ifdef MG_ENTERPRISE + +TEST_P(CypherMainVisitorTest, TestRegisterSyncInstance) { + auto &ast_generator = *GetParam(); + + std::string const sync_instance = R"(REGISTER INSTANCE instance_1 WITH CONFIG {"bolt_server": "127.0.0.1:7688", + "replication_server": "127.0.0.1:10001", "management_server": "127.0.0.1:10011" + })"; + + auto *parsed_query = dynamic_cast(ast_generator.ParseQuery(sync_instance)); + + EXPECT_EQ(parsed_query->action_, CoordinatorQuery::Action::REGISTER_INSTANCE); + EXPECT_EQ(parsed_query->sync_mode_, CoordinatorQuery::SyncMode::SYNC); + + auto const evaluate_config_map = [&ast_generator](std::unordered_map const &config_map) + -> std::unordered_map { + auto const expr_to_str = [&ast_generator](Expression *expression) { + return std::string{ast_generator.GetLiteral(expression, ast_generator.context_.is_query_cached).ValueString()}; + }; + + return ranges::views::transform(config_map, + [&expr_to_str](auto const &expr_pair) { + return std::pair{expr_to_str(expr_pair.first), expr_to_str(expr_pair.second)}; + }) | + ranges::to>; + }; + + auto const config_map = evaluate_config_map(parsed_query->configs_); + ASSERT_EQ(config_map.size(), 3); + EXPECT_EQ(config_map.at("bolt_server"), "127.0.0.1:7688"); + EXPECT_EQ(config_map.at("management_server"), "127.0.0.1:10011"); + EXPECT_EQ(config_map.at("replication_server"), "127.0.0.1:10001"); +} + +TEST_P(CypherMainVisitorTest, TestRegisterAsyncInstance) { + auto &ast_generator = *GetParam(); + + std::string const async_instance = + R"(REGISTER INSTANCE instance_1 AS ASYNC WITH CONFIG {"bolt_server": "127.0.0.1:7688", + "replication_server": "127.0.0.1:10001", + "management_server": "127.0.0.1:10011"})"; + + auto *parsed_query = dynamic_cast(ast_generator.ParseQuery(async_instance)); + + EXPECT_EQ(parsed_query->action_, CoordinatorQuery::Action::REGISTER_INSTANCE); + EXPECT_EQ(parsed_query->sync_mode_, CoordinatorQuery::SyncMode::ASYNC); + + auto const evaluate_config_map = [&ast_generator](std::unordered_map const &config_map) + -> std::map> { + auto const expr_to_str = [&ast_generator](Expression *expression) { + return std::string{ast_generator.GetLiteral(expression, ast_generator.context_.is_query_cached).ValueString()}; + }; + + return ranges::views::transform(config_map, + [&expr_to_str](auto const &expr_pair) { + return std::pair{expr_to_str(expr_pair.first), expr_to_str(expr_pair.second)}; + }) | + ranges::to>>; + }; + + auto const config_map = evaluate_config_map(parsed_query->configs_); + ASSERT_EQ(config_map.size(), 3); + EXPECT_EQ(config_map.find(memgraph::query::kBoltServer)->second, "127.0.0.1:7688"); + EXPECT_EQ(config_map.find(memgraph::query::kManagementServer)->second, "127.0.0.1:10011"); + EXPECT_EQ(config_map.find(memgraph::query::kReplicationServer)->second, "127.0.0.1:10001"); +} + TEST_P(CypherMainVisitorTest, TestAddCoordinatorInstance) { auto &ast_generator = *GetParam(); - std::string const correct_query = R"(ADD COORDINATOR 1 ON "127.0.0.1:10111")"; + std::string const correct_query = + R"(ADD COORDINATOR 1 WITH CONFIG {"bolt_server": "127.0.0.1:7688", "coordinator_server": "127.0.0.1:10111"})"; auto *parsed_query = dynamic_cast(ast_generator.ParseQuery(correct_query)); EXPECT_EQ(parsed_query->action_, CoordinatorQuery::Action::ADD_COORDINATOR_INSTANCE); - ast_generator.CheckLiteral(parsed_query->raft_socket_address_, TypedValue("127.0.0.1:10111")); - ast_generator.CheckLiteral(parsed_query->raft_server_id_, TypedValue(1)); + ast_generator.CheckLiteral(parsed_query->coordinator_server_id_, TypedValue(1)); + + auto const evaluate_config_map = [&ast_generator](std::unordered_map const &config_map) + -> std::map> { + auto const expr_to_str = [&ast_generator](Expression *expression) { + return std::string{ast_generator.GetLiteral(expression, ast_generator.context_.is_query_cached).ValueString()}; + }; + + return ranges::views::transform(config_map, + [&expr_to_str](auto const &expr_pair) { + return std::pair{expr_to_str(expr_pair.first), expr_to_str(expr_pair.second)}; + }) | + ranges::to>>; + }; + + auto const config_map = evaluate_config_map(parsed_query->configs_); + ASSERT_EQ(config_map.size(), 2); + EXPECT_EQ(config_map.find(kBoltServer)->second, "127.0.0.1:7688"); + EXPECT_EQ(config_map.find(kCoordinatorServer)->second, "127.0.0.1:10111"); } #endif