Add force sync REPLICA with MAIN (#1777)

This commit is contained in:
Antonio Filipovic 2024-03-05 17:51:14 +01:00 committed by GitHub
parent 1802dc93d1
commit d4d4660af0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 525 additions and 66 deletions

View File

@ -100,8 +100,7 @@ auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> {
.health = "unknown"}; // TODO: (andi) Get this info from RAFT and test it or when we will move
// CoordinatorState to every instance, we can be smarter about this using our RPC.
};
auto instances_status = utils::fmap(coord_instance_to_status, raft_state_.GetAllCoordinators());
auto instances_status = utils::fmap(raft_state_.GetAllCoordinators(), coord_instance_to_status);
if (raft_state_.IsLeader()) {
auto const stringify_repl_role = [this](ReplicationInstance const &instance) -> std::string {

View File

@ -118,9 +118,14 @@ void InMemoryReplicationHandlers::Register(dbms::DbmsHandler *dbms_handler, repl
});
server.rpc_server_.Register<replication_coordination_glue::SwapMainUUIDRpc>(
[&data, dbms_handler](auto *req_reader, auto *res_builder) {
spdlog::debug("Received SwapMainUUIDHandler");
spdlog::debug("Received SwapMainUUIDRpc");
InMemoryReplicationHandlers::SwapMainUUIDHandler(dbms_handler, data, req_reader, res_builder);
});
server.rpc_server_.Register<storage::replication::ForceResetStorageRpc>(
[&data, dbms_handler](auto *req_reader, auto *res_builder) {
spdlog::debug("Received ForceResetStorageRpc");
InMemoryReplicationHandlers::ForceResetStorageHandler(dbms_handler, data.uuid_, req_reader, res_builder);
});
}
void InMemoryReplicationHandlers::SwapMainUUIDHandler(dbms::DbmsHandler *dbms_handler,
@ -329,6 +334,78 @@ void InMemoryReplicationHandlers::SnapshotHandler(dbms::DbmsHandler *dbms_handle
spdlog::debug("Replication recovery from snapshot finished!");
}
void InMemoryReplicationHandlers::ForceResetStorageHandler(dbms::DbmsHandler *dbms_handler,
const std::optional<utils::UUID> &current_main_uuid,
slk::Reader *req_reader, slk::Builder *res_builder) {
storage::replication::ForceResetStorageReq req;
slk::Load(&req, req_reader);
auto db_acc = GetDatabaseAccessor(dbms_handler, req.db_uuid);
if (!db_acc) {
storage::replication::ForceResetStorageRes res{false, 0};
slk::Save(res, res_builder);
return;
}
if (!current_main_uuid.has_value() || req.main_uuid != current_main_uuid) [[unlikely]] {
LogWrongMain(current_main_uuid, req.main_uuid, storage::replication::SnapshotReq::kType.name);
storage::replication::ForceResetStorageRes res{false, 0};
slk::Save(res, res_builder);
return;
}
storage::replication::Decoder decoder(req_reader);
auto *storage = static_cast<storage::InMemoryStorage *>(db_acc->get()->storage());
auto storage_guard = std::unique_lock{storage->main_lock_};
// Clear the database
storage->vertices_.clear();
storage->edges_.clear();
storage->commit_log_.reset();
storage->commit_log_.emplace();
storage->constraints_.existence_constraints_ = std::make_unique<storage::ExistenceConstraints>();
storage->constraints_.unique_constraints_ = std::make_unique<storage::InMemoryUniqueConstraints>();
storage->indices_.label_index_ = std::make_unique<storage::InMemoryLabelIndex>();
storage->indices_.label_property_index_ = std::make_unique<storage::InMemoryLabelPropertyIndex>();
// Fine since we will force push when reading from WAL just random epoch with 0 timestamp, as it should be if it
// acted as MAIN before
storage->repl_storage_state_.epoch_.SetEpoch(std::string(utils::UUID{}));
storage->repl_storage_state_.last_commit_timestamp_ = 0;
storage->repl_storage_state_.history.clear();
storage->vertex_id_ = 0;
storage->edge_id_ = 0;
storage->timestamp_ = storage::kTimestampInitialId;
storage->CollectGarbage<true>(std::move(storage_guard), false);
storage->vertices_.run_gc();
storage->edges_.run_gc();
storage::replication::ForceResetStorageRes res{true, storage->repl_storage_state_.last_commit_timestamp_.load()};
slk::Save(res, res_builder);
spdlog::trace("Deleting old snapshot files.");
// Delete other durability files
auto snapshot_files = storage::durability::GetSnapshotFiles(storage->recovery_.snapshot_directory_, storage->uuid_);
for (const auto &[path, uuid, _] : snapshot_files) {
spdlog::trace("Deleting snapshot file {}", path);
storage->file_retainer_.DeleteFile(path);
}
spdlog::trace("Deleting old WAL files.");
auto wal_files = storage::durability::GetWalFiles(storage->recovery_.wal_directory_, storage->uuid_);
if (wal_files) {
for (const auto &wal_file : *wal_files) {
spdlog::trace("Deleting WAL file {}", wal_file.path);
storage->file_retainer_.DeleteFile(wal_file.path);
}
storage->wal_file_.reset();
}
}
void InMemoryReplicationHandlers::WalFilesHandler(dbms::DbmsHandler *dbms_handler,
const std::optional<utils::UUID> &current_main_uuid,
slk::Reader *req_reader, slk::Builder *res_builder) {

View File

@ -48,6 +48,9 @@ class InMemoryReplicationHandlers {
static void SwapMainUUIDHandler(dbms::DbmsHandler *dbms_handler, replication::RoleReplicaData &role_replica_data,
slk::Reader *req_reader, slk::Builder *res_builder);
static void ForceResetStorageHandler(dbms::DbmsHandler *dbms_handler,
const std::optional<utils::UUID> &current_main_uuid, slk::Reader *req_reader,
slk::Builder *res_builder);
static void LoadWal(storage::InMemoryStorage *storage, storage::replication::Decoder *decoder);

View File

@ -1263,7 +1263,7 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param
TypedValue{status.coord_socket_address}, TypedValue{status.health}, TypedValue{status.cluster_role}};
};
return utils::fmap(converter, instances);
return utils::fmap(instances, converter);
};
return callback;
}

