diff --git a/src/dbms/dbms_handler.cpp b/src/dbms/dbms_handler.cpp index 7ae67dd02..16927d7e2 100644 --- a/src/dbms/dbms_handler.cpp +++ b/src/dbms/dbms_handler.cpp @@ -188,7 +188,8 @@ DbmsHandler::DbmsHandler(storage::Config config, replication::ReplicationState & if (flags::AreExperimentsEnabled(flags::Experiments::SYSTEM_REPLICATION) && !recovery_on_startup) { // This will result in dropping databases on SystemRecoveryHandler // for MT case, and for single DB case we might not even set replication as commit timestamp is checked - spdlog::warn("Data recovery on startup not set, this will result in dropping database in case o."); + spdlog::warn( + "Data recovery on startup not set, this will result in dropping database in case of multi-tenancy enabled."); } // TODO: Problem is if user doesn't set this up "database" name won't be recovered diff --git a/src/storage/v2/durability/durability.cpp b/src/storage/v2/durability/durability.cpp index 336fd58a4..b81357902 100644 --- a/src/storage/v2/durability/durability.cpp +++ b/src/storage/v2/durability/durability.cpp @@ -414,10 +414,6 @@ std::optional Recovery::RecoverData(std::string *uuid, Replication auto last_loaded_timestamp = snapshot_timestamp; spdlog::info("Trying to load WAL files."); - // This way we skip WALs finalized only because of role change. - // We can also set the last timestamp to 0 if last loaded timestamp - // is nullopt as this can only happen if the WAL file with seq = 0 - // does not contain any deltas and we didn't find any snapshots. if (last_loaded_timestamp) { epoch_history->emplace_back(repl_storage_state.epoch_.id(), *last_loaded_timestamp); } @@ -427,18 +423,7 @@ std::optional Recovery::RecoverData(std::string *uuid, Replication LOG_FATAL("You are missing a WAL file with the sequence number {}!", *previous_seq_num + 1); } previous_seq_num = wal_file.seq_num; - /* - if (wal_file.epoch_id != repl_storage_state.epoch_.id()) { - // This way we skip WALs finalized only because of role change. - // We can also set the last timestamp to 0 if last loaded timestamp - // is nullopt as this can only happen if the WAL file with seq = 0 - // does not contain any deltas and we didn't find any snapshots. - if (last_loaded_timestamp) { - epoch_history->emplace_back(wal_file.epoch_id, *last_loaded_timestamp); - } - repl_storage_state.epoch_.SetEpoch(std::move(wal_file.epoch_id)); - } - */ + try { auto info = LoadWal(wal_file.path, &indices_constraints, last_loaded_timestamp, vertices, edges, name_id_mapper, edge_count, config.salient.items); @@ -452,24 +437,23 @@ std::optional Recovery::RecoverData(std::string *uuid, Replication last_loaded_timestamp.emplace(recovery_info.next_timestamp - 1); } - auto no_last_element_or_different_epoch = - (!epoch_history->empty() && epoch_history->back().first != wal_file.epoch_id) || epoch_history->empty(); - if (no_last_element_or_different_epoch) { - epoch_history->emplace_back(std::string(wal_file.epoch_id), last_loaded_timestamp.value_or(0)); + bool epoch_history_empty = epoch_history->empty(); + bool epoch_not_recorded = !epoch_history_empty && epoch_history->back().first != wal_file.epoch_id; + auto last_loaded_timestamp_value = last_loaded_timestamp.value_or(0); + + if (epoch_history_empty || epoch_not_recorded) { + epoch_history->emplace_back(std::string(wal_file.epoch_id), last_loaded_timestamp_value); } - auto last_epoch_bigger_timestamp = !epoch_history->empty() && - epoch_history->back().first == wal_file.epoch_id && - epoch_history->back().second < last_loaded_timestamp.value_or(0); - if (last_epoch_bigger_timestamp) { - epoch_history->back().second = last_loaded_timestamp.value_or(0); + + auto last_epoch_updated = !epoch_history_empty && epoch_history->back().first == wal_file.epoch_id && + epoch_history->back().second < last_loaded_timestamp_value; + if (last_epoch_updated) { + epoch_history->back().second = last_loaded_timestamp_value; } } catch (const RecoveryFailure &e) { LOG_FATAL("Couldn't recover WAL deltas from {} because of: {}", wal_file.path, e.what()); } - if (recovery_info.next_timestamp != 0) { - last_loaded_timestamp.emplace(recovery_info.next_timestamp - 1); - } } // The sequence number needs to be recovered even though `LoadWal` didn't // load any deltas from that file. diff --git a/tests/e2e/high_availability/single_coordinator.py b/tests/e2e/high_availability/single_coordinator.py index e7ae2f14b..d111e0604 100644 --- a/tests/e2e/high_availability/single_coordinator.py +++ b/tests/e2e/high_availability/single_coordinator.py @@ -234,7 +234,7 @@ def test_replication_works_on_failover_replica_1_epoch_2_commits_away(data_recov # 5 interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_1") - # 6. + # 6 with pytest.raises(Exception) as e: execute_and_fetch_all(main_cursor, "CREATE (:EpochVertex1 {prop:2});") @@ -242,7 +242,7 @@ def test_replication_works_on_failover_replica_1_epoch_2_commits_away(data_recov assert execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2 - # 7. Kill main + # 7 interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_3") # 8. @@ -259,13 +259,13 @@ def test_replication_works_on_failover_replica_1_epoch_2_commits_away(data_recov ] mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances) - # 9. Create vertex on instance 2 + # 9 with pytest.raises(Exception) as e: execute_and_fetch_all(instance_2_cursor, "CREATE (:Epoch3 {prop:3});") assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value) - # 10. Start instance_1 ( it should have one commit on old epoch and new epoch with new commit shouldn't be replicated) + # 10 interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_1") new_expected_data_on_coord = [ @@ -276,7 +276,7 @@ def test_replication_works_on_failover_replica_1_epoch_2_commits_away(data_recov ] mg_sleep_and_assert(new_expected_data_on_coord, retrieve_data_show_instances) - # 11. Expect data to be copied on instance_1 + # 11 instance_1_cursor = connect(host="localhost", port=7688).cursor() def get_vertex_count(): @@ -284,7 +284,7 @@ def test_replication_works_on_failover_replica_1_epoch_2_commits_away(data_recov mg_sleep_and_assert(3, get_vertex_count) - # 12. Start old MAIN (instance_3) + # 12 interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_3") @@ -296,7 +296,7 @@ def test_replication_works_on_failover_replica_1_epoch_2_commits_away(data_recov ] mg_sleep_and_assert(new_expected_data_on_coord, retrieve_data_show_instances) - # 13. Expect data to be copied to instance_3 + # 13 instance_3_cursor = connect(host="localhost", port=7687).cursor() @@ -1462,4 +1462,5 @@ def test_disable_multiple_mains(): if __name__ == "__main__": + sys.exit(pytest.main([__file__, "-k", "test_replication_correct_replica_chosen_up_to_date_data"])) sys.exit(pytest.main([__file__, "-rA"]))