Ensure replication works on HA cluster in different scenarios (#1743)

This commit is contained in:
Antonio Filipovic 2024-03-01 12:32:56 +01:00 committed by GitHub
parent f316f7db87
commit 33caa27161
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 924 additions and 87 deletions

View File

@ -175,9 +175,7 @@ auto CoordinatorClient::SendGetInstanceTimestampsRpc() const
-> utils::BasicResult<GetInstanceUUIDError, replication_coordination_glue::DatabaseHistories> { -> utils::BasicResult<GetInstanceUUIDError, replication_coordination_glue::DatabaseHistories> {
try { try {
auto stream{rpc_client_.Stream<coordination::GetDatabaseHistoriesRpc>()}; auto stream{rpc_client_.Stream<coordination::GetDatabaseHistoriesRpc>()};
auto res = stream.AwaitResponse(); return stream.AwaitResponse().database_histories;
return res.database_histories;
} catch (const rpc::RpcFailedException &) { } catch (const rpc::RpcFailedException &) {
spdlog::error("RPC error occured while sending GetInstance UUID RPC"); spdlog::error("RPC error occured while sending GetInstance UUID RPC");

View File

@ -127,7 +127,7 @@ auto CoordinatorInstance::TryFailover() -> void {
ChooseMostUpToDateInstance(instance_database_histories); ChooseMostUpToDateInstance(instance_database_histories);
spdlog::trace("The most up to date instance is {} with epoch {} and {} latest commit timestamp", spdlog::trace("The most up to date instance is {} with epoch {} and {} latest commit timestamp",
most_up_to_date_instance, *latest_epoch, *latest_commit_timestamp); most_up_to_date_instance, latest_epoch, latest_commit_timestamp);
auto *new_main = &FindReplicationInstance(most_up_to_date_instance); auto *new_main = &FindReplicationInstance(most_up_to_date_instance);
@ -391,7 +391,7 @@ auto CoordinatorInstance::SetMainUUID(utils::UUID new_uuid) -> void { main_uuid_
auto CoordinatorInstance::ChooseMostUpToDateInstance( auto CoordinatorInstance::ChooseMostUpToDateInstance(
const std::vector<std::pair<std::string, replication_coordination_glue::DatabaseHistories>> const std::vector<std::pair<std::string, replication_coordination_glue::DatabaseHistories>>
&instance_database_histories) -> NewMainRes { &instance_database_histories) -> NewMainRes {
NewMainRes new_main_res; std::optional<NewMainRes> new_main_res;
std::for_each( std::for_each(
instance_database_histories.begin(), instance_database_histories.end(), instance_database_histories.begin(), instance_database_histories.end(),
[&new_main_res](const InstanceNameDbHistories &instance_res_pair) { [&new_main_res](const InstanceNameDbHistories &instance_res_pair) {
@ -407,7 +407,7 @@ auto CoordinatorInstance::ChooseMostUpToDateInstance(
std::ranges::for_each( std::ranges::for_each(
instance_db_histories, instance_db_histories,
[&instance_name = instance_name](const replication_coordination_glue::DatabaseHistory &db_history) { [&instance_name = instance_name](const replication_coordination_glue::DatabaseHistory &db_history) {
spdlog::trace("Instance {}: name {}, default db {}", instance_name, db_history.name, spdlog::debug("Instance {}: name {}, default db {}", instance_name, db_history.name,
memgraph::dbms::kDefaultDB); memgraph::dbms::kDefaultDB);
}); });
@ -417,35 +417,26 @@ auto CoordinatorInstance::ChooseMostUpToDateInstance(
std::ranges::for_each(instance_default_db_history | ranges::views::reverse, std::ranges::for_each(instance_default_db_history | ranges::views::reverse,
[&instance_name = instance_name](const auto &epoch_history_it) { [&instance_name = instance_name](const auto &epoch_history_it) {
spdlog::trace("Instance {}: epoch {}, last_commit_timestamp: {}", instance_name, spdlog::debug("Instance {}: epoch {}, last_commit_timestamp: {}", instance_name,
std::get<0>(epoch_history_it), std::get<1>(epoch_history_it)); std::get<0>(epoch_history_it), std::get<1>(epoch_history_it));
}); });
// get latest epoch // get latest epoch
// get latest timestamp // get latest timestamp
if (!new_main_res.latest_epoch) { if (!new_main_res) {
const auto &[epoch, timestamp] = *instance_default_db_history.crbegin(); const auto &[epoch, timestamp] = *instance_default_db_history.crbegin();
new_main_res = NewMainRes{ new_main_res = std::make_optional<NewMainRes>({instance_name, epoch, timestamp});
.most_up_to_date_instance = instance_name, spdlog::debug("Currently the most up to date instance is {} with epoch {} and {} latest commit timestamp",
.latest_epoch = epoch,
.latest_commit_timestamp = timestamp,
};
spdlog::trace("Currently the most up to date instance is {} with epoch {} and {} latest commit timestamp",
instance_name, epoch, timestamp); instance_name, epoch, timestamp);
return; return;
} }
bool found_same_point{false}; bool found_same_point{false};
std::string last_most_up_to_date_epoch{*new_main_res.latest_epoch}; std::string last_most_up_to_date_epoch{new_main_res->latest_epoch};
for (auto [epoch, timestamp] : ranges::reverse_view(instance_default_db_history)) { for (auto [epoch, timestamp] : ranges::reverse_view(instance_default_db_history)) {
if (*new_main_res.latest_commit_timestamp < timestamp) { if (new_main_res->latest_commit_timestamp < timestamp) {
new_main_res = NewMainRes{ new_main_res = std::make_optional<NewMainRes>({instance_name, epoch, timestamp});
.most_up_to_date_instance = instance_name,
.latest_epoch = epoch,
.latest_commit_timestamp = timestamp,
};
spdlog::trace("Found the new most up to date instance {} with epoch {} and {} latest commit timestamp", spdlog::trace("Found the new most up to date instance {} with epoch {} and {} latest commit timestamp",
instance_name, epoch, timestamp); instance_name, epoch, timestamp);
} }
@ -459,11 +450,11 @@ auto CoordinatorInstance::ChooseMostUpToDateInstance(
if (!found_same_point) { if (!found_same_point) {
spdlog::error("Didn't find same history epoch {} for instance {} and instance {}", last_most_up_to_date_epoch, spdlog::error("Didn't find same history epoch {} for instance {} and instance {}", last_most_up_to_date_epoch,
new_main_res.most_up_to_date_instance, instance_name); new_main_res->most_up_to_date_instance, instance_name);
} }
}); });
return new_main_res; return std::move(*new_main_res);
} }
} // namespace memgraph::coordination } // namespace memgraph::coordination
#endif #endif

View File

@ -28,8 +28,8 @@ namespace memgraph::coordination {
struct NewMainRes { struct NewMainRes {
std::string most_up_to_date_instance; std::string most_up_to_date_instance;
std::optional<std::string> latest_epoch; std::string latest_epoch;
std::optional<uint64_t> latest_commit_timestamp; uint64_t latest_commit_timestamp;
}; };
using InstanceNameDbHistories = std::pair<std::string, replication_coordination_glue::DatabaseHistories>; using InstanceNameDbHistories = std::pair<std::string, replication_coordination_glue::DatabaseHistories>;

View File

@ -185,6 +185,16 @@ DbmsHandler::DbmsHandler(storage::Config config, replication::ReplicationState &
auto directories = std::set{std::string{kDefaultDB}}; auto directories = std::set{std::string{kDefaultDB}};
// Recover previous databases // Recover previous databases
if (flags::AreExperimentsEnabled(flags::Experiments::SYSTEM_REPLICATION) && !recovery_on_startup) {
// This will result in dropping databases on SystemRecoveryHandler
// for MT case, and for single DB case we might not even set replication as commit timestamp is checked
spdlog::warn(
"Data recovery on startup not set, this will result in dropping database in case of multi-tenancy enabled.");
}
// TODO: Problem is if user doesn't set this up "database" name won't be recovered
// but if storage-recover-on-startup is true storage will be recovered which is an issue
spdlog::info("Data recovery on startup set to {}", recovery_on_startup);
if (recovery_on_startup) { if (recovery_on_startup) {
auto it = durability_->begin(std::string(kDBPrefix)); auto it = durability_->begin(std::string(kDBPrefix));
auto end = durability_->end(std::string(kDBPrefix)); auto end = durability_->end(std::string(kDBPrefix));
@ -410,9 +420,10 @@ void DbmsHandler::UpdateDurability(const storage::Config &config, std::optional<
if (!durability_) return; if (!durability_) return;
// Save database in a list of active databases // Save database in a list of active databases
const auto &key = Durability::GenKey(config.salient.name); const auto &key = Durability::GenKey(config.salient.name);
if (rel_dir == std::nullopt) if (rel_dir == std::nullopt) {
rel_dir = rel_dir =
std::filesystem::relative(config.durability.storage_directory, default_config_.durability.storage_directory); std::filesystem::relative(config.durability.storage_directory, default_config_.durability.storage_directory);
}
const auto &val = Durability::GenVal(config.salient.uuid, *rel_dir); const auto &val = Durability::GenVal(config.salient.uuid, *rel_dir);
durability_->Put(key, val); durability_->Put(key, val);
} }

View File

@ -155,6 +155,8 @@ class DbmsHandler {
spdlog::debug("Trying to create db '{}' on replica which already exists.", config.name); spdlog::debug("Trying to create db '{}' on replica which already exists.", config.name);
auto db = Get_(config.name); auto db = Get_(config.name);
spdlog::debug("Aligning database with name {} which has UUID {}, where config UUID is {}", config.name,
std::string(db->uuid()), std::string(config.uuid));
if (db->uuid() == config.uuid) { // Same db if (db->uuid() == config.uuid) { // Same db
return db; return db;
} }
@ -163,18 +165,22 @@ class DbmsHandler {
// TODO: Fix this hack // TODO: Fix this hack
if (config.name == kDefaultDB) { if (config.name == kDefaultDB) {
spdlog::debug("Last commit timestamp for DB {} is {}", kDefaultDB,
db->storage()->repl_storage_state_.last_commit_timestamp_);
// This seems correct, if database made progress
if (db->storage()->repl_storage_state_.last_commit_timestamp_ != storage::kTimestampInitialId) { if (db->storage()->repl_storage_state_.last_commit_timestamp_ != storage::kTimestampInitialId) {
spdlog::debug("Default storage is not clean, cannot update UUID..."); spdlog::debug("Default storage is not clean, cannot update UUID...");
return NewError::GENERIC; // Update error return NewError::GENERIC; // Update error
} }
spdlog::debug("Update default db's UUID"); spdlog::debug("Updated default db's UUID");
// Default db cannot be deleted and remade, have to just update the UUID // Default db cannot be deleted and remade, have to just update the UUID
db->storage()->config_.salient.uuid = config.uuid; db->storage()->config_.salient.uuid = config.uuid;
UpdateDurability(db->storage()->config_, "."); UpdateDurability(db->storage()->config_, ".");
return db; return db;
} }
spdlog::debug("Drop database and recreate with the correct UUID"); spdlog::debug("Dropping database {} with UUID: {} and recreating with the correct UUID: {}", config.name,
std::string(db->uuid()), std::string(config.uuid));
// Defer drop // Defer drop
(void)Delete_(db->name()); (void)Delete_(db->name());
// Second attempt // Second attempt

View File

@ -334,7 +334,8 @@ int main(int argc, char **argv) {
.salient.items = {.properties_on_edges = FLAGS_storage_properties_on_edges, .salient.items = {.properties_on_edges = FLAGS_storage_properties_on_edges,
.enable_schema_metadata = FLAGS_storage_enable_schema_metadata}, .enable_schema_metadata = FLAGS_storage_enable_schema_metadata},
.salient.storage_mode = memgraph::flags::ParseStorageMode()}; .salient.storage_mode = memgraph::flags::ParseStorageMode()};
spdlog::info("config recover on startup {}, flags {} {}", db_config.durability.recover_on_startup,
FLAGS_storage_recover_on_startup, FLAGS_data_recovery_on_startup);
memgraph::utils::Scheduler jemalloc_purge_scheduler; memgraph::utils::Scheduler jemalloc_purge_scheduler;
jemalloc_purge_scheduler.Run("Jemalloc purge", std::chrono::seconds(FLAGS_storage_gc_cycle_sec), jemalloc_purge_scheduler.Run("Jemalloc purge", std::chrono::seconds(FLAGS_storage_gc_cycle_sec),
[] { memgraph::memory::PurgeUnusedMemory(); }); [] { memgraph::memory::PurgeUnusedMemory(); });

View File

@ -271,9 +271,8 @@ auto ReplicationHandler::GetDatabasesHistories() -> replication_coordination_glu
dbms_handler_.ForEach([&results](memgraph::dbms::DatabaseAccess db_acc) { dbms_handler_.ForEach([&results](memgraph::dbms::DatabaseAccess db_acc) {
auto &repl_storage_state = db_acc->storage()->repl_storage_state_; auto &repl_storage_state = db_acc->storage()->repl_storage_state_;
std::vector<std::pair<std::string, uint64_t>> history = std::vector<std::pair<std::string, uint64_t>> history = utils::fmap(
utils::fmap([](const auto &elem) { return std::pair<std::string, uint64_t>(elem.first, elem.second); }, [](const auto &elem) { return std::make_pair(elem.first, elem.second); }, repl_storage_state.history);
repl_storage_state.history);
history.emplace_back(std::string(repl_storage_state.epoch_.id()), repl_storage_state.last_commit_timestamp_.load()); history.emplace_back(std::string(repl_storage_state.epoch_.id()), repl_storage_state.last_commit_timestamp_.load());
replication_coordination_glue::DatabaseHistory repl{ replication_coordination_glue::DatabaseHistory repl{

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd. // Copyright 2024 Memgraph Ltd.
// //
// Use of this software is governed by the Business Source License // Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -118,6 +118,8 @@ std::optional<std::vector<WalDurabilityInfo>> GetWalFiles(const std::filesystem:
if (!item.is_regular_file()) continue; if (!item.is_regular_file()) continue;
try { try {
auto info = ReadWalInfo(item.path()); auto info = ReadWalInfo(item.path());
spdlog::trace("Getting wal file with following info: uuid: {}, epoch id: {}, from timestamp {}, to_timestamp {} ",
info.uuid, info.epoch_id, info.from_timestamp, info.to_timestamp);
if ((uuid.empty() || info.uuid == uuid) && (!current_seq_num || info.seq_num < *current_seq_num)) { if ((uuid.empty() || info.uuid == uuid) && (!current_seq_num || info.seq_num < *current_seq_num)) {
wal_files.emplace_back(info.seq_num, info.from_timestamp, info.to_timestamp, std::move(info.uuid), wal_files.emplace_back(info.seq_num, info.from_timestamp, info.to_timestamp, std::move(info.uuid),
std::move(info.epoch_id), item.path()); std::move(info.epoch_id), item.path());
@ -356,6 +358,7 @@ std::optional<RecoveryInfo> Recovery::RecoverData(std::string *uuid, Replication
spdlog::warn(utils::MessageWithLink("No snapshot or WAL file found.", "https://memgr.ph/durability")); spdlog::warn(utils::MessageWithLink("No snapshot or WAL file found.", "https://memgr.ph/durability"));
return std::nullopt; return std::nullopt;
} }
// TODO(antoniofilipovic) What is the logic here?
std::sort(wal_files.begin(), wal_files.end()); std::sort(wal_files.begin(), wal_files.end());
// UUID used for durability is the UUID of the last WAL file. // UUID used for durability is the UUID of the last WAL file.
// Same for the epoch id. // Same for the epoch id.
@ -410,22 +413,17 @@ std::optional<RecoveryInfo> Recovery::RecoverData(std::string *uuid, Replication
std::optional<uint64_t> previous_seq_num; std::optional<uint64_t> previous_seq_num;
auto last_loaded_timestamp = snapshot_timestamp; auto last_loaded_timestamp = snapshot_timestamp;
spdlog::info("Trying to load WAL files."); spdlog::info("Trying to load WAL files.");
if (last_loaded_timestamp) {
epoch_history->emplace_back(repl_storage_state.epoch_.id(), *last_loaded_timestamp);
}
for (auto &wal_file : wal_files) { for (auto &wal_file : wal_files) {
if (previous_seq_num && (wal_file.seq_num - *previous_seq_num) > 1) { if (previous_seq_num && (wal_file.seq_num - *previous_seq_num) > 1) {
LOG_FATAL("You are missing a WAL file with the sequence number {}!", *previous_seq_num + 1); LOG_FATAL("You are missing a WAL file with the sequence number {}!", *previous_seq_num + 1);
} }
previous_seq_num = wal_file.seq_num; previous_seq_num = wal_file.seq_num;
if (wal_file.epoch_id != repl_storage_state.epoch_.id()) {
// This way we skip WALs finalized only because of role change.
// We can also set the last timestamp to 0 if last loaded timestamp
// is nullopt as this can only happen if the WAL file with seq = 0
// does not contain any deltas and we didn't find any snapshots.
if (last_loaded_timestamp) {
epoch_history->emplace_back(wal_file.epoch_id, *last_loaded_timestamp);
}
repl_storage_state.epoch_.SetEpoch(std::move(wal_file.epoch_id));
}
try { try {
auto info = LoadWal(wal_file.path, &indices_constraints, last_loaded_timestamp, vertices, edges, name_id_mapper, auto info = LoadWal(wal_file.path, &indices_constraints, last_loaded_timestamp, vertices, edges, name_id_mapper,
edge_count, config.salient.items); edge_count, config.salient.items);
@ -434,13 +432,28 @@ std::optional<RecoveryInfo> Recovery::RecoverData(std::string *uuid, Replication
recovery_info.next_timestamp = std::max(recovery_info.next_timestamp, info.next_timestamp); recovery_info.next_timestamp = std::max(recovery_info.next_timestamp, info.next_timestamp);
recovery_info.last_commit_timestamp = info.last_commit_timestamp; recovery_info.last_commit_timestamp = info.last_commit_timestamp;
if (recovery_info.next_timestamp != 0) {
last_loaded_timestamp.emplace(recovery_info.next_timestamp - 1);
}
bool epoch_history_empty = epoch_history->empty();
bool epoch_not_recorded = !epoch_history_empty && epoch_history->back().first != wal_file.epoch_id;
auto last_loaded_timestamp_value = last_loaded_timestamp.value_or(0);
if (epoch_history_empty || epoch_not_recorded) {
epoch_history->emplace_back(std::string(wal_file.epoch_id), last_loaded_timestamp_value);
}
auto last_epoch_updated = !epoch_history_empty && epoch_history->back().first == wal_file.epoch_id &&
epoch_history->back().second < last_loaded_timestamp_value;
if (last_epoch_updated) {
epoch_history->back().second = last_loaded_timestamp_value;
}
} catch (const RecoveryFailure &e) { } catch (const RecoveryFailure &e) {
LOG_FATAL("Couldn't recover WAL deltas from {} because of: {}", wal_file.path, e.what()); LOG_FATAL("Couldn't recover WAL deltas from {} because of: {}", wal_file.path, e.what());
} }
if (recovery_info.next_timestamp != 0) {
last_loaded_timestamp.emplace(recovery_info.next_timestamp - 1);
}
} }
// The sequence number needs to be recovered even though `LoadWal` didn't // The sequence number needs to be recovered even though `LoadWal` didn't
// load any deltas from that file. // load any deltas from that file.
@ -456,7 +469,12 @@ std::optional<RecoveryInfo> Recovery::RecoverData(std::string *uuid, Replication
memgraph::metrics::Measure(memgraph::metrics::SnapshotRecoveryLatency_us, memgraph::metrics::Measure(memgraph::metrics::SnapshotRecoveryLatency_us,
std::chrono::duration_cast<std::chrono::microseconds>(timer.Elapsed()).count()); std::chrono::duration_cast<std::chrono::microseconds>(timer.Elapsed()).count());
spdlog::info("Set epoch id: {} with commit timestamp {}", std::string(repl_storage_state.epoch_.id()),
repl_storage_state.last_commit_timestamp_);
std::for_each(repl_storage_state.history.begin(), repl_storage_state.history.end(), [](auto &history) {
spdlog::info("epoch id: {} with commit timestamp {}", std::string(history.first), history.second);
});
return recovery_info; return recovery_info;
} }

View File

@ -233,7 +233,7 @@ std::vector<RecoveryStep> GetRecoverySteps(uint64_t replica_commit, utils::FileR
} }
} }
// In all cases, if we have a current wal file we need to use itW // In all cases, if we have a current wal file we need to use it
if (current_wal_seq_num) { if (current_wal_seq_num) {
// NOTE: File not handled directly, so no need to lock it // NOTE: File not handled directly, so no need to lock it
recovery_steps.emplace_back(RecoveryCurrentWal{*current_wal_seq_num}); recovery_steps.emplace_back(RecoveryCurrentWal{*current_wal_seq_num});

View File

@ -109,6 +109,7 @@ InMemoryStorage::InMemoryStorage(Config config)
timestamp_ = std::max(timestamp_, info->next_timestamp); timestamp_ = std::max(timestamp_, info->next_timestamp);
if (info->last_commit_timestamp) { if (info->last_commit_timestamp) {
repl_storage_state_.last_commit_timestamp_ = *info->last_commit_timestamp; repl_storage_state_.last_commit_timestamp_ = *info->last_commit_timestamp;
spdlog::info("Recovering last commit timestamp {}", *info->last_commit_timestamp);
} }
} }
} else if (config_.durability.snapshot_wal_mode != Config::Durability::SnapshotWalMode::DISABLED || } else if (config_.durability.snapshot_wal_mode != Config::Durability::SnapshotWalMode::DISABLED ||

View File

@ -54,15 +54,24 @@ void ReplicationStorageClient::UpdateReplicaState(Storage *storage, DatabaseAcce
std::optional<uint64_t> branching_point; std::optional<uint64_t> branching_point;
// different epoch id, replica was main // different epoch id, replica was main
// In case there is no epoch transfer, and MAIN doesn't hold all the epochs as it could have been down and miss it
// we need then just to check commit timestamp
if (replica.epoch_id != replStorageState.epoch_.id() && replica.current_commit_timestamp != kTimestampInitialId) { if (replica.epoch_id != replStorageState.epoch_.id() && replica.current_commit_timestamp != kTimestampInitialId) {
spdlog::trace(
"REPLICA: epoch UUID: {} and last_commit_timestamp: {}; MAIN: epoch UUID {} and last_commit_timestamp {}",
std::string(replica.epoch_id), replica.current_commit_timestamp, std::string(replStorageState.epoch_.id()),
replStorageState.last_commit_timestamp_);
auto const &history = replStorageState.history; auto const &history = replStorageState.history;
const auto epoch_info_iter = std::find_if(history.crbegin(), history.crend(), [&](const auto &main_epoch_info) { const auto epoch_info_iter = std::find_if(history.crbegin(), history.crend(), [&](const auto &main_epoch_info) {
return main_epoch_info.first == replica.epoch_id; return main_epoch_info.first == replica.epoch_id;
}); });
// main didn't have that epoch, but why is here branching point // main didn't have that epoch, but why is here branching point
if (epoch_info_iter == history.crend()) { if (epoch_info_iter == history.crend()) {
spdlog::info("Couldn't find epoch {} in MAIN, setting branching point", std::string(replica.epoch_id));
branching_point = 0; branching_point = 0;
} else if (epoch_info_iter->second != replica.current_commit_timestamp) { } else if (epoch_info_iter->second < replica.current_commit_timestamp) {
spdlog::info("Found epoch {} on MAIN with last_commit_timestamp {}, REPLICA's last_commit_timestamp {}",
std::string(epoch_info_iter->first), epoch_info_iter->second, replica.current_commit_timestamp);
branching_point = epoch_info_iter->second; branching_point = epoch_info_iter->second;
} }
} }
@ -192,9 +201,6 @@ void ReplicationStorageClient::StartTransactionReplication(const uint64_t curren
} }
} }
//////// AF: you can't finialize transaction replication if you are not replicating
/////// AF: if there is no stream or it is Defunct than we need to set replica in MAYBE_BEHIND -> is that even used
/////// AF:
bool ReplicationStorageClient::FinalizeTransactionReplication(Storage *storage, DatabaseAccessProtector db_acc) { bool ReplicationStorageClient::FinalizeTransactionReplication(Storage *storage, DatabaseAccessProtector db_acc) {
// We can only check the state because it guarantees to be only // We can only check the state because it guarantees to be only
// valid during a single transaction replication (if the assumption // valid during a single transaction replication (if the assumption
@ -259,6 +265,8 @@ void ReplicationStorageClient::RecoverReplica(uint64_t replica_commit, memgraph:
spdlog::debug("Starting replica recovery"); spdlog::debug("Starting replica recovery");
auto *mem_storage = static_cast<InMemoryStorage *>(storage); auto *mem_storage = static_cast<InMemoryStorage *>(storage);
// TODO(antoniofilipovic): Can we get stuck here in while loop if replica commit timestamp is not updated when using
// only snapshot
while (true) { while (true) {
auto file_locker = mem_storage->file_retainer_.AddLocker(); auto file_locker = mem_storage->file_retainer_.AddLocker();

View File

@ -37,6 +37,9 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
"TRACE", "TRACE",
"--coordinator-server-port", "--coordinator-server-port",
"10011", "10011",
"--replication-restore-state-on-startup=true",
"--storage-recover-on-startup=false",
"--data-recovery-on-startup=false",
], ],
"log_file": "instance_1.log", "log_file": "instance_1.log",
"data_directory": f"{TEMP_DIR}/instance_1", "data_directory": f"{TEMP_DIR}/instance_1",
@ -51,6 +54,9 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
"TRACE", "TRACE",
"--coordinator-server-port", "--coordinator-server-port",
"10012", "10012",
"--replication-restore-state-on-startup=true",
"--storage-recover-on-startup=false",
"--data-recovery-on-startup=false",
], ],
"log_file": "instance_2.log", "log_file": "instance_2.log",
"data_directory": f"{TEMP_DIR}/instance_2", "data_directory": f"{TEMP_DIR}/instance_2",
@ -65,6 +71,9 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
"TRACE", "TRACE",
"--coordinator-server-port", "--coordinator-server-port",
"10013", "10013",
"--replication-restore-state-on-startup=true",
"--storage-recover-on-startup=false",
"--data-recovery-on-startup=false",
], ],
"log_file": "instance_3.log", "log_file": "instance_3.log",
"data_directory": f"{TEMP_DIR}/instance_3", "data_directory": f"{TEMP_DIR}/instance_3",
@ -90,14 +99,794 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
} }
def test_replication_works_on_failover(): @pytest.mark.parametrize("data_recovery", ["false", "true"])
def test_replication_works_on_failover_replica_1_epoch_2_commits_away(data_recovery):
# Goal of this test is to check the replication works after failover command. # Goal of this test is to check the replication works after failover command.
# 1. We start all replicas, main and coordinator manually: we want to be able to kill them ourselves without relying on external tooling to kill processes. # 1. We start all replicas, main and coordinator manually
# 2. We check that main has correct state
# 3. Create initial data on MAIN
# 4. Expect data to be copied on all replicas
# 5. Kill instance_1 (replica 1)
# 6. Create data on MAIN and expect to be copied to only one replica (instance_2)
# 7. Kill main
# 8. Instance_2 new MAIN
# 9. Create vertex on instance 2
# 10. Start instance_1(it should have one commit on old epoch and new epoch with new commit shouldn't be replicated)
# 11. Expect data to be copied on instance_1
# 12. Start old MAIN (instance_3)
# 13. Expect data to be copied to instance_3
temp_dir = tempfile.TemporaryDirectory().name
MEMGRAPH_INNER_INSTANCES_DESCRIPTION = {
"instance_1": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7688",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10011",
"--replication-restore-state-on-startup",
"true",
f"--data-recovery-on-startup={data_recovery}",
"--storage-recover-on-startup=false",
],
"log_file": "instance_1.log",
"data_directory": f"{temp_dir}/instance_1",
"setup_queries": [],
},
"instance_2": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7689",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10012",
"--replication-restore-state-on-startup",
"true",
f"--data-recovery-on-startup={data_recovery}",
"--storage-recover-on-startup=false",
],
"log_file": "instance_2.log",
"data_directory": f"{temp_dir}/instance_2",
"setup_queries": [],
},
"instance_3": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7687",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10013",
"--replication-restore-state-on-startup",
"true",
"--data-recovery-on-startup",
f"{data_recovery}",
"--storage-recover-on-startup=false",
],
"log_file": "instance_3.log",
"data_directory": f"{temp_dir}/instance_3",
"setup_queries": [],
},
"coordinator": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7690",
"--log-level=TRACE",
"--raft-server-id=1",
"--raft-server-port=10111",
],
"log_file": "coordinator.log",
"setup_queries": [
"REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001';",
"REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002';",
"REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003';",
"SET INSTANCE instance_3 TO MAIN",
],
},
}
# 1
interactive_mg_runner.start_all(MEMGRAPH_INNER_INSTANCES_DESCRIPTION)
# 2
main_cursor = connect(host="localhost", port=7687).cursor()
def retrieve_data_show_replicas():
return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
expected_data_on_main = [
(
"instance_1",
"127.0.0.1:10001",
"sync",
{"behind": None, "status": "ready", "ts": 0},
{"memgraph": {"behind": 0, "status": "ready", "ts": 0}},
),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"behind": None, "status": "ready", "ts": 0},
{"memgraph": {"behind": 0, "status": "ready", "ts": 0}},
),
]
mg_sleep_and_assert_collection(expected_data_on_main, retrieve_data_show_replicas)
# 3
execute_and_fetch_all(main_cursor, "CREATE (:EpochVertex1 {prop:1});")
# 4
instance_1_cursor = connect(host="localhost", port=7688).cursor()
instance_2_cursor = connect(host="localhost", port=7689).cursor()
assert execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n);")[0][0] == 1
assert execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n);")[0][0] == 1
# 5
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_1")
# 6
with pytest.raises(Exception) as e:
execute_and_fetch_all(main_cursor, "CREATE (:EpochVertex1 {prop:2});")
assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value)
assert execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2
# 7
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_3")
# 8.
coord_cursor = connect(host="localhost", port=7690).cursor()
def retrieve_data_show_instances():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;")))
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", False, "unknown"),
("instance_2", "", "127.0.0.1:10012", True, "main"),
("instance_3", "", "127.0.0.1:10013", False, "unknown"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
# 9
with pytest.raises(Exception) as e:
execute_and_fetch_all(instance_2_cursor, "CREATE (:Epoch3 {prop:3});")
assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value)
# 10
interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_1")
new_expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "main"),
("instance_3", "", "127.0.0.1:10013", False, "unknown"),
]
mg_sleep_and_assert(new_expected_data_on_coord, retrieve_data_show_instances)
# 11
instance_1_cursor = connect(host="localhost", port=7688).cursor()
def get_vertex_count():
return execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n)")[0][0]
mg_sleep_and_assert(3, get_vertex_count)
# 12
interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_3")
new_expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "main"),
("instance_3", "", "127.0.0.1:10013", True, "replica"),
]
mg_sleep_and_assert(new_expected_data_on_coord, retrieve_data_show_instances)
# 13
instance_3_cursor = connect(host="localhost", port=7687).cursor()
def get_vertex_count():
return execute_and_fetch_all(instance_3_cursor, "MATCH (n) RETURN count(n)")[0][0]
mg_sleep_and_assert(3, get_vertex_count)
@pytest.mark.parametrize("data_recovery", ["false", "true"])
def test_replication_works_on_failover_replica_2_epochs_more_commits_away(data_recovery):
# Goal of this test is to check the replication works after failover command if one
# instance missed couple of epochs but data is still available on one of the instances
# 1. We start all replicas, main and coordinator manually
# 2. Main does commit
# 3. instance_2 down
# 4. Main commits more
# 5. Main down
# 6. Instance_1 new main
# 7. Instance 1 commits
# 8. Instance 4 gets data
# 9. Instance 1 dies
# 10. Instance 4 new main
# 11. Instance 4 commits
# 12. Instance 2 wakes up
# 13. Instance 2 gets data from old epochs
# 14. All other instances wake up
# 15. Everything is replicated
temp_dir = tempfile.TemporaryDirectory().name
MEMGRAPH_INNER_INSTANCES_DESCRIPTION = {
"instance_1": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7688",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10011",
"--replication-restore-state-on-startup",
"true",
f"--data-recovery-on-startup={data_recovery}",
"--storage-recover-on-startup=false",
],
"log_file": "instance_1.log",
"data_directory": f"{temp_dir}/instance_1",
"setup_queries": [],
},
"instance_2": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7689",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10012",
"--replication-restore-state-on-startup",
"true",
f"--data-recovery-on-startup={data_recovery}",
"--storage-recover-on-startup=false",
],
"log_file": "instance_2.log",
"data_directory": f"{temp_dir}/instance_2",
"setup_queries": [],
},
"instance_3": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7687",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10013",
"--replication-restore-state-on-startup",
"true",
"--data-recovery-on-startup",
f"{data_recovery}",
"--storage-recover-on-startup=false",
],
"log_file": "instance_3.log",
"data_directory": f"{temp_dir}/instance_3",
"setup_queries": [],
},
"instance_4": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7691",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10014",
"--replication-restore-state-on-startup",
"true",
"--data-recovery-on-startup",
f"{data_recovery}",
"--storage-recover-on-startup=false",
],
"log_file": "instance_4.log",
"data_directory": f"{temp_dir}/instance_4",
"setup_queries": [],
},
"coordinator": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7690",
"--log-level=TRACE",
"--raft-server-id=1",
"--raft-server-port=10111",
],
"log_file": "coordinator.log",
"setup_queries": [
"REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001';",
"REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002';",
"REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003';",
"REGISTER INSTANCE instance_4 ON '127.0.0.1:10014' WITH '127.0.0.1:10004';",
"SET INSTANCE instance_3 TO MAIN",
],
},
}
# 1
interactive_mg_runner.start_all(MEMGRAPH_INNER_INSTANCES_DESCRIPTION)
expected_data_on_main = [
(
"instance_1",
"127.0.0.1:10001",
"sync",
{"behind": None, "status": "ready", "ts": 0},
{"memgraph": {"behind": 0, "status": "ready", "ts": 0}},
),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"behind": None, "status": "ready", "ts": 0},
{"memgraph": {"behind": 0, "status": "ready", "ts": 0}},
),
(
"instance_4",
"127.0.0.1:10004",
"sync",
{"behind": None, "status": "ready", "ts": 0},
{"memgraph": {"behind": 0, "status": "ready", "ts": 0}},
),
]
main_cursor = connect(host="localhost", port=7687).cursor()
def retrieve_data_show_replicas():
return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
mg_sleep_and_assert_collection(expected_data_on_main, retrieve_data_show_replicas)
# 2
execute_and_fetch_all(main_cursor, "CREATE (:EpochVertex1 {prop:1});")
execute_and_fetch_all(main_cursor, "CREATE (:EpochVertex1 {prop:2});")
instance_1_cursor = connect(host="localhost", port=7688).cursor()
instance_2_cursor = connect(host="localhost", port=7689).cursor()
instance_4_cursor = connect(host="localhost", port=7691).cursor()
assert execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2
assert execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2
assert execute_and_fetch_all(instance_4_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2
# 3
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_2")
coord_cursor = connect(host="localhost", port=7690).cursor()
def retrieve_data_show_instances():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;")))
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", False, "unknown"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
("instance_4", "", "127.0.0.1:10014", True, "replica"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
# 4
with pytest.raises(Exception) as e:
execute_and_fetch_all(main_cursor, "CREATE (:EpochVertex1 {prop:1});")
assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value)
assert execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n);")[0][0] == 3
assert execute_and_fetch_all(instance_4_cursor, "MATCH (n) RETURN count(n);")[0][0] == 3
# 5
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_3")
# 6
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "main"),
("instance_2", "", "127.0.0.1:10012", False, "unknown"),
("instance_3", "", "127.0.0.1:10013", False, "unknown"),
("instance_4", "", "127.0.0.1:10014", True, "replica"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
# 7
with pytest.raises(Exception) as e:
execute_and_fetch_all(instance_1_cursor, "CREATE (:Epoch2Vertex {prop:1});")
assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value)
# 8
assert execute_and_fetch_all(instance_4_cursor, "MATCH (n) RETURN count(n);")[0][0] == 4
# 9
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_1")
# 10
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", False, "unknown"),
("instance_2", "", "127.0.0.1:10012", False, "unknown"),
("instance_3", "", "127.0.0.1:10013", False, "unknown"),
("instance_4", "", "127.0.0.1:10014", True, "main"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
# 11
with pytest.raises(Exception) as e:
execute_and_fetch_all(instance_4_cursor, "CREATE (:Epoch3Vertex {prop:1});")
assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value)
# 12
interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_2")
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", False, "unknown"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", False, "unknown"),
("instance_4", "", "127.0.0.1:10014", True, "main"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
# 13
instance_2_cursor = connect(host="localhost", port=7689).cursor()
def get_vertex_count():
return execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n)")[0][0]
mg_sleep_and_assert(5, get_vertex_count)
# 14
interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_1")
interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_3")
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "replica"),
("instance_4", "", "127.0.0.1:10014", True, "main"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
# 15
instance_1_cursor = connect(host="localhost", port=7688).cursor()
instance_4_cursor = connect(host="localhost", port=7691).cursor()
def get_vertex_count():
return execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n)")[0][0]
mg_sleep_and_assert(5, get_vertex_count)
def get_vertex_count():
return execute_and_fetch_all(instance_4_cursor, "MATCH (n) RETURN count(n)")[0][0]
mg_sleep_and_assert(5, get_vertex_count)
@pytest.mark.parametrize("data_recovery", ["false"])
def test_replication_forcefully_works_on_failover_replica_misses_epoch(data_recovery):
# TODO(antoniofilipovic) Test should pass when logic is added
# Goal of this test is to check the replication works forcefully if replica misses epoch
# 1. We start all replicas, main and coordinator manually
# 2. We check that main has correct state
# 3. Create initial data on MAIN
# 4. Expect data to be copied on all replicas
# 5. Kill instance_1 ( this one will miss complete epoch)
# 6. Kill main (instance_3)
# 7. Instance_2 or instance_4 new main
# 8. New main commits
# 9. Instance_2 down (not main)
# 10. instance_4 down
# 11. Instance 1 up (missed epoch)
# 12 Instance 1 new main
# 13 instance 2 up
# 14 Force data from instance 1 to instance 2
temp_dir = tempfile.TemporaryDirectory().name
pass
@pytest.mark.parametrize("data_recovery", ["false", "true"])
def test_replication_correct_replica_chosen_up_to_date_data(data_recovery):
# Goal of this test is to check that correct replica instance as new MAIN is chosen
# 1. We start all replicas, main and coordinator manually
# 2. We check that main has correct state
# 3. Create initial data on MAIN
# 4. Expect data to be copied on all replicas
# 5. Kill instance_1 ( this one will miss complete epoch)
# 6. Kill main (instance_3)
# 7. Instance_2 new MAIN
# 8. Instance_2 commits and replicates data
# 9. Instance_4 down (not main)
# 10. instance_2 down (MAIN), instance 1 up (missed epoch),
# instance 4 up (In this case we should always choose instance_4 because it has up-to-date data)
# 11 Instance 4 new main
# 12 instance_1 gets up-to-date data, instance_4 has all data
temp_dir = tempfile.TemporaryDirectory().name
MEMGRAPH_INNER_INSTANCES_DESCRIPTION = {
"instance_1": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7688",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10011",
"--replication-restore-state-on-startup",
"true",
f"--data-recovery-on-startup={data_recovery}",
"--storage-recover-on-startup=false",
],
"log_file": "instance_1.log",
"data_directory": f"{temp_dir}/instance_1",
"setup_queries": [],
},
"instance_2": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7689",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10012",
"--replication-restore-state-on-startup",
"true",
f"--data-recovery-on-startup={data_recovery}",
"--storage-recover-on-startup=false",
],
"log_file": "instance_2.log",
"data_directory": f"{temp_dir}/instance_2",
"setup_queries": [],
},
"instance_3": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7687",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10013",
"--replication-restore-state-on-startup",
"true",
"--data-recovery-on-startup",
f"{data_recovery}",
"--storage-recover-on-startup=false",
],
"log_file": "instance_3.log",
"data_directory": f"{temp_dir}/instance_3",
"setup_queries": [],
},
"instance_4": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7691",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10014",
"--replication-restore-state-on-startup",
"true",
"--data-recovery-on-startup",
f"{data_recovery}",
"--storage-recover-on-startup=false",
],
"log_file": "instance_4.log",
"data_directory": f"{temp_dir}/instance_4",
"setup_queries": [],
},
"coordinator": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7690",
"--log-level=TRACE",
"--raft-server-id=1",
"--raft-server-port=10111",
],
"log_file": "coordinator.log",
"setup_queries": [
"REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001';",
"REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002';",
"REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003';",
"REGISTER INSTANCE instance_4 ON '127.0.0.1:10014' WITH '127.0.0.1:10004';",
"SET INSTANCE instance_3 TO MAIN",
],
},
}
# 1
interactive_mg_runner.start_all(MEMGRAPH_INNER_INSTANCES_DESCRIPTION)
# 2
main_cursor = connect(host="localhost", port=7687).cursor()
expected_data_on_main = [
(
"instance_1",
"127.0.0.1:10001",
"sync",
{"behind": None, "status": "ready", "ts": 0},
{"memgraph": {"behind": 0, "status": "ready", "ts": 0}},
),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"behind": None, "status": "ready", "ts": 0},
{"memgraph": {"behind": 0, "status": "ready", "ts": 0}},
),
(
"instance_4",
"127.0.0.1:10004",
"sync",
{"behind": None, "status": "ready", "ts": 0},
{"memgraph": {"behind": 0, "status": "ready", "ts": 0}},
),
]
main_cursor = connect(host="localhost", port=7687).cursor()
def retrieve_data_show_replicas():
return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
mg_sleep_and_assert_collection(expected_data_on_main, retrieve_data_show_replicas)
coord_cursor = connect(host="localhost", port=7690).cursor()
def retrieve_data_show_instances():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;")))
# TODO(antoniofilipovic) Before fixing durability, if this is removed we also have an issue. Check after fix
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
("instance_4", "", "127.0.0.1:10014", True, "replica"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
# 3
execute_and_fetch_all(main_cursor, "CREATE (:Epoch1Vertex {prop:1});")
execute_and_fetch_all(main_cursor, "CREATE (:Epoch1Vertex {prop:2});")
# 4
instance_1_cursor = connect(host="localhost", port=7688).cursor()
instance_2_cursor = connect(host="localhost", port=7689).cursor()
instance_4_cursor = connect(host="localhost", port=7691).cursor()
assert execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2
assert execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2
assert execute_and_fetch_all(instance_4_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2
# 5
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_1")
# 6
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_3")
# 7
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", False, "unknown"),
("instance_2", "", "127.0.0.1:10012", True, "main"),
("instance_3", "", "127.0.0.1:10013", False, "unknown"),
("instance_4", "", "127.0.0.1:10014", True, "replica"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
# 8
with pytest.raises(Exception) as e:
execute_and_fetch_all(instance_2_cursor, "CREATE (:Epoch2Vertex {prop:1});")
assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value)
def get_vertex_count():
return execute_and_fetch_all(instance_4_cursor, "MATCH (n) RETURN count(n)")[0][0]
mg_sleep_and_assert(3, get_vertex_count)
assert execute_and_fetch_all(instance_4_cursor, "MATCH (n) RETURN count(n);")[0][0] == 3
# 9
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_4")
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", False, "unknown"),
("instance_2", "", "127.0.0.1:10012", True, "main"),
("instance_3", "", "127.0.0.1:10013", False, "unknown"),
("instance_4", "", "127.0.0.1:10014", False, "unknown"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
# 10
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_2")
interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_1")
interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_4")
# 11
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", False, "unknown"),
("instance_3", "", "127.0.0.1:10013", False, "unknown"),
("instance_4", "", "127.0.0.1:10014", True, "main"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
# 12
instance_1_cursor = connect(host="localhost", port=7688).cursor()
instance_4_cursor = connect(host="localhost", port=7691).cursor()
def get_vertex_count():
return execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n)")[0][0]
mg_sleep_and_assert(3, get_vertex_count)
def get_vertex_count():
return execute_and_fetch_all(instance_4_cursor, "MATCH (n) RETURN count(n)")[0][0]
mg_sleep_and_assert(3, get_vertex_count)
def test_replication_works_on_failover_simple():
# Goal of this test is to check the replication works after failover command.
# 1. We start all replicas, main and coordinator manually
# 2. We check that main has correct state # 2. We check that main has correct state
# 3. We kill main # 3. We kill main
# 4. We check that coordinator and new main have correct state # 4. We check that coordinator and new main have correct state
# 5. We insert one vertex on new main # 5. We insert one vertex on new main
# 6. We check that vertex appears on new replica # 6. We check that vertex appears on new replica
# 7. We bring back main up
# 8. Expect data to be copied to main
safe_execute(shutil.rmtree, TEMP_DIR) safe_execute(shutil.rmtree, TEMP_DIR)
# 1 # 1
@ -164,33 +953,48 @@ def test_replication_works_on_failover():
] ]
mg_sleep_and_assert_collection(expected_data_on_new_main, retrieve_data_show_replicas) mg_sleep_and_assert_collection(expected_data_on_new_main, retrieve_data_show_replicas)
# 5
with pytest.raises(Exception) as e:
execute_and_fetch_all(new_main_cursor, "CREATE ();")
assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value)
# 6
alive_replica_cursor = connect(host="localhost", port=7689).cursor()
res = execute_and_fetch_all(alive_replica_cursor, "MATCH (n) RETURN count(n) as count;")[0][0]
assert res == 1, "Vertex should be replicated"
# 7
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3") interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
def retrieve_data_show_replicas():
return sorted(list(execute_and_fetch_all(new_main_cursor, "SHOW REPLICAS;")))
new_main_cursor = connect(host="localhost", port=7688).cursor()
expected_data_on_new_main = [ expected_data_on_new_main = [
( (
"instance_2", "instance_2",
"127.0.0.1:10002", "127.0.0.1:10002",
"sync", "sync",
{"ts": 0, "behind": None, "status": "ready"}, {"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}}, {"memgraph": {"ts": 2, "behind": 0, "status": "ready"}},
), ),
( (
"instance_3", "instance_3",
"127.0.0.1:10003", "127.0.0.1:10003",
"sync", "sync",
{"ts": 0, "behind": None, "status": "ready"}, {"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}}, {"memgraph": {"ts": 2, "behind": 0, "status": "ready"}},
), ),
] ]
mg_sleep_and_assert_collection(expected_data_on_new_main, retrieve_data_show_replicas) mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas)
# 5 # 8
execute_and_fetch_all(new_main_cursor, "CREATE ();") alive_main = connect(host="localhost", port=7687).cursor()
# 6 def retrieve_vertices_count():
alive_replica_cursror = connect(host="localhost", port=7689).cursor() return execute_and_fetch_all(alive_main, "MATCH (n) RETURN count(n) as count;")[0][0]
res = execute_and_fetch_all(alive_replica_cursror, "MATCH (n) RETURN count(n) as count;")[0][0]
assert res == 1, "Vertex should be replicated" mg_sleep_and_assert(1, retrieve_vertices_count)
interactive_mg_runner.stop_all(MEMGRAPH_INSTANCES_DESCRIPTION)
def test_replication_works_on_replica_instance_restart(): def test_replication_works_on_replica_instance_restart():

View File

@ -923,7 +923,7 @@ def test_replication_role_recovery(connection):
"--log-level=TRACE", "--log-level=TRACE",
"--replication-restore-state-on-startup", "--replication-restore-state-on-startup",
"true", "true",
"--storage-recover-on-startup", "--data-recovery-on-startup",
"false", "false",
], ],
"log_file": "replica.log", "log_file": "replica.log",
@ -934,7 +934,7 @@ def test_replication_role_recovery(connection):
"--bolt-port", "--bolt-port",
"7687", "7687",
"--log-level=TRACE", "--log-level=TRACE",
"--storage-recover-on-startup=true", "--data-recovery-on-startup=true",
"--replication-restore-state-on-startup=true", "--replication-restore-state-on-startup=true",
], ],
"log_file": "main.log", "log_file": "main.log",
@ -1105,7 +1105,7 @@ def test_basic_recovery_when_replica_is_kill_when_main_is_down():
"--bolt-port", "--bolt-port",
"7687", "7687",
"--log-level=TRACE", "--log-level=TRACE",
"--storage-recover-on-startup=true", "--data-recovery-on-startup=true",
"--replication-restore-state-on-startup=true", "--replication-restore-state-on-startup=true",
], ],
"log_file": "main.log", "log_file": "main.log",
@ -1201,7 +1201,7 @@ def test_async_replication_when_main_is_killed():
"data_directory": f"{data_directory_replica.name}", "data_directory": f"{data_directory_replica.name}",
}, },
"main": { "main": {
"args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], "args": ["--bolt-port", "7687", "--log-level=TRACE", "--data-recovery-on-startup=true"],
"log_file": "main.log", "log_file": "main.log",
"setup_queries": [], "setup_queries": [],
"data_directory": f"{data_directory_main.name}", "data_directory": f"{data_directory_main.name}",
@ -1284,7 +1284,7 @@ def test_sync_replication_when_main_is_killed():
"data_directory": f"{data_directory_replica.name}", "data_directory": f"{data_directory_replica.name}",
}, },
"main": { "main": {
"args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], "args": ["--bolt-port", "7687", "--log-level=TRACE", "--data-recovery-on-startup=true"],
"log_file": "main.log", "log_file": "main.log",
"setup_queries": [], "setup_queries": [],
"data_directory": f"{data_directory_main.name}", "data_directory": f"{data_directory_main.name}",
@ -1340,7 +1340,7 @@ def test_attempt_to_write_data_on_main_when_async_replica_is_down():
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"], "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"],
}, },
"main": { "main": {
"args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], "args": ["--bolt-port", "7687", "--log-level=TRACE", "--data-recovery-on-startup=true"],
"log_file": "main.log", "log_file": "main.log",
"setup_queries": [ "setup_queries": [
"REGISTER REPLICA async_replica1 ASYNC TO '127.0.0.1:10001';", "REGISTER REPLICA async_replica1 ASYNC TO '127.0.0.1:10001';",
@ -1443,7 +1443,7 @@ def test_attempt_to_write_data_on_main_when_sync_replica_is_down(connection):
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"], "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"],
}, },
"main": { "main": {
"args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup", "true"], "args": ["--bolt-port", "7687", "--log-level=TRACE", "--data-recovery-on-startup", "true"],
"log_file": "main.log", "log_file": "main.log",
# need to do it manually # need to do it manually
"setup_queries": [], "setup_queries": [],
@ -1572,7 +1572,7 @@ def test_attempt_to_create_indexes_on_main_when_async_replica_is_down():
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"], "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"],
}, },
"main": { "main": {
"args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], "args": ["--bolt-port", "7687", "--log-level=TRACE", "--data-recovery-on-startup=true"],
"log_file": "main.log", "log_file": "main.log",
"setup_queries": [ "setup_queries": [
"REGISTER REPLICA async_replica1 ASYNC TO '127.0.0.1:10001';", "REGISTER REPLICA async_replica1 ASYNC TO '127.0.0.1:10001';",
@ -1673,7 +1673,7 @@ def test_attempt_to_create_indexes_on_main_when_sync_replica_is_down(connection)
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"], "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"],
}, },
"main": { "main": {
"args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], "args": ["--bolt-port", "7687", "--log-level=TRACE", "--data-recovery-on-startup=true"],
"log_file": "main.log", "log_file": "main.log",
# Need to do it manually # Need to do it manually
"setup_queries": [], "setup_queries": [],
@ -1818,7 +1818,7 @@ def test_trigger_on_create_before_commit_with_offline_sync_replica(connection):
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"], "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"],
}, },
"main": { "main": {
"args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], "args": ["--bolt-port", "7687", "--log-level=TRACE", "--data-recovery-on-startup=true"],
"log_file": "main.log", "log_file": "main.log",
# Need to do it manually since we kill replica # Need to do it manually since we kill replica
"setup_queries": [], "setup_queries": [],
@ -1937,7 +1937,7 @@ def test_trigger_on_update_before_commit_with_offline_sync_replica(connection):
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"], "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"],
}, },
"main": { "main": {
"args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], "args": ["--bolt-port", "7687", "--log-level=TRACE", "--data-recovery-on-startup=true"],
"log_file": "main.log", "log_file": "main.log",
"setup_queries": [], "setup_queries": [],
}, },
@ -2060,7 +2060,7 @@ def test_trigger_on_delete_before_commit_with_offline_sync_replica(connection):
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"], "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"],
}, },
"main": { "main": {
"args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], "args": ["--bolt-port", "7687", "--log-level=TRACE", "--data-recovery-on-startup=true"],
"log_file": "main.log", "log_file": "main.log",
"setup_queries": [], "setup_queries": [],
}, },
@ -2187,7 +2187,7 @@ def test_trigger_on_create_before_and_after_commit_with_offline_sync_replica(con
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"], "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"],
}, },
"main": { "main": {
"args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], "args": ["--bolt-port", "7687", "--log-level=TRACE", "--data-recovery-on-startup=true"],
"log_file": "main.log", "log_file": "main.log",
"setup_queries": [], "setup_queries": [],
}, },
@ -2310,7 +2310,7 @@ def test_triggers_on_create_before_commit_with_offline_sync_replica(connection):
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"], "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"],
}, },
"main": { "main": {
"args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], "args": ["--bolt-port", "7687", "--log-level=TRACE", "--data-recovery-on-startup=true"],
"log_file": "main.log", "log_file": "main.log",
"setup_queries": [], "setup_queries": [],
}, },

