From 4f4a569c727ffc0b5cbf10aea6e62fc21248cc80 Mon Sep 17 00:00:00 2001 From: Antonio Filipovic <61245998+antoniofilipovic@users.noreply.github.com> Date: Mon, 12 Feb 2024 16:42:57 +0100 Subject: [PATCH] Revert replication tests (#1707) --- .github/workflows/diff.yaml | 1 - src/dbms/inmemory/replication_handlers.cpp | 7 +- src/dbms/rpc.cpp | 4 + src/replication/state.cpp | 7 + .../replication_handler.cpp | 1 + tests/e2e/CMakeLists.txt | 2 +- tests/e2e/memgraph.py | 5 +- .../show_while_creating_invalid_state.py | 389 +++++++++++++----- .../replication_experimental/multitenancy.py | 119 +++++- 9 files changed, 399 insertions(+), 136 deletions(-) diff --git a/.github/workflows/diff.yaml b/.github/workflows/diff.yaml index 1113c4181..e9894efaa 100644 --- a/.github/workflows/diff.yaml +++ b/.github/workflows/diff.yaml @@ -423,7 +423,6 @@ jobs: ctest -R memgraph__unit --output-on-failure -j$THREADS - name: Run e2e tests - if: false run: | cd tests ./setup.sh /opt/toolchain-v4/activate diff --git a/src/dbms/inmemory/replication_handlers.cpp b/src/dbms/inmemory/replication_handlers.cpp index 61d8d3bbd..b7b2146f4 100644 --- a/src/dbms/inmemory/replication_handlers.cpp +++ b/src/dbms/inmemory/replication_handlers.cpp @@ -155,6 +155,12 @@ void InMemoryReplicationHandlers::HeartbeatHandler(dbms::DbmsHandler *dbms_handl return; } // TODO: this handler is agnostic of InMemory, move to be reused by on-disk + if (!db_acc.has_value()) { + spdlog::warn("No database accessor"); + storage::replication::HeartbeatRes res{false, 0, ""}; + slk::Save(res, res_builder); + return; + } auto const *storage = db_acc->get()->storage(); storage::replication::HeartbeatRes res{true, storage->repl_storage_state_.last_commit_timestamp_.load(), std::string{storage->repl_storage_state_.epoch_.id()}}; @@ -463,7 +469,6 @@ void InMemoryReplicationHandlers::TimestampHandler(dbms::DbmsHandler *dbms_handl slk::Save(res, res_builder); } -/////// AF how does this work, does it get all deltas at once or what? uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage *storage, storage::durability::BaseDecoder *decoder, const uint64_t version) { diff --git a/src/dbms/rpc.cpp b/src/dbms/rpc.cpp index 18a425cbf..c9508d064 100644 --- a/src/dbms/rpc.cpp +++ b/src/dbms/rpc.cpp @@ -57,6 +57,7 @@ namespace slk { // Serialize code for CreateDatabaseReq void Save(const memgraph::storage::replication::CreateDatabaseReq &self, memgraph::slk::Builder *builder) { + memgraph::slk::Save(self.main_uuid, builder); memgraph::slk::Save(self.epoch_id, builder); memgraph::slk::Save(self.expected_group_timestamp, builder); memgraph::slk::Save(self.new_group_timestamp, builder); @@ -64,6 +65,7 @@ void Save(const memgraph::storage::replication::CreateDatabaseReq &self, memgrap } void Load(memgraph::storage::replication::CreateDatabaseReq *self, memgraph::slk::Reader *reader) { + memgraph::slk::Load(&self->main_uuid, reader); memgraph::slk::Load(&self->epoch_id, reader); memgraph::slk::Load(&self->expected_group_timestamp, reader); memgraph::slk::Load(&self->new_group_timestamp, reader); @@ -87,6 +89,7 @@ void Load(memgraph::storage::replication::CreateDatabaseRes *self, memgraph::slk // Serialize code for DropDatabaseReq void Save(const memgraph::storage::replication::DropDatabaseReq &self, memgraph::slk::Builder *builder) { + memgraph::slk::Save(self.main_uuid, builder); memgraph::slk::Save(self.epoch_id, builder); memgraph::slk::Save(self.expected_group_timestamp, builder); memgraph::slk::Save(self.new_group_timestamp, builder); @@ -94,6 +97,7 @@ void Save(const memgraph::storage::replication::DropDatabaseReq &self, memgraph: } void Load(memgraph::storage::replication::DropDatabaseReq *self, memgraph::slk::Reader *reader) { + memgraph::slk::Load(&self->main_uuid, reader); memgraph::slk::Load(&self->epoch_id, reader); memgraph::slk::Load(&self->expected_group_timestamp, reader); memgraph::slk::Load(&self->new_group_timestamp, reader); diff --git a/src/replication/state.cpp b/src/replication/state.cpp index 6cc4ff951..6b1d128ec 100644 --- a/src/replication/state.cpp +++ b/src/replication/state.cpp @@ -57,9 +57,16 @@ ReplicationState::ReplicationState(std::optional<std::filesystem::path> durabili auto replication_data = std::move(fetched_replication_data).GetValue(); #ifdef MG_ENTERPRISE if (FLAGS_coordinator_server_port && std::holds_alternative<RoleReplicaData>(replication_data)) { + spdlog::trace("Restarted replication uuid for replica"); std::get<RoleReplicaData>(replication_data).uuid_.reset(); } #endif + if (std::holds_alternative<RoleReplicaData>(replication_data)) { + spdlog::trace("Recovered main's uuid for replica {}", + std::string(std::get<RoleReplicaData>(replication_data).uuid_.value())); + } else { + spdlog::trace("Recovered uuid for main {}", std::string(std::get<RoleMainData>(replication_data).uuid_)); + } replication_data_ = std::move(replication_data); } diff --git a/src/replication_handler/replication_handler.cpp b/src/replication_handler/replication_handler.cpp index ed0a095c8..211b04854 100644 --- a/src/replication_handler/replication_handler.cpp +++ b/src/replication_handler/replication_handler.cpp @@ -198,6 +198,7 @@ bool ReplicationHandler::SetReplicationRoleReplica(const memgraph::replication:: const std::optional<utils::UUID> &main_uuid) { // We don't want to restart the server if we're already a REPLICA if (repl_state_.IsReplica()) { + spdlog::trace("Instance has already has replica role."); return false; } diff --git a/tests/e2e/CMakeLists.txt b/tests/e2e/CMakeLists.txt index 23ec96d75..8892ae5db 100644 --- a/tests/e2e/CMakeLists.txt +++ b/tests/e2e/CMakeLists.txt @@ -39,7 +39,7 @@ endfunction() add_subdirectory(fine_grained_access) add_subdirectory(server) -#add_subdirectory(replication) +add_subdirectory(replication) #add_subdirectory(memory) add_subdirectory(triggers) add_subdirectory(isolation_levels) diff --git a/tests/e2e/memgraph.py b/tests/e2e/memgraph.py index 92c0a8343..f90e17c99 100755 --- a/tests/e2e/memgraph.py +++ b/tests/e2e/memgraph.py @@ -151,7 +151,10 @@ class MemgraphInstanceRunner: if not keep_directories: for folder in self.delete_on_stop or {}: - shutil.rmtree(folder) + try: + shutil.rmtree(folder) + except Exception as e: + pass # couldn't delete folder, skip def kill(self, keep_directories=False): if not self.is_running(): diff --git a/tests/e2e/replication/show_while_creating_invalid_state.py b/tests/e2e/replication/show_while_creating_invalid_state.py index 8da0c560a..abd5b5f48 100644 --- a/tests/e2e/replication/show_while_creating_invalid_state.py +++ b/tests/e2e/replication/show_while_creating_invalid_state.py @@ -260,7 +260,14 @@ def test_drop_replicas(connection): mg_sleep_and_assert(expected_data, retrieve_data) -def test_basic_recovery(connection): +@pytest.mark.parametrize( + "recover_data_on_startup", + [ + "true", + "false", + ], +) +def test_basic_recovery(recover_data_on_startup, connection): # Goal of this test is to check the recovery of main. # 0/ We start all replicas manually: we want to be able to kill them ourselves without relying on external tooling to kill processes. # 1/ We check that all replicas have the correct state: they should all be ready. @@ -272,9 +279,9 @@ def test_basic_recovery(connection): # 7/ We check that all replicas but one have the expected data. # 8/ We kill another replica. # 9/ We add some data to main. - # 10/ We re-add the two replicas droped/killed and check the data. + # 10/ We re-add the two replicas dropped/killed and check the data. # 11/ We kill another replica. - # 12/ Add some more data to main. It must still still occured but exception is expected since one replica is down. + # 12/ Add some more data to main. It must still occur but exception is expected since one replica is down. # 13/ Restart the replica # 14/ Check the states of replicas. # 15/ Add some data again. @@ -284,22 +291,60 @@ def test_basic_recovery(connection): data_directory = tempfile.TemporaryDirectory() CONFIGURATION = { "replica_1": { - "args": ["--bolt-port", "7688", "--log-level=TRACE", "--replication-restore-state-on-startup=true"], + "args": [ + "--bolt-port", + "7688", + "--log-level=TRACE", + "--replication-restore-state-on-startup", + "true", + "--data-recovery-on-startup", + "false", + ], "log_file": "replica1.log", - "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"], + # Need to set it up manually + "setup_queries": [], + "data_directory": f"{data_directory.name}/replica_1", }, "replica_2": { - "args": ["--bolt-port", "7689", "--log-level=TRACE", "--replication-restore-state-on-startup=true"], + "args": [ + "--bolt-port", + "7689", + "--log-level=TRACE", + "--replication-restore-state-on-startup", + "true", + "--data-recovery-on-startup", + "false", + ], "log_file": "replica2.log", "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"], + "data_directory": f"{data_directory.name}/replica_2", }, "replica_3": { - "args": ["--bolt-port", "7690", "--log-level=TRACE", "--replication-restore-state-on-startup=true"], + "args": [ + "--bolt-port", + "7690", + "--log-level=TRACE", + "--replication-restore-state-on-startup", + "true", + "--data-recovery-on-startup", + f"{recover_data_on_startup}", + ], "log_file": "replica3.log", - "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10003;"], + # We restart this replica so we set replication role manually, + # On restart we would set replication role again, we want to get it from data + "setup_queries": [], + "data_directory": f"{data_directory.name}/replica_3", }, "replica_4": { - "args": ["--bolt-port", "7691", "--log-level=TRACE", "--replication-restore-state-on-startup=true"], + "args": [ + "--bolt-port", + "7691", + "--log-level=TRACE", + "--replication-restore-state-on-startup", + "true", + "--data-recovery-on-startup", + "false", + ], "log_file": "replica4.log", "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10004;"], }, @@ -313,11 +358,18 @@ def test_basic_recovery(connection): ], "log_file": "main.log", "setup_queries": [], - "data_directory": f"{data_directory.name}", + "data_directory": f"{data_directory.name}/main", }, } interactive_mg_runner.start_all(CONFIGURATION) + + replica_1_cursor = connection(7688, "replica_1").cursor() + execute_and_fetch_all(replica_1_cursor, "SET REPLICATION ROLE TO REPLICA WITH PORT 10001;") + + replica_3_cursor = connection(7690, "replica_3").cursor() + execute_and_fetch_all(replica_3_cursor, "SET REPLICATION ROLE TO REPLICA WITH PORT 10003;") + cursor = connection(7687, "main").cursor() # We want to execute manually and not via the configuration, otherwise re-starting main would also execute these registration. @@ -485,26 +537,32 @@ def test_basic_recovery(connection): def test_replication_role_recovery(connection): # Goal of this test is to check the recovery of main and replica role. # 0/ We start all replicas manually: we want to be able to kill them ourselves without relying on external tooling to kill processes. - # 1/ We try to add a replica with reserved name which results in an exception <- Schema changed, there are no reserved names now - # 2/ We check that all replicas have the correct state: they should all be ready. - # 3/ We kill main. - # 4/ We re-start main. We check that main indeed has the role main and replicas still have the correct state. - # 5/ We kill the replica. - # 6/ We observed that the replica result is in invalid state. - # 7/ We start the replica again. We observe that indeed the replica has the replica state. - # 8/ We observe that main has the replica ready. - # 9/ We kill the replica again. - # 10/ We add data to main. - # 11/ We start the replica again. We observe that the replica has the same + # 1/ We check that all replicas have the correct state: they should all be ready. + # 2/ We kill main. + # 3/ We re-start main. We check that main indeed has the role main and replicas still have the correct state. + # 4/ We kill the replica. + # 5/ We observed that the replica result is in invalid state. + # 6/ We start the replica again. We observe that indeed the replica has the replica state. + # 7/ We observe that main has the replica ready. + # 8/ We kill the replica again. + # 9/ We add data to main. + # 10/ We start the replica again. We observe that the replica has the same # data as main because it synced and added lost data. # 0/ data_directory = tempfile.TemporaryDirectory() CONFIGURATION = { "replica": { - "args": ["--bolt-port", "7688", "--log-level=TRACE", "--replication-restore-state-on-startup=true"], + "args": [ + "--bolt-port", + "7688", + "--log-level=TRACE", + "--replication-restore-state-on-startup", + "true", + "--storage-recover-on-startup", + "false", + ], "log_file": "replica.log", - "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"], "data_directory": f"{data_directory.name}/replica", }, "main": { @@ -522,37 +580,16 @@ def test_replication_role_recovery(connection): } interactive_mg_runner.start_all(CONFIGURATION) + + replica_cursor = connection(7688, "replica").cursor() + execute_and_fetch_all(replica_cursor, "SET REPLICATION ROLE TO REPLICA WITH PORT 10001;") + cursor = connection(7687, "main").cursor() # We want to execute manually and not via the configuration, otherwise re-starting main would also execute these registration. execute_and_fetch_all(cursor, "REGISTER REPLICA replica SYNC TO '127.0.0.1:10001';") - # When we restart the replica, it does not need this query anymore since it needs to remember state - CONFIGURATION = { - "replica": { - "args": ["--bolt-port", "7688", "--log-level=TRACE", "--replication-restore-state-on-startup=true"], - "log_file": "replica.log", - "setup_queries": [], - "data_directory": f"{data_directory.name}/replica", - }, - "main": { - "args": [ - "--bolt-port", - "7687", - "--log-level=TRACE", - "--storage-recover-on-startup=true", - "--replication-restore-state-on-startup=true", - ], - "log_file": "main.log", - "setup_queries": [], - "data_directory": f"{data_directory.name}/main", - }, - } - # 1/ Obsolete, schema change, no longer a reserved name - # with pytest.raises(mgclient.DatabaseError): - # execute_and_fetch_all(cursor, "REGISTER REPLICA __replication_role SYNC TO '127.0.0.1:10002';") - - # 2/ + # 1/ expected_data = { ("replica", "127.0.0.1:10001", "sync", 0, 0, "ready"), } @@ -566,10 +603,10 @@ def test_replication_role_recovery(connection): check_roles() - # 3/ + # 2/ interactive_mg_runner.kill(CONFIGURATION, "main") - # 4/ + # 3/ interactive_mg_runner.start(CONFIGURATION, "main") cursor = connection(7687, "main").cursor() check_roles() @@ -580,10 +617,10 @@ def test_replication_role_recovery(connection): actual_data = mg_sleep_and_assert(expected_data, retrieve_data) assert actual_data == expected_data - # 5/ + # 4/ interactive_mg_runner.kill(CONFIGURATION, "replica") - # 6/ + # 5/ expected_data = { ("replica", "127.0.0.1:10001", "sync", 0, 0, "invalid"), } @@ -591,11 +628,11 @@ def test_replication_role_recovery(connection): assert actual_data == expected_data - # 7/ + # 6/ interactive_mg_runner.start(CONFIGURATION, "replica") check_roles() - # 8/ + # 7/ expected_data = { ("replica", "127.0.0.1:10001", "sync", 0, 0, "ready"), } @@ -603,14 +640,14 @@ def test_replication_role_recovery(connection): actual_data = mg_sleep_and_assert(expected_data, retrieve_data) assert actual_data == expected_data - # 9/ + # 8/ interactive_mg_runner.kill(CONFIGURATION, "replica") - # 10/ + # 9/ with pytest.raises(mgclient.DatabaseError): execute_and_fetch_all(cursor, "CREATE (n:First)") - # 11/ + # 10/ interactive_mg_runner.start(CONFIGURATION, "replica") check_roles() @@ -948,7 +985,7 @@ def test_attempt_to_write_data_on_main_when_async_replica_is_down(): assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["async_replica2"].query(QUERY_TO_CHECK) -def test_attempt_to_write_data_on_main_when_sync_replica_is_down(): +def test_attempt_to_write_data_on_main_when_sync_replica_is_down(connection): # Goal of this test is to check that main cannot write new data if a sync replica is down. # 0/ Start main and sync replicas. # 1/ Check status of replicas. @@ -958,30 +995,48 @@ def test_attempt_to_write_data_on_main_when_sync_replica_is_down(): # 5/ Check the status of replicas. # 6/ Restart the replica that was killed and check that it is up to date with main. + data_directory = tempfile.TemporaryDirectory() CONFIGURATION = { "sync_replica1": { - "args": ["--bolt-port", "7688", "--log-level=TRACE"], + "args": [ + "--bolt-port", + "7688", + "--log-level", + "TRACE", + "--replication-restore-state-on-startup", + "true", + "--data-recovery-on-startup", + "false", + ], "log_file": "sync_replica1.log", - "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"], + # We restart this replica so we want to set role manually + "setup_queries": [], + "data_directory": f"{data_directory.name}/sync_replica1", }, "sync_replica2": { - "args": ["--bolt-port", "7689", "--log-level=TRACE"], + "args": ["--bolt-port", "7689", "--log-level", "TRACE"], "log_file": "sync_replica2.log", "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"], }, "main": { - "args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], + "args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup", "true"], "log_file": "main.log", - "setup_queries": [ - "REGISTER REPLICA sync_replica1 SYNC TO '127.0.0.1:10001';", - "REGISTER REPLICA sync_replica2 SYNC TO '127.0.0.1:10002';", - ], + # need to do it manually + "setup_queries": [], + "data_directory": f"{data_directory.name}/main", }, } # 0/ interactive_mg_runner.start_all(CONFIGURATION) + sync_replica1_cursor = connection(7688, "sync_replica1_cursor").cursor() + execute_and_fetch_all(sync_replica1_cursor, "SET REPLICATION ROLE TO REPLICA WITH PORT 10001;") + + main_cursor = connection(7687, "main").cursor() + execute_and_fetch_all(main_cursor, "REGISTER REPLICA sync_replica1 SYNC TO '127.0.0.1:10001';") + execute_and_fetch_all(main_cursor, "REGISTER REPLICA sync_replica2 SYNC TO '127.0.0.1:10002';") + # 1/ expected_data = { ("sync_replica1", "127.0.0.1:10001", "sync", 0, 0, "ready"), @@ -1126,7 +1181,7 @@ def test_attempt_to_create_indexes_on_main_when_async_replica_is_down(): assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["async_replica2"].query(QUERY_TO_CHECK) -def test_attempt_to_create_indexes_on_main_when_sync_replica_is_down(): +def test_attempt_to_create_indexes_on_main_when_sync_replica_is_down(connection): # Goal of this test is to check creation of new indexes/constraints when a sync replica is down. # 0/ Start main and sync replicas. # 1/ Check status of replicas. @@ -1136,11 +1191,21 @@ def test_attempt_to_create_indexes_on_main_when_sync_replica_is_down(): # 5/ Check the status of replicas. # 6/ Restart the replica that was killed and check that it is up to date with main. + data_directory = tempfile.TemporaryDirectory() CONFIGURATION = { "sync_replica1": { - "args": ["--bolt-port", "7688", "--log-level=TRACE"], + "args": [ + "--bolt-port", + "7688", + "--log-level=TRACE", + "--replication-restore-state-on-startup", + "true", + "--data-recovery-on-startup", + "false", + ], "log_file": "sync_replica1.log", - "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"], + "setup_queries": [], + "data_directory": f"{data_directory.name}/sync_replica1", }, "sync_replica2": { "args": ["--bolt-port", "7689", "--log-level=TRACE"], @@ -1150,16 +1215,24 @@ def test_attempt_to_create_indexes_on_main_when_sync_replica_is_down(): "main": { "args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], "log_file": "main.log", - "setup_queries": [ - "REGISTER REPLICA sync_replica1 SYNC TO '127.0.0.1:10001';", - "REGISTER REPLICA sync_replica2 SYNC TO '127.0.0.1:10002';", - ], + # Need to do it manually + "setup_queries": [], }, } # 0/ interactive_mg_runner.start_all(CONFIGURATION) + sync_replica1_cursor = connection(7688, "sync_replica1").cursor() + execute_and_fetch_all(sync_replica1_cursor, "SET REPLICATION ROLE TO REPLICA WITH PORT 10001;") + + cursor = connection(7687, "main").cursor() + + # We want to execute manually and not via the configuration, as we are setting replica manually because + # of restart. Restart on replica would set role again. + execute_and_fetch_all(cursor, "REGISTER REPLICA sync_replica1 SYNC TO '127.0.0.1:10001';") + execute_and_fetch_all(cursor, "REGISTER REPLICA sync_replica2 SYNC TO '127.0.0.1:10002';") + # 1/ expected_data = { ("sync_replica1", "127.0.0.1:10001", "sync", 0, 0, "ready"), @@ -1226,7 +1299,7 @@ def test_attempt_to_create_indexes_on_main_when_sync_replica_is_down(): assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica2"].query(QUERY_TO_CHECK) -def test_trigger_on_create_before_commit_with_offline_sync_replica(): +def test_trigger_on_create_before_commit_with_offline_sync_replica(connection): # 0/ Start all. # 1/ Create the trigger # 2/ Create a node. We expect two nodes created (our Not_Magic and the Magic created by trigger). @@ -1237,11 +1310,23 @@ def test_trigger_on_create_before_commit_with_offline_sync_replica(): # 7/ Check that we have two nodes. # 8/ Re-start the replica and check it's online and that it has two nodes. + data_directory = tempfile.TemporaryDirectory() CONFIGURATION = { "sync_replica1": { - "args": ["--bolt-port", "7688", "--log-level=TRACE"], + "args": [ + "--bolt-port", + "7688", + "--log-level", + "TRACE", + "--replication-restore-state-on-startup", + "true", + "--data-recovery-on-startup", + "false", + ], "log_file": "sync_replica1.log", - "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"], + # Need to do it manually since we kill this replica + "setup_queries": [], + "data_directory": f"{data_directory.name}/sync_replica1", }, "sync_replica2": { "args": ["--bolt-port", "7689", "--log-level=TRACE"], @@ -1251,16 +1336,24 @@ def test_trigger_on_create_before_commit_with_offline_sync_replica(): "main": { "args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], "log_file": "main.log", - "setup_queries": [ - "REGISTER REPLICA sync_replica1 SYNC TO '127.0.0.1:10001';", - "REGISTER REPLICA sync_replica2 SYNC TO '127.0.0.1:10002';", - ], + # Need to do it manually since we kill replica + "setup_queries": [], }, } # 0/ interactive_mg_runner.start_all(CONFIGURATION) + sync_replica1_cursor = connection(7688, "sync_replica1").cursor() + execute_and_fetch_all(sync_replica1_cursor, "SET REPLICATION ROLE TO REPLICA WITH PORT 10001;") + + cursor = connection(7687, "main").cursor() + + # We want to execute manually and not via the configuration, as we are setting replica manually because + # of restart. Restart on replica would set role again. + execute_and_fetch_all(cursor, "REGISTER REPLICA sync_replica1 SYNC TO '127.0.0.1:10001';") + execute_and_fetch_all(cursor, "REGISTER REPLICA sync_replica2 SYNC TO '127.0.0.1:10002';") + # 1/ QUERY_CREATE_TRIGGER = """ CREATE TRIGGER exampleTrigger @@ -1325,7 +1418,7 @@ def test_trigger_on_create_before_commit_with_offline_sync_replica(): assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica2"].query(QUERY_TO_CHECK) -def test_trigger_on_update_before_commit_with_offline_sync_replica(): +def test_trigger_on_update_before_commit_with_offline_sync_replica(connection): # 0/ Start all. # 1/ Create the trigger # 2/ Create a node. @@ -1337,11 +1430,22 @@ def test_trigger_on_update_before_commit_with_offline_sync_replica(): # 8/ Check that we have two nodes. # 9/ Re-start the replica and check it's online and that it has two nodes. + data_directory = tempfile.TemporaryDirectory() CONFIGURATION = { "sync_replica1": { - "args": ["--bolt-port", "7688", "--log-level=TRACE"], + "args": [ + "--bolt-port", + "7688", + "--log-level=TRACE", + "--replication-restore-state-on-startup", + "true", + "--data-recovery-on-startup", + "false", + ], "log_file": "sync_replica1.log", - "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"], + # Need to do it manually + "setup_queries": [], + "data_directory": f"{data_directory.name}/sync_replica1", }, "sync_replica2": { "args": ["--bolt-port", "7689", "--log-level=TRACE"], @@ -1351,15 +1455,23 @@ def test_trigger_on_update_before_commit_with_offline_sync_replica(): "main": { "args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], "log_file": "main.log", - "setup_queries": [ - "REGISTER REPLICA sync_replica1 SYNC TO '127.0.0.1:10001';", - "REGISTER REPLICA sync_replica2 SYNC TO '127.0.0.1:10002';", - ], + "setup_queries": [], }, } # 0/ interactive_mg_runner.start_all(CONFIGURATION) + + sync_replica1_cursor = connection(7688, "sync_replica1").cursor() + execute_and_fetch_all(sync_replica1_cursor, "SET REPLICATION ROLE TO REPLICA WITH PORT 10001;") + + cursor = connection(7687, "main").cursor() + + # We want to execute manually and not via the configuration, as we are setting replica manually because + # of restart. Restart on replica would set role again. + execute_and_fetch_all(cursor, "REGISTER REPLICA sync_replica1 SYNC TO '127.0.0.1:10001';") + execute_and_fetch_all(cursor, "REGISTER REPLICA sync_replica2 SYNC TO '127.0.0.1:10002';") + # 1/ QUERY_CREATE_TRIGGER = """ CREATE TRIGGER exampleTrigger @@ -1429,7 +1541,7 @@ def test_trigger_on_update_before_commit_with_offline_sync_replica(): assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica2"].query(QUERY_TO_CHECK) -def test_trigger_on_delete_before_commit_with_offline_sync_replica(): +def test_trigger_on_delete_before_commit_with_offline_sync_replica(connection): # 0/ Start all. # 1/ Create the trigger # 2/ Create a node. @@ -1441,11 +1553,22 @@ def test_trigger_on_delete_before_commit_with_offline_sync_replica(): # 8/ Check that we have one node. # 9/ Re-start the replica and check it's online and that it has one node, and the correct one. + data_directory = tempfile.TemporaryDirectory() CONFIGURATION = { "sync_replica1": { - "args": ["--bolt-port", "7688", "--log-level=TRACE"], + "args": [ + "--bolt-port", + "7688", + "--log-level=TRACE", + "--replication-restore-state-on-startup", + "true", + "--data-recovery-on-startup", + "false", + ], "log_file": "sync_replica1.log", - "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"], + # we need to set it manually + "setup_queries": [], + "data_directory": f"{data_directory.name}/sync_replica1", }, "sync_replica2": { "args": ["--bolt-port", "7689", "--log-level=TRACE"], @@ -1455,16 +1578,23 @@ def test_trigger_on_delete_before_commit_with_offline_sync_replica(): "main": { "args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], "log_file": "main.log", - "setup_queries": [ - "REGISTER REPLICA sync_replica1 SYNC TO '127.0.0.1:10001';", - "REGISTER REPLICA sync_replica2 SYNC TO '127.0.0.1:10002';", - ], + "setup_queries": [], }, } # 0/ interactive_mg_runner.start_all(CONFIGURATION) + sync_replica1_cursor = connection(7688, "sync_replica1").cursor() + execute_and_fetch_all(sync_replica1_cursor, "SET REPLICATION ROLE TO REPLICA WITH PORT 10001;") + + cursor = connection(7687, "main").cursor() + + # We want to execute manually and not via the configuration, as we are setting replica manually because + # of restart. Restart on replica would set role again. + execute_and_fetch_all(cursor, "REGISTER REPLICA sync_replica1 SYNC TO '127.0.0.1:10001';") + execute_and_fetch_all(cursor, "REGISTER REPLICA sync_replica2 SYNC TO '127.0.0.1:10002';") + # 1/ QUERY_CREATE_TRIGGER = """ CREATE TRIGGER exampleTrigger @@ -1473,7 +1603,7 @@ def test_trigger_on_delete_before_commit_with_offline_sync_replica(): """ interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_CREATE_TRIGGER) res_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW TRIGGERS;") - assert len(res_from_main) == 1, f"Incorect result: {res_from_main}" + assert len(res_from_main) == 1, f"Incorrect result: {res_from_main}" # 2/ QUERY_CREATE_NODE = "CREATE (p:Number {name:'Not_Magic', value:0})" @@ -1486,7 +1616,7 @@ def test_trigger_on_delete_before_commit_with_offline_sync_replica(): # 4/ QUERY_TO_CHECK = "MATCH (node) return node;" res_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_TO_CHECK) - assert len(res_from_main) == 1, f"Incorect result: {res_from_main}" + assert len(res_from_main) == 1, f"Incorrect result: {res_from_main}" assert res_from_main[0][0].properties["name"] == "Node_created_by_trigger" assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica1"].query(QUERY_TO_CHECK) assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica2"].query(QUERY_TO_CHECK) @@ -1539,7 +1669,7 @@ def test_trigger_on_delete_before_commit_with_offline_sync_replica(): assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica2"].query(QUERY_TO_CHECK) -def test_trigger_on_create_before_and_after_commit_with_offline_sync_replica(): +def test_trigger_on_create_before_and_after_commit_with_offline_sync_replica(connection): # 0/ Start all. # 1/ Create the triggers # 2/ Create a node. We expect three nodes created (1 node created + the two created by triggers). @@ -1550,11 +1680,22 @@ def test_trigger_on_create_before_and_after_commit_with_offline_sync_replica(): # 7/ Check that we have three nodes. # 8/ Re-start the replica and check it's online and that it has three nodes. + data_directory = tempfile.TemporaryDirectory() CONFIGURATION = { "sync_replica1": { - "args": ["--bolt-port", "7688", "--log-level=TRACE"], + "args": [ + "--bolt-port", + "7688", + "--log-level=TRACE", + "--replication-restore-state-on-startup", + "true", + "--data-recovery-on-startup", + "false", + ], "log_file": "sync_replica1.log", - "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"], + # we need to set it manually + "setup_queries": [], + "data_directory": f"{data_directory.name}/sync_replica1", }, "sync_replica2": { "args": ["--bolt-port", "7689", "--log-level=TRACE"], @@ -1564,16 +1705,23 @@ def test_trigger_on_create_before_and_after_commit_with_offline_sync_replica(): "main": { "args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], "log_file": "main.log", - "setup_queries": [ - "REGISTER REPLICA sync_replica1 SYNC TO '127.0.0.1:10001';", - "REGISTER REPLICA sync_replica2 SYNC TO '127.0.0.1:10002';", - ], + "setup_queries": [], }, } # 0/ interactive_mg_runner.start_all(CONFIGURATION) + sync_replica1_cursor = connection(7688, "sync_replica1").cursor() + execute_and_fetch_all(sync_replica1_cursor, "SET REPLICATION ROLE TO REPLICA WITH PORT 10001;") + + cursor = connection(7687, "main").cursor() + + # We want to execute manually and not via the configuration, as we are setting replica manually because + # of restart. Restart on replica would set role again. + execute_and_fetch_all(cursor, "REGISTER REPLICA sync_replica1 SYNC TO '127.0.0.1:10001';") + execute_and_fetch_all(cursor, "REGISTER REPLICA sync_replica2 SYNC TO '127.0.0.1:10002';") + # 1/ QUERY_CREATE_TRIGGER_BEFORE = """ CREATE TRIGGER exampleTriggerBefore @@ -1644,7 +1792,7 @@ def test_trigger_on_create_before_and_after_commit_with_offline_sync_replica(): assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica2"].query(QUERY_TO_CHECK) -def test_triggers_on_create_before_commit_with_offline_sync_replica(): +def test_triggers_on_create_before_commit_with_offline_sync_replica(connection): # 0/ Start all. # 1/ Create the two triggers # 2/ Create a node. We expect three nodes. @@ -1655,11 +1803,22 @@ def test_triggers_on_create_before_commit_with_offline_sync_replica(): # 7/ Check that we have three nodes. # 8/ Re-start the replica and check it's online and that it has two nodes. + data_directory = tempfile.TemporaryDirectory() CONFIGURATION = { "sync_replica1": { - "args": ["--bolt-port", "7688", "--log-level=TRACE"], + "args": [ + "--bolt-port", + "7688", + "--log-level=TRACE", + "--replication-restore-state-on-startup", + "true", + "--data-recovery-on-startup", + "false", + ], "log_file": "sync_replica1.log", - "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"], + # we need to set it manually + "setup_queries": [], + "data_directory": f"{data_directory.name}/sync_replica1", }, "sync_replica2": { "args": ["--bolt-port", "7689", "--log-level=TRACE"], @@ -1669,16 +1828,23 @@ def test_triggers_on_create_before_commit_with_offline_sync_replica(): "main": { "args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], "log_file": "main.log", - "setup_queries": [ - "REGISTER REPLICA sync_replica1 SYNC TO '127.0.0.1:10001';", - "REGISTER REPLICA sync_replica2 SYNC TO '127.0.0.1:10002';", - ], + "setup_queries": [], }, } # 0/ interactive_mg_runner.start_all(CONFIGURATION) + sync_replica1_cursor = connection(7688, "sync_replica1").cursor() + execute_and_fetch_all(sync_replica1_cursor, "SET REPLICATION ROLE TO REPLICA WITH PORT 10001;") + + cursor = connection(7687, "main").cursor() + + # We want to execute manually and not via the configuration, as we are setting replica manually because + # of restart. Restart on replica would set role again. + execute_and_fetch_all(cursor, "REGISTER REPLICA sync_replica1 SYNC TO '127.0.0.1:10001';") + execute_and_fetch_all(cursor, "REGISTER REPLICA sync_replica2 SYNC TO '127.0.0.1:10002';") + # 1/ QUERY_CREATE_TRIGGER_FIRST = """ CREATE TRIGGER exampleTriggerFirst @@ -1824,4 +1990,5 @@ def test_replication_not_messed_up_by_ShowIndexInfo(connection): if __name__ == "__main__": + sys.exit(pytest.main([__file__, "-k", "test_basic_recovery"])) sys.exit(pytest.main([__file__, "-rA"])) diff --git a/tests/e2e/replication_experimental/multitenancy.py b/tests/e2e/replication_experimental/multitenancy.py index 7eb699341..3d73abb64 100644 --- a/tests/e2e/replication_experimental/multitenancy.py +++ b/tests/e2e/replication_experimental/multitenancy.py @@ -16,6 +16,7 @@ import sys import tempfile import time from functools import partial +from typing import Any, Dict import interactive_mg_runner import mgclient @@ -33,26 +34,64 @@ interactive_mg_runner.MEMGRAPH_BINARY = os.path.normpath(os.path.join(interactiv BOLT_PORTS = {"main": 7687, "replica_1": 7688, "replica_2": 7689} REPLICATION_PORTS = {"replica_1": 10001, "replica_2": 10002} -MEMGRAPH_INSTANCES_DESCRIPTION = { - "replica_1": { - "args": ["--bolt-port", f"{BOLT_PORTS['replica_1']}", "--log-level=TRACE"], - "log_file": "replica1.log", - "setup_queries": [f"SET REPLICATION ROLE TO REPLICA WITH PORT {REPLICATION_PORTS['replica_1']};"], - }, - "replica_2": { - "args": ["--bolt-port", f"{BOLT_PORTS['replica_2']}", "--log-level=TRACE"], - "log_file": "replica2.log", - "setup_queries": [f"SET REPLICATION ROLE TO REPLICA WITH PORT {REPLICATION_PORTS['replica_2']};"], - }, - "main": { - "args": ["--bolt-port", f"{BOLT_PORTS['main']}", "--log-level=TRACE"], - "log_file": "main.log", - "setup_queries": [ - f"REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:{REPLICATION_PORTS['replica_1']}';", - f"REGISTER REPLICA replica_2 ASYNC TO '127.0.0.1:{REPLICATION_PORTS['replica_2']}';", - ], - }, -} + +def create_memgraph_instances_with_role_recovery(data_directory: Any) -> Dict[str, Any]: + return { + "replica_1": { + "args": [ + "--bolt-port", + f"{BOLT_PORTS['replica_1']}", + "--log-level", + "TRACE", + "--replication-restore-state-on-startup", + "true", + "--data-recovery-on-startup", + "false", + ], + "log_file": "replica1.log", + "data_directory": f"{data_directory}/replica_1", + }, + "replica_2": { + "args": [ + "--bolt-port", + f"{BOLT_PORTS['replica_2']}", + "--log-level=TRACE", + "--replication-restore-state-on-startup", + "true", + "--data-recovery-on-startup", + "false", + ], + "log_file": "replica2.log", + "data_directory": f"{data_directory}/replica_2", + }, + "main": { + "args": ["--bolt-port", f"{BOLT_PORTS['main']}", "--log-level=TRACE"], + "log_file": "main.log", + "setup_queries": [], + }, + } + + +def do_manual_setting_up(connection): + replica_1_cursor = connection(BOLT_PORTS["replica_1"], "replica_1").cursor() + execute_and_fetch_all( + replica_1_cursor, f"SET REPLICATION ROLE TO REPLICA WITH PORT {REPLICATION_PORTS['replica_1']};" + ) + + replica_2_cursor = connection(BOLT_PORTS["replica_2"], "replica_2").cursor() + execute_and_fetch_all( + replica_2_cursor, f"SET REPLICATION ROLE TO REPLICA WITH PORT {REPLICATION_PORTS['replica_2']};" + ) + + main_cursor = connection(BOLT_PORTS["main"], "main").cursor() + + execute_and_fetch_all( + main_cursor, f"REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:{REPLICATION_PORTS['replica_1']}';" + ) + execute_and_fetch_all( + main_cursor, f"REGISTER REPLICA replica_2 ASYNC TO '127.0.0.1:{REPLICATION_PORTS['replica_2']}';" + ) + TEMP_DIR = tempfile.TemporaryDirectory().name @@ -438,7 +477,12 @@ def test_automatic_databases_create_multitenancy_replication(connection): # 3/ Validate replication of changes to A have arrived at REPLICA # 0/ + data_directory = tempfile.TemporaryDirectory() + MEMGRAPH_INSTANCES_DESCRIPTION = create_memgraph_instances_with_role_recovery(data_directory.name) interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) + + do_manual_setting_up(connection) + main_cursor = connection(BOLT_PORTS["main"], "main").cursor() # 1/ @@ -586,11 +630,15 @@ def test_multitenancy_replication_restart_replica_w_fc(connection, replica_name) # 3/ Restart replica # 4/ Validate data on replica + data_directory = tempfile.TemporaryDirectory() + MEMGRAPH_INSTANCES_DESCRIPTION = create_memgraph_instances_with_role_recovery(data_directory.name) # 0/ interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) - main_cursor = connection(BOLT_PORTS["main"], "main").cursor() + + do_manual_setting_up(connection) # 1/ + main_cursor = connection(BOLT_PORTS["main"], "main").cursor() execute_and_fetch_all(main_cursor, "CREATE DATABASE A;") execute_and_fetch_all(main_cursor, "CREATE DATABASE B;") @@ -647,7 +695,12 @@ def test_multitenancy_replication_restart_replica_wo_fc(connection, replica_name # 4/ Validate data on replica # 0/ + data_directory = tempfile.TemporaryDirectory() + MEMGRAPH_INSTANCES_DESCRIPTION = create_memgraph_instances_with_role_recovery(data_directory.name) interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) + + do_manual_setting_up(connection) + main_cursor = connection(BOLT_PORTS["main"], "main").cursor() # 1/ @@ -733,7 +786,11 @@ def test_multitenancy_replication_drop_replica(connection, replica_name): # 4/ Validate data on replica # 0/ + data_directory = tempfile.TemporaryDirectory() + MEMGRAPH_INSTANCES_DESCRIPTION = create_memgraph_instances_with_role_recovery(data_directory.name) + interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) + do_manual_setting_up(connection) main_cursor = connection(BOLT_PORTS["main"], "main").cursor() # 1/ @@ -829,7 +886,12 @@ def test_automatic_databases_drop_multitenancy_replication(connection): # 5/ Check that the drop replicated # 0/ + data_directory = tempfile.TemporaryDirectory() + MEMGRAPH_INSTANCES_DESCRIPTION = create_memgraph_instances_with_role_recovery(data_directory.name) interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) + + do_manual_setting_up(connection) + main_cursor = connection(BOLT_PORTS["main"], "main").cursor() # 1/ @@ -878,7 +940,12 @@ def test_drop_multitenancy_replication_restart_replica(connection, replica_name) # 4/ Validate data on replica # 0/ + data_directory = tempfile.TemporaryDirectory() + MEMGRAPH_INSTANCES_DESCRIPTION = create_memgraph_instances_with_role_recovery(data_directory.name) interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) + + do_manual_setting_up(connection) + main_cursor = connection(BOLT_PORTS["main"], "main").cursor() # 1/ @@ -915,7 +982,12 @@ def test_multitenancy_drop_while_replica_using(connection): # 6/ Validate that the transaction is still active and working and that the replica2 is not pointing to anything # 0/ + data_directory = tempfile.TemporaryDirectory() + MEMGRAPH_INSTANCES_DESCRIPTION = create_memgraph_instances_with_role_recovery(data_directory.name) interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) + + do_manual_setting_up(connection) + main_cursor = connection(BOLT_PORTS["main"], "main").cursor() # 1/ @@ -985,7 +1057,12 @@ def test_multitenancy_drop_and_recreate_while_replica_using(connection): # 6/ Validate that the transaction is still active and working and that the replica2 is not pointing to anything # 0/ + data_directory = tempfile.TemporaryDirectory() + MEMGRAPH_INSTANCES_DESCRIPTION = create_memgraph_instances_with_role_recovery(data_directory.name) interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) + + do_manual_setting_up(connection) + main_cursor = connection(BOLT_PORTS["main"], "main").cursor() # 1/