diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index a8bb42dc9..c1d206a72 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -495,6 +495,10 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters & } else if (timeout.IsInt()) { maybe_timeout = static_cast(timeout.ValueInt()); } + if (maybe_timeout && *maybe_timeout <= 0.0) { + throw utils::BasicException("Parameter TIMEOUT must be strictly greater than 0."); + } + callback.fn = [handler = ReplQueryHandler{interpreter_context->db}, name, socket_address, sync_mode, maybe_timeout, replica_check_frequency]() mutable { handler.RegisterReplica(name, std::string(socket_address.ValueString()), sync_mode, maybe_timeout, diff --git a/src/storage/v2/replication/replication_client.cpp b/src/storage/v2/replication/replication_client.cpp index 9aec05a50..19fdf4d9b 100644 --- a/src/storage/v2/replication/replication_client.cpp +++ b/src/storage/v2/replication/replication_client.cpp @@ -44,6 +44,7 @@ Storage::ReplicationClient::ReplicationClient(std::string name, Storage *storage TryInitializeClientSync(); if (config.timeout && replica_state_ != replication::ReplicaState::INVALID) { + MG_ASSERT(*config.timeout > 0); timeout_.emplace(*config.timeout); timeout_dispatcher_.emplace(); } @@ -558,7 +559,8 @@ Storage::TimestampInfo Storage::ReplicationClient::GetTimestampInfo() { std::unique_lock client_guard(client_lock_); replica_state_.store(replication::ReplicaState::INVALID); } - HandleRpcFailure(); // mutex already unlocked, if the new enqueued task dispatches immediately it probably won't block + HandleRpcFailure(); // mutex already unlocked, if the new enqueued task dispatches immediately it probably won't + // block } return info; diff --git a/tests/e2e/interactive_mg_runner.py b/tests/e2e/interactive_mg_runner.py index 29f496713..0c0c23644 100644 --- a/tests/e2e/interactive_mg_runner.py +++ b/tests/e2e/interactive_mg_runner.py @@ -96,6 +96,8 @@ def load_args(): def _start_instance(name, args, log_file, queries, use_ssl, procdir): + assert name not in MEMGRAPH_INSTANCES.keys() + # If this raises, you are trying to start an instance with the same name than one already running. mg_instance = MemgraphInstanceRunner(MEMGRAPH_BINARY, use_ssl) MEMGRAPH_INSTANCES[name] = mg_instance log_file_path = os.path.join(BUILD_DIR, "logs", log_file) @@ -108,12 +110,11 @@ def _start_instance(name, args, log_file, queries, use_ssl, procdir): for query in queries: mg_instance.query(query) - return mg_instance - def stop_all(): for mg_instance in MEMGRAPH_INSTANCES.values(): mg_instance.stop() + MEMGRAPH_INSTANCES.clear() def stop_instance(context, name): @@ -121,6 +122,7 @@ def stop_instance(context, name): if key != name: continue MEMGRAPH_INSTANCES[name].stop() + MEMGRAPH_INSTANCES.pop(name) def stop(context, name): @@ -131,6 +133,14 @@ def stop(context, name): stop_all() +def kill(context, name): + for key in context.keys(): + if key != name: + continue + MEMGRAPH_INSTANCES[name].kill() + MEMGRAPH_INSTANCES.pop(name) + + @atexit.register def cleanup(): stop_all() @@ -157,22 +167,19 @@ def start_instance(context, name, procdir): assert len(mg_instances) == 1 - return mg_instances - def start_all(context, procdir=""): - mg_instances = {} + stop_all() for key, _ in context.items(): - mg_instances.update(start_instance(context, key, procdir)) - - return mg_instances + start_instance(context, key, procdir) def start(context, name, procdir=""): if name != "all": - return start_instance(context, name, procdir) + start_instance(context, name, procdir) + return - return start_all(context) + start_all(context) def info(context): diff --git a/tests/e2e/replication/show_while_creating_invalid_state.py b/tests/e2e/replication/show_while_creating_invalid_state.py index d302a8956..151119be8 100644 --- a/tests/e2e/replication/show_while_creating_invalid_state.py +++ b/tests/e2e/replication/show_while_creating_invalid_state.py @@ -11,13 +11,13 @@ import sys -import atexit import os import pytest import time from common import execute_and_fetch_all import interactive_mg_runner +import mgclient interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) interactive_mg_runner.PROJECT_DIR = os.path.normpath( @@ -51,7 +51,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = { "args": ["--bolt-port", "7687", "--log-level=TRACE"], "log_file": "main.log", "setup_queries": [ - "REGISTER REPLICA replica_1 SYNC WITH TIMEOUT 0 TO '127.0.0.1:10001';", + "REGISTER REPLICA replica_1 SYNC WITH TIMEOUT 2 TO '127.0.0.1:10001';", "REGISTER REPLICA replica_2 SYNC WITH TIMEOUT 1 TO '127.0.0.1:10002';", "REGISTER REPLICA replica_3 ASYNC TO '127.0.0.1:10003';", "REGISTER REPLICA replica_4 ASYNC TO '127.0.0.1:10004';", @@ -68,11 +68,7 @@ def test_show_replicas(connection): # 3/ We kill another replica. It should become invalid in the SHOW REPLICAS command. # 0/ - - atexit.register( - interactive_mg_runner.stop_all - ) # Needed in case the test fails due to an assert. One still want the instances to be stoped. - mg_instances = interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) + interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) cursor = connection(7687, "main").cursor() @@ -92,7 +88,7 @@ def test_show_replicas(connection): assert EXPECTED_COLUMN_NAMES == actual_column_names expected_data = { - ("replica_1", "127.0.0.1:10001", "sync", 0, 0, 0, "ready"), + ("replica_1", "127.0.0.1:10001", "sync", 2, 0, 0, "ready"), ("replica_2", "127.0.0.1:10002", "sync", 1.0, 0, 0, "ready"), ("replica_3", "127.0.0.1:10003", "async", None, 0, 0, "ready"), ("replica_4", "127.0.0.1:10004", "async", None, 0, 0, "ready"), @@ -103,27 +99,60 @@ def test_show_replicas(connection): execute_and_fetch_all(cursor, "DROP REPLICA replica_2") actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;")) expected_data = { - ("replica_1", "127.0.0.1:10001", "sync", 0, 0, 0, "ready"), + ("replica_1", "127.0.0.1:10001", "sync", 2.0, 0, 0, "ready"), ("replica_3", "127.0.0.1:10003", "async", None, 0, 0, "ready"), ("replica_4", "127.0.0.1:10004", "async", None, 0, 0, "ready"), } assert expected_data == actual_data # 3/ - mg_instances["replica_1"].kill() - mg_instances["replica_3"].kill() - mg_instances["replica_4"].stop() + interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "replica_1") + interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "replica_3") + interactive_mg_runner.stop(MEMGRAPH_INSTANCES_DESCRIPTION, "replica_4") # We leave some time for the main to realise the replicas are down. time.sleep(2) actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;")) expected_data = { - ("replica_1", "127.0.0.1:10001", "sync", 0, 0, 0, "invalid"), + ("replica_1", "127.0.0.1:10001", "sync", 2.0, 0, 0, "invalid"), ("replica_3", "127.0.0.1:10003", "async", None, 0, 0, "invalid"), ("replica_4", "127.0.0.1:10004", "async", None, 0, 0, "invalid"), } assert expected_data == actual_data +def test_add_replica_invalid_timeout(connection): + # Goal of this test is to check the registration of replica with invalid timeout raises an exception + CONFIGURATION = { + "replica_1": { + "args": ["--bolt-port", "7688", "--log-level=TRACE"], + "log_file": "replica1.log", + "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"], + }, + "main": { + "args": ["--bolt-port", "7687", "--log-level=TRACE"], + "log_file": "main.log", + "setup_queries": [], + }, + } + + interactive_mg_runner.start_all(CONFIGURATION) + + cursor = connection(7687, "main").cursor() + + with pytest.raises(mgclient.DatabaseError): + execute_and_fetch_all(cursor, "REGISTER REPLICA replica_1 SYNC WITH TIMEOUT 0 TO '127.0.0.1:10001';") + + with pytest.raises(mgclient.DatabaseError): + execute_and_fetch_all(cursor, "REGISTER REPLICA replica_1 SYNC WITH TIMEOUT -5 TO '127.0.0.1:10001';") + + actual_data = execute_and_fetch_all(cursor, "SHOW REPLICAS;") + assert 0 == len(actual_data) + + execute_and_fetch_all(cursor, "REGISTER REPLICA replica_1 SYNC WITH TIMEOUT 1 TO '127.0.0.1:10001';") + actual_data = execute_and_fetch_all(cursor, "SHOW REPLICAS;") + assert 1 == len(actual_data) + + if __name__ == "__main__": sys.exit(pytest.main([__file__, "-rA"])) diff --git a/tests/e2e/replication/workloads.yaml b/tests/e2e/replication/workloads.yaml index a05fc6aca..c7e11d12e 100644 --- a/tests/e2e/replication/workloads.yaml +++ b/tests/e2e/replication/workloads.yaml @@ -29,7 +29,7 @@ template_cluster: &template_cluster args: ["--bolt-port", "7687", "--log-level=TRACE"] log_file: "replication-e2e-main.log" setup_queries: [ - "REGISTER REPLICA replica_1 SYNC WITH TIMEOUT 0 TO '127.0.0.1:10001'", + "REGISTER REPLICA replica_1 SYNC WITH TIMEOUT 2 TO '127.0.0.1:10001'", "REGISTER REPLICA replica_2 SYNC WITH TIMEOUT 1 TO '127.0.0.1:10002'", "REGISTER REPLICA replica_3 ASYNC TO '127.0.0.1:10003'" ] diff --git a/tests/e2e/runner.py b/tests/e2e/runner.py index 3c57f918e..356fb91a7 100755 --- a/tests/e2e/runner.py +++ b/tests/e2e/runner.py @@ -50,18 +50,15 @@ def run(args): continue log.info("%s STARTED.", workload_name) # Setup. - mg_instances = {} - @atexit.register def cleanup(): - for mg_instance in mg_instances.values(): - mg_instance.stop() + interactive_mg_runner.stop_all() if "cluster" in workload: procdir = "" if "proc" in workload: procdir = os.path.join(BUILD_DIR, workload["proc"]) - mg_instances = interactive_mg_runner.start_all(workload["cluster"], procdir) + interactive_mg_runner.start_all(workload["cluster"], procdir) # Test. mg_test_binary = os.path.join(BUILD_DIR, workload["binary"]) @@ -70,7 +67,7 @@ def run(args): if "cluster" in workload: for name, config in workload["cluster"].items(): for validation in config.get("validation_queries", []): - mg_instance = mg_instances[name] + mg_instance = interactive_mg_runner.MEMGRAPH_INSTANCES[name] data = mg_instance.query(validation["query"])[0][0] assert data == validation["expected"] cleanup()