View File

@ -67,8 +67,8 @@ TEST_F(CoordinationUtils, MemgraphDbHistorySimple) {
auto [instance_name, latest_epoch, latest_commit_timestamp] = auto [instance_name, latest_epoch, latest_commit_timestamp] =
instance.ChooseMostUpToDateInstance(instance_database_histories); instance.ChooseMostUpToDateInstance(instance_database_histories);
ASSERT_TRUE(instance_name == "instance_1" || instance_name == "instance_2" || instance_name == "instance_3"); ASSERT_TRUE(instance_name == "instance_1" || instance_name == "instance_2" || instance_name == "instance_3");
ASSERT_TRUE(*latest_epoch == db_histories.back().first); ASSERT_TRUE(latest_epoch == db_histories.back().first);
ASSERT_TRUE(*latest_commit_timestamp == db_histories.back().second); ASSERT_TRUE(latest_commit_timestamp == db_histories.back().second);
} }
TEST_F(CoordinationUtils, MemgraphDbHistoryLastEpochDifferent) { TEST_F(CoordinationUtils, MemgraphDbHistoryLastEpochDifferent) {
@ -121,8 +121,8 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryLastEpochDifferent) {
instance.ChooseMostUpToDateInstance(instance_database_histories); instance.ChooseMostUpToDateInstance(instance_database_histories);
ASSERT_TRUE(instance_name == "instance_3"); ASSERT_TRUE(instance_name == "instance_3");
ASSERT_TRUE(*latest_epoch == db_histories.back().first); ASSERT_TRUE(latest_epoch == db_histories.back().first);
ASSERT_TRUE(*latest_commit_timestamp == db_histories.back().second); ASSERT_TRUE(latest_commit_timestamp == db_histories.back().second);
} }
TEST_F(CoordinationUtils, MemgraphDbHistoryOneInstanceAheadFewEpochs) { TEST_F(CoordinationUtils, MemgraphDbHistoryOneInstanceAheadFewEpochs) {
@ -179,8 +179,8 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryOneInstanceAheadFewEpochs) {
instance.ChooseMostUpToDateInstance(instance_database_histories); instance.ChooseMostUpToDateInstance(instance_database_histories);
ASSERT_TRUE(instance_name == "instance_3"); ASSERT_TRUE(instance_name == "instance_3");
ASSERT_TRUE(*latest_epoch == db_histories_longest.back().first); ASSERT_TRUE(latest_epoch == db_histories_longest.back().first);
ASSERT_TRUE(*latest_commit_timestamp == db_histories_longest.back().second); ASSERT_TRUE(latest_commit_timestamp == db_histories_longest.back().second);
} }
TEST_F(CoordinationUtils, MemgraphDbHistoryInstancesHistoryDiverged) { TEST_F(CoordinationUtils, MemgraphDbHistoryInstancesHistoryDiverged) {
@ -241,6 +241,6 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryInstancesHistoryDiverged) {
instance.ChooseMostUpToDateInstance(instance_database_histories); instance.ChooseMostUpToDateInstance(instance_database_histories);
ASSERT_TRUE(instance_name == "instance_3"); ASSERT_TRUE(instance_name == "instance_3");
ASSERT_TRUE(*latest_epoch == std::string(newest_different_epoch)); ASSERT_TRUE(latest_epoch == std::string(newest_different_epoch));
ASSERT_TRUE(*latest_commit_timestamp == oldest_commit_timestamp); ASSERT_TRUE(latest_commit_timestamp == oldest_commit_timestamp);
} }