fix tests and old PR comments

This commit is contained in:
antoniofilipovic 2024-02-27 17:01:36 +01:00 committed by Antonio Filipovic
parent 1288b47a2c
commit 0af2e5cfe3
10 changed files with 126 additions and 89 deletions

View File

@ -175,9 +175,7 @@ auto CoordinatorClient::SendGetInstanceTimestampsRpc() const
-> utils::BasicResult<GetInstanceUUIDError, replication_coordination_glue::DatabaseHistories> {
try {
auto stream{rpc_client_.Stream<coordination::GetDatabaseHistoriesRpc>()};
auto res = stream.AwaitResponse();
return res.database_histories;
return stream.AwaitResponse().database_histories;
} catch (const rpc::RpcFailedException &) {
spdlog::error("RPC error occured while sending GetInstance UUID RPC");

View File

@ -127,7 +127,7 @@ auto CoordinatorInstance::TryFailover() -> void {
ChooseMostUpToDateInstance(instance_database_histories);
spdlog::trace("The most up to date instance is {} with epoch {} and {} latest commit timestamp",
most_up_to_date_instance, *latest_epoch, *latest_commit_timestamp);
most_up_to_date_instance, latest_epoch, latest_commit_timestamp);
auto *new_main = &FindReplicationInstance(most_up_to_date_instance);
@ -391,7 +391,7 @@ auto CoordinatorInstance::SetMainUUID(utils::UUID new_uuid) -> void { main_uuid_
auto CoordinatorInstance::ChooseMostUpToDateInstance(
const std::vector<std::pair<std::string, replication_coordination_glue::DatabaseHistories>>
&instance_database_histories) -> NewMainRes {
NewMainRes new_main_res;
std::optional<NewMainRes> new_main_res;
std::for_each(
instance_database_histories.begin(), instance_database_histories.end(),
[&new_main_res](const InstanceNameDbHistories &instance_res_pair) {
@ -407,7 +407,7 @@ auto CoordinatorInstance::ChooseMostUpToDateInstance(
std::ranges::for_each(
instance_db_histories,
[&instance_name = instance_name](const replication_coordination_glue::DatabaseHistory &db_history) {
spdlog::trace("Instance {}: name {}, default db {}", instance_name, db_history.name,
spdlog::debug("Instance {}: name {}, default db {}", instance_name, db_history.name,
memgraph::dbms::kDefaultDB);
});
@ -417,35 +417,26 @@ auto CoordinatorInstance::ChooseMostUpToDateInstance(
std::ranges::for_each(instance_default_db_history | ranges::views::reverse,
[&instance_name = instance_name](const auto &epoch_history_it) {
spdlog::trace("Instance {}: epoch {}, last_commit_timestamp: {}", instance_name,
spdlog::debug("Instance {}: epoch {}, last_commit_timestamp: {}", instance_name,
std::get<0>(epoch_history_it), std::get<1>(epoch_history_it));
});
// get latest epoch
// get latest timestamp
if (!new_main_res.latest_epoch) {
if (!new_main_res) {
const auto &[epoch, timestamp] = *instance_default_db_history.crbegin();
new_main_res = NewMainRes{
.most_up_to_date_instance = instance_name,
.latest_epoch = epoch,
.latest_commit_timestamp = timestamp,
};
spdlog::trace("Currently the most up to date instance is {} with epoch {} and {} latest commit timestamp",
new_main_res = std::make_optional<NewMainRes>({instance_name, epoch, timestamp});
spdlog::debug("Currently the most up to date instance is {} with epoch {} and {} latest commit timestamp",
instance_name, epoch, timestamp);
return;
}
bool found_same_point{false};
std::string last_most_up_to_date_epoch{*new_main_res.latest_epoch};
std::string last_most_up_to_date_epoch{new_main_res->latest_epoch};
for (auto [epoch, timestamp] : ranges::reverse_view(instance_default_db_history)) {
if (*new_main_res.latest_commit_timestamp < timestamp) {
new_main_res = NewMainRes{
.most_up_to_date_instance = instance_name,
.latest_epoch = epoch,
.latest_commit_timestamp = timestamp,
};
if (new_main_res->latest_commit_timestamp < timestamp) {
new_main_res = std::make_optional<NewMainRes>({instance_name, epoch, timestamp});
spdlog::trace("Found the new most up to date instance {} with epoch {} and {} latest commit timestamp",
instance_name, epoch, timestamp);
}
@ -459,11 +450,11 @@ auto CoordinatorInstance::ChooseMostUpToDateInstance(
if (!found_same_point) {
spdlog::error("Didn't find same history epoch {} for instance {} and instance {}", last_most_up_to_date_epoch,
new_main_res.most_up_to_date_instance, instance_name);
new_main_res->most_up_to_date_instance, instance_name);
}
});
return new_main_res;
return std::move(*new_main_res);
}
} // namespace memgraph::coordination
#endif

View File

@ -28,8 +28,8 @@ namespace memgraph::coordination {
struct NewMainRes {
std::string most_up_to_date_instance;
std::optional<std::string> latest_epoch;
std::optional<uint64_t> latest_commit_timestamp;
std::string latest_epoch;
uint64_t latest_commit_timestamp;
};
using InstanceNameDbHistories = std::pair<std::string, replication_coordination_glue::DatabaseHistories>;

View File

@ -163,13 +163,8 @@ class DbmsHandler {
spdlog::debug("Different UUIDs");
// This case can happen in following scenarios:
// 1. INSTANCE was down and set --data-recover-on-startup=false so we have DB which are created as new
// For replication to work --recover-replication-on-startup must be true
// Instance can only make progress if not coordinator managed
// TODO: Fix this hack
if (config.name == kDefaultDB) {
// If we have replication cluster, for REPLICAs which where down we
spdlog::debug("Last commit timestamp for DB {} is {}", kDefaultDB,
db->storage()->repl_storage_state_.last_commit_timestamp_);
// This seems correct, if database made progress
@ -184,10 +179,8 @@ class DbmsHandler {
return db;
}
// TODO AF: In case of MT we might not have any issue at all?
// we will have issues if they set --data-recovery-on-startup=true if it fails and comes back up but was ahead
spdlog::debug("Dropping database and recreating with the correct UUID");
spdlog::debug("Dropping database {} with UUID: {} and recreating with the correct UUID: {}", config.name,
std::string(db->uuid()), std::string(config.uuid));
// Defer drop
(void)Delete_(db->name());
// Second attempt

View File

@ -291,10 +291,6 @@ void InMemoryReplicationHandlers::SnapshotHandler(dbms::DbmsHandler *dbms_handle
storage->vertex_id_ = recovery_info.next_vertex_id;
storage->edge_id_ = recovery_info.next_edge_id;
storage->timestamp_ = std::max(storage->timestamp_, recovery_info.next_timestamp);
// Is this correct?
// storage->repl_storage_state_.last_commit_timestamp_ =
// std::max(storage->repl_storage_state_.last_commit_timestamp_.load(),
// recovered_snapshot.snapshot_info.start_timestamp);
spdlog::trace("Recovering indices and constraints from snapshot.");
memgraph::storage::durability::RecoverIndicesAndStats(recovered_snapshot.indices_constraints.indices,

View File

@ -271,9 +271,8 @@ auto ReplicationHandler::GetDatabasesHistories() -> replication_coordination_glu
dbms_handler_.ForEach([&results](memgraph::dbms::DatabaseAccess db_acc) {
auto &repl_storage_state = db_acc->storage()->repl_storage_state_;
std::vector<std::pair<std::string, uint64_t>> history =
utils::fmap([](const auto &elem) { return std::pair<std::string, uint64_t>(elem.first, elem.second); },
repl_storage_state.history);
std::vector<std::pair<std::string, uint64_t>> history = utils::fmap(
[](const auto &elem) { return std::make_pair(elem.first, elem.second); }, repl_storage_state.history);
history.emplace_back(std::string(repl_storage_state.epoch_.id()), repl_storage_state.last_commit_timestamp_.load());
replication_coordination_glue::DatabaseHistory repl{

View File

@ -100,7 +100,6 @@ InMemoryStorage::InMemoryStorage(Config config)
"process!",
config_.durability.storage_directory);
}
spdlog::trace("Config durability recover on startup is {}", config_.durability.recover_on_startup);
if (config_.durability.recover_on_startup) {
auto info = recovery_.RecoverData(&uuid_, repl_storage_state_, &vertices_, &edges_, &edge_count_,
name_id_mapper_.get(), &indices_, &constraints_, config_, &wal_seq_num_);

View File

@ -56,20 +56,22 @@ void ReplicationStorageClient::UpdateReplicaState(Storage *storage, DatabaseAcce
// different epoch id, replica was main
// In case there is no epoch transfer, and MAIN doesn't hold all the epochs as it could have been down and miss it
// we need then just to check commit timestamp
spdlog::trace("Replicas epoch id {}, replicas timestamp id {}, main epoch id {}, main timestamp {}",
std::string(replica.epoch_id), replica.current_commit_timestamp,
std::string(replStorageState.epoch_.id()), replStorageState.last_commit_timestamp_);
if (replica.epoch_id != replStorageState.epoch_.id() && replica.current_commit_timestamp != kTimestampInitialId) {
spdlog::trace(
"REPLICA: epoch UUID: {} and last_commit_timestamp: {}; MAIN: epoch UUID {} and last_commit_timestamp {}",
std::string(replica.epoch_id), replica.current_commit_timestamp, std::string(replStorageState.epoch_.id()),
replStorageState.last_commit_timestamp_);
auto const &history = replStorageState.history;
const auto epoch_info_iter = std::find_if(history.crbegin(), history.crend(), [&](const auto &main_epoch_info) {
return main_epoch_info.first == replica.epoch_id;
});
// main didn't have that epoch, but why is here branching point
if (epoch_info_iter == history.crend()) {
spdlog::info("Couldn't find epoch, setting branching point");
spdlog::info("Couldn't find epoch {} in MAIN, setting branching point", std::string(replica.epoch_id));
branching_point = 0;
} else if (epoch_info_iter->second < replica.current_commit_timestamp) {
spdlog::info("Found epoch with commit timestamp {}", epoch_info_iter->second);
spdlog::info("Found epoch {} on MAIN with last_commit_timestamp {}, REPLICA's last_commit_timestamp {}",
std::string(epoch_info_iter->first), epoch_info_iter->second, replica.current_commit_timestamp);
branching_point = epoch_info_iter->second;
}
}

View File

@ -198,12 +198,27 @@ def test_replication_works_on_failover_replica_1_epoch_2_commits_away(data_recov
# 2
main_cursor = connect(host="localhost", port=7687).cursor()
def retrieve_data_show_replicas():
return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
expected_data_on_main = [
("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
(
"instance_1",
"127.0.0.1:10001",
"sync",
{"behind": None, "status": "ready", "ts": 0},
{"memgraph": {"behind": 0, "status": "ready", "ts": 0}},
),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"behind": None, "status": "ready", "ts": 0},
{"memgraph": {"behind": 0, "status": "ready", "ts": 0}},
),
]
actual_data_on_main = sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
assert actual_data_on_main == expected_data_on_main
mg_sleep_and_assert_collection(expected_data_on_main, retrieve_data_show_replicas)
# 3
execute_and_fetch_all(main_cursor, "CREATE (:EpochVertex1 {prop:1});")
@ -409,20 +424,42 @@ def test_replication_works_on_failover_replica_2_epochs_more_commits_away(data_r
},
}
# 1. We start all replicas, main and coordinator manually
# 1
interactive_mg_runner.start_all(MEMGRAPH_INNER_INSTANCES_DESCRIPTION)
main_cursor = connect(host="localhost", port=7687).cursor()
expected_data_on_main = [
("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("instance_4", "127.0.0.1:10004", "sync", 0, 0, "ready"),
(
"instance_1",
"127.0.0.1:10001",
"sync",
{"behind": None, "status": "ready", "ts": 0},
{"memgraph": {"behind": 0, "status": "ready", "ts": 0}},
),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"behind": None, "status": "ready", "ts": 0},
{"memgraph": {"behind": 0, "status": "ready", "ts": 0}},
),
(
"instance_4",
"127.0.0.1:10004",
"sync",
{"behind": None, "status": "ready", "ts": 0},
{"memgraph": {"behind": 0, "status": "ready", "ts": 0}},
),
]
actual_data_on_main = sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
assert actual_data_on_main == expected_data_on_main
# 2. Main does commit
main_cursor = connect(host="localhost", port=7687).cursor()
def retrieve_data_show_replicas():
return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
mg_sleep_and_assert_collection(expected_data_on_main, retrieve_data_show_replicas)
# 2
execute_and_fetch_all(main_cursor, "CREATE (:EpochVertex1 {prop:1});")
execute_and_fetch_all(main_cursor, "CREATE (:EpochVertex1 {prop:2});")
@ -435,7 +472,7 @@ def test_replication_works_on_failover_replica_2_epochs_more_commits_away(data_r
assert execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2
assert execute_and_fetch_all(instance_4_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2
# 3. instance_2 down
# 3
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_2")
@ -453,7 +490,7 @@ def test_replication_works_on_failover_replica_2_epochs_more_commits_away(data_r
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
# 4. Main commits more
# 4
with pytest.raises(Exception) as e:
execute_and_fetch_all(main_cursor, "CREATE (:EpochVertex1 {prop:1});")
@ -462,11 +499,11 @@ def test_replication_works_on_failover_replica_2_epochs_more_commits_away(data_r
assert execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n);")[0][0] == 3
assert execute_and_fetch_all(instance_4_cursor, "MATCH (n) RETURN count(n);")[0][0] == 3
# 5. Main down
# 5
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_3")
# 6. Instance_1 new main
# 6
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
@ -477,21 +514,21 @@ def test_replication_works_on_failover_replica_2_epochs_more_commits_away(data_r
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
# 7. Instance 1 commits
# 7
with pytest.raises(Exception) as e:
execute_and_fetch_all(instance_1_cursor, "CREATE (:Epoch2Vertex {prop:1});")
assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value)
# 8. Instance 4 gets data
# 8
assert execute_and_fetch_all(instance_4_cursor, "MATCH (n) RETURN count(n);")[0][0] == 4
# 8. Instance 1 dies
# 9
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_1")
# 9. Instance 4 new main
# 10
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
@ -502,13 +539,13 @@ def test_replication_works_on_failover_replica_2_epochs_more_commits_away(data_r
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
# 10 Instance 4 commits
# 11
with pytest.raises(Exception) as e:
execute_and_fetch_all(instance_4_cursor, "CREATE (:Epoch3Vertex {prop:1});")
assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value)
# 11 Instance 2 wakes up
# 12
interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_2")
@ -521,7 +558,7 @@ def test_replication_works_on_failover_replica_2_epochs_more_commits_away(data_r
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
# 12 Instance 2 gets data from old epochs
# 13
instance_2_cursor = connect(host="localhost", port=7689).cursor()
@ -530,7 +567,7 @@ def test_replication_works_on_failover_replica_2_epochs_more_commits_away(data_r
mg_sleep_and_assert(5, get_vertex_count)
# 12. All other instances wake up
# 14
interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_1")
interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_3")
@ -544,7 +581,7 @@ def test_replication_works_on_failover_replica_2_epochs_more_commits_away(data_r
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
# 13. Everything is replicated
# 15
instance_1_cursor = connect(host="localhost", port=7688).cursor()
instance_4_cursor = connect(host="localhost", port=7691).cursor()
@ -705,12 +742,35 @@ def test_replication_correct_replica_chosen_up_to_date_data(data_recovery):
main_cursor = connect(host="localhost", port=7687).cursor()
expected_data_on_main = [
("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("instance_4", "127.0.0.1:10004", "sync", 0, 0, "ready"),
(
"instance_1",
"127.0.0.1:10001",
"sync",
{"behind": None, "status": "ready", "ts": 0},
{"memgraph": {"behind": 0, "status": "ready", "ts": 0}},
),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"behind": None, "status": "ready", "ts": 0},
{"memgraph": {"behind": 0, "status": "ready", "ts": 0}},
),
(
"instance_4",
"127.0.0.1:10004",
"sync",
{"behind": None, "status": "ready", "ts": 0},
{"memgraph": {"behind": 0, "status": "ready", "ts": 0}},
),
]
actual_data_on_main = sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
assert actual_data_on_main == expected_data_on_main
main_cursor = connect(host="localhost", port=7687).cursor()
def retrieve_data_show_replicas():
return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
mg_sleep_and_assert_collection(expected_data_on_main, retrieve_data_show_replicas)
coord_cursor = connect(host="localhost", port=7690).cursor()
@ -893,7 +953,6 @@ def test_replication_works_on_failover_simple():
]
mg_sleep_and_assert_collection(expected_data_on_new_main, retrieve_data_show_replicas)
# 5
with pytest.raises(Exception) as e:
execute_and_fetch_all(new_main_cursor, "CREATE ();")
@ -917,14 +976,14 @@ def test_replication_works_on_failover_simple():
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
{"memgraph": {"ts": 2, "behind": 0, "status": "ready"}},
),
(
"instance_3",
"127.0.0.1:10003",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
{"memgraph": {"ts": 2, "behind": 0, "status": "ready"}},
),
]
mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas)

View File

@ -67,8 +67,8 @@ TEST_F(CoordinationUtils, MemgraphDbHistorySimple) {
auto [instance_name, latest_epoch, latest_commit_timestamp] =
instance.ChooseMostUpToDateInstance(instance_database_histories);
ASSERT_TRUE(instance_name == "instance_1" || instance_name == "instance_2" || instance_name == "instance_3");
ASSERT_TRUE(*latest_epoch == db_histories.back().first);
ASSERT_TRUE(*latest_commit_timestamp == db_histories.back().second);
ASSERT_TRUE(latest_epoch == db_histories.back().first);
ASSERT_TRUE(latest_commit_timestamp == db_histories.back().second);
}
TEST_F(CoordinationUtils, MemgraphDbHistoryLastEpochDifferent) {
@ -121,8 +121,8 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryLastEpochDifferent) {
instance.ChooseMostUpToDateInstance(instance_database_histories);
ASSERT_TRUE(instance_name == "instance_3");
ASSERT_TRUE(*latest_epoch == db_histories.back().first);
ASSERT_TRUE(*latest_commit_timestamp == db_histories.back().second);
ASSERT_TRUE(latest_epoch == db_histories.back().first);
ASSERT_TRUE(latest_commit_timestamp == db_histories.back().second);
}
TEST_F(CoordinationUtils, MemgraphDbHistoryOneInstanceAheadFewEpochs) {
@ -179,8 +179,8 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryOneInstanceAheadFewEpochs) {
instance.ChooseMostUpToDateInstance(instance_database_histories);
ASSERT_TRUE(instance_name == "instance_3");
ASSERT_TRUE(*latest_epoch == db_histories_longest.back().first);
ASSERT_TRUE(*latest_commit_timestamp == db_histories_longest.back().second);
ASSERT_TRUE(latest_epoch == db_histories_longest.back().first);
ASSERT_TRUE(latest_commit_timestamp == db_histories_longest.back().second);
}
TEST_F(CoordinationUtils, MemgraphDbHistoryInstancesHistoryDiverged) {
@ -241,6 +241,6 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryInstancesHistoryDiverged) {
instance.ChooseMostUpToDateInstance(instance_database_histories);
ASSERT_TRUE(instance_name == "instance_3");
ASSERT_TRUE(*latest_epoch == std::string(newest_different_epoch));
ASSERT_TRUE(*latest_commit_timestamp == oldest_commit_timestamp);
ASSERT_TRUE(latest_epoch == std::string(newest_different_epoch));
ASSERT_TRUE(latest_commit_timestamp == oldest_commit_timestamp);
}