Revert replication tests (#1707)

This commit is contained in:
Antonio Filipovic 2024-02-12 16:42:57 +01:00 committed by GitHub
parent a511e63c7a
commit 4f4a569c72
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 399 additions and 136 deletions

View File

@ -423,7 +423,6 @@ jobs:
ctest -R memgraph__unit --output-on-failure -j$THREADS
- name: Run e2e tests
if: false
run: |
cd tests
./setup.sh /opt/toolchain-v4/activate

View File

@ -155,6 +155,12 @@ void InMemoryReplicationHandlers::HeartbeatHandler(dbms::DbmsHandler *dbms_handl
return;
}
// TODO: this handler is agnostic of InMemory, move to be reused by on-disk
if (!db_acc.has_value()) {
spdlog::warn("No database accessor");
storage::replication::HeartbeatRes res{false, 0, ""};
slk::Save(res, res_builder);
return;
}
auto const *storage = db_acc->get()->storage();
storage::replication::HeartbeatRes res{true, storage->repl_storage_state_.last_commit_timestamp_.load(),
std::string{storage->repl_storage_state_.epoch_.id()}};
@ -463,7 +469,6 @@ void InMemoryReplicationHandlers::TimestampHandler(dbms::DbmsHandler *dbms_handl
slk::Save(res, res_builder);
}
/////// AF how does this work, does it get all deltas at once or what?
uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage *storage,
storage::durability::BaseDecoder *decoder,
const uint64_t version) {

View File

@ -57,6 +57,7 @@ namespace slk {
// Serialize code for CreateDatabaseReq
void Save(const memgraph::storage::replication::CreateDatabaseReq &self, memgraph::slk::Builder *builder) {
memgraph::slk::Save(self.main_uuid, builder);
memgraph::slk::Save(self.epoch_id, builder);
memgraph::slk::Save(self.expected_group_timestamp, builder);
memgraph::slk::Save(self.new_group_timestamp, builder);
@ -64,6 +65,7 @@ void Save(const memgraph::storage::replication::CreateDatabaseReq &self, memgrap
}
void Load(memgraph::storage::replication::CreateDatabaseReq *self, memgraph::slk::Reader *reader) {
memgraph::slk::Load(&self->main_uuid, reader);
memgraph::slk::Load(&self->epoch_id, reader);
memgraph::slk::Load(&self->expected_group_timestamp, reader);
memgraph::slk::Load(&self->new_group_timestamp, reader);
@ -87,6 +89,7 @@ void Load(memgraph::storage::replication::CreateDatabaseRes *self, memgraph::slk
// Serialize code for DropDatabaseReq
void Save(const memgraph::storage::replication::DropDatabaseReq &self, memgraph::slk::Builder *builder) {
memgraph::slk::Save(self.main_uuid, builder);
memgraph::slk::Save(self.epoch_id, builder);
memgraph::slk::Save(self.expected_group_timestamp, builder);
memgraph::slk::Save(self.new_group_timestamp, builder);
@ -94,6 +97,7 @@ void Save(const memgraph::storage::replication::DropDatabaseReq &self, memgraph:
}
void Load(memgraph::storage::replication::DropDatabaseReq *self, memgraph::slk::Reader *reader) {
memgraph::slk::Load(&self->main_uuid, reader);
memgraph::slk::Load(&self->epoch_id, reader);
memgraph::slk::Load(&self->expected_group_timestamp, reader);
memgraph::slk::Load(&self->new_group_timestamp, reader);

View File

@ -57,9 +57,16 @@ ReplicationState::ReplicationState(std::optional<std::filesystem::path> durabili
auto replication_data = std::move(fetched_replication_data).GetValue();
#ifdef MG_ENTERPRISE
if (FLAGS_coordinator_server_port && std::holds_alternative<RoleReplicaData>(replication_data)) {
spdlog::trace("Restarted replication uuid for replica");
std::get<RoleReplicaData>(replication_data).uuid_.reset();
}
#endif
if (std::holds_alternative<RoleReplicaData>(replication_data)) {
spdlog::trace("Recovered main's uuid for replica {}",
std::string(std::get<RoleReplicaData>(replication_data).uuid_.value()));
} else {
spdlog::trace("Recovered uuid for main {}", std::string(std::get<RoleMainData>(replication_data).uuid_));
}
replication_data_ = std::move(replication_data);
}

View File

@ -198,6 +198,7 @@ bool ReplicationHandler::SetReplicationRoleReplica(const memgraph::replication::
const std::optional<utils::UUID> &main_uuid) {
// We don't want to restart the server if we're already a REPLICA
if (repl_state_.IsReplica()) {
spdlog::trace("Instance has already has replica role.");
return false;
}

View File

@ -39,7 +39,7 @@ endfunction()
add_subdirectory(fine_grained_access)
add_subdirectory(server)
#add_subdirectory(replication)
add_subdirectory(replication)
#add_subdirectory(memory)
add_subdirectory(triggers)
add_subdirectory(isolation_levels)

View File

@ -151,7 +151,10 @@ class MemgraphInstanceRunner:
if not keep_directories:
for folder in self.delete_on_stop or {}:
shutil.rmtree(folder)
try:
shutil.rmtree(folder)
except Exception as e:
pass # couldn't delete folder, skip
def kill(self, keep_directories=False):
if not self.is_running():

View File

@ -260,7 +260,14 @@ def test_drop_replicas(connection):
mg_sleep_and_assert(expected_data, retrieve_data)
def test_basic_recovery(connection):
@pytest.mark.parametrize(
"recover_data_on_startup",
[
"true",
"false",
],
)
def test_basic_recovery(recover_data_on_startup, connection):
# Goal of this test is to check the recovery of main.
# 0/ We start all replicas manually: we want to be able to kill them ourselves without relying on external tooling to kill processes.
# 1/ We check that all replicas have the correct state: they should all be ready.
@ -272,9 +279,9 @@ def test_basic_recovery(connection):
# 7/ We check that all replicas but one have the expected data.
# 8/ We kill another replica.
# 9/ We add some data to main.
# 10/ We re-add the two replicas droped/killed and check the data.
# 10/ We re-add the two replicas dropped/killed and check the data.
# 11/ We kill another replica.
# 12/ Add some more data to main. It must still still occured but exception is expected since one replica is down.
# 12/ Add some more data to main. It must still occur but exception is expected since one replica is down.
# 13/ Restart the replica
# 14/ Check the states of replicas.
# 15/ Add some data again.
@ -284,22 +291,60 @@ def test_basic_recovery(connection):
data_directory = tempfile.TemporaryDirectory()
CONFIGURATION = {
"replica_1": {
"args": ["--bolt-port", "7688", "--log-level=TRACE", "--replication-restore-state-on-startup=true"],
"args": [
"--bolt-port",
"7688",
"--log-level=TRACE",
"--replication-restore-state-on-startup",
"true",
"--data-recovery-on-startup",
"false",
],
"log_file": "replica1.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"],
# Need to set it up manually
"setup_queries": [],
"data_directory": f"{data_directory.name}/replica_1",
},
"replica_2": {
"args": ["--bolt-port", "7689", "--log-level=TRACE", "--replication-restore-state-on-startup=true"],
"args": [
"--bolt-port",
"7689",
"--log-level=TRACE",
"--replication-restore-state-on-startup",
"true",
"--data-recovery-on-startup",
"false",
],
"log_file": "replica2.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"],
"data_directory": f"{data_directory.name}/replica_2",
},
"replica_3": {
"args": ["--bolt-port", "7690", "--log-level=TRACE", "--replication-restore-state-on-startup=true"],
"args": [
"--bolt-port",
"7690",
"--log-level=TRACE",
"--replication-restore-state-on-startup",
"true",
"--data-recovery-on-startup",
f"{recover_data_on_startup}",
],
"log_file": "replica3.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10003;"],
# We restart this replica so we set replication role manually,
# On restart we would set replication role again, we want to get it from data
"setup_queries": [],
"data_directory": f"{data_directory.name}/replica_3",
},
"replica_4": {
"args": ["--bolt-port", "7691", "--log-level=TRACE", "--replication-restore-state-on-startup=true"],
"args": [
"--bolt-port",
"7691",
"--log-level=TRACE",
"--replication-restore-state-on-startup",
"true",
"--data-recovery-on-startup",
"false",
],
"log_file": "replica4.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10004;"],
},
@ -313,11 +358,18 @@ def test_basic_recovery(connection):
],
"log_file": "main.log",
"setup_queries": [],
"data_directory": f"{data_directory.name}",
"data_directory": f"{data_directory.name}/main",
},
}
interactive_mg_runner.start_all(CONFIGURATION)
replica_1_cursor = connection(7688, "replica_1").cursor()
execute_and_fetch_all(replica_1_cursor, "SET REPLICATION ROLE TO REPLICA WITH PORT 10001;")
replica_3_cursor = connection(7690, "replica_3").cursor()
execute_and_fetch_all(replica_3_cursor, "SET REPLICATION ROLE TO REPLICA WITH PORT 10003;")
cursor = connection(7687, "main").cursor()
# We want to execute manually and not via the configuration, otherwise re-starting main would also execute these registration.
@ -485,26 +537,32 @@ def test_basic_recovery(connection):
def test_replication_role_recovery(connection):
# Goal of this test is to check the recovery of main and replica role.
# 0/ We start all replicas manually: we want to be able to kill them ourselves without relying on external tooling to kill processes.
# 1/ We try to add a replica with reserved name which results in an exception <- Schema changed, there are no reserved names now
# 2/ We check that all replicas have the correct state: they should all be ready.
# 3/ We kill main.
# 4/ We re-start main. We check that main indeed has the role main and replicas still have the correct state.
# 5/ We kill the replica.
# 6/ We observed that the replica result is in invalid state.
# 7/ We start the replica again. We observe that indeed the replica has the replica state.
# 8/ We observe that main has the replica ready.
# 9/ We kill the replica again.
# 10/ We add data to main.
# 11/ We start the replica again. We observe that the replica has the same
# 1/ We check that all replicas have the correct state: they should all be ready.
# 2/ We kill main.
# 3/ We re-start main. We check that main indeed has the role main and replicas still have the correct state.
# 4/ We kill the replica.
# 5/ We observed that the replica result is in invalid state.
# 6/ We start the replica again. We observe that indeed the replica has the replica state.
# 7/ We observe that main has the replica ready.
# 8/ We kill the replica again.
# 9/ We add data to main.
# 10/ We start the replica again. We observe that the replica has the same
# data as main because it synced and added lost data.
# 0/
data_directory = tempfile.TemporaryDirectory()
CONFIGURATION = {
"replica": {
"args": ["--bolt-port", "7688", "--log-level=TRACE", "--replication-restore-state-on-startup=true"],
"args": [
"--bolt-port",
"7688",
"--log-level=TRACE",
"--replication-restore-state-on-startup",
"true",
"--storage-recover-on-startup",
"false",
],
"log_file": "replica.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"],
"data_directory": f"{data_directory.name}/replica",
},
"main": {
@ -522,37 +580,16 @@ def test_replication_role_recovery(connection):
}
interactive_mg_runner.start_all(CONFIGURATION)
replica_cursor = connection(7688, "replica").cursor()
execute_and_fetch_all(replica_cursor, "SET REPLICATION ROLE TO REPLICA WITH PORT 10001;")
cursor = connection(7687, "main").cursor()
# We want to execute manually and not via the configuration, otherwise re-starting main would also execute these registration.
execute_and_fetch_all(cursor, "REGISTER REPLICA replica SYNC TO '127.0.0.1:10001';")
# When we restart the replica, it does not need this query anymore since it needs to remember state
CONFIGURATION = {
"replica": {
"args": ["--bolt-port", "7688", "--log-level=TRACE", "--replication-restore-state-on-startup=true"],
"log_file": "replica.log",
"setup_queries": [],
"data_directory": f"{data_directory.name}/replica",
},
"main": {
"args": [
"--bolt-port",
"7687",
"--log-level=TRACE",
"--storage-recover-on-startup=true",
"--replication-restore-state-on-startup=true",
],
"log_file": "main.log",
"setup_queries": [],
"data_directory": f"{data_directory.name}/main",
},
}
# 1/ Obsolete, schema change, no longer a reserved name
# with pytest.raises(mgclient.DatabaseError):
# execute_and_fetch_all(cursor, "REGISTER REPLICA __replication_role SYNC TO '127.0.0.1:10002';")
# 2/
# 1/
expected_data = {
("replica", "127.0.0.1:10001", "sync", 0, 0, "ready"),
}
@ -566,10 +603,10 @@ def test_replication_role_recovery(connection):
check_roles()
# 3/
# 2/
interactive_mg_runner.kill(CONFIGURATION, "main")
# 4/
# 3/
interactive_mg_runner.start(CONFIGURATION, "main")
cursor = connection(7687, "main").cursor()
check_roles()
@ -580,10 +617,10 @@ def test_replication_role_recovery(connection):
actual_data = mg_sleep_and_assert(expected_data, retrieve_data)
assert actual_data == expected_data
# 5/
# 4/
interactive_mg_runner.kill(CONFIGURATION, "replica")
# 6/
# 5/
expected_data = {
("replica", "127.0.0.1:10001", "sync", 0, 0, "invalid"),
}
@ -591,11 +628,11 @@ def test_replication_role_recovery(connection):
assert actual_data == expected_data
# 7/
# 6/
interactive_mg_runner.start(CONFIGURATION, "replica")
check_roles()
# 8/
# 7/
expected_data = {
("replica", "127.0.0.1:10001", "sync", 0, 0, "ready"),
}
@ -603,14 +640,14 @@ def test_replication_role_recovery(connection):
actual_data = mg_sleep_and_assert(expected_data, retrieve_data)
assert actual_data == expected_data
# 9/
# 8/
interactive_mg_runner.kill(CONFIGURATION, "replica")
# 10/
# 9/
with pytest.raises(mgclient.DatabaseError):
execute_and_fetch_all(cursor, "CREATE (n:First)")
# 11/
# 10/
interactive_mg_runner.start(CONFIGURATION, "replica")
check_roles()
@ -948,7 +985,7 @@ def test_attempt_to_write_data_on_main_when_async_replica_is_down():
assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["async_replica2"].query(QUERY_TO_CHECK)
def test_attempt_to_write_data_on_main_when_sync_replica_is_down():
def test_attempt_to_write_data_on_main_when_sync_replica_is_down(connection):
# Goal of this test is to check that main cannot write new data if a sync replica is down.
# 0/ Start main and sync replicas.
# 1/ Check status of replicas.
@ -958,30 +995,48 @@ def test_attempt_to_write_data_on_main_when_sync_replica_is_down():
# 5/ Check the status of replicas.
# 6/ Restart the replica that was killed and check that it is up to date with main.
data_directory = tempfile.TemporaryDirectory()
CONFIGURATION = {
"sync_replica1": {
"args": ["--bolt-port", "7688", "--log-level=TRACE"],
"args": [
"--bolt-port",
"7688",
"--log-level",
"TRACE",
"--replication-restore-state-on-startup",
"true",
"--data-recovery-on-startup",
"false",
],
"log_file": "sync_replica1.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"],
# We restart this replica so we want to set role manually
"setup_queries": [],
"data_directory": f"{data_directory.name}/sync_replica1",
},
"sync_replica2": {
"args": ["--bolt-port", "7689", "--log-level=TRACE"],
"args": ["--bolt-port", "7689", "--log-level", "TRACE"],
"log_file": "sync_replica2.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"],
},
"main": {
"args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"],
"args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup", "true"],
"log_file": "main.log",
"setup_queries": [
"REGISTER REPLICA sync_replica1 SYNC TO '127.0.0.1:10001';",
"REGISTER REPLICA sync_replica2 SYNC TO '127.0.0.1:10002';",
],
# need to do it manually
"setup_queries": [],
"data_directory": f"{data_directory.name}/main",
},
}
# 0/
interactive_mg_runner.start_all(CONFIGURATION)
sync_replica1_cursor = connection(7688, "sync_replica1_cursor").cursor()
execute_and_fetch_all(sync_replica1_cursor, "SET REPLICATION ROLE TO REPLICA WITH PORT 10001;")
main_cursor = connection(7687, "main").cursor()
execute_and_fetch_all(main_cursor, "REGISTER REPLICA sync_replica1 SYNC TO '127.0.0.1:10001';")
execute_and_fetch_all(main_cursor, "REGISTER REPLICA sync_replica2 SYNC TO '127.0.0.1:10002';")
# 1/
expected_data = {
("sync_replica1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
@ -1126,7 +1181,7 @@ def test_attempt_to_create_indexes_on_main_when_async_replica_is_down():
assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["async_replica2"].query(QUERY_TO_CHECK)
def test_attempt_to_create_indexes_on_main_when_sync_replica_is_down():
def test_attempt_to_create_indexes_on_main_when_sync_replica_is_down(connection):
# Goal of this test is to check creation of new indexes/constraints when a sync replica is down.
# 0/ Start main and sync replicas.
# 1/ Check status of replicas.
@ -1136,11 +1191,21 @@ def test_attempt_to_create_indexes_on_main_when_sync_replica_is_down():
# 5/ Check the status of replicas.
# 6/ Restart the replica that was killed and check that it is up to date with main.
data_directory = tempfile.TemporaryDirectory()
CONFIGURATION = {
"sync_replica1": {
"args": ["--bolt-port", "7688", "--log-level=TRACE"],
"args": [
"--bolt-port",
"7688",
"--log-level=TRACE",
"--replication-restore-state-on-startup",
"true",
"--data-recovery-on-startup",
"false",
],
"log_file": "sync_replica1.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"],
"setup_queries": [],
"data_directory": f"{data_directory.name}/sync_replica1",
},
"sync_replica2": {
"args": ["--bolt-port", "7689", "--log-level=TRACE"],
@ -1150,16 +1215,24 @@ def test_attempt_to_create_indexes_on_main_when_sync_replica_is_down():
"main": {
"args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"],
"log_file": "main.log",
"setup_queries": [
"REGISTER REPLICA sync_replica1 SYNC TO '127.0.0.1:10001';",
"REGISTER REPLICA sync_replica2 SYNC TO '127.0.0.1:10002';",
],
# Need to do it manually
"setup_queries": [],
},
}
# 0/
interactive_mg_runner.start_all(CONFIGURATION)
sync_replica1_cursor = connection(7688, "sync_replica1").cursor()
execute_and_fetch_all(sync_replica1_cursor, "SET REPLICATION ROLE TO REPLICA WITH PORT 10001;")
cursor = connection(7687, "main").cursor()
# We want to execute manually and not via the configuration, as we are setting replica manually because
# of restart. Restart on replica would set role again.
execute_and_fetch_all(cursor, "REGISTER REPLICA sync_replica1 SYNC TO '127.0.0.1:10001';")
execute_and_fetch_all(cursor, "REGISTER REPLICA sync_replica2 SYNC TO '127.0.0.1:10002';")
# 1/
expected_data = {
("sync_replica1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
@ -1226,7 +1299,7 @@ def test_attempt_to_create_indexes_on_main_when_sync_replica_is_down():
assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica2"].query(QUERY_TO_CHECK)
def test_trigger_on_create_before_commit_with_offline_sync_replica():
def test_trigger_on_create_before_commit_with_offline_sync_replica(connection):
# 0/ Start all.
# 1/ Create the trigger
# 2/ Create a node. We expect two nodes created (our Not_Magic and the Magic created by trigger).
@ -1237,11 +1310,23 @@ def test_trigger_on_create_before_commit_with_offline_sync_replica():
# 7/ Check that we have two nodes.
# 8/ Re-start the replica and check it's online and that it has two nodes.
data_directory = tempfile.TemporaryDirectory()
CONFIGURATION = {
"sync_replica1": {
"args": ["--bolt-port", "7688", "--log-level=TRACE"],
"args": [
"--bolt-port",
"7688",
"--log-level",
"TRACE",
"--replication-restore-state-on-startup",
"true",
"--data-recovery-on-startup",
"false",
],
"log_file": "sync_replica1.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"],
# Need to do it manually since we kill this replica
"setup_queries": [],
"data_directory": f"{data_directory.name}/sync_replica1",
},
"sync_replica2": {
"args": ["--bolt-port", "7689", "--log-level=TRACE"],
@ -1251,16 +1336,24 @@ def test_trigger_on_create_before_commit_with_offline_sync_replica():
"main": {
"args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"],
"log_file": "main.log",
"setup_queries": [
"REGISTER REPLICA sync_replica1 SYNC TO '127.0.0.1:10001';",
"REGISTER REPLICA sync_replica2 SYNC TO '127.0.0.1:10002';",
],
# Need to do it manually since we kill replica
"setup_queries": [],
},
}
# 0/
interactive_mg_runner.start_all(CONFIGURATION)
sync_replica1_cursor = connection(7688, "sync_replica1").cursor()
execute_and_fetch_all(sync_replica1_cursor, "SET REPLICATION ROLE TO REPLICA WITH PORT 10001;")
cursor = connection(7687, "main").cursor()
# We want to execute manually and not via the configuration, as we are setting replica manually because
# of restart. Restart on replica would set role again.
execute_and_fetch_all(cursor, "REGISTER REPLICA sync_replica1 SYNC TO '127.0.0.1:10001';")
execute_and_fetch_all(cursor, "REGISTER REPLICA sync_replica2 SYNC TO '127.0.0.1:10002';")
# 1/
QUERY_CREATE_TRIGGER = """
CREATE TRIGGER exampleTrigger
@ -1325,7 +1418,7 @@ def test_trigger_on_create_before_commit_with_offline_sync_replica():
assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica2"].query(QUERY_TO_CHECK)
def test_trigger_on_update_before_commit_with_offline_sync_replica():
def test_trigger_on_update_before_commit_with_offline_sync_replica(connection):
# 0/ Start all.
# 1/ Create the trigger
# 2/ Create a node.
@ -1337,11 +1430,22 @@ def test_trigger_on_update_before_commit_with_offline_sync_replica():
# 8/ Check that we have two nodes.
# 9/ Re-start the replica and check it's online and that it has two nodes.
data_directory = tempfile.TemporaryDirectory()
CONFIGURATION = {
"sync_replica1": {
"args": ["--bolt-port", "7688", "--log-level=TRACE"],
"args": [
"--bolt-port",
"7688",
"--log-level=TRACE",
"--replication-restore-state-on-startup",
"true",
"--data-recovery-on-startup",
"false",
],
"log_file": "sync_replica1.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"],
# Need to do it manually
"setup_queries": [],
"data_directory": f"{data_directory.name}/sync_replica1",
},
"sync_replica2": {
"args": ["--bolt-port", "7689", "--log-level=TRACE"],
@ -1351,15 +1455,23 @@ def test_trigger_on_update_before_commit_with_offline_sync_replica():
"main": {
"args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"],
"log_file": "main.log",
"setup_queries": [
"REGISTER REPLICA sync_replica1 SYNC TO '127.0.0.1:10001';",
"REGISTER REPLICA sync_replica2 SYNC TO '127.0.0.1:10002';",
],
"setup_queries": [],
},
}
# 0/
interactive_mg_runner.start_all(CONFIGURATION)
sync_replica1_cursor = connection(7688, "sync_replica1").cursor()
execute_and_fetch_all(sync_replica1_cursor, "SET REPLICATION ROLE TO REPLICA WITH PORT 10001;")
cursor = connection(7687, "main").cursor()
# We want to execute manually and not via the configuration, as we are setting replica manually because
# of restart. Restart on replica would set role again.
execute_and_fetch_all(cursor, "REGISTER REPLICA sync_replica1 SYNC TO '127.0.0.1:10001';")
execute_and_fetch_all(cursor, "REGISTER REPLICA sync_replica2 SYNC TO '127.0.0.1:10002';")
# 1/
QUERY_CREATE_TRIGGER = """
CREATE TRIGGER exampleTrigger
@ -1429,7 +1541,7 @@ def test_trigger_on_update_before_commit_with_offline_sync_replica():
assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica2"].query(QUERY_TO_CHECK)
def test_trigger_on_delete_before_commit_with_offline_sync_replica():
def test_trigger_on_delete_before_commit_with_offline_sync_replica(connection):
# 0/ Start all.
# 1/ Create the trigger
# 2/ Create a node.
@ -1441,11 +1553,22 @@ def test_trigger_on_delete_before_commit_with_offline_sync_replica():
# 8/ Check that we have one node.
# 9/ Re-start the replica and check it's online and that it has one node, and the correct one.
data_directory = tempfile.TemporaryDirectory()
CONFIGURATION = {
"sync_replica1": {
"args": ["--bolt-port", "7688", "--log-level=TRACE"],
"args": [
"--bolt-port",
"7688",
"--log-level=TRACE",
"--replication-restore-state-on-startup",
"true",
"--data-recovery-on-startup",
"false",
],
"log_file": "sync_replica1.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"],
# we need to set it manually
"setup_queries": [],
"data_directory": f"{data_directory.name}/sync_replica1",
},
"sync_replica2": {
"args": ["--bolt-port", "7689", "--log-level=TRACE"],
@ -1455,16 +1578,23 @@ def test_trigger_on_delete_before_commit_with_offline_sync_replica():
"main": {
"args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"],
"log_file": "main.log",
"setup_queries": [
"REGISTER REPLICA sync_replica1 SYNC TO '127.0.0.1:10001';",
"REGISTER REPLICA sync_replica2 SYNC TO '127.0.0.1:10002';",
],
"setup_queries": [],
},
}
# 0/
interactive_mg_runner.start_all(CONFIGURATION)
sync_replica1_cursor = connection(7688, "sync_replica1").cursor()
execute_and_fetch_all(sync_replica1_cursor, "SET REPLICATION ROLE TO REPLICA WITH PORT 10001;")
cursor = connection(7687, "main").cursor()
# We want to execute manually and not via the configuration, as we are setting replica manually because
# of restart. Restart on replica would set role again.
execute_and_fetch_all(cursor, "REGISTER REPLICA sync_replica1 SYNC TO '127.0.0.1:10001';")
execute_and_fetch_all(cursor, "REGISTER REPLICA sync_replica2 SYNC TO '127.0.0.1:10002';")
# 1/
QUERY_CREATE_TRIGGER = """
CREATE TRIGGER exampleTrigger
@ -1473,7 +1603,7 @@ def test_trigger_on_delete_before_commit_with_offline_sync_replica():
"""
interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_CREATE_TRIGGER)
res_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW TRIGGERS;")
assert len(res_from_main) == 1, f"Incorect result: {res_from_main}"
assert len(res_from_main) == 1, f"Incorrect result: {res_from_main}"
# 2/
QUERY_CREATE_NODE = "CREATE (p:Number {name:'Not_Magic', value:0})"
@ -1486,7 +1616,7 @@ def test_trigger_on_delete_before_commit_with_offline_sync_replica():
# 4/
QUERY_TO_CHECK = "MATCH (node) return node;"
res_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_TO_CHECK)
assert len(res_from_main) == 1, f"Incorect result: {res_from_main}"
assert len(res_from_main) == 1, f"Incorrect result: {res_from_main}"
assert res_from_main[0][0].properties["name"] == "Node_created_by_trigger"
assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica1"].query(QUERY_TO_CHECK)
assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica2"].query(QUERY_TO_CHECK)
@ -1539,7 +1669,7 @@ def test_trigger_on_delete_before_commit_with_offline_sync_replica():
assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica2"].query(QUERY_TO_CHECK)
def test_trigger_on_create_before_and_after_commit_with_offline_sync_replica():
def test_trigger_on_create_before_and_after_commit_with_offline_sync_replica(connection):
# 0/ Start all.
# 1/ Create the triggers
# 2/ Create a node. We expect three nodes created (1 node created + the two created by triggers).
@ -1550,11 +1680,22 @@ def test_trigger_on_create_before_and_after_commit_with_offline_sync_replica():
# 7/ Check that we have three nodes.
# 8/ Re-start the replica and check it's online and that it has three nodes.
data_directory = tempfile.TemporaryDirectory()
CONFIGURATION = {
"sync_replica1": {
"args": ["--bolt-port", "7688", "--log-level=TRACE"],
"args": [
"--bolt-port",
"7688",
"--log-level=TRACE",
"--replication-restore-state-on-startup",
"true",
"--data-recovery-on-startup",
"false",
],
"log_file": "sync_replica1.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"],
# we need to set it manually
"setup_queries": [],
"data_directory": f"{data_directory.name}/sync_replica1",
},
"sync_replica2": {
"args": ["--bolt-port", "7689", "--log-level=TRACE"],
@ -1564,16 +1705,23 @@ def test_trigger_on_create_before_and_after_commit_with_offline_sync_replica():
"main": {
"args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"],
"log_file": "main.log",
"setup_queries": [
"REGISTER REPLICA sync_replica1 SYNC TO '127.0.0.1:10001';",
"REGISTER REPLICA sync_replica2 SYNC TO '127.0.0.1:10002';",
],
"setup_queries": [],
},
}
# 0/
interactive_mg_runner.start_all(CONFIGURATION)
sync_replica1_cursor = connection(7688, "sync_replica1").cursor()
execute_and_fetch_all(sync_replica1_cursor, "SET REPLICATION ROLE TO REPLICA WITH PORT 10001;")
cursor = connection(7687, "main").cursor()
# We want to execute manually and not via the configuration, as we are setting replica manually because
# of restart. Restart on replica would set role again.
execute_and_fetch_all(cursor, "REGISTER REPLICA sync_replica1 SYNC TO '127.0.0.1:10001';")
execute_and_fetch_all(cursor, "REGISTER REPLICA sync_replica2 SYNC TO '127.0.0.1:10002';")
# 1/
QUERY_CREATE_TRIGGER_BEFORE = """
CREATE TRIGGER exampleTriggerBefore
@ -1644,7 +1792,7 @@ def test_trigger_on_create_before_and_after_commit_with_offline_sync_replica():
assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica2"].query(QUERY_TO_CHECK)
def test_triggers_on_create_before_commit_with_offline_sync_replica():
def test_triggers_on_create_before_commit_with_offline_sync_replica(connection):
# 0/ Start all.
# 1/ Create the two triggers
# 2/ Create a node. We expect three nodes.
@ -1655,11 +1803,22 @@ def test_triggers_on_create_before_commit_with_offline_sync_replica():
# 7/ Check that we have three nodes.
# 8/ Re-start the replica and check it's online and that it has two nodes.
data_directory = tempfile.TemporaryDirectory()
CONFIGURATION = {
"sync_replica1": {
"args": ["--bolt-port", "7688", "--log-level=TRACE"],
"args": [
"--bolt-port",
"7688",
"--log-level=TRACE",
"--replication-restore-state-on-startup",
"true",
"--data-recovery-on-startup",
"false",
],
"log_file": "sync_replica1.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"],
# we need to set it manually
"setup_queries": [],
"data_directory": f"{data_directory.name}/sync_replica1",
},
"sync_replica2": {
"args": ["--bolt-port", "7689", "--log-level=TRACE"],
@ -1669,16 +1828,23 @@ def test_triggers_on_create_before_commit_with_offline_sync_replica():
"main": {
"args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"],
"log_file": "main.log",
"setup_queries": [
"REGISTER REPLICA sync_replica1 SYNC TO '127.0.0.1:10001';",
"REGISTER REPLICA sync_replica2 SYNC TO '127.0.0.1:10002';",
],
"setup_queries": [],
},
}
# 0/
interactive_mg_runner.start_all(CONFIGURATION)
sync_replica1_cursor = connection(7688, "sync_replica1").cursor()
execute_and_fetch_all(sync_replica1_cursor, "SET REPLICATION ROLE TO REPLICA WITH PORT 10001;")
cursor = connection(7687, "main").cursor()
# We want to execute manually and not via the configuration, as we are setting replica manually because
# of restart. Restart on replica would set role again.
execute_and_fetch_all(cursor, "REGISTER REPLICA sync_replica1 SYNC TO '127.0.0.1:10001';")
execute_and_fetch_all(cursor, "REGISTER REPLICA sync_replica2 SYNC TO '127.0.0.1:10002';")
# 1/
QUERY_CREATE_TRIGGER_FIRST = """
CREATE TRIGGER exampleTriggerFirst
@ -1824,4 +1990,5 @@ def test_replication_not_messed_up_by_ShowIndexInfo(connection):
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-k", "test_basic_recovery"]))
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -16,6 +16,7 @@ import sys
import tempfile
import time
from functools import partial
from typing import Any, Dict
import interactive_mg_runner
import mgclient
@ -33,26 +34,64 @@ interactive_mg_runner.MEMGRAPH_BINARY = os.path.normpath(os.path.join(interactiv
BOLT_PORTS = {"main": 7687, "replica_1": 7688, "replica_2": 7689}
REPLICATION_PORTS = {"replica_1": 10001, "replica_2": 10002}
MEMGRAPH_INSTANCES_DESCRIPTION = {
"replica_1": {
"args": ["--bolt-port", f"{BOLT_PORTS['replica_1']}", "--log-level=TRACE"],
"log_file": "replica1.log",
"setup_queries": [f"SET REPLICATION ROLE TO REPLICA WITH PORT {REPLICATION_PORTS['replica_1']};"],
},
"replica_2": {
"args": ["--bolt-port", f"{BOLT_PORTS['replica_2']}", "--log-level=TRACE"],
"log_file": "replica2.log",
"setup_queries": [f"SET REPLICATION ROLE TO REPLICA WITH PORT {REPLICATION_PORTS['replica_2']};"],
},
"main": {
"args": ["--bolt-port", f"{BOLT_PORTS['main']}", "--log-level=TRACE"],
"log_file": "main.log",
"setup_queries": [
f"REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:{REPLICATION_PORTS['replica_1']}';",
f"REGISTER REPLICA replica_2 ASYNC TO '127.0.0.1:{REPLICATION_PORTS['replica_2']}';",
],
},
}
def create_memgraph_instances_with_role_recovery(data_directory: Any) -> Dict[str, Any]:
return {
"replica_1": {
"args": [
"--bolt-port",
f"{BOLT_PORTS['replica_1']}",
"--log-level",
"TRACE",
"--replication-restore-state-on-startup",
"true",
"--data-recovery-on-startup",
"false",
],
"log_file": "replica1.log",
"data_directory": f"{data_directory}/replica_1",
},
"replica_2": {
"args": [
"--bolt-port",
f"{BOLT_PORTS['replica_2']}",
"--log-level=TRACE",
"--replication-restore-state-on-startup",
"true",
"--data-recovery-on-startup",
"false",
],
"log_file": "replica2.log",
"data_directory": f"{data_directory}/replica_2",
},
"main": {
"args": ["--bolt-port", f"{BOLT_PORTS['main']}", "--log-level=TRACE"],
"log_file": "main.log",
"setup_queries": [],
},
}
def do_manual_setting_up(connection):
replica_1_cursor = connection(BOLT_PORTS["replica_1"], "replica_1").cursor()
execute_and_fetch_all(
replica_1_cursor, f"SET REPLICATION ROLE TO REPLICA WITH PORT {REPLICATION_PORTS['replica_1']};"
)
replica_2_cursor = connection(BOLT_PORTS["replica_2"], "replica_2").cursor()
execute_and_fetch_all(
replica_2_cursor, f"SET REPLICATION ROLE TO REPLICA WITH PORT {REPLICATION_PORTS['replica_2']};"
)
main_cursor = connection(BOLT_PORTS["main"], "main").cursor()
execute_and_fetch_all(
main_cursor, f"REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:{REPLICATION_PORTS['replica_1']}';"
)
execute_and_fetch_all(
main_cursor, f"REGISTER REPLICA replica_2 ASYNC TO '127.0.0.1:{REPLICATION_PORTS['replica_2']}';"
)
TEMP_DIR = tempfile.TemporaryDirectory().name
@ -438,7 +477,12 @@ def test_automatic_databases_create_multitenancy_replication(connection):
# 3/ Validate replication of changes to A have arrived at REPLICA
# 0/
data_directory = tempfile.TemporaryDirectory()
MEMGRAPH_INSTANCES_DESCRIPTION = create_memgraph_instances_with_role_recovery(data_directory.name)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
do_manual_setting_up(connection)
main_cursor = connection(BOLT_PORTS["main"], "main").cursor()
# 1/
@ -586,11 +630,15 @@ def test_multitenancy_replication_restart_replica_w_fc(connection, replica_name)
# 3/ Restart replica
# 4/ Validate data on replica
data_directory = tempfile.TemporaryDirectory()
MEMGRAPH_INSTANCES_DESCRIPTION = create_memgraph_instances_with_role_recovery(data_directory.name)
# 0/
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
main_cursor = connection(BOLT_PORTS["main"], "main").cursor()
do_manual_setting_up(connection)
# 1/
main_cursor = connection(BOLT_PORTS["main"], "main").cursor()
execute_and_fetch_all(main_cursor, "CREATE DATABASE A;")
execute_and_fetch_all(main_cursor, "CREATE DATABASE B;")
@ -647,7 +695,12 @@ def test_multitenancy_replication_restart_replica_wo_fc(connection, replica_name
# 4/ Validate data on replica
# 0/
data_directory = tempfile.TemporaryDirectory()
MEMGRAPH_INSTANCES_DESCRIPTION = create_memgraph_instances_with_role_recovery(data_directory.name)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
do_manual_setting_up(connection)
main_cursor = connection(BOLT_PORTS["main"], "main").cursor()
# 1/
@ -733,7 +786,11 @@ def test_multitenancy_replication_drop_replica(connection, replica_name):
# 4/ Validate data on replica
# 0/
data_directory = tempfile.TemporaryDirectory()
MEMGRAPH_INSTANCES_DESCRIPTION = create_memgraph_instances_with_role_recovery(data_directory.name)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
do_manual_setting_up(connection)
main_cursor = connection(BOLT_PORTS["main"], "main").cursor()
# 1/
@ -829,7 +886,12 @@ def test_automatic_databases_drop_multitenancy_replication(connection):
# 5/ Check that the drop replicated
# 0/
data_directory = tempfile.TemporaryDirectory()
MEMGRAPH_INSTANCES_DESCRIPTION = create_memgraph_instances_with_role_recovery(data_directory.name)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
do_manual_setting_up(connection)
main_cursor = connection(BOLT_PORTS["main"], "main").cursor()
# 1/
@ -878,7 +940,12 @@ def test_drop_multitenancy_replication_restart_replica(connection, replica_name)
# 4/ Validate data on replica
# 0/
data_directory = tempfile.TemporaryDirectory()
MEMGRAPH_INSTANCES_DESCRIPTION = create_memgraph_instances_with_role_recovery(data_directory.name)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
do_manual_setting_up(connection)
main_cursor = connection(BOLT_PORTS["main"], "main").cursor()
# 1/
@ -915,7 +982,12 @@ def test_multitenancy_drop_while_replica_using(connection):
# 6/ Validate that the transaction is still active and working and that the replica2 is not pointing to anything
# 0/
data_directory = tempfile.TemporaryDirectory()
MEMGRAPH_INSTANCES_DESCRIPTION = create_memgraph_instances_with_role_recovery(data_directory.name)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
do_manual_setting_up(connection)
main_cursor = connection(BOLT_PORTS["main"], "main").cursor()
# 1/
@ -985,7 +1057,12 @@ def test_multitenancy_drop_and_recreate_while_replica_using(connection):
# 6/ Validate that the transaction is still active and working and that the replica2 is not pointing to anything
# 0/
data_directory = tempfile.TemporaryDirectory()
MEMGRAPH_INSTANCES_DESCRIPTION = create_memgraph_instances_with_role_recovery(data_directory.name)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
do_manual_setting_up(connection)
main_cursor = connection(BOLT_PORTS["main"], "main").cursor()
# 1/