update durability code

This commit is contained in:
antoniofilipovic 2024-02-28 18:01:25 +01:00 committed by Antonio Filipovic
parent 86fce650d2
commit 7fa4b48770
3 changed files with 22 additions and 36 deletions

View File

@ -188,7 +188,8 @@ DbmsHandler::DbmsHandler(storage::Config config, replication::ReplicationState &
if (flags::AreExperimentsEnabled(flags::Experiments::SYSTEM_REPLICATION) && !recovery_on_startup) { if (flags::AreExperimentsEnabled(flags::Experiments::SYSTEM_REPLICATION) && !recovery_on_startup) {
// This will result in dropping databases on SystemRecoveryHandler // This will result in dropping databases on SystemRecoveryHandler
// for MT case, and for single DB case we might not even set replication as commit timestamp is checked // for MT case, and for single DB case we might not even set replication as commit timestamp is checked
spdlog::warn("Data recovery on startup not set, this will result in dropping database in case o."); spdlog::warn(
"Data recovery on startup not set, this will result in dropping database in case of multi-tenancy enabled.");
} }
// TODO: Problem is if user doesn't set this up "database" name won't be recovered // TODO: Problem is if user doesn't set this up "database" name won't be recovered

View File

@ -414,10 +414,6 @@ std::optional<RecoveryInfo> Recovery::RecoverData(std::string *uuid, Replication
auto last_loaded_timestamp = snapshot_timestamp; auto last_loaded_timestamp = snapshot_timestamp;
spdlog::info("Trying to load WAL files."); spdlog::info("Trying to load WAL files.");
// This way we skip WALs finalized only because of role change.
// We can also set the last timestamp to 0 if last loaded timestamp
// is nullopt as this can only happen if the WAL file with seq = 0
// does not contain any deltas and we didn't find any snapshots.
if (last_loaded_timestamp) { if (last_loaded_timestamp) {
epoch_history->emplace_back(repl_storage_state.epoch_.id(), *last_loaded_timestamp); epoch_history->emplace_back(repl_storage_state.epoch_.id(), *last_loaded_timestamp);
} }
@ -427,18 +423,7 @@ std::optional<RecoveryInfo> Recovery::RecoverData(std::string *uuid, Replication
LOG_FATAL("You are missing a WAL file with the sequence number {}!", *previous_seq_num + 1); LOG_FATAL("You are missing a WAL file with the sequence number {}!", *previous_seq_num + 1);
} }
previous_seq_num = wal_file.seq_num; previous_seq_num = wal_file.seq_num;
/*
if (wal_file.epoch_id != repl_storage_state.epoch_.id()) {
// This way we skip WALs finalized only because of role change.
// We can also set the last timestamp to 0 if last loaded timestamp
// is nullopt as this can only happen if the WAL file with seq = 0
// does not contain any deltas and we didn't find any snapshots.
if (last_loaded_timestamp) {
epoch_history->emplace_back(wal_file.epoch_id, *last_loaded_timestamp);
}
repl_storage_state.epoch_.SetEpoch(std::move(wal_file.epoch_id));
}
*/
try { try {
auto info = LoadWal(wal_file.path, &indices_constraints, last_loaded_timestamp, vertices, edges, name_id_mapper, auto info = LoadWal(wal_file.path, &indices_constraints, last_loaded_timestamp, vertices, edges, name_id_mapper,
edge_count, config.salient.items); edge_count, config.salient.items);
@ -452,24 +437,23 @@ std::optional<RecoveryInfo> Recovery::RecoverData(std::string *uuid, Replication
last_loaded_timestamp.emplace(recovery_info.next_timestamp - 1); last_loaded_timestamp.emplace(recovery_info.next_timestamp - 1);
} }
auto no_last_element_or_different_epoch = bool epoch_history_empty = epoch_history->empty();
(!epoch_history->empty() && epoch_history->back().first != wal_file.epoch_id) || epoch_history->empty(); bool epoch_not_recorded = !epoch_history_empty && epoch_history->back().first != wal_file.epoch_id;
if (no_last_element_or_different_epoch) { auto last_loaded_timestamp_value = last_loaded_timestamp.value_or(0);
epoch_history->emplace_back(std::string(wal_file.epoch_id), last_loaded_timestamp.value_or(0));
if (epoch_history_empty || epoch_not_recorded) {
epoch_history->emplace_back(std::string(wal_file.epoch_id), last_loaded_timestamp_value);
} }
auto last_epoch_bigger_timestamp = !epoch_history->empty() &&
epoch_history->back().first == wal_file.epoch_id && auto last_epoch_updated = !epoch_history_empty && epoch_history->back().first == wal_file.epoch_id &&
epoch_history->back().second < last_loaded_timestamp.value_or(0); epoch_history->back().second < last_loaded_timestamp_value;
if (last_epoch_bigger_timestamp) { if (last_epoch_updated) {
epoch_history->back().second = last_loaded_timestamp.value_or(0); epoch_history->back().second = last_loaded_timestamp_value;
} }
} catch (const RecoveryFailure &e) { } catch (const RecoveryFailure &e) {
LOG_FATAL("Couldn't recover WAL deltas from {} because of: {}", wal_file.path, e.what()); LOG_FATAL("Couldn't recover WAL deltas from {} because of: {}", wal_file.path, e.what());
} }
if (recovery_info.next_timestamp != 0) {
last_loaded_timestamp.emplace(recovery_info.next_timestamp - 1);
}
} }
// The sequence number needs to be recovered even though `LoadWal` didn't // The sequence number needs to be recovered even though `LoadWal` didn't
// load any deltas from that file. // load any deltas from that file.

