diff --git a/src/coordination/coordinator_client.cpp b/src/coordination/coordinator_client.cpp index f4d2da838..b4f82b60c 100644 --- a/src/coordination/coordinator_client.cpp +++ b/src/coordination/coordinator_client.cpp @@ -175,9 +175,7 @@ auto CoordinatorClient::SendGetInstanceTimestampsRpc() const -> utils::BasicResult { try { auto stream{rpc_client_.Stream()}; - auto res = stream.AwaitResponse(); - - return res.database_histories; + return stream.AwaitResponse().database_histories; } catch (const rpc::RpcFailedException &) { spdlog::error("RPC error occured while sending GetInstance UUID RPC"); diff --git a/src/coordination/coordinator_instance.cpp b/src/coordination/coordinator_instance.cpp index ba94d9d5f..daf1d7138 100644 --- a/src/coordination/coordinator_instance.cpp +++ b/src/coordination/coordinator_instance.cpp @@ -127,7 +127,7 @@ auto CoordinatorInstance::TryFailover() -> void { ChooseMostUpToDateInstance(instance_database_histories); spdlog::trace("The most up to date instance is {} with epoch {} and {} latest commit timestamp", - most_up_to_date_instance, *latest_epoch, *latest_commit_timestamp); + most_up_to_date_instance, latest_epoch, latest_commit_timestamp); auto *new_main = &FindReplicationInstance(most_up_to_date_instance); @@ -391,7 +391,7 @@ auto CoordinatorInstance::SetMainUUID(utils::UUID new_uuid) -> void { main_uuid_ auto CoordinatorInstance::ChooseMostUpToDateInstance( const std::vector> &instance_database_histories) -> NewMainRes { - NewMainRes new_main_res; + std::optional new_main_res; std::for_each( instance_database_histories.begin(), instance_database_histories.end(), [&new_main_res](const InstanceNameDbHistories &instance_res_pair) { @@ -407,7 +407,7 @@ auto CoordinatorInstance::ChooseMostUpToDateInstance( std::ranges::for_each( instance_db_histories, [&instance_name = instance_name](const replication_coordination_glue::DatabaseHistory &db_history) { - spdlog::trace("Instance {}: name {}, default db {}", instance_name, db_history.name, + spdlog::debug("Instance {}: name {}, default db {}", instance_name, db_history.name, memgraph::dbms::kDefaultDB); }); @@ -417,35 +417,26 @@ auto CoordinatorInstance::ChooseMostUpToDateInstance( std::ranges::for_each(instance_default_db_history | ranges::views::reverse, [&instance_name = instance_name](const auto &epoch_history_it) { - spdlog::trace("Instance {}: epoch {}, last_commit_timestamp: {}", instance_name, + spdlog::debug("Instance {}: epoch {}, last_commit_timestamp: {}", instance_name, std::get<0>(epoch_history_it), std::get<1>(epoch_history_it)); }); // get latest epoch // get latest timestamp - if (!new_main_res.latest_epoch) { + if (!new_main_res) { const auto &[epoch, timestamp] = *instance_default_db_history.crbegin(); - new_main_res = NewMainRes{ - .most_up_to_date_instance = instance_name, - .latest_epoch = epoch, - .latest_commit_timestamp = timestamp, - }; - spdlog::trace("Currently the most up to date instance is {} with epoch {} and {} latest commit timestamp", + new_main_res = std::make_optional({instance_name, epoch, timestamp}); + spdlog::debug("Currently the most up to date instance is {} with epoch {} and {} latest commit timestamp", instance_name, epoch, timestamp); return; } bool found_same_point{false}; - std::string last_most_up_to_date_epoch{*new_main_res.latest_epoch}; + std::string last_most_up_to_date_epoch{new_main_res->latest_epoch}; for (auto [epoch, timestamp] : ranges::reverse_view(instance_default_db_history)) { - if (*new_main_res.latest_commit_timestamp < timestamp) { - new_main_res = NewMainRes{ - .most_up_to_date_instance = instance_name, - .latest_epoch = epoch, - .latest_commit_timestamp = timestamp, - }; - + if (new_main_res->latest_commit_timestamp < timestamp) { + new_main_res = std::make_optional({instance_name, epoch, timestamp}); spdlog::trace("Found the new most up to date instance {} with epoch {} and {} latest commit timestamp", instance_name, epoch, timestamp); } @@ -459,11 +450,11 @@ auto CoordinatorInstance::ChooseMostUpToDateInstance( if (!found_same_point) { spdlog::error("Didn't find same history epoch {} for instance {} and instance {}", last_most_up_to_date_epoch, - new_main_res.most_up_to_date_instance, instance_name); + new_main_res->most_up_to_date_instance, instance_name); } }); - return new_main_res; + return std::move(*new_main_res); } } // namespace memgraph::coordination #endif diff --git a/src/coordination/include/coordination/coordinator_instance.hpp b/src/coordination/include/coordination/coordinator_instance.hpp index bed202744..ee5a6fb6e 100644 --- a/src/coordination/include/coordination/coordinator_instance.hpp +++ b/src/coordination/include/coordination/coordinator_instance.hpp @@ -28,8 +28,8 @@ namespace memgraph::coordination { struct NewMainRes { std::string most_up_to_date_instance; - std::optional latest_epoch; - std::optional latest_commit_timestamp; + std::string latest_epoch; + uint64_t latest_commit_timestamp; }; using InstanceNameDbHistories = std::pair; diff --git a/src/dbms/dbms_handler.cpp b/src/dbms/dbms_handler.cpp index 1c38106db..16927d7e2 100644 --- a/src/dbms/dbms_handler.cpp +++ b/src/dbms/dbms_handler.cpp @@ -185,6 +185,16 @@ DbmsHandler::DbmsHandler(storage::Config config, replication::ReplicationState & auto directories = std::set{std::string{kDefaultDB}}; // Recover previous databases + if (flags::AreExperimentsEnabled(flags::Experiments::SYSTEM_REPLICATION) && !recovery_on_startup) { + // This will result in dropping databases on SystemRecoveryHandler + // for MT case, and for single DB case we might not even set replication as commit timestamp is checked + spdlog::warn( + "Data recovery on startup not set, this will result in dropping database in case of multi-tenancy enabled."); + } + + // TODO: Problem is if user doesn't set this up "database" name won't be recovered + // but if storage-recover-on-startup is true storage will be recovered which is an issue + spdlog::info("Data recovery on startup set to {}", recovery_on_startup); if (recovery_on_startup) { auto it = durability_->begin(std::string(kDBPrefix)); auto end = durability_->end(std::string(kDBPrefix)); @@ -410,9 +420,10 @@ void DbmsHandler::UpdateDurability(const storage::Config &config, std::optional< if (!durability_) return; // Save database in a list of active databases const auto &key = Durability::GenKey(config.salient.name); - if (rel_dir == std::nullopt) + if (rel_dir == std::nullopt) { rel_dir = std::filesystem::relative(config.durability.storage_directory, default_config_.durability.storage_directory); + } const auto &val = Durability::GenVal(config.salient.uuid, *rel_dir); durability_->Put(key, val); } diff --git a/src/dbms/dbms_handler.hpp b/src/dbms/dbms_handler.hpp index 87d1257a6..b0bbd5758 100644 --- a/src/dbms/dbms_handler.hpp +++ b/src/dbms/dbms_handler.hpp @@ -155,6 +155,8 @@ class DbmsHandler { spdlog::debug("Trying to create db '{}' on replica which already exists.", config.name); auto db = Get_(config.name); + spdlog::debug("Aligning database with name {} which has UUID {}, where config UUID is {}", config.name, + std::string(db->uuid()), std::string(config.uuid)); if (db->uuid() == config.uuid) { // Same db return db; } @@ -163,18 +165,22 @@ class DbmsHandler { // TODO: Fix this hack if (config.name == kDefaultDB) { + spdlog::debug("Last commit timestamp for DB {} is {}", kDefaultDB, + db->storage()->repl_storage_state_.last_commit_timestamp_); + // This seems correct, if database made progress if (db->storage()->repl_storage_state_.last_commit_timestamp_ != storage::kTimestampInitialId) { spdlog::debug("Default storage is not clean, cannot update UUID..."); return NewError::GENERIC; // Update error } - spdlog::debug("Update default db's UUID"); + spdlog::debug("Updated default db's UUID"); // Default db cannot be deleted and remade, have to just update the UUID db->storage()->config_.salient.uuid = config.uuid; UpdateDurability(db->storage()->config_, "."); return db; } - spdlog::debug("Drop database and recreate with the correct UUID"); + spdlog::debug("Dropping database {} with UUID: {} and recreating with the correct UUID: {}", config.name, + std::string(db->uuid()), std::string(config.uuid)); // Defer drop (void)Delete_(db->name()); // Second attempt diff --git a/src/memgraph.cpp b/src/memgraph.cpp index 34d64f434..d896bcc4c 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -334,7 +334,8 @@ int main(int argc, char **argv) { .salient.items = {.properties_on_edges = FLAGS_storage_properties_on_edges, .enable_schema_metadata = FLAGS_storage_enable_schema_metadata}, .salient.storage_mode = memgraph::flags::ParseStorageMode()}; - + spdlog::info("config recover on startup {}, flags {} {}", db_config.durability.recover_on_startup, + FLAGS_storage_recover_on_startup, FLAGS_data_recovery_on_startup); memgraph::utils::Scheduler jemalloc_purge_scheduler; jemalloc_purge_scheduler.Run("Jemalloc purge", std::chrono::seconds(FLAGS_storage_gc_cycle_sec), [] { memgraph::memory::PurgeUnusedMemory(); }); diff --git a/src/replication_handler/replication_handler.cpp b/src/replication_handler/replication_handler.cpp index ea567eed0..fc3dd3da4 100644 --- a/src/replication_handler/replication_handler.cpp +++ b/src/replication_handler/replication_handler.cpp @@ -271,9 +271,8 @@ auto ReplicationHandler::GetDatabasesHistories() -> replication_coordination_glu dbms_handler_.ForEach([&results](memgraph::dbms::DatabaseAccess db_acc) { auto &repl_storage_state = db_acc->storage()->repl_storage_state_; - std::vector> history = - utils::fmap([](const auto &elem) { return std::pair(elem.first, elem.second); }, - repl_storage_state.history); + std::vector> history = utils::fmap( + [](const auto &elem) { return std::make_pair(elem.first, elem.second); }, repl_storage_state.history); history.emplace_back(std::string(repl_storage_state.epoch_.id()), repl_storage_state.last_commit_timestamp_.load()); replication_coordination_glue::DatabaseHistory repl{ diff --git a/src/storage/v2/durability/durability.cpp b/src/storage/v2/durability/durability.cpp index 92c4d11e8..b81357902 100644 --- a/src/storage/v2/durability/durability.cpp +++ b/src/storage/v2/durability/durability.cpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -118,6 +118,8 @@ std::optional> GetWalFiles(const std::filesystem: if (!item.is_regular_file()) continue; try { auto info = ReadWalInfo(item.path()); + spdlog::trace("Getting wal file with following info: uuid: {}, epoch id: {}, from timestamp {}, to_timestamp {} ", + info.uuid, info.epoch_id, info.from_timestamp, info.to_timestamp); if ((uuid.empty() || info.uuid == uuid) && (!current_seq_num || info.seq_num < *current_seq_num)) { wal_files.emplace_back(info.seq_num, info.from_timestamp, info.to_timestamp, std::move(info.uuid), std::move(info.epoch_id), item.path()); @@ -356,6 +358,7 @@ std::optional Recovery::RecoverData(std::string *uuid, Replication spdlog::warn(utils::MessageWithLink("No snapshot or WAL file found.", "https://memgr.ph/durability")); return std::nullopt; } + // TODO(antoniofilipovic) What is the logic here? std::sort(wal_files.begin(), wal_files.end()); // UUID used for durability is the UUID of the last WAL file. // Same for the epoch id. @@ -410,22 +413,17 @@ std::optional Recovery::RecoverData(std::string *uuid, Replication std::optional previous_seq_num; auto last_loaded_timestamp = snapshot_timestamp; spdlog::info("Trying to load WAL files."); + + if (last_loaded_timestamp) { + epoch_history->emplace_back(repl_storage_state.epoch_.id(), *last_loaded_timestamp); + } + for (auto &wal_file : wal_files) { if (previous_seq_num && (wal_file.seq_num - *previous_seq_num) > 1) { LOG_FATAL("You are missing a WAL file with the sequence number {}!", *previous_seq_num + 1); } previous_seq_num = wal_file.seq_num; - if (wal_file.epoch_id != repl_storage_state.epoch_.id()) { - // This way we skip WALs finalized only because of role change. - // We can also set the last timestamp to 0 if last loaded timestamp - // is nullopt as this can only happen if the WAL file with seq = 0 - // does not contain any deltas and we didn't find any snapshots. - if (last_loaded_timestamp) { - epoch_history->emplace_back(wal_file.epoch_id, *last_loaded_timestamp); - } - repl_storage_state.epoch_.SetEpoch(std::move(wal_file.epoch_id)); - } try { auto info = LoadWal(wal_file.path, &indices_constraints, last_loaded_timestamp, vertices, edges, name_id_mapper, edge_count, config.salient.items); @@ -434,13 +432,28 @@ std::optional Recovery::RecoverData(std::string *uuid, Replication recovery_info.next_timestamp = std::max(recovery_info.next_timestamp, info.next_timestamp); recovery_info.last_commit_timestamp = info.last_commit_timestamp; + + if (recovery_info.next_timestamp != 0) { + last_loaded_timestamp.emplace(recovery_info.next_timestamp - 1); + } + + bool epoch_history_empty = epoch_history->empty(); + bool epoch_not_recorded = !epoch_history_empty && epoch_history->back().first != wal_file.epoch_id; + auto last_loaded_timestamp_value = last_loaded_timestamp.value_or(0); + + if (epoch_history_empty || epoch_not_recorded) { + epoch_history->emplace_back(std::string(wal_file.epoch_id), last_loaded_timestamp_value); + } + + auto last_epoch_updated = !epoch_history_empty && epoch_history->back().first == wal_file.epoch_id && + epoch_history->back().second < last_loaded_timestamp_value; + if (last_epoch_updated) { + epoch_history->back().second = last_loaded_timestamp_value; + } + } catch (const RecoveryFailure &e) { LOG_FATAL("Couldn't recover WAL deltas from {} because of: {}", wal_file.path, e.what()); } - - if (recovery_info.next_timestamp != 0) { - last_loaded_timestamp.emplace(recovery_info.next_timestamp - 1); - } } // The sequence number needs to be recovered even though `LoadWal` didn't // load any deltas from that file. @@ -456,7 +469,12 @@ std::optional Recovery::RecoverData(std::string *uuid, Replication memgraph::metrics::Measure(memgraph::metrics::SnapshotRecoveryLatency_us, std::chrono::duration_cast(timer.Elapsed()).count()); + spdlog::info("Set epoch id: {} with commit timestamp {}", std::string(repl_storage_state.epoch_.id()), + repl_storage_state.last_commit_timestamp_); + std::for_each(repl_storage_state.history.begin(), repl_storage_state.history.end(), [](auto &history) { + spdlog::info("epoch id: {} with commit timestamp {}", std::string(history.first), history.second); + }); return recovery_info; } diff --git a/src/storage/v2/inmemory/replication/recovery.cpp b/src/storage/v2/inmemory/replication/recovery.cpp index fe752bfd1..5f1182c75 100644 --- a/src/storage/v2/inmemory/replication/recovery.cpp +++ b/src/storage/v2/inmemory/replication/recovery.cpp @@ -233,7 +233,7 @@ std::vector GetRecoverySteps(uint64_t replica_commit, utils::FileR } } - // In all cases, if we have a current wal file we need to use itW + // In all cases, if we have a current wal file we need to use it if (current_wal_seq_num) { // NOTE: File not handled directly, so no need to lock it recovery_steps.emplace_back(RecoveryCurrentWal{*current_wal_seq_num}); diff --git a/src/storage/v2/inmemory/storage.cpp b/src/storage/v2/inmemory/storage.cpp index bd8534673..1437524d6 100644 --- a/src/storage/v2/inmemory/storage.cpp +++ b/src/storage/v2/inmemory/storage.cpp @@ -109,6 +109,7 @@ InMemoryStorage::InMemoryStorage(Config config) timestamp_ = std::max(timestamp_, info->next_timestamp); if (info->last_commit_timestamp) { repl_storage_state_.last_commit_timestamp_ = *info->last_commit_timestamp; + spdlog::info("Recovering last commit timestamp {}", *info->last_commit_timestamp); } } } else if (config_.durability.snapshot_wal_mode != Config::Durability::SnapshotWalMode::DISABLED || diff --git a/src/storage/v2/replication/replication_client.cpp b/src/storage/v2/replication/replication_client.cpp index 1eb06bf10..bd2c2cc7d 100644 --- a/src/storage/v2/replication/replication_client.cpp +++ b/src/storage/v2/replication/replication_client.cpp @@ -54,15 +54,24 @@ void ReplicationStorageClient::UpdateReplicaState(Storage *storage, DatabaseAcce std::optional branching_point; // different epoch id, replica was main + // In case there is no epoch transfer, and MAIN doesn't hold all the epochs as it could have been down and miss it + // we need then just to check commit timestamp if (replica.epoch_id != replStorageState.epoch_.id() && replica.current_commit_timestamp != kTimestampInitialId) { + spdlog::trace( + "REPLICA: epoch UUID: {} and last_commit_timestamp: {}; MAIN: epoch UUID {} and last_commit_timestamp {}", + std::string(replica.epoch_id), replica.current_commit_timestamp, std::string(replStorageState.epoch_.id()), + replStorageState.last_commit_timestamp_); auto const &history = replStorageState.history; const auto epoch_info_iter = std::find_if(history.crbegin(), history.crend(), [&](const auto &main_epoch_info) { return main_epoch_info.first == replica.epoch_id; }); // main didn't have that epoch, but why is here branching point if (epoch_info_iter == history.crend()) { + spdlog::info("Couldn't find epoch {} in MAIN, setting branching point", std::string(replica.epoch_id)); branching_point = 0; - } else if (epoch_info_iter->second != replica.current_commit_timestamp) { + } else if (epoch_info_iter->second < replica.current_commit_timestamp) { + spdlog::info("Found epoch {} on MAIN with last_commit_timestamp {}, REPLICA's last_commit_timestamp {}", + std::string(epoch_info_iter->first), epoch_info_iter->second, replica.current_commit_timestamp); branching_point = epoch_info_iter->second; } } @@ -192,9 +201,6 @@ void ReplicationStorageClient::StartTransactionReplication(const uint64_t curren } } -//////// AF: you can't finialize transaction replication if you are not replicating -/////// AF: if there is no stream or it is Defunct than we need to set replica in MAYBE_BEHIND -> is that even used -/////// AF: bool ReplicationStorageClient::FinalizeTransactionReplication(Storage *storage, DatabaseAccessProtector db_acc) { // We can only check the state because it guarantees to be only // valid during a single transaction replication (if the assumption @@ -259,6 +265,8 @@ void ReplicationStorageClient::RecoverReplica(uint64_t replica_commit, memgraph: spdlog::debug("Starting replica recovery"); auto *mem_storage = static_cast(storage); + // TODO(antoniofilipovic): Can we get stuck here in while loop if replica commit timestamp is not updated when using + // only snapshot while (true) { auto file_locker = mem_storage->file_retainer_.AddLocker(); diff --git a/tests/e2e/high_availability/single_coordinator.py b/tests/e2e/high_availability/single_coordinator.py index ecf063092..2d81298b2 100644 --- a/tests/e2e/high_availability/single_coordinator.py +++ b/tests/e2e/high_availability/single_coordinator.py @@ -37,6 +37,9 @@ MEMGRAPH_INSTANCES_DESCRIPTION = { "TRACE", "--coordinator-server-port", "10011", + "--replication-restore-state-on-startup=true", + "--storage-recover-on-startup=false", + "--data-recovery-on-startup=false", ], "log_file": "instance_1.log", "data_directory": f"{TEMP_DIR}/instance_1", @@ -51,6 +54,9 @@ MEMGRAPH_INSTANCES_DESCRIPTION = { "TRACE", "--coordinator-server-port", "10012", + "--replication-restore-state-on-startup=true", + "--storage-recover-on-startup=false", + "--data-recovery-on-startup=false", ], "log_file": "instance_2.log", "data_directory": f"{TEMP_DIR}/instance_2", @@ -65,6 +71,9 @@ MEMGRAPH_INSTANCES_DESCRIPTION = { "TRACE", "--coordinator-server-port", "10013", + "--replication-restore-state-on-startup=true", + "--storage-recover-on-startup=false", + "--data-recovery-on-startup=false", ], "log_file": "instance_3.log", "data_directory": f"{TEMP_DIR}/instance_3", @@ -90,14 +99,794 @@ MEMGRAPH_INSTANCES_DESCRIPTION = { } -def test_replication_works_on_failover(): +@pytest.mark.parametrize("data_recovery", ["false", "true"]) +def test_replication_works_on_failover_replica_1_epoch_2_commits_away(data_recovery): # Goal of this test is to check the replication works after failover command. - # 1. We start all replicas, main and coordinator manually: we want to be able to kill them ourselves without relying on external tooling to kill processes. + # 1. We start all replicas, main and coordinator manually + # 2. We check that main has correct state + # 3. Create initial data on MAIN + # 4. Expect data to be copied on all replicas + # 5. Kill instance_1 (replica 1) + # 6. Create data on MAIN and expect to be copied to only one replica (instance_2) + # 7. Kill main + # 8. Instance_2 new MAIN + # 9. Create vertex on instance 2 + # 10. Start instance_1(it should have one commit on old epoch and new epoch with new commit shouldn't be replicated) + # 11. Expect data to be copied on instance_1 + # 12. Start old MAIN (instance_3) + # 13. Expect data to be copied to instance_3 + + temp_dir = tempfile.TemporaryDirectory().name + + MEMGRAPH_INNER_INSTANCES_DESCRIPTION = { + "instance_1": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7688", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10011", + "--replication-restore-state-on-startup", + "true", + f"--data-recovery-on-startup={data_recovery}", + "--storage-recover-on-startup=false", + ], + "log_file": "instance_1.log", + "data_directory": f"{temp_dir}/instance_1", + "setup_queries": [], + }, + "instance_2": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7689", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10012", + "--replication-restore-state-on-startup", + "true", + f"--data-recovery-on-startup={data_recovery}", + "--storage-recover-on-startup=false", + ], + "log_file": "instance_2.log", + "data_directory": f"{temp_dir}/instance_2", + "setup_queries": [], + }, + "instance_3": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7687", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10013", + "--replication-restore-state-on-startup", + "true", + "--data-recovery-on-startup", + f"{data_recovery}", + "--storage-recover-on-startup=false", + ], + "log_file": "instance_3.log", + "data_directory": f"{temp_dir}/instance_3", + "setup_queries": [], + }, + "coordinator": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7690", + "--log-level=TRACE", + "--raft-server-id=1", + "--raft-server-port=10111", + ], + "log_file": "coordinator.log", + "setup_queries": [ + "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001';", + "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002';", + "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003';", + "SET INSTANCE instance_3 TO MAIN", + ], + }, + } + + # 1 + interactive_mg_runner.start_all(MEMGRAPH_INNER_INSTANCES_DESCRIPTION) + + # 2 + main_cursor = connect(host="localhost", port=7687).cursor() + + def retrieve_data_show_replicas(): + return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;"))) + + expected_data_on_main = [ + ( + "instance_1", + "127.0.0.1:10001", + "sync", + {"behind": None, "status": "ready", "ts": 0}, + {"memgraph": {"behind": 0, "status": "ready", "ts": 0}}, + ), + ( + "instance_2", + "127.0.0.1:10002", + "sync", + {"behind": None, "status": "ready", "ts": 0}, + {"memgraph": {"behind": 0, "status": "ready", "ts": 0}}, + ), + ] + mg_sleep_and_assert_collection(expected_data_on_main, retrieve_data_show_replicas) + + # 3 + execute_and_fetch_all(main_cursor, "CREATE (:EpochVertex1 {prop:1});") + + # 4 + + instance_1_cursor = connect(host="localhost", port=7688).cursor() + instance_2_cursor = connect(host="localhost", port=7689).cursor() + + assert execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n);")[0][0] == 1 + assert execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n);")[0][0] == 1 + + # 5 + interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_1") + + # 6 + + with pytest.raises(Exception) as e: + execute_and_fetch_all(main_cursor, "CREATE (:EpochVertex1 {prop:2});") + assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value) + + assert execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2 + + # 7 + interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_3") + + # 8. + coord_cursor = connect(host="localhost", port=7690).cursor() + + def retrieve_data_show_instances(): + return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;"))) + + expected_data_on_coord = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", False, "unknown"), + ("instance_2", "", "127.0.0.1:10012", True, "main"), + ("instance_3", "", "127.0.0.1:10013", False, "unknown"), + ] + mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances) + + # 9 + + with pytest.raises(Exception) as e: + execute_and_fetch_all(instance_2_cursor, "CREATE (:Epoch3 {prop:3});") + assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value) + + # 10 + interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_1") + + new_expected_data_on_coord = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", True, "replica"), + ("instance_2", "", "127.0.0.1:10012", True, "main"), + ("instance_3", "", "127.0.0.1:10013", False, "unknown"), + ] + mg_sleep_and_assert(new_expected_data_on_coord, retrieve_data_show_instances) + + # 11 + instance_1_cursor = connect(host="localhost", port=7688).cursor() + + def get_vertex_count(): + return execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n)")[0][0] + + mg_sleep_and_assert(3, get_vertex_count) + + # 12 + + interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_3") + + new_expected_data_on_coord = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", True, "replica"), + ("instance_2", "", "127.0.0.1:10012", True, "main"), + ("instance_3", "", "127.0.0.1:10013", True, "replica"), + ] + mg_sleep_and_assert(new_expected_data_on_coord, retrieve_data_show_instances) + + # 13 + + instance_3_cursor = connect(host="localhost", port=7687).cursor() + + def get_vertex_count(): + return execute_and_fetch_all(instance_3_cursor, "MATCH (n) RETURN count(n)")[0][0] + + mg_sleep_and_assert(3, get_vertex_count) + + +@pytest.mark.parametrize("data_recovery", ["false", "true"]) +def test_replication_works_on_failover_replica_2_epochs_more_commits_away(data_recovery): + # Goal of this test is to check the replication works after failover command if one + # instance missed couple of epochs but data is still available on one of the instances + + # 1. We start all replicas, main and coordinator manually + # 2. Main does commit + # 3. instance_2 down + # 4. Main commits more + # 5. Main down + # 6. Instance_1 new main + # 7. Instance 1 commits + # 8. Instance 4 gets data + # 9. Instance 1 dies + # 10. Instance 4 new main + # 11. Instance 4 commits + # 12. Instance 2 wakes up + # 13. Instance 2 gets data from old epochs + # 14. All other instances wake up + # 15. Everything is replicated + + temp_dir = tempfile.TemporaryDirectory().name + + MEMGRAPH_INNER_INSTANCES_DESCRIPTION = { + "instance_1": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7688", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10011", + "--replication-restore-state-on-startup", + "true", + f"--data-recovery-on-startup={data_recovery}", + "--storage-recover-on-startup=false", + ], + "log_file": "instance_1.log", + "data_directory": f"{temp_dir}/instance_1", + "setup_queries": [], + }, + "instance_2": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7689", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10012", + "--replication-restore-state-on-startup", + "true", + f"--data-recovery-on-startup={data_recovery}", + "--storage-recover-on-startup=false", + ], + "log_file": "instance_2.log", + "data_directory": f"{temp_dir}/instance_2", + "setup_queries": [], + }, + "instance_3": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7687", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10013", + "--replication-restore-state-on-startup", + "true", + "--data-recovery-on-startup", + f"{data_recovery}", + "--storage-recover-on-startup=false", + ], + "log_file": "instance_3.log", + "data_directory": f"{temp_dir}/instance_3", + "setup_queries": [], + }, + "instance_4": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7691", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10014", + "--replication-restore-state-on-startup", + "true", + "--data-recovery-on-startup", + f"{data_recovery}", + "--storage-recover-on-startup=false", + ], + "log_file": "instance_4.log", + "data_directory": f"{temp_dir}/instance_4", + "setup_queries": [], + }, + "coordinator": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7690", + "--log-level=TRACE", + "--raft-server-id=1", + "--raft-server-port=10111", + ], + "log_file": "coordinator.log", + "setup_queries": [ + "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001';", + "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002';", + "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003';", + "REGISTER INSTANCE instance_4 ON '127.0.0.1:10014' WITH '127.0.0.1:10004';", + "SET INSTANCE instance_3 TO MAIN", + ], + }, + } + + # 1 + + interactive_mg_runner.start_all(MEMGRAPH_INNER_INSTANCES_DESCRIPTION) + + expected_data_on_main = [ + ( + "instance_1", + "127.0.0.1:10001", + "sync", + {"behind": None, "status": "ready", "ts": 0}, + {"memgraph": {"behind": 0, "status": "ready", "ts": 0}}, + ), + ( + "instance_2", + "127.0.0.1:10002", + "sync", + {"behind": None, "status": "ready", "ts": 0}, + {"memgraph": {"behind": 0, "status": "ready", "ts": 0}}, + ), + ( + "instance_4", + "127.0.0.1:10004", + "sync", + {"behind": None, "status": "ready", "ts": 0}, + {"memgraph": {"behind": 0, "status": "ready", "ts": 0}}, + ), + ] + + main_cursor = connect(host="localhost", port=7687).cursor() + + def retrieve_data_show_replicas(): + return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;"))) + + mg_sleep_and_assert_collection(expected_data_on_main, retrieve_data_show_replicas) + + # 2 + + execute_and_fetch_all(main_cursor, "CREATE (:EpochVertex1 {prop:1});") + execute_and_fetch_all(main_cursor, "CREATE (:EpochVertex1 {prop:2});") + + instance_1_cursor = connect(host="localhost", port=7688).cursor() + instance_2_cursor = connect(host="localhost", port=7689).cursor() + instance_4_cursor = connect(host="localhost", port=7691).cursor() + + assert execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2 + assert execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2 + assert execute_and_fetch_all(instance_4_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2 + + # 3 + + interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_2") + + coord_cursor = connect(host="localhost", port=7690).cursor() + + def retrieve_data_show_instances(): + return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;"))) + + expected_data_on_coord = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", True, "replica"), + ("instance_2", "", "127.0.0.1:10012", False, "unknown"), + ("instance_3", "", "127.0.0.1:10013", True, "main"), + ("instance_4", "", "127.0.0.1:10014", True, "replica"), + ] + mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances) + + # 4 + + with pytest.raises(Exception) as e: + execute_and_fetch_all(main_cursor, "CREATE (:EpochVertex1 {prop:1});") + assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value) + + assert execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n);")[0][0] == 3 + assert execute_and_fetch_all(instance_4_cursor, "MATCH (n) RETURN count(n);")[0][0] == 3 + + # 5 + + interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_3") + + # 6 + + expected_data_on_coord = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", True, "main"), + ("instance_2", "", "127.0.0.1:10012", False, "unknown"), + ("instance_3", "", "127.0.0.1:10013", False, "unknown"), + ("instance_4", "", "127.0.0.1:10014", True, "replica"), + ] + mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances) + + # 7 + + with pytest.raises(Exception) as e: + execute_and_fetch_all(instance_1_cursor, "CREATE (:Epoch2Vertex {prop:1});") + assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value) + + # 8 + + assert execute_and_fetch_all(instance_4_cursor, "MATCH (n) RETURN count(n);")[0][0] == 4 + + # 9 + + interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_1") + + # 10 + + expected_data_on_coord = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", False, "unknown"), + ("instance_2", "", "127.0.0.1:10012", False, "unknown"), + ("instance_3", "", "127.0.0.1:10013", False, "unknown"), + ("instance_4", "", "127.0.0.1:10014", True, "main"), + ] + mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances) + + # 11 + + with pytest.raises(Exception) as e: + execute_and_fetch_all(instance_4_cursor, "CREATE (:Epoch3Vertex {prop:1});") + assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value) + + # 12 + + interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_2") + + expected_data_on_coord = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", False, "unknown"), + ("instance_2", "", "127.0.0.1:10012", True, "replica"), + ("instance_3", "", "127.0.0.1:10013", False, "unknown"), + ("instance_4", "", "127.0.0.1:10014", True, "main"), + ] + mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances) + + # 13 + + instance_2_cursor = connect(host="localhost", port=7689).cursor() + + def get_vertex_count(): + return execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n)")[0][0] + + mg_sleep_and_assert(5, get_vertex_count) + + # 14 + + interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_1") + interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_3") + + expected_data_on_coord = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", True, "replica"), + ("instance_2", "", "127.0.0.1:10012", True, "replica"), + ("instance_3", "", "127.0.0.1:10013", True, "replica"), + ("instance_4", "", "127.0.0.1:10014", True, "main"), + ] + mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances) + + # 15 + instance_1_cursor = connect(host="localhost", port=7688).cursor() + instance_4_cursor = connect(host="localhost", port=7691).cursor() + + def get_vertex_count(): + return execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n)")[0][0] + + mg_sleep_and_assert(5, get_vertex_count) + + def get_vertex_count(): + return execute_and_fetch_all(instance_4_cursor, "MATCH (n) RETURN count(n)")[0][0] + + mg_sleep_and_assert(5, get_vertex_count) + + +@pytest.mark.parametrize("data_recovery", ["false"]) +def test_replication_forcefully_works_on_failover_replica_misses_epoch(data_recovery): + # TODO(antoniofilipovic) Test should pass when logic is added + # Goal of this test is to check the replication works forcefully if replica misses epoch + # 1. We start all replicas, main and coordinator manually + # 2. We check that main has correct state + # 3. Create initial data on MAIN + # 4. Expect data to be copied on all replicas + # 5. Kill instance_1 ( this one will miss complete epoch) + # 6. Kill main (instance_3) + # 7. Instance_2 or instance_4 new main + # 8. New main commits + # 9. Instance_2 down (not main) + # 10. instance_4 down + # 11. Instance 1 up (missed epoch) + # 12 Instance 1 new main + # 13 instance 2 up + # 14 Force data from instance 1 to instance 2 + + temp_dir = tempfile.TemporaryDirectory().name + + pass + + +@pytest.mark.parametrize("data_recovery", ["false", "true"]) +def test_replication_correct_replica_chosen_up_to_date_data(data_recovery): + # Goal of this test is to check that correct replica instance as new MAIN is chosen + # 1. We start all replicas, main and coordinator manually + # 2. We check that main has correct state + # 3. Create initial data on MAIN + # 4. Expect data to be copied on all replicas + # 5. Kill instance_1 ( this one will miss complete epoch) + # 6. Kill main (instance_3) + # 7. Instance_2 new MAIN + # 8. Instance_2 commits and replicates data + # 9. Instance_4 down (not main) + # 10. instance_2 down (MAIN), instance 1 up (missed epoch), + # instance 4 up (In this case we should always choose instance_4 because it has up-to-date data) + # 11 Instance 4 new main + # 12 instance_1 gets up-to-date data, instance_4 has all data + + temp_dir = tempfile.TemporaryDirectory().name + + MEMGRAPH_INNER_INSTANCES_DESCRIPTION = { + "instance_1": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7688", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10011", + "--replication-restore-state-on-startup", + "true", + f"--data-recovery-on-startup={data_recovery}", + "--storage-recover-on-startup=false", + ], + "log_file": "instance_1.log", + "data_directory": f"{temp_dir}/instance_1", + "setup_queries": [], + }, + "instance_2": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7689", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10012", + "--replication-restore-state-on-startup", + "true", + f"--data-recovery-on-startup={data_recovery}", + "--storage-recover-on-startup=false", + ], + "log_file": "instance_2.log", + "data_directory": f"{temp_dir}/instance_2", + "setup_queries": [], + }, + "instance_3": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7687", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10013", + "--replication-restore-state-on-startup", + "true", + "--data-recovery-on-startup", + f"{data_recovery}", + "--storage-recover-on-startup=false", + ], + "log_file": "instance_3.log", + "data_directory": f"{temp_dir}/instance_3", + "setup_queries": [], + }, + "instance_4": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7691", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10014", + "--replication-restore-state-on-startup", + "true", + "--data-recovery-on-startup", + f"{data_recovery}", + "--storage-recover-on-startup=false", + ], + "log_file": "instance_4.log", + "data_directory": f"{temp_dir}/instance_4", + "setup_queries": [], + }, + "coordinator": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7690", + "--log-level=TRACE", + "--raft-server-id=1", + "--raft-server-port=10111", + ], + "log_file": "coordinator.log", + "setup_queries": [ + "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001';", + "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002';", + "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003';", + "REGISTER INSTANCE instance_4 ON '127.0.0.1:10014' WITH '127.0.0.1:10004';", + "SET INSTANCE instance_3 TO MAIN", + ], + }, + } + + # 1 + + interactive_mg_runner.start_all(MEMGRAPH_INNER_INSTANCES_DESCRIPTION) + + # 2 + + main_cursor = connect(host="localhost", port=7687).cursor() + expected_data_on_main = [ + ( + "instance_1", + "127.0.0.1:10001", + "sync", + {"behind": None, "status": "ready", "ts": 0}, + {"memgraph": {"behind": 0, "status": "ready", "ts": 0}}, + ), + ( + "instance_2", + "127.0.0.1:10002", + "sync", + {"behind": None, "status": "ready", "ts": 0}, + {"memgraph": {"behind": 0, "status": "ready", "ts": 0}}, + ), + ( + "instance_4", + "127.0.0.1:10004", + "sync", + {"behind": None, "status": "ready", "ts": 0}, + {"memgraph": {"behind": 0, "status": "ready", "ts": 0}}, + ), + ] + + main_cursor = connect(host="localhost", port=7687).cursor() + + def retrieve_data_show_replicas(): + return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;"))) + + mg_sleep_and_assert_collection(expected_data_on_main, retrieve_data_show_replicas) + + coord_cursor = connect(host="localhost", port=7690).cursor() + + def retrieve_data_show_instances(): + return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;"))) + + # TODO(antoniofilipovic) Before fixing durability, if this is removed we also have an issue. Check after fix + expected_data_on_coord = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", True, "replica"), + ("instance_2", "", "127.0.0.1:10012", True, "replica"), + ("instance_3", "", "127.0.0.1:10013", True, "main"), + ("instance_4", "", "127.0.0.1:10014", True, "replica"), + ] + mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances) + + # 3 + + execute_and_fetch_all(main_cursor, "CREATE (:Epoch1Vertex {prop:1});") + execute_and_fetch_all(main_cursor, "CREATE (:Epoch1Vertex {prop:2});") + + # 4 + instance_1_cursor = connect(host="localhost", port=7688).cursor() + instance_2_cursor = connect(host="localhost", port=7689).cursor() + instance_4_cursor = connect(host="localhost", port=7691).cursor() + + assert execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2 + assert execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2 + assert execute_and_fetch_all(instance_4_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2 + + # 5 + + interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_1") + + # 6 + interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_3") + + # 7 + + expected_data_on_coord = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", False, "unknown"), + ("instance_2", "", "127.0.0.1:10012", True, "main"), + ("instance_3", "", "127.0.0.1:10013", False, "unknown"), + ("instance_4", "", "127.0.0.1:10014", True, "replica"), + ] + mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances) + + # 8 + + with pytest.raises(Exception) as e: + execute_and_fetch_all(instance_2_cursor, "CREATE (:Epoch2Vertex {prop:1});") + assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value) + + def get_vertex_count(): + return execute_and_fetch_all(instance_4_cursor, "MATCH (n) RETURN count(n)")[0][0] + + mg_sleep_and_assert(3, get_vertex_count) + + assert execute_and_fetch_all(instance_4_cursor, "MATCH (n) RETURN count(n);")[0][0] == 3 + + # 9 + + interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_4") + + expected_data_on_coord = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", False, "unknown"), + ("instance_2", "", "127.0.0.1:10012", True, "main"), + ("instance_3", "", "127.0.0.1:10013", False, "unknown"), + ("instance_4", "", "127.0.0.1:10014", False, "unknown"), + ] + mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances) + + # 10 + + interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_2") + interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_1") + interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_4") + + # 11 + + expected_data_on_coord = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", True, "replica"), + ("instance_2", "", "127.0.0.1:10012", False, "unknown"), + ("instance_3", "", "127.0.0.1:10013", False, "unknown"), + ("instance_4", "", "127.0.0.1:10014", True, "main"), + ] + mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances) + + # 12 + instance_1_cursor = connect(host="localhost", port=7688).cursor() + instance_4_cursor = connect(host="localhost", port=7691).cursor() + + def get_vertex_count(): + return execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n)")[0][0] + + mg_sleep_and_assert(3, get_vertex_count) + + def get_vertex_count(): + return execute_and_fetch_all(instance_4_cursor, "MATCH (n) RETURN count(n)")[0][0] + + mg_sleep_and_assert(3, get_vertex_count) + + +def test_replication_works_on_failover_simple(): + # Goal of this test is to check the replication works after failover command. + # 1. We start all replicas, main and coordinator manually # 2. We check that main has correct state # 3. We kill main # 4. We check that coordinator and new main have correct state # 5. We insert one vertex on new main # 6. We check that vertex appears on new replica + # 7. We bring back main up + # 8. Expect data to be copied to main safe_execute(shutil.rmtree, TEMP_DIR) # 1 @@ -164,33 +953,48 @@ def test_replication_works_on_failover(): ] mg_sleep_and_assert_collection(expected_data_on_new_main, retrieve_data_show_replicas) + # 5 + with pytest.raises(Exception) as e: + execute_and_fetch_all(new_main_cursor, "CREATE ();") + assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value) + # 6 + alive_replica_cursor = connect(host="localhost", port=7689).cursor() + res = execute_and_fetch_all(alive_replica_cursor, "MATCH (n) RETURN count(n) as count;")[0][0] + assert res == 1, "Vertex should be replicated" + + # 7 interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3") + + def retrieve_data_show_replicas(): + return sorted(list(execute_and_fetch_all(new_main_cursor, "SHOW REPLICAS;"))) + + new_main_cursor = connect(host="localhost", port=7688).cursor() + expected_data_on_new_main = [ ( "instance_2", "127.0.0.1:10002", "sync", {"ts": 0, "behind": None, "status": "ready"}, - {"memgraph": {"ts": 0, "behind": 0, "status": "ready"}}, + {"memgraph": {"ts": 2, "behind": 0, "status": "ready"}}, ), ( "instance_3", "127.0.0.1:10003", "sync", {"ts": 0, "behind": None, "status": "ready"}, - {"memgraph": {"ts": 0, "behind": 0, "status": "ready"}}, + {"memgraph": {"ts": 2, "behind": 0, "status": "ready"}}, ), ] - mg_sleep_and_assert_collection(expected_data_on_new_main, retrieve_data_show_replicas) + mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas) - # 5 - execute_and_fetch_all(new_main_cursor, "CREATE ();") + # 8 + alive_main = connect(host="localhost", port=7687).cursor() - # 6 - alive_replica_cursror = connect(host="localhost", port=7689).cursor() - res = execute_and_fetch_all(alive_replica_cursror, "MATCH (n) RETURN count(n) as count;")[0][0] - assert res == 1, "Vertex should be replicated" - interactive_mg_runner.stop_all(MEMGRAPH_INSTANCES_DESCRIPTION) + def retrieve_vertices_count(): + return execute_and_fetch_all(alive_main, "MATCH (n) RETURN count(n) as count;")[0][0] + + mg_sleep_and_assert(1, retrieve_vertices_count) def test_replication_works_on_replica_instance_restart(): diff --git a/tests/e2e/replication/show_while_creating_invalid_state.py b/tests/e2e/replication/show_while_creating_invalid_state.py index be7cd2b54..963aad7fd 100644 --- a/tests/e2e/replication/show_while_creating_invalid_state.py +++ b/tests/e2e/replication/show_while_creating_invalid_state.py @@ -923,7 +923,7 @@ def test_replication_role_recovery(connection): "--log-level=TRACE", "--replication-restore-state-on-startup", "true", - "--storage-recover-on-startup", + "--data-recovery-on-startup", "false", ], "log_file": "replica.log", @@ -934,7 +934,7 @@ def test_replication_role_recovery(connection): "--bolt-port", "7687", "--log-level=TRACE", - "--storage-recover-on-startup=true", + "--data-recovery-on-startup=true", "--replication-restore-state-on-startup=true", ], "log_file": "main.log", @@ -1105,7 +1105,7 @@ def test_basic_recovery_when_replica_is_kill_when_main_is_down(): "--bolt-port", "7687", "--log-level=TRACE", - "--storage-recover-on-startup=true", + "--data-recovery-on-startup=true", "--replication-restore-state-on-startup=true", ], "log_file": "main.log", @@ -1201,7 +1201,7 @@ def test_async_replication_when_main_is_killed(): "data_directory": f"{data_directory_replica.name}", }, "main": { - "args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], + "args": ["--bolt-port", "7687", "--log-level=TRACE", "--data-recovery-on-startup=true"], "log_file": "main.log", "setup_queries": [], "data_directory": f"{data_directory_main.name}", @@ -1284,7 +1284,7 @@ def test_sync_replication_when_main_is_killed(): "data_directory": f"{data_directory_replica.name}", }, "main": { - "args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], + "args": ["--bolt-port", "7687", "--log-level=TRACE", "--data-recovery-on-startup=true"], "log_file": "main.log", "setup_queries": [], "data_directory": f"{data_directory_main.name}", @@ -1340,7 +1340,7 @@ def test_attempt_to_write_data_on_main_when_async_replica_is_down(): "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"], }, "main": { - "args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], + "args": ["--bolt-port", "7687", "--log-level=TRACE", "--data-recovery-on-startup=true"], "log_file": "main.log", "setup_queries": [ "REGISTER REPLICA async_replica1 ASYNC TO '127.0.0.1:10001';", @@ -1443,7 +1443,7 @@ def test_attempt_to_write_data_on_main_when_sync_replica_is_down(connection): "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"], }, "main": { - "args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup", "true"], + "args": ["--bolt-port", "7687", "--log-level=TRACE", "--data-recovery-on-startup", "true"], "log_file": "main.log", # need to do it manually "setup_queries": [], @@ -1572,7 +1572,7 @@ def test_attempt_to_create_indexes_on_main_when_async_replica_is_down(): "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"], }, "main": { - "args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], + "args": ["--bolt-port", "7687", "--log-level=TRACE", "--data-recovery-on-startup=true"], "log_file": "main.log", "setup_queries": [ "REGISTER REPLICA async_replica1 ASYNC TO '127.0.0.1:10001';", @@ -1673,7 +1673,7 @@ def test_attempt_to_create_indexes_on_main_when_sync_replica_is_down(connection) "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"], }, "main": { - "args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], + "args": ["--bolt-port", "7687", "--log-level=TRACE", "--data-recovery-on-startup=true"], "log_file": "main.log", # Need to do it manually "setup_queries": [], @@ -1818,7 +1818,7 @@ def test_trigger_on_create_before_commit_with_offline_sync_replica(connection): "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"], }, "main": { - "args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], + "args": ["--bolt-port", "7687", "--log-level=TRACE", "--data-recovery-on-startup=true"], "log_file": "main.log", # Need to do it manually since we kill replica "setup_queries": [], @@ -1937,7 +1937,7 @@ def test_trigger_on_update_before_commit_with_offline_sync_replica(connection): "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"], }, "main": { - "args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], + "args": ["--bolt-port", "7687", "--log-level=TRACE", "--data-recovery-on-startup=true"], "log_file": "main.log", "setup_queries": [], }, @@ -2060,7 +2060,7 @@ def test_trigger_on_delete_before_commit_with_offline_sync_replica(connection): "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"], }, "main": { - "args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], + "args": ["--bolt-port", "7687", "--log-level=TRACE", "--data-recovery-on-startup=true"], "log_file": "main.log", "setup_queries": [], }, @@ -2187,7 +2187,7 @@ def test_trigger_on_create_before_and_after_commit_with_offline_sync_replica(con "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"], }, "main": { - "args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], + "args": ["--bolt-port", "7687", "--log-level=TRACE", "--data-recovery-on-startup=true"], "log_file": "main.log", "setup_queries": [], }, @@ -2310,7 +2310,7 @@ def test_triggers_on_create_before_commit_with_offline_sync_replica(connection): "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"], }, "main": { - "args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], + "args": ["--bolt-port", "7687", "--log-level=TRACE", "--data-recovery-on-startup=true"], "log_file": "main.log", "setup_queries": [], }, diff --git a/tests/unit/coordination_utils.cpp b/tests/unit/coordination_utils.cpp index 1346dce2c..2a595c19f 100644 --- a/tests/unit/coordination_utils.cpp +++ b/tests/unit/coordination_utils.cpp @@ -67,8 +67,8 @@ TEST_F(CoordinationUtils, MemgraphDbHistorySimple) { auto [instance_name, latest_epoch, latest_commit_timestamp] = instance.ChooseMostUpToDateInstance(instance_database_histories); ASSERT_TRUE(instance_name == "instance_1" || instance_name == "instance_2" || instance_name == "instance_3"); - ASSERT_TRUE(*latest_epoch == db_histories.back().first); - ASSERT_TRUE(*latest_commit_timestamp == db_histories.back().second); + ASSERT_TRUE(latest_epoch == db_histories.back().first); + ASSERT_TRUE(latest_commit_timestamp == db_histories.back().second); } TEST_F(CoordinationUtils, MemgraphDbHistoryLastEpochDifferent) { @@ -121,8 +121,8 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryLastEpochDifferent) { instance.ChooseMostUpToDateInstance(instance_database_histories); ASSERT_TRUE(instance_name == "instance_3"); - ASSERT_TRUE(*latest_epoch == db_histories.back().first); - ASSERT_TRUE(*latest_commit_timestamp == db_histories.back().second); + ASSERT_TRUE(latest_epoch == db_histories.back().first); + ASSERT_TRUE(latest_commit_timestamp == db_histories.back().second); } TEST_F(CoordinationUtils, MemgraphDbHistoryOneInstanceAheadFewEpochs) { @@ -179,8 +179,8 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryOneInstanceAheadFewEpochs) { instance.ChooseMostUpToDateInstance(instance_database_histories); ASSERT_TRUE(instance_name == "instance_3"); - ASSERT_TRUE(*latest_epoch == db_histories_longest.back().first); - ASSERT_TRUE(*latest_commit_timestamp == db_histories_longest.back().second); + ASSERT_TRUE(latest_epoch == db_histories_longest.back().first); + ASSERT_TRUE(latest_commit_timestamp == db_histories_longest.back().second); } TEST_F(CoordinationUtils, MemgraphDbHistoryInstancesHistoryDiverged) { @@ -241,6 +241,6 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryInstancesHistoryDiverged) { instance.ChooseMostUpToDateInstance(instance_database_histories); ASSERT_TRUE(instance_name == "instance_3"); - ASSERT_TRUE(*latest_epoch == std::string(newest_different_epoch)); - ASSERT_TRUE(*latest_commit_timestamp == oldest_commit_timestamp); + ASSERT_TRUE(latest_epoch == std::string(newest_different_epoch)); + ASSERT_TRUE(latest_commit_timestamp == oldest_commit_timestamp); }