Registering a replica with timeout 0 should not be allowed (#414)

This commit is contained in:
Jeremy B 2022-06-29 10:14:23 +02:00 committed by GitHub
parent cbe15e7f44
commit 1ae6b71c5f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 70 additions and 31 deletions

View File

@ -495,6 +495,10 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
} else if (timeout.IsInt()) {
maybe_timeout = static_cast<double>(timeout.ValueInt());
}
if (maybe_timeout && *maybe_timeout <= 0.0) {
throw utils::BasicException("Parameter TIMEOUT must be strictly greater than 0.");
}
callback.fn = [handler = ReplQueryHandler{interpreter_context->db}, name, socket_address, sync_mode,
maybe_timeout, replica_check_frequency]() mutable {
handler.RegisterReplica(name, std::string(socket_address.ValueString()), sync_mode, maybe_timeout,

View File

@ -44,6 +44,7 @@ Storage::ReplicationClient::ReplicationClient(std::string name, Storage *storage
TryInitializeClientSync();
if (config.timeout && replica_state_ != replication::ReplicaState::INVALID) {
MG_ASSERT(*config.timeout > 0);
timeout_.emplace(*config.timeout);
timeout_dispatcher_.emplace();
}
@ -558,7 +559,8 @@ Storage::TimestampInfo Storage::ReplicationClient::GetTimestampInfo() {
std::unique_lock client_guard(client_lock_);
replica_state_.store(replication::ReplicaState::INVALID);
}
HandleRpcFailure(); // mutex already unlocked, if the new enqueued task dispatches immediately it probably won't block
HandleRpcFailure(); // mutex already unlocked, if the new enqueued task dispatches immediately it probably won't
// block
}
return info;

View File

@ -96,6 +96,8 @@ def load_args():
def _start_instance(name, args, log_file, queries, use_ssl, procdir):
assert name not in MEMGRAPH_INSTANCES.keys()
# If this raises, you are trying to start an instance with the same name than one already running.
mg_instance = MemgraphInstanceRunner(MEMGRAPH_BINARY, use_ssl)
MEMGRAPH_INSTANCES[name] = mg_instance
log_file_path = os.path.join(BUILD_DIR, "logs", log_file)
@ -108,12 +110,11 @@ def _start_instance(name, args, log_file, queries, use_ssl, procdir):
for query in queries:
mg_instance.query(query)
return mg_instance
def stop_all():
for mg_instance in MEMGRAPH_INSTANCES.values():
mg_instance.stop()
MEMGRAPH_INSTANCES.clear()
def stop_instance(context, name):
@ -121,6 +122,7 @@ def stop_instance(context, name):
if key != name:
continue
MEMGRAPH_INSTANCES[name].stop()
MEMGRAPH_INSTANCES.pop(name)
def stop(context, name):
@ -131,6 +133,14 @@ def stop(context, name):
stop_all()
def kill(context, name):
for key in context.keys():
if key != name:
continue
MEMGRAPH_INSTANCES[name].kill()
MEMGRAPH_INSTANCES.pop(name)
@atexit.register
def cleanup():
stop_all()
@ -157,22 +167,19 @@ def start_instance(context, name, procdir):
assert len(mg_instances) == 1
return mg_instances
def start_all(context, procdir=""):
mg_instances = {}
stop_all()
for key, _ in context.items():
mg_instances.update(start_instance(context, key, procdir))
return mg_instances
start_instance(context, key, procdir)
def start(context, name, procdir=""):
if name != "all":
return start_instance(context, name, procdir)
start_instance(context, name, procdir)
return
return start_all(context)
start_all(context)
def info(context):

View File

@ -11,13 +11,13 @@
import sys
import atexit
import os
import pytest
import time
from common import execute_and_fetch_all
import interactive_mg_runner
import mgclient
interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
interactive_mg_runner.PROJECT_DIR = os.path.normpath(
@ -51,7 +51,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
"args": ["--bolt-port", "7687", "--log-level=TRACE"],
"log_file": "main.log",
"setup_queries": [
"REGISTER REPLICA replica_1 SYNC WITH TIMEOUT 0 TO '127.0.0.1:10001';",
"REGISTER REPLICA replica_1 SYNC WITH TIMEOUT 2 TO '127.0.0.1:10001';",
"REGISTER REPLICA replica_2 SYNC WITH TIMEOUT 1 TO '127.0.0.1:10002';",
"REGISTER REPLICA replica_3 ASYNC TO '127.0.0.1:10003';",
"REGISTER REPLICA replica_4 ASYNC TO '127.0.0.1:10004';",
@ -68,11 +68,7 @@ def test_show_replicas(connection):
# 3/ We kill another replica. It should become invalid in the SHOW REPLICAS command.
# 0/
atexit.register(
interactive_mg_runner.stop_all
) # Needed in case the test fails due to an assert. One still want the instances to be stoped.
mg_instances = interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
cursor = connection(7687, "main").cursor()
@ -92,7 +88,7 @@ def test_show_replicas(connection):
assert EXPECTED_COLUMN_NAMES == actual_column_names
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0, 0, 0, "ready"),
("replica_1", "127.0.0.1:10001", "sync", 2, 0, 0, "ready"),
("replica_2", "127.0.0.1:10002", "sync", 1.0, 0, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", None, 0, 0, "ready"),
("replica_4", "127.0.0.1:10004", "async", None, 0, 0, "ready"),
@ -103,27 +99,60 @@ def test_show_replicas(connection):
execute_and_fetch_all(cursor, "DROP REPLICA replica_2")
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0, 0, 0, "ready"),
("replica_1", "127.0.0.1:10001", "sync", 2.0, 0, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", None, 0, 0, "ready"),
("replica_4", "127.0.0.1:10004", "async", None, 0, 0, "ready"),
}
assert expected_data == actual_data
# 3/
mg_instances["replica_1"].kill()
mg_instances["replica_3"].kill()
mg_instances["replica_4"].stop()
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "replica_1")
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "replica_3")
interactive_mg_runner.stop(MEMGRAPH_INSTANCES_DESCRIPTION, "replica_4")
# We leave some time for the main to realise the replicas are down.
time.sleep(2)
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0, 0, 0, "invalid"),
("replica_1", "127.0.0.1:10001", "sync", 2.0, 0, 0, "invalid"),
("replica_3", "127.0.0.1:10003", "async", None, 0, 0, "invalid"),
("replica_4", "127.0.0.1:10004", "async", None, 0, 0, "invalid"),
}
assert expected_data == actual_data
def test_add_replica_invalid_timeout(connection):
# Goal of this test is to check the registration of replica with invalid timeout raises an exception
CONFIGURATION = {
"replica_1": {
"args": ["--bolt-port", "7688", "--log-level=TRACE"],
"log_file": "replica1.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"],
},
"main": {
"args": ["--bolt-port", "7687", "--log-level=TRACE"],
"log_file": "main.log",
"setup_queries": [],
},
}
interactive_mg_runner.start_all(CONFIGURATION)
cursor = connection(7687, "main").cursor()
with pytest.raises(mgclient.DatabaseError):
execute_and_fetch_all(cursor, "REGISTER REPLICA replica_1 SYNC WITH TIMEOUT 0 TO '127.0.0.1:10001';")
with pytest.raises(mgclient.DatabaseError):
execute_and_fetch_all(cursor, "REGISTER REPLICA replica_1 SYNC WITH TIMEOUT -5 TO '127.0.0.1:10001';")
actual_data = execute_and_fetch_all(cursor, "SHOW REPLICAS;")
assert 0 == len(actual_data)
execute_and_fetch_all(cursor, "REGISTER REPLICA replica_1 SYNC WITH TIMEOUT 1 TO '127.0.0.1:10001';")
actual_data = execute_and_fetch_all(cursor, "SHOW REPLICAS;")
assert 1 == len(actual_data)
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -29,7 +29,7 @@ template_cluster: &template_cluster
args: ["--bolt-port", "7687", "--log-level=TRACE"]
log_file: "replication-e2e-main.log"
setup_queries: [
"REGISTER REPLICA replica_1 SYNC WITH TIMEOUT 0 TO '127.0.0.1:10001'",
"REGISTER REPLICA replica_1 SYNC WITH TIMEOUT 2 TO '127.0.0.1:10001'",
"REGISTER REPLICA replica_2 SYNC WITH TIMEOUT 1 TO '127.0.0.1:10002'",
"REGISTER REPLICA replica_3 ASYNC TO '127.0.0.1:10003'"
]

View File

@ -50,18 +50,15 @@ def run(args):
continue
log.info("%s STARTED.", workload_name)
# Setup.
mg_instances = {}
@atexit.register
def cleanup():
for mg_instance in mg_instances.values():
mg_instance.stop()
interactive_mg_runner.stop_all()
if "cluster" in workload:
procdir = ""
if "proc" in workload:
procdir = os.path.join(BUILD_DIR, workload["proc"])
mg_instances = interactive_mg_runner.start_all(workload["cluster"], procdir)
interactive_mg_runner.start_all(workload["cluster"], procdir)
# Test.
mg_test_binary = os.path.join(BUILD_DIR, workload["binary"])
@ -70,7 +67,7 @@ def run(args):
if "cluster" in workload:
for name, config in workload["cluster"].items():
for validation in config.get("validation_queries", []):
mg_instance = mg_instances[name]
mg_instance = interactive_mg_runner.MEMGRAPH_INSTANCES[name]
data = mg_instance.query(validation["query"])[0][0]
assert data == validation["expected"]
cleanup()