View File

@ -234,7 +234,7 @@ def test_replication_works_on_failover_replica_1_epoch_2_commits_away(data_recov
# 5 # 5
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_1") interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_1")
# 6. # 6
with pytest.raises(Exception) as e: with pytest.raises(Exception) as e:
execute_and_fetch_all(main_cursor, "CREATE (:EpochVertex1 {prop:2});") execute_and_fetch_all(main_cursor, "CREATE (:EpochVertex1 {prop:2});")
@ -242,7 +242,7 @@ def test_replication_works_on_failover_replica_1_epoch_2_commits_away(data_recov
assert execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2 assert execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2
# 7. Kill main # 7
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_3") interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_3")
# 8. # 8.
@ -259,13 +259,13 @@ def test_replication_works_on_failover_replica_1_epoch_2_commits_away(data_recov
] ]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances) mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
# 9. Create vertex on instance 2 # 9
with pytest.raises(Exception) as e: with pytest.raises(Exception) as e:
execute_and_fetch_all(instance_2_cursor, "CREATE (:Epoch3 {prop:3});") execute_and_fetch_all(instance_2_cursor, "CREATE (:Epoch3 {prop:3});")
assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value) assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value)
# 10. Start instance_1 ( it should have one commit on old epoch and new epoch with new commit shouldn't be replicated) # 10
interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_1") interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_1")
new_expected_data_on_coord = [ new_expected_data_on_coord = [
@ -276,7 +276,7 @@ def test_replication_works_on_failover_replica_1_epoch_2_commits_away(data_recov
] ]
mg_sleep_and_assert(new_expected_data_on_coord, retrieve_data_show_instances) mg_sleep_and_assert(new_expected_data_on_coord, retrieve_data_show_instances)
# 11. Expect data to be copied on instance_1 # 11
instance_1_cursor = connect(host="localhost", port=7688).cursor() instance_1_cursor = connect(host="localhost", port=7688).cursor()
def get_vertex_count(): def get_vertex_count():
@ -284,7 +284,7 @@ def test_replication_works_on_failover_replica_1_epoch_2_commits_away(data_recov
mg_sleep_and_assert(3, get_vertex_count) mg_sleep_and_assert(3, get_vertex_count)
# 12. Start old MAIN (instance_3) # 12
interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_3") interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_3")
@ -296,7 +296,7 @@ def test_replication_works_on_failover_replica_1_epoch_2_commits_away(data_recov
] ]
mg_sleep_and_assert(new_expected_data_on_coord, retrieve_data_show_instances) mg_sleep_and_assert(new_expected_data_on_coord, retrieve_data_show_instances)
# 13. Expect data to be copied to instance_3 # 13
instance_3_cursor = connect(host="localhost", port=7687).cursor() instance_3_cursor = connect(host="localhost", port=7687).cursor()
@ -1462,4 +1462,5 @@ def test_disable_multiple_mains():
if __name__ == "__main__": if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-k", "test_replication_correct_replica_chosen_up_to_date_data"]))
sys.exit(pytest.main([__file__, "-rA"])) sys.exit(pytest.main([__file__, "-rA"]))