View File

@ -210,8 +210,13 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler {
auto client = std::make_unique<storage::ReplicationStorageClient>(*instance_client_ptr, main_uuid);
client->Start(storage, std::move(db_acc));
bool const success = std::invoke([state = client->State()]() {
// We force sync replicas in other situation
if (state == storage::replication::ReplicaState::DIVERGED_FROM_MAIN) {
#ifdef MG_ENTERPRISE
return FLAGS_coordinator_server_port != 0;
#else
return false;
#endif
}
return true;
});

View File

@ -271,8 +271,7 @@ auto ReplicationHandler::GetDatabasesHistories() -> replication_coordination_glu
dbms_handler_.ForEach([&results](memgraph::dbms::DatabaseAccess db_acc) {
auto &repl_storage_state = db_acc->storage()->repl_storage_state_;
std::vector<std::pair<std::string, uint64_t>> history = utils::fmap(
[](const auto &elem) { return std::make_pair(elem.first, elem.second); }, repl_storage_state.history);
std::vector<std::pair<std::string, uint64_t>> history = utils::fmap(repl_storage_state.history);
history.emplace_back(std::string(repl_storage_state.epoch_.id()), repl_storage_state.last_commit_timestamp_.load());
replication_coordination_glue::DatabaseHistory repl{

View File

@ -358,7 +358,6 @@ std::optional<RecoveryInfo> Recovery::RecoverData(std::string *uuid, Replication
spdlog::warn(utils::MessageWithLink("No snapshot or WAL file found.", "https://memgr.ph/durability"));
return std::nullopt;
}
// TODO(antoniofilipovic) What is the logic here?
std::sort(wal_files.begin(), wal_files.end());
// UUID used for durability is the UUID of the last WAL file.
// Same for the epoch id.
@ -437,17 +436,13 @@ std::optional<RecoveryInfo> Recovery::RecoverData(std::string *uuid, Replication
last_loaded_timestamp.emplace(recovery_info.next_timestamp - 1);
}
bool epoch_history_empty = epoch_history->empty();
bool epoch_not_recorded = !epoch_history_empty && epoch_history->back().first != wal_file.epoch_id;
auto last_loaded_timestamp_value = last_loaded_timestamp.value_or(0);
if (epoch_history_empty || epoch_not_recorded) {
epoch_history->emplace_back(std::string(wal_file.epoch_id), last_loaded_timestamp_value);
}
auto last_epoch_updated = !epoch_history_empty && epoch_history->back().first == wal_file.epoch_id &&
epoch_history->back().second < last_loaded_timestamp_value;
if (last_epoch_updated) {
if (epoch_history->empty() || epoch_history->back().first != wal_file.epoch_id) {
// no history or new epoch, add it
epoch_history->emplace_back(wal_file.epoch_id, last_loaded_timestamp_value);
repl_storage_state.epoch_.SetEpoch(wal_file.epoch_id);
} else if (epoch_history->back().second < last_loaded_timestamp_value) {
// existing epoch, update with newer timestamp
epoch_history->back().second = last_loaded_timestamp_value;
}
@ -469,11 +464,11 @@ std::optional<RecoveryInfo> Recovery::RecoverData(std::string *uuid, Replication
memgraph::metrics::Measure(memgraph::metrics::SnapshotRecoveryLatency_us,
std::chrono::duration_cast<std::chrono::microseconds>(timer.Elapsed()).count());
spdlog::info("Set epoch id: {} with commit timestamp {}", std::string(repl_storage_state.epoch_.id()),
repl_storage_state.last_commit_timestamp_);
spdlog::trace("Set epoch id: {} with commit timestamp {}", std::string(repl_storage_state.epoch_.id()),
repl_storage_state.last_commit_timestamp_);
std::for_each(repl_storage_state.history.begin(), repl_storage_state.history.end(), [](auto &history) {
spdlog::info("epoch id: {} with commit timestamp {}", std::string(history.first), history.second);
spdlog::trace("epoch id: {} with commit timestamp {}", std::string(history.first), history.second);
});
return recovery_info;
}

View File

@ -109,7 +109,7 @@ InMemoryStorage::InMemoryStorage(Config config)
timestamp_ = std::max(timestamp_, info->next_timestamp);
if (info->last_commit_timestamp) {
repl_storage_state_.last_commit_timestamp_ = *info->last_commit_timestamp;
spdlog::info("Recovering last commit timestamp {}", *info->last_commit_timestamp);
spdlog::trace("Recovering last commit timestamp {}", *info->last_commit_timestamp);
}
}
} else if (config_.durability.snapshot_wal_mode != Config::Durability::SnapshotWalMode::DISABLED ||

View File

@ -76,13 +76,37 @@ void ReplicationStorageClient::UpdateReplicaState(Storage *storage, DatabaseAcce
}
}
if (branching_point) {
spdlog::error(
"You cannot register Replica {} to this Main because at one point "
"Replica {} acted as the Main instance. Both the Main and Replica {} "
"now hold unique data. Please resolve data conflicts and start the "
"replication on a clean instance.",
client_.name_, client_.name_, client_.name_);
replica_state_.WithLock([](auto &val) { val = replication::ReplicaState::DIVERGED_FROM_MAIN; });
auto replica_state = replica_state_.Lock();
if (*replica_state == replication::ReplicaState::DIVERGED_FROM_MAIN) {
return;
}
*replica_state = replication::ReplicaState::DIVERGED_FROM_MAIN;
auto log_error = [client_name = client_.name_]() {
spdlog::error(
"You cannot register Replica {} to this Main because at one point "
"Replica {} acted as the Main instance. Both the Main and Replica {} "
"now hold unique data. Please resolve data conflicts and start the "
"replication on a clean instance.",
client_name, client_name, client_name);
};
#ifdef MG_ENTERPRISE
if (!FLAGS_coordinator_server_port) {
log_error();
return;
}
client_.thread_pool_.AddTask([storage, gk = std::move(db_acc), this] {
const auto [success, timestamp] = this->ForceResetStorage(storage);
if (success) {
spdlog::info("Successfully reset storage of REPLICA {} to timestamp {}.", client_.name_, timestamp);
return;
}
spdlog::error("You cannot register REPLICA {} to this MAIN because MAIN couldn't reset REPLICA's storage.",
client_.name_);
});
#else
log_error();
#endif
return;
}
@ -265,8 +289,6 @@ void ReplicationStorageClient::RecoverReplica(uint64_t replica_commit, memgraph:
spdlog::debug("Starting replica recovery");
auto *mem_storage = static_cast<InMemoryStorage *>(storage);
// TODO(antoniofilipovic): Can we get stuck here in while loop if replica commit timestamp is not updated when using
// only snapshot
while (true) {
auto file_locker = mem_storage->file_retainer_.AddLocker();
@ -335,6 +357,21 @@ void ReplicationStorageClient::RecoverReplica(uint64_t replica_commit, memgraph:
}
}
std::pair<bool, uint64_t> ReplicationStorageClient::ForceResetStorage(memgraph::storage::Storage *storage) {
utils::OnScopeExit set_to_maybe_behind{
[this]() { replica_state_.WithLock([](auto &state) { state = replication::ReplicaState::MAYBE_BEHIND; }); }};
try {
auto stream{client_.rpc_client_.Stream<replication::ForceResetStorageRpc>(main_uuid_, storage->uuid())};
const auto res = stream.AwaitResponse();
return std::pair{res.success, res.current_commit_timestamp};
} catch (const rpc::RpcFailedException &) {
spdlog::error(
utils::MessageWithLink("Couldn't ForceReset data to {}.", client_.name_, "https://memgr.ph/replication"));
}
return {false, 0};
}
////// ReplicaStream //////
ReplicaStream::ReplicaStream(Storage *storage, rpc::Client &rpc_client, const uint64_t current_seq_num,
utils::UUID main_uuid)

View File

@ -188,6 +188,13 @@ class ReplicationStorageClient {
*/
void UpdateReplicaState(Storage *storage, DatabaseAccessProtector db_acc);
/**
* @brief Forcefully reset storage to as it is when started from scratch.
*
* @param storage pointer to the storage associated with the client
*/
std::pair<bool, uint64_t> ForceResetStorage(Storage *storage);
void LogRpcFailure();
/**

View File

@ -59,6 +59,19 @@ void TimestampRes::Save(const TimestampRes &self, memgraph::slk::Builder *builde
memgraph::slk::Save(self, builder);
}
void TimestampRes::Load(TimestampRes *self, memgraph::slk::Reader *reader) { memgraph::slk::Load(self, reader); }
void ForceResetStorageReq::Save(const ForceResetStorageReq &self, memgraph::slk::Builder *builder) {
memgraph::slk::Save(self, builder);
}
void ForceResetStorageReq::Load(ForceResetStorageReq *self, memgraph::slk::Reader *reader) {
memgraph::slk::Load(self, reader);
}
void ForceResetStorageRes::Save(const ForceResetStorageRes &self, memgraph::slk::Builder *builder) {
memgraph::slk::Save(self, builder);
}
void ForceResetStorageRes::Load(ForceResetStorageRes *self, memgraph::slk::Reader *reader) {
memgraph::slk::Load(self, reader);
}
} // namespace storage::replication
constexpr utils::TypeInfo storage::replication::AppendDeltasReq::kType{utils::TypeId::REP_APPEND_DELTAS_REQ,
@ -97,6 +110,12 @@ constexpr utils::TypeInfo storage::replication::TimestampReq::kType{utils::TypeI
constexpr utils::TypeInfo storage::replication::TimestampRes::kType{utils::TypeId::REP_TIMESTAMP_RES, "TimestampRes",
nullptr};
constexpr utils::TypeInfo storage::replication::ForceResetStorageReq::kType{utils::TypeId::REP_FORCE_RESET_STORAGE_REQ,
"ForceResetStorageReq", nullptr};
constexpr utils::TypeInfo storage::replication::ForceResetStorageRes::kType{utils::TypeId::REP_FORCE_RESET_STORAGE_RES,
"ForceResetStorageRes", nullptr};
// Autogenerated SLK serialization code
namespace slk {
// Serialize code for TimestampRes
@ -255,6 +274,30 @@ void Load(memgraph::storage::replication::AppendDeltasReq *self, memgraph::slk::
memgraph::slk::Load(&self->seq_num, reader);
}
// Serialize code for ForceResetStorageReq
void Save(const memgraph::storage::replication::ForceResetStorageReq &self, memgraph::slk::Builder *builder) {
memgraph::slk::Save(self.main_uuid, builder);
memgraph::slk::Save(self.db_uuid, builder);
}
void Load(memgraph::storage::replication::ForceResetStorageReq *self, memgraph::slk::Reader *reader) {
memgraph::slk::Load(&self->main_uuid, reader);
memgraph::slk::Load(&self->db_uuid, reader);
}
// Serialize code for ForceResetStorageRes
void Save(const memgraph::storage::replication::ForceResetStorageRes &self, memgraph::slk::Builder *builder) {
memgraph::slk::Save(self.success, builder);
memgraph::slk::Save(self.current_commit_timestamp, builder);
}
void Load(memgraph::storage::replication::ForceResetStorageRes *self, memgraph::slk::Reader *reader) {
memgraph::slk::Load(&self->success, reader);
memgraph::slk::Load(&self->current_commit_timestamp, reader);
}
// Serialize SalientConfig
void Save(const memgraph::storage::SalientConfig &self, memgraph::slk::Builder *builder) {

View File

@ -210,6 +210,36 @@ struct TimestampRes {
using TimestampRpc = rpc::RequestResponse<TimestampReq, TimestampRes>;
struct ForceResetStorageReq {
static const utils::TypeInfo kType;
static const utils::TypeInfo &GetTypeInfo() { return kType; }
static void Load(ForceResetStorageReq *self, memgraph::slk::Reader *reader);
static void Save(const ForceResetStorageReq &self, memgraph::slk::Builder *builder);
ForceResetStorageReq() = default;
explicit ForceResetStorageReq(const utils::UUID &main_uuid, const utils::UUID &db_uuid)
: main_uuid{main_uuid}, db_uuid{db_uuid} {}
utils::UUID main_uuid;
utils::UUID db_uuid;
};
struct ForceResetStorageRes {
static const utils::TypeInfo kType;
static const utils::TypeInfo &GetTypeInfo() { return kType; }
static void Load(ForceResetStorageRes *self, memgraph::slk::Reader *reader);
static void Save(const ForceResetStorageRes &self, memgraph::slk::Builder *builder);
ForceResetStorageRes() = default;
ForceResetStorageRes(bool success, uint64_t current_commit_timestamp)
: success(success), current_commit_timestamp(current_commit_timestamp) {}
bool success;
uint64_t current_commit_timestamp;
};
using ForceResetStorageRpc = rpc::RequestResponse<ForceResetStorageReq, ForceResetStorageRes>;
} // namespace memgraph::storage::replication
// SLK serialization declarations
@ -267,4 +297,12 @@ void Save(const memgraph::storage::SalientConfig &self, memgraph::slk::Builder *
void Load(memgraph::storage::SalientConfig *self, memgraph::slk::Reader *reader);
void Save(const memgraph::storage::replication::ForceResetStorageReq &self, memgraph::slk::Builder *builder);
void Load(memgraph::storage::replication::ForceResetStorageReq *self, memgraph::slk::Reader *reader);
void Save(const memgraph::storage::replication::ForceResetStorageRes &self, memgraph::slk::Builder *builder);
void Load(memgraph::storage::replication::ForceResetStorageRes *self, memgraph::slk::Reader *reader);
} // namespace memgraph::slk

View File

@ -19,9 +19,9 @@
namespace memgraph::utils {
template <template <typename, typename...> class Container, typename T, typename Allocator = std::allocator<T>,
typename F, typename R = std::invoke_result_t<F, T>>
typename F = std::identity, typename R = std::decay_t<std::invoke_result_t<F, T>>>
requires ranges::range<Container<T, Allocator>> &&
(!std::same_as<Container<T, Allocator>, std::string>)auto fmap(F &&f, const Container<T, Allocator> &v)
(!std::same_as<Container<T, Allocator>, std::string>)auto fmap(const Container<T, Allocator> &v, F &&f = {})
-> std::vector<R> {
return v | ranges::views::transform(std::forward<F>(f)) | ranges::to<std::vector<R>>();
}

View File

@ -99,6 +99,8 @@ enum class TypeId : uint64_t {
REP_DROP_AUTH_DATA_RES,
REP_TRY_SET_MAIN_UUID_REQ,
REP_TRY_SET_MAIN_UUID_RES,
REP_FORCE_RESET_STORAGE_REQ,
REP_FORCE_RESET_STORAGE_RES,
// Coordinator
COORD_FAILOVER_REQ,

View File

@ -596,9 +596,8 @@ def test_replication_works_on_failover_replica_2_epochs_more_commits_away(data_r
mg_sleep_and_assert(5, get_vertex_count)
@pytest.mark.parametrize("data_recovery", ["false"])
@pytest.mark.parametrize("data_recovery", ["true"])
def test_replication_forcefully_works_on_failover_replica_misses_epoch(data_recovery):
# TODO(antoniofilipovic) Test should pass when logic is added
# Goal of this test is to check the replication works forcefully if replica misses epoch
# 1. We start all replicas, main and coordinator manually
# 2. We check that main has correct state
@ -606,9 +605,9 @@ def test_replication_forcefully_works_on_failover_replica_misses_epoch(data_reco
# 4. Expect data to be copied on all replicas
# 5. Kill instance_1 ( this one will miss complete epoch)
# 6. Kill main (instance_3)
# 7. Instance_2 or instance_4 new main
# 8. New main commits
# 9. Instance_2 down (not main)
# 7. Instance_2
# 8. Instance_2 commits
# 9. Instance_2 down
# 10. instance_4 down
# 11. Instance 1 up (missed epoch)
# 12 Instance 1 new main
@ -617,7 +616,272 @@ def test_replication_forcefully_works_on_failover_replica_misses_epoch(data_reco
temp_dir = tempfile.TemporaryDirectory().name
pass
MEMGRAPH_INNER_INSTANCES_DESCRIPTION = {
"instance_1": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7688",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10011",
"--replication-restore-state-on-startup",
"true",
f"--data-recovery-on-startup={data_recovery}",
"--storage-recover-on-startup=false",
],
"log_file": "instance_1.log",
"data_directory": f"{temp_dir}/instance_1",
"setup_queries": [],
},
"instance_2": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7689",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10012",
"--replication-restore-state-on-startup",
"true",
f"--data-recovery-on-startup={data_recovery}",
"--storage-recover-on-startup=false",
],
"log_file": "instance_2.log",
"data_directory": f"{temp_dir}/instance_2",
"setup_queries": [],
},
"instance_3": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7687",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10013",
"--replication-restore-state-on-startup",
"true",
"--data-recovery-on-startup",
f"{data_recovery}",
"--storage-recover-on-startup=false",
],
"log_file": "instance_3.log",
"data_directory": f"{temp_dir}/instance_3",
"setup_queries": [],
},
"instance_4": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7691",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10014",
"--replication-restore-state-on-startup",
"true",
"--data-recovery-on-startup",
f"{data_recovery}",
"--storage-recover-on-startup=false",
],
"log_file": "instance_4.log",
"data_directory": f"{temp_dir}/instance_4",
"setup_queries": [],
},
"coordinator": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7690",
"--log-level=TRACE",
"--raft-server-id=1",
"--raft-server-port=10111",
],
"log_file": "coordinator.log",
"setup_queries": [
"REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001';",
"REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002';",
"REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003';",
"REGISTER INSTANCE instance_4 ON '127.0.0.1:10014' WITH '127.0.0.1:10004';",
"SET INSTANCE instance_3 TO MAIN",
],
},
}
# 1
interactive_mg_runner.start_all(MEMGRAPH_INNER_INSTANCES_DESCRIPTION)
# 2
main_cursor = connect(host="localhost", port=7687).cursor()
expected_data_on_main = [
(
"instance_1",
"127.0.0.1:10001",
"sync",
{"behind": None, "status": "ready", "ts": 0},
{"memgraph": {"behind": 0, "status": "ready", "ts": 0}},
),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"behind": None, "status": "ready", "ts": 0},
{"memgraph": {"behind": 0, "status": "ready", "ts": 0}},
),
(
"instance_4",
"127.0.0.1:10004",
"sync",
{"behind": None, "status": "ready", "ts": 0},
{"memgraph": {"behind": 0, "status": "ready", "ts": 0}},
),
]
main_cursor = connect(host="localhost", port=7687).cursor()
def retrieve_data_show_replicas():
return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
mg_sleep_and_assert_collection(expected_data_on_main, retrieve_data_show_replicas)
coord_cursor = connect(host="localhost", port=7690).cursor()
def retrieve_data_show_instances():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;")))
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "replica"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "up", "main"),
("instance_4", "", "127.0.0.1:10014", "up", "replica"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
# 3
execute_and_fetch_all(main_cursor, "CREATE (:Epoch1Vertex {prop:1});")
execute_and_fetch_all(main_cursor, "CREATE (:Epoch1Vertex {prop:2});")
# 4
instance_1_cursor = connect(host="localhost", port=7688).cursor()
instance_2_cursor = connect(host="localhost", port=7689).cursor()
instance_4_cursor = connect(host="localhost", port=7691).cursor()
assert execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2
assert execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2
assert execute_and_fetch_all(instance_4_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2
# 5
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_1")
# 6
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_3")
# 7
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "down", "unknown"),
("instance_2", "", "127.0.0.1:10012", "up", "main"),
("instance_3", "", "127.0.0.1:10013", "down", "unknown"),
("instance_4", "", "127.0.0.1:10014", "up", "replica"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
# 8
with pytest.raises(Exception) as e:
execute_and_fetch_all(instance_2_cursor, "CREATE (:Epoch2Vertex {prop:1});")
assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value)
def get_vertex_count():
return execute_and_fetch_all(instance_4_cursor, "MATCH (n) RETURN count(n)")[0][0]
mg_sleep_and_assert(3, get_vertex_count)
assert execute_and_fetch_all(instance_4_cursor, "MATCH (n) RETURN count(n);")[0][0] == 3
# 9
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_2")
# 10
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_4")
# 11
interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_1")
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "main"),
("instance_2", "", "127.0.0.1:10012", "down", "unknown"),
("instance_3", "", "127.0.0.1:10013", "down", "unknown"),
("instance_4", "", "127.0.0.1:10014", "down", "unknown"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
# 12
interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_2")
# 13
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "main"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "down", "unknown"),
("instance_4", "", "127.0.0.1:10014", "down", "unknown"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
# 12
instance_1_cursor = connect(host="localhost", port=7688).cursor()
instance_2_cursor = connect(host="localhost", port=7689).cursor()
def get_vertex_count():
return execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n)")[0][0]
mg_sleep_and_assert(2, get_vertex_count)
def get_vertex_count():
return execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n)")[0][0]
mg_sleep_and_assert(2, get_vertex_count)
# 13
with pytest.raises(Exception) as e:
execute_and_fetch_all(instance_1_cursor, "CREATE (:Epoch3Vertex {prop:1});")
assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value)
# 14
def get_vertex_objects_func_creator(cursor):
def get_vertex_objects():
return list(
execute_and_fetch_all(
cursor, "MATCH (n) " "WITH labels(n) as labels, properties(n) as props " "RETURN labels[0], props;"
)
)
return get_vertex_objects
vertex_objects = [("Epoch1Vertex", {"prop": 1}), ("Epoch1Vertex", {"prop": 2}), ("Epoch3Vertex", {"prop": 1})]
mg_sleep_and_assert_collection(vertex_objects, get_vertex_objects_func_creator(instance_1_cursor))
mg_sleep_and_assert_collection(vertex_objects, get_vertex_objects_func_creator(instance_2_cursor))
# 15
@pytest.mark.parametrize("data_recovery", ["false", "true"])

View File

@ -45,11 +45,9 @@ TEST_F(CoordinationUtils, MemgraphDbHistorySimple) {
memgraph::utils::UUID db_uuid;
std::string default_name = std::string(memgraph::dbms::kDefaultDB);
auto db_histories = memgraph::utils::fmap(
[](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
return std::make_pair(std::string(pair.first), pair.second);
},
histories);
auto db_histories = memgraph::utils::fmap(histories, [](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
return std::make_pair(std::string(pair.first), pair.second);
});
memgraph::replication_coordination_glue::DatabaseHistory history{
.db_uuid = db_uuid, .history = db_histories, .name = default_name};
@ -91,11 +89,9 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryLastEpochDifferent) {
memgraph::utils::UUID db_uuid;
std::string default_name = std::string(memgraph::dbms::kDefaultDB);
auto db_histories = memgraph::utils::fmap(
[](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
return std::make_pair(std::string(pair.first), pair.second);
},
histories);
auto db_histories = memgraph::utils::fmap(histories, [](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
return std::make_pair(std::string(pair.first), pair.second);
});
db_histories.back().second = 51;
memgraph::replication_coordination_glue::DatabaseHistory history1{
@ -145,11 +141,9 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryOneInstanceAheadFewEpochs) {
memgraph::utils::UUID db_uuid;
std::string default_name = std::string(memgraph::dbms::kDefaultDB);
auto db_histories = memgraph::utils::fmap(
[](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
return std::make_pair(std::string(pair.first), pair.second);
},
histories);
auto db_histories = memgraph::utils::fmap(histories, [](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
return std::make_pair(std::string(pair.first), pair.second);
});
memgraph::replication_coordination_glue::DatabaseHistory history{
.db_uuid = db_uuid, .history = db_histories, .name = default_name};
@ -162,11 +156,10 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryOneInstanceAheadFewEpochs) {
histories.emplace_back(memgraph::utils::UUID{}, 60);
histories.emplace_back(memgraph::utils::UUID{}, 65);
auto db_histories_longest = memgraph::utils::fmap(
[](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
auto db_histories_longest =
memgraph::utils::fmap(histories, [](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
return std::make_pair(std::string(pair.first), pair.second);
},
histories);
});
memgraph::replication_coordination_glue::DatabaseHistory history_longest{
.db_uuid = db_uuid, .history = db_histories_longest, .name = default_name};
@ -200,11 +193,9 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryInstancesHistoryDiverged) {
memgraph::utils::UUID db_uuid;
std::string default_name = std::string(memgraph::dbms::kDefaultDB);
auto db_histories = memgraph::utils::fmap(
[](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
return std::make_pair(std::string(pair.first), pair.second);
},
histories);
auto db_histories = memgraph::utils::fmap(histories, [](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
return std::make_pair(std::string(pair.first), pair.second);
});
memgraph::replication_coordination_glue::DatabaseHistory history{
.db_uuid = db_uuid, .history = db_histories, .name = default_name};
@ -217,11 +208,10 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryInstancesHistoryDiverged) {
auto oldest_commit_timestamp{5};
auto newest_different_epoch = memgraph::utils::UUID{};
histories.emplace_back(newest_different_epoch, oldest_commit_timestamp);
auto db_histories_different = memgraph::utils::fmap(
[](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
auto db_histories_different =
memgraph::utils::fmap(histories, [](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
return std::make_pair(std::string(pair.first), pair.second);
},
histories);
});
memgraph::replication_coordination_glue::DatabaseHistory history_3{
.db_uuid = db_uuid, .history = db_histories_different, .name = default_name};