Introduce multi-tenancy to SHOW REPLICAS (#1735)

---------

Co-authored-by: Gareth Lloyd <gareth.lloyd@memgraph.io>
This commit is contained in:
andrejtonev 2024-02-26 20:05:49 +01:00 committed by GitHub
parent c2e9df309a
commit f4d9a3695d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 1728 additions and 644 deletions

View File

@ -3035,8 +3035,6 @@ class ReplicationQuery : public memgraph::query::Query {
enum class SyncMode { SYNC, ASYNC };
enum class ReplicaState { READY, REPLICATING, RECOVERY, MAYBE_BEHIND, DIVERGED_FROM_MAIN };
ReplicationQuery() = default;
DEFVISITABLE(QueryVisitor<void>);

View File

@ -297,16 +297,6 @@ inline auto convertToReplicationMode(const ReplicationQuery::SyncMode &sync_mode
class ReplQueryHandler {
public:
struct ReplicaInfo {
std::string name;
std::string socket_address;
ReplicationQuery::SyncMode sync_mode;
std::optional<double> timeout;
uint64_t current_timestamp_of_replica;
uint64_t current_number_of_timestamp_behind_master;
ReplicationQuery::ReplicaState state;
};
explicit ReplQueryHandler(query::ReplicationQueryHandler &replication_query_handler)
: handler_{&replication_query_handler} {}
@ -397,58 +387,16 @@ class ReplQueryHandler {
}
}
std::vector<ReplicaInfo> ShowReplicas(const dbms::Database &db) const {
if (handler_->IsReplica()) {
// replica can't show registered replicas (it shouldn't have any)
throw QueryRuntimeException("Replica can't show registered replicas (it shouldn't have any)!");
std::vector<ReplicasInfo> ShowReplicas() const {
auto info = handler_->ShowReplicas();
if (info.HasError()) {
switch (info.GetError()) {
case ShowReplicaError::NOT_MAIN:
throw QueryRuntimeException("Replica can't show registered replicas (it shouldn't have any)!");
}
}
// TODO: Combine results? Have a single place with clients???
// Also authentication checks (replica + database visibility)
const auto repl_infos = db.storage()->ReplicasInfo();
std::vector<ReplicaInfo> replicas;
replicas.reserve(repl_infos.size());
const auto from_info = [](const auto &repl_info) -> ReplicaInfo {
ReplicaInfo replica;
replica.name = repl_info.name;
replica.socket_address = repl_info.endpoint.SocketAddress();
switch (repl_info.mode) {
case replication_coordination_glue::ReplicationMode::SYNC:
replica.sync_mode = ReplicationQuery::SyncMode::SYNC;
break;
case replication_coordination_glue::ReplicationMode::ASYNC:
replica.sync_mode = ReplicationQuery::SyncMode::ASYNC;
break;
}
replica.current_timestamp_of_replica = repl_info.timestamp_info.current_timestamp_of_replica;
replica.current_number_of_timestamp_behind_master =
repl_info.timestamp_info.current_number_of_timestamp_behind_master;
switch (repl_info.state) {
case storage::replication::ReplicaState::READY:
replica.state = ReplicationQuery::ReplicaState::READY;
break;
case storage::replication::ReplicaState::REPLICATING:
replica.state = ReplicationQuery::ReplicaState::REPLICATING;
break;
case storage::replication::ReplicaState::RECOVERY:
replica.state = ReplicationQuery::ReplicaState::RECOVERY;
break;
case storage::replication::ReplicaState::MAYBE_BEHIND:
replica.state = ReplicationQuery::ReplicaState::MAYBE_BEHIND;
break;
case storage::replication::ReplicaState::DIVERGED_FROM_MAIN:
replica.state = ReplicationQuery::ReplicaState::DIVERGED_FROM_MAIN;
break;
}
return replica;
};
std::transform(repl_infos.begin(), repl_infos.end(), std::back_inserter(replicas), from_info);
return replicas;
return info.GetValue().entries_;
}
private:
@ -1092,50 +1040,98 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
}
#endif
callback.header = {
"name", "socket_address", "sync_mode", "current_timestamp_of_replica", "number_of_timestamp_behind_master",
"state"};
bool full_info = false;
#ifdef MG_ENTERPRISE
full_info = license::global_license_checker.IsEnterpriseValidFast();
#endif
callback.header = {"name", "socket_address", "sync_mode", "system_info", "data_info"};
callback.fn = [handler = ReplQueryHandler{replication_query_handler}, replica_nfields = callback.header.size(),
db_acc = current_db.db_acc_] {
const auto &replicas = handler.ShowReplicas(*db_acc->get());
full_info] {
auto const sync_mode_to_tv = [](memgraph::replication_coordination_glue::ReplicationMode sync_mode) {
using namespace std::string_view_literals;
switch (sync_mode) {
using enum memgraph::replication_coordination_glue::ReplicationMode;
case SYNC:
return TypedValue{"sync"sv};
case ASYNC:
return TypedValue{"async"sv};
}
};
auto const replica_sys_state_to_tv = [](memgraph::replication::ReplicationClient::State state) {
using namespace std::string_view_literals;
switch (state) {
using enum memgraph::replication::ReplicationClient::State;
case BEHIND:
return TypedValue{"invalid"sv};
case READY:
return TypedValue{"ready"sv};
case RECOVERY:
return TypedValue{"recovery"sv};
}
};
auto const sys_info_to_tv = [&](ReplicaSystemInfoState orig) {
auto info = std::map<std::string, TypedValue>{};
info.emplace("ts", TypedValue{static_cast<int64_t>(orig.ts_)});
// TODO: behind not implemented
info.emplace("behind", TypedValue{/* static_cast<int64_t>(orig.behind_) */});
info.emplace("status", replica_sys_state_to_tv(orig.state_));
return TypedValue{std::move(info)};
};
auto const replica_state_to_tv = [](memgraph::storage::replication::ReplicaState state) {
using namespace std::string_view_literals;
switch (state) {
using enum memgraph::storage::replication::ReplicaState;
case READY:
return TypedValue{"ready"sv};
case REPLICATING:
return TypedValue{"replicating"sv};
case RECOVERY:
return TypedValue{"recovery"sv};
case MAYBE_BEHIND:
return TypedValue{"invalid"sv};
case DIVERGED_FROM_MAIN:
return TypedValue{"diverged"sv};
}
};
auto const info_to_tv = [&](ReplicaInfoState orig) {
auto info = std::map<std::string, TypedValue>{};
info.emplace("ts", TypedValue{static_cast<int64_t>(orig.ts_)});
info.emplace("behind", TypedValue{static_cast<int64_t>(orig.behind_)});
info.emplace("status", replica_state_to_tv(orig.state_));
return TypedValue{std::move(info)};
};
auto const data_info_to_tv = [&](std::map<std::string, ReplicaInfoState> orig) {
auto data_info = std::map<std::string, TypedValue>{};
for (auto &[name, info] : orig) {
data_info.emplace(name, info_to_tv(info));
}
return TypedValue{std::move(data_info)};
};
auto replicas = handler.ShowReplicas();
auto typed_replicas = std::vector<std::vector<TypedValue>>{};
typed_replicas.reserve(replicas.size());
for (const auto &replica : replicas) {
for (auto &replica : replicas) {
std::vector<TypedValue> typed_replica;
typed_replica.reserve(replica_nfields);
typed_replica.emplace_back(replica.name);
typed_replica.emplace_back(replica.socket_address);
switch (replica.sync_mode) {
case ReplicationQuery::SyncMode::SYNC:
typed_replica.emplace_back("sync");
break;
case ReplicationQuery::SyncMode::ASYNC:
typed_replica.emplace_back("async");
break;
}
typed_replica.emplace_back(static_cast<int64_t>(replica.current_timestamp_of_replica));
typed_replica.emplace_back(static_cast<int64_t>(replica.current_number_of_timestamp_behind_master));
switch (replica.state) {
case ReplicationQuery::ReplicaState::READY:
typed_replica.emplace_back("ready");
break;
case ReplicationQuery::ReplicaState::REPLICATING:
typed_replica.emplace_back("replicating");
break;
case ReplicationQuery::ReplicaState::RECOVERY:
typed_replica.emplace_back("recovery");
break;
case ReplicationQuery::ReplicaState::MAYBE_BEHIND:
typed_replica.emplace_back("invalid");
break;
case ReplicationQuery::ReplicaState::DIVERGED_FROM_MAIN:
typed_replica.emplace_back("diverged");
break;
typed_replica.emplace_back(replica.name_);
typed_replica.emplace_back(replica.socket_address_);
typed_replica.emplace_back(sync_mode_to_tv(replica.sync_mode_));
if (full_info) {
typed_replica.emplace_back(sys_info_to_tv(replica.system_info_));
} else {
// Set to NULL
typed_replica.emplace_back(TypedValue{});
}
typed_replica.emplace_back(data_info_to_tv(replica.data_info_));
typed_replicas.emplace_back(std::move(typed_replica));
}

View File

@ -84,16 +84,6 @@ class CoordinatorQueryHandler {
CoordinatorQueryHandler(CoordinatorQueryHandler &&) = default;
CoordinatorQueryHandler &operator=(CoordinatorQueryHandler &&) = default;
struct Replica {
std::string name;
std::string socket_address;
ReplicationQuery::SyncMode sync_mode;
std::optional<double> timeout;
uint64_t current_timestamp_of_replica;
uint64_t current_number_of_timestamp_behind_master;
ReplicationQuery::ReplicaState state;
};
struct MainReplicaStatus {
std::string_view name;
std::string_view socket_address;

View File

@ -11,6 +11,8 @@
#pragma once
#include "replication/replication_client.hpp"
#include "replication_coordination_glue/mode.hpp"
#include "replication_coordination_glue/role.hpp"
#include "utils/result.hpp"
#include "utils/uuid.hpp"
@ -31,6 +33,7 @@ enum class RegisterReplicaError : uint8_t {
COULD_NOT_BE_PERSISTED,
ERROR_ACCEPTING_MAIN
};
enum class UnregisterReplicaResult : uint8_t {
NOT_MAIN,
COULD_NOT_BE_PERSISTED,
@ -38,6 +41,47 @@ enum class UnregisterReplicaResult : uint8_t {
SUCCESS,
};
enum class ShowReplicaError : uint8_t {
NOT_MAIN,
};
struct ReplicaSystemInfoState {
uint64_t ts_;
uint64_t behind_;
replication::ReplicationClient::State state_;
};
struct ReplicaInfoState {
ReplicaInfoState(uint64_t ts, uint64_t behind, storage::replication::ReplicaState state)
: ts_(ts), behind_(behind), state_(state) {}
uint64_t ts_;
uint64_t behind_;
storage::replication::ReplicaState state_;
};
struct ReplicasInfo {
ReplicasInfo(std::string name, std::string socket_address, replication_coordination_glue::ReplicationMode sync_mode,
ReplicaSystemInfoState system_info, std::map<std::string, ReplicaInfoState> data_info)
: name_(std::move(name)),
socket_address_(std::move(socket_address)),
sync_mode_(sync_mode),
system_info_(std::move(system_info)),
data_info_(std::move(data_info)) {}
std::string name_;
std::string socket_address_;
memgraph::replication_coordination_glue::ReplicationMode sync_mode_;
ReplicaSystemInfoState system_info_;
std::map<std::string, ReplicaInfoState> data_info_;
};
struct ReplicasInfos {
explicit ReplicasInfos(std::vector<ReplicasInfo> entries) : entries_(std::move(entries)) {}
std::vector<ReplicasInfo> entries_;
};
/// A handler type that keep in sync current ReplicationState and the MAIN/REPLICA-ness of Storage
struct ReplicationQueryHandler {
virtual ~ReplicationQueryHandler() = default;
@ -66,6 +110,8 @@ struct ReplicationQueryHandler {
virtual auto GetRole() const -> memgraph::replication_coordination_glue::ReplicationRole = 0;
virtual bool IsMain() const = 0;
virtual bool IsReplica() const = 0;
virtual auto ShowReplicas() const -> utils::BasicResult<ShowReplicaError, ReplicasInfos> = 0;
};
} // namespace memgraph::query

View File

@ -14,7 +14,9 @@
#include "replication/config.hpp"
#include "replication_coordination_glue/messages.hpp"
#include "rpc/client.hpp"
#include "utils/rw_lock.hpp"
#include "utils/scheduler.hpp"
#include "utils/spin_lock.hpp"
#include "utils/synchronized.hpp"
#include "utils/thread_pool.hpp"
@ -114,8 +116,9 @@ struct ReplicationClient {
enum class State {
BEHIND,
READY,
RECOVERY,
};
utils::Synchronized<State> state_{State::BEHIND};
utils::Synchronized<State, utils::WritePrioritizedRWLock> state_{State::BEHIND};
replication_coordination_glue::ReplicationMode mode_{replication_coordination_glue::ReplicationMode::SYNC};
// This thread pool is used for background tasks so we don't

View File

@ -39,10 +39,12 @@ void SystemRestore(replication::ReplicationClient &client, system::System &syste
const utils::UUID &main_uuid, auth::SynchedAuth &auth) {
// Check if system is up to date
if (client.state_.WithLock(
[](auto &state) { return state == memgraph::replication::ReplicationClient::State::READY; }))
[](auto &state) { return state != memgraph::replication::ReplicationClient::State::BEHIND; }))
return;
// Try to recover...
client.state_.WithLock(
[](auto &state) { return state != memgraph::replication::ReplicationClient::State::RECOVERY; });
{
using enum memgraph::flags::Experiments;
bool full_system_replication =
@ -139,6 +141,9 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler {
bool IsMain() const override;
bool IsReplica() const override;
auto ShowReplicas() const
-> utils::BasicResult<memgraph::query::ShowReplicaError, memgraph::query::ReplicasInfos> override;
auto GetReplState() const -> const memgraph::replication::ReplicationState &;
auto GetReplState() -> memgraph::replication::ReplicationState &;

View File

@ -10,26 +10,28 @@
// licenses/APL.txt.
#include "replication_handler/replication_handler.hpp"
#include "dbms/constants.hpp"
#include "dbms/dbms_handler.hpp"
#include "replication/replication_client.hpp"
#include "replication_handler/system_replication.hpp"
namespace memgraph::replication {
namespace {
#ifdef MG_ENTERPRISE
void RecoverReplication(memgraph::replication::ReplicationState &repl_state, memgraph::system::System &system,
memgraph::dbms::DbmsHandler &dbms_handler, memgraph::auth::SynchedAuth &auth) {
void RecoverReplication(replication::ReplicationState &repl_state, system::System &system,
dbms::DbmsHandler &dbms_handler, auth::SynchedAuth &auth) {
/*
* REPLICATION RECOVERY AND STARTUP
*/
// Startup replication state (if recovered at startup)
auto replica = [&dbms_handler, &auth, &system](memgraph::replication::RoleReplicaData &data) {
return memgraph::replication::StartRpcServer(dbms_handler, data, auth, system);
auto replica = [&dbms_handler, &auth, &system](replication::RoleReplicaData &data) {
return replication::StartRpcServer(dbms_handler, data, auth, system);
};
// Replication recovery and frequent check start
auto main = [&system, &dbms_handler, &auth](memgraph::replication::RoleMainData &mainData) {
auto main = [&system, &dbms_handler, &auth](replication::RoleMainData &mainData) {
for (auto &client : mainData.registered_replicas_) {
if (client.try_set_uuid &&
replication_coordination_glue::SendSwapMainUUIDRpc(client.rpc_client_, mainData.uuid_)) {
@ -38,7 +40,7 @@ void RecoverReplication(memgraph::replication::ReplicationState &repl_state, mem
SystemRestore(client, system, dbms_handler, mainData.uuid_, auth);
}
// DBMS here
dbms_handler.ForEach([&mainData](memgraph::dbms::DatabaseAccess db_acc) {
dbms_handler.ForEach([&mainData](dbms::DatabaseAccess db_acc) {
dbms::DbmsHandler::RecoverStorageReplication(std::move(db_acc), mainData);
});
@ -48,7 +50,7 @@ void RecoverReplication(memgraph::replication::ReplicationState &repl_state, mem
// Warning
if (dbms_handler.default_config().durability.snapshot_wal_mode ==
memgraph::storage::Config::Durability::SnapshotWalMode::DISABLED) {
storage::Config::Durability::SnapshotWalMode::DISABLED) {
spdlog::warn(
"The instance has the MAIN replication role, but durability logs and snapshots are disabled. Please "
"consider "
@ -59,19 +61,18 @@ void RecoverReplication(memgraph::replication::ReplicationState &repl_state, mem
return true;
};
auto result = std::visit(memgraph::utils::Overloaded{replica, main}, repl_state.ReplicationData());
auto result = std::visit(utils::Overloaded{replica, main}, repl_state.ReplicationData());
MG_ASSERT(result, "Replica recovery failure!");
}
#else
void RecoverReplication(memgraph::replication::ReplicationState &repl_state,
memgraph::dbms::DbmsHandler &dbms_handler) {
void RecoverReplication(replication::ReplicationState &repl_state, dbms::DbmsHandler &dbms_handler) {
// Startup replication state (if recovered at startup)
auto replica = [&dbms_handler](memgraph::replication::RoleReplicaData &data) {
return memgraph::replication::StartRpcServer(dbms_handler, data);
auto replica = [&dbms_handler](replication::RoleReplicaData &data) {
return replication::StartRpcServer(dbms_handler, data);
};
// Replication recovery and frequent check start
auto main = [&dbms_handler](memgraph::replication::RoleMainData &mainData) {
auto main = [&dbms_handler](replication::RoleMainData &mainData) {
dbms::DbmsHandler::RecoverStorageReplication(dbms_handler.Get(), mainData);
for (auto &client : mainData.registered_replicas_) {
@ -79,12 +80,12 @@ void RecoverReplication(memgraph::replication::ReplicationState &repl_state,
replication_coordination_glue::SendSwapMainUUIDRpc(client.rpc_client_, mainData.uuid_)) {
client.try_set_uuid = false;
}
memgraph::replication::StartReplicaClient(client, dbms_handler, mainData.uuid_);
replication::StartReplicaClient(client, dbms_handler, mainData.uuid_);
}
// Warning
if (dbms_handler.default_config().durability.snapshot_wal_mode ==
memgraph::storage::Config::Durability::SnapshotWalMode::DISABLED) {
storage::Config::Durability::SnapshotWalMode::DISABLED) {
spdlog::warn(
"The instance has the MAIN replication role, but durability logs and snapshots are disabled. Please "
"consider "
@ -95,7 +96,7 @@ void RecoverReplication(memgraph::replication::ReplicationState &repl_state,
return true;
};
auto result = std::visit(memgraph::utils::Overloaded{replica, main}, repl_state.ReplicationData());
auto result = std::visit(utils::Overloaded{replica, main}, repl_state.ReplicationData());
MG_ASSERT(result, "Replica recovery failure!");
}
#endif
@ -133,20 +134,19 @@ void StartReplicaClient(replication::ReplicationClient &client, dbms::DbmsHandle
spdlog::trace("Replication client started at: {}:{}", endpoint.address, endpoint.port);
client.StartFrequentCheck([&, license = license::global_license_checker.IsEnterpriseValidFast(), main_uuid](
bool reconnect, replication::ReplicationClient &client) mutable {
if (client.try_set_uuid &&
memgraph::replication_coordination_glue::SendSwapMainUUIDRpc(client.rpc_client_, main_uuid)) {
if (client.try_set_uuid && replication_coordination_glue::SendSwapMainUUIDRpc(client.rpc_client_, main_uuid)) {
client.try_set_uuid = false;
}
// Working connection
// Check if system needs restoration
if (reconnect) {
client.state_.WithLock([](auto &state) { state = memgraph::replication::ReplicationClient::State::BEHIND; });
client.state_.WithLock([](auto &state) { state = replication::ReplicationClient::State::BEHIND; });
}
// Check if license has changed
const auto new_license = license::global_license_checker.IsEnterpriseValidFast();
if (new_license != license) {
license = new_license;
client.state_.WithLock([](auto &state) { state = memgraph::replication::ReplicationClient::State::BEHIND; });
client.state_.WithLock([](auto &state) { state = replication::ReplicationClient::State::BEHIND; });
}
#ifdef MG_ENTERPRISE
SystemRestore<true>(client, system, dbms_handler, main_uuid, auth);
@ -154,10 +154,10 @@ void StartReplicaClient(replication::ReplicationClient &client, dbms::DbmsHandle
// Check if any database has been left behind
dbms_handler.ForEach([&name = client.name_, reconnect](dbms::DatabaseAccess db_acc) {
// Specific database <-> replica client
db_acc->storage()->repl_storage_state_.WithClient(name, [&](storage::ReplicationStorageClient *client) {
if (reconnect || client->State() == storage::replication::ReplicaState::MAYBE_BEHIND) {
db_acc->storage()->repl_storage_state_.WithClient(name, [&](storage::ReplicationStorageClient &client) {
if (reconnect || client.State() == storage::replication::ReplicaState::MAYBE_BEHIND) {
// Database <-> replica might be behind, check and recover
client->TryCheckReplicaStateAsync(db_acc->storage(), db_acc);
client.TryCheckReplicaStateAsync(db_acc->storage(), db_acc);
}
});
});
@ -165,9 +165,8 @@ void StartReplicaClient(replication::ReplicationClient &client, dbms::DbmsHandle
}
#ifdef MG_ENTERPRISE
ReplicationHandler::ReplicationHandler(memgraph::replication::ReplicationState &repl_state,
memgraph::dbms::DbmsHandler &dbms_handler, memgraph::system::System &system,
memgraph::auth::SynchedAuth &auth)
ReplicationHandler::ReplicationHandler(replication::ReplicationState &repl_state, dbms::DbmsHandler &dbms_handler,
system::System &system, auth::SynchedAuth &auth)
: repl_state_{repl_state}, dbms_handler_{dbms_handler}, system_{system}, auth_{auth} {
RecoverReplication(repl_state_, system_, dbms_handler_, auth_);
}
@ -179,20 +178,20 @@ ReplicationHandler::ReplicationHandler(replication::ReplicationState &repl_state
#endif
bool ReplicationHandler::SetReplicationRoleMain() {
auto const main_handler = [](memgraph::replication::RoleMainData &) {
auto const main_handler = [](replication::RoleMainData &) {
// If we are already MAIN, we don't want to change anything
return false;
};
auto const replica_handler = [this](memgraph::replication::RoleReplicaData const &) {
auto const replica_handler = [this](replication::RoleReplicaData const &) {
return DoReplicaToMainPromotion(utils::UUID{});
};
// TODO: under lock
return std::visit(memgraph::utils::Overloaded{main_handler, replica_handler}, repl_state_.ReplicationData());
return std::visit(utils::Overloaded{main_handler, replica_handler}, repl_state_.ReplicationData());
}
bool ReplicationHandler::SetReplicationRoleReplica(const memgraph::replication::ReplicationServerConfig &config,
bool ReplicationHandler::SetReplicationRoleReplica(const replication::ReplicationServerConfig &config,
const std::optional<utils::UUID> &main_uuid) {
return SetReplicationRoleReplica_<true>(config, main_uuid);
}
@ -238,18 +237,16 @@ auto ReplicationHandler::RegisterReplica(const memgraph::replication::Replicatio
return RegisterReplica_<false>(config);
}
auto ReplicationHandler::UnregisterReplica(std::string_view name) -> memgraph::query::UnregisterReplicaResult {
auto const replica_handler =
[](memgraph::replication::RoleReplicaData const &) -> memgraph::query::UnregisterReplicaResult {
return memgraph::query::UnregisterReplicaResult::NOT_MAIN;
auto ReplicationHandler::UnregisterReplica(std::string_view name) -> query::UnregisterReplicaResult {
auto const replica_handler = [](replication::RoleReplicaData const &) -> query::UnregisterReplicaResult {
return query::UnregisterReplicaResult::NOT_MAIN;
};
auto const main_handler =
[this, name](memgraph::replication::RoleMainData &mainData) -> memgraph::query::UnregisterReplicaResult {
auto const main_handler = [this, name](replication::RoleMainData &mainData) -> query::UnregisterReplicaResult {
if (!repl_state_.TryPersistUnregisterReplica(name)) {
return memgraph::query::UnregisterReplicaResult::COULD_NOT_BE_PERSISTED;
return query::UnregisterReplicaResult::COULD_NOT_BE_PERSISTED;
}
// Remove database specific clients
dbms_handler_.ForEach([name](memgraph::dbms::DatabaseAccess db_acc) {
dbms_handler_.ForEach([name](dbms::DatabaseAccess db_acc) {
db_acc->storage()->repl_storage_state_.replication_clients_.WithLock([&name](auto &clients) {
std::erase_if(clients, [name](const auto &client) { return client->Name() == name; });
});
@ -257,14 +254,14 @@ auto ReplicationHandler::UnregisterReplica(std::string_view name) -> memgraph::q
// Remove instance level clients
auto const n_unregistered =
std::erase_if(mainData.registered_replicas_, [name](auto const &client) { return client.name_ == name; });
return n_unregistered != 0 ? memgraph::query::UnregisterReplicaResult::SUCCESS
: memgraph::query::UnregisterReplicaResult::CAN_NOT_UNREGISTER;
return n_unregistered != 0 ? query::UnregisterReplicaResult::SUCCESS
: query::UnregisterReplicaResult::CAN_NOT_UNREGISTER;
};
return std::visit(memgraph::utils::Overloaded{main_handler, replica_handler}, repl_state_.ReplicationData());
return std::visit(utils::Overloaded{main_handler, replica_handler}, repl_state_.ReplicationData());
}
auto ReplicationHandler::GetRole() const -> memgraph::replication_coordination_glue::ReplicationRole {
auto ReplicationHandler::GetRole() const -> replication_coordination_glue::ReplicationRole {
return repl_state_.GetRole();
}
@ -275,10 +272,57 @@ auto ReplicationHandler::GetReplicaUUID() -> std::optional<utils::UUID> {
auto ReplicationHandler::GetReplState() const -> const memgraph::replication::ReplicationState & { return repl_state_; }
auto ReplicationHandler::GetReplState() -> memgraph::replication::ReplicationState & { return repl_state_; }
auto ReplicationHandler::GetReplState() -> replication::ReplicationState & { return repl_state_; }
bool ReplicationHandler::IsMain() const { return repl_state_.IsMain(); }
bool ReplicationHandler::IsReplica() const { return repl_state_.IsReplica(); }
auto ReplicationHandler::ShowReplicas() const -> utils::BasicResult<query::ShowReplicaError, query::ReplicasInfos> {
using res_t = utils::BasicResult<query::ShowReplicaError, query::ReplicasInfos>;
auto main = [this](RoleMainData const &main) -> res_t {
auto entries = std::vector<query::ReplicasInfo>{};
entries.reserve(main.registered_replicas_.size());
const bool full_info = license::global_license_checker.IsEnterpriseValidFast();
for (auto const &replica : main.registered_replicas_) {
// STEP 1: data_info
auto data_info = std::map<std::string, query::ReplicaInfoState>{};
this->dbms_handler_.ForEach([&](dbms::DatabaseAccess db_acc) {
auto *storage = db_acc->storage();
// ATM we only support IN_MEMORY_TRANSACTIONAL
if (storage->storage_mode_ != storage::StorageMode::IN_MEMORY_TRANSACTIONAL) return;
if (!full_info && storage->name() == dbms::kDefaultDB) return;
auto ok =
storage->repl_storage_state_.WithClient(replica.name_, [&](storage::ReplicationStorageClient &client) {
auto ts_info = client.GetTimestampInfo(storage);
auto state = client.State();
data_info.emplace(storage->name(),
query::ReplicaInfoState{ts_info.current_timestamp_of_replica,
ts_info.current_number_of_timestamp_behind_main, state});
});
DMG_ASSERT(ok);
});
// STEP 2: system_info
#ifdef MG_ENTERPRISE
// Already locked on system transaction via the interpreter
const auto ts = system_.LastCommittedSystemTimestamp();
// NOTE: no system behind at the moment
query::ReplicaSystemInfoState system_info{ts, 0 /* behind ts not implemented */, *replica.state_.ReadLock()};
#else
query::ReplicaSystemInfoState system_info{};
#endif
// STEP 3: add entry
entries.emplace_back(replica.name_, replica.rpc_client_.Endpoint().SocketAddress(), replica.mode_, system_info,
std::move(data_info));
}
return query::ReplicasInfos{std::move(entries)};
};
auto replica = [](RoleReplicaData const &) -> res_t { return query::ShowReplicaError::NOT_MAIN; };
return std::visit(utils::Overloaded{main, replica}, repl_state_.ReplicationData());
}
} // namespace memgraph::replication

View File

@ -26,7 +26,7 @@ namespace memgraph::storage {
struct TimestampInfo {
uint64_t current_timestamp_of_replica;
uint64_t current_number_of_timestamp_behind_master;
uint64_t current_number_of_timestamp_behind_main;
};
struct ReplicaInfo {

View File

@ -95,7 +95,7 @@ void ReplicationStorageClient::UpdateReplicaState(Storage *storage, DatabaseAcce
TimestampInfo ReplicationStorageClient::GetTimestampInfo(Storage const *storage) {
TimestampInfo info;
info.current_timestamp_of_replica = 0;
info.current_number_of_timestamp_behind_master = 0;
info.current_number_of_timestamp_behind_main = 0;
try {
auto stream{client_.rpc_client_.Stream<replication::TimestampRpc>(main_uuid_, storage->uuid())};
@ -104,9 +104,9 @@ TimestampInfo ReplicationStorageClient::GetTimestampInfo(Storage const *storage)
auto main_time_stamp = storage->repl_storage_state_.last_commit_timestamp_.load();
info.current_timestamp_of_replica = response.current_commit_timestamp;
info.current_number_of_timestamp_behind_master = response.current_commit_timestamp - main_time_stamp;
info.current_number_of_timestamp_behind_main = response.current_commit_timestamp - main_time_stamp;
if (!is_success || info.current_number_of_timestamp_behind_master != 0) {
if (!is_success || info.current_number_of_timestamp_behind_main != 0) {
replica_state_.WithLock([](auto &val) { val = replication::ReplicaState::MAYBE_BEHIND; });
LogRpcFailure();
}

View File

@ -63,7 +63,7 @@ struct ReplicationStorageState {
return replication_clients_.WithLock([replica_name, cb = std::forward<F>(callback)](auto &clients) {
for (const auto &client : clients) {
if (client->Name() == replica_name) {
cb(client.get());
cb(*client);
return true;
}
}

View File

@ -34,7 +34,7 @@ struct System {
if (!system_unique.try_lock_for(try_time)) {
return std::nullopt;
}
return Transaction{state_, std::move(system_unique), timestamp_++};
return Transaction{state_, std::move(system_unique), ++timestamp_};
}
// TODO: this and LastCommittedSystemTimestamp maybe not needed
@ -46,7 +46,7 @@ struct System {
private:
State state_;
std::timed_mutex mtx_{};
std::uint64_t timestamp_{};
std::uint64_t timestamp_{0};
};
} // namespace memgraph::system

View File

@ -312,8 +312,20 @@ def test_unregister_replicas(kill_instance):
]
expected_replicas = [
("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
(
"instance_1",
"127.0.0.1:10001",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
mg_sleep_and_assert(expected_cluster, check_coordinator3)
@ -330,7 +342,13 @@ def test_unregister_replicas(kill_instance):
]
expected_replicas = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
mg_sleep_and_assert(expected_cluster, check_coordinator3)
@ -406,7 +424,13 @@ def test_unregister_main():
]
expected_replicas = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
main_cursor = connect(host="localhost", port=7687).cursor()

View File

@ -17,7 +17,7 @@ import tempfile
import interactive_mg_runner
import pytest
from common import connect, execute_and_fetch_all, safe_execute
from mg_utils import mg_sleep_and_assert
from mg_utils import mg_sleep_and_assert, mg_sleep_and_assert_collection
interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
interactive_mg_runner.PROJECT_DIR = os.path.normpath(
@ -123,11 +123,23 @@ def test_distributed_automatic_failover():
main_cursor = connect(host="localhost", port=7689).cursor()
expected_data_on_main = [
("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
(
"instance_1",
"127.0.0.1:10001",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
actual_data_on_main = sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
assert actual_data_on_main == expected_data_on_main
assert actual_data_on_main == sorted(expected_data_on_main)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
@ -152,18 +164,42 @@ def test_distributed_automatic_failover():
return sorted(list(execute_and_fetch_all(new_main_cursor, "SHOW REPLICAS;")))
expected_data_on_new_main = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("instance_3", "127.0.0.1:10003", "sync", 0, 0, "invalid"),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"instance_3",
"127.0.0.1:10003",
"sync",
{"ts": 0, "behind": None, "status": "invalid"},
{"memgraph": {"ts": 0, "behind": 0, "status": "invalid"}},
),
]
mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas)
mg_sleep_and_assert_collection(expected_data_on_new_main, retrieve_data_show_replicas)
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
expected_data_on_new_main_old_alive = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("instance_3", "127.0.0.1:10003", "sync", 0, 0, "ready"),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"instance_3",
"127.0.0.1:10003",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
mg_sleep_and_assert(expected_data_on_new_main_old_alive, retrieve_data_show_replicas)
mg_sleep_and_assert_collection(expected_data_on_new_main_old_alive, retrieve_data_show_replicas)
if __name__ == "__main__":

View File

@ -67,7 +67,13 @@ def test_replication_works_on_failover():
# 2
main_cursor = connect(host="localhost", port=7687).cursor()
expected_data_on_main = [
("shared_replica", "127.0.0.1:10001", "sync", 0, 0, "ready"),
(
"shared_replica",
"127.0.0.1:10001",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
actual_data_on_main = sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
assert actual_data_on_main == expected_data_on_main
@ -82,8 +88,20 @@ def test_replication_works_on_failover():
return sorted(list(execute_and_fetch_all(new_main_cursor, "SHOW REPLICAS;")))
expected_data_on_new_main = [
("replica", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("shared_replica", "127.0.0.1:10001", "sync", 0, 0, "ready"),
(
"replica",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"shared_replica",
"127.0.0.1:10001",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas)

View File

@ -16,7 +16,7 @@ import tempfile
import interactive_mg_runner
import pytest
from common import connect, execute_and_fetch_all, safe_execute
from mg_utils import mg_sleep_and_assert
from mg_utils import mg_sleep_and_assert, mg_sleep_and_assert_collection
interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
interactive_mg_runner.PROJECT_DIR = os.path.normpath(
@ -106,8 +106,20 @@ def test_replication_works_on_failover():
# 2
main_cursor = connect(host="localhost", port=7687).cursor()
expected_data_on_main = [
("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
(
"instance_1",
"127.0.0.1:10001",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
actual_data_on_main = sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
assert actual_data_on_main == expected_data_on_main
@ -135,17 +147,41 @@ def test_replication_works_on_failover():
return sorted(list(execute_and_fetch_all(new_main_cursor, "SHOW REPLICAS;")))
expected_data_on_new_main = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("instance_3", "127.0.0.1:10003", "sync", 0, 0, "invalid"),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"instance_3",
"127.0.0.1:10003",
"sync",
{"ts": 0, "behind": None, "status": "invalid"},
{"memgraph": {"ts": 0, "behind": 0, "status": "invalid"}},
),
]
mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas)
mg_sleep_and_assert_collection(expected_data_on_new_main, retrieve_data_show_replicas)
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
expected_data_on_new_main = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("instance_3", "127.0.0.1:10003", "sync", 0, 0, "ready"),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"instance_3",
"127.0.0.1:10003",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas)
mg_sleep_and_assert_collection(expected_data_on_new_main, retrieve_data_show_replicas)
# 5
execute_and_fetch_all(new_main_cursor, "CREATE ();")
@ -173,8 +209,20 @@ def test_replication_works_on_replica_instance_restart():
# 2
main_cursor = connect(host="localhost", port=7687).cursor()
expected_data_on_main = [
("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
(
"instance_1",
"127.0.0.1:10001",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
actual_data_on_main = sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
assert actual_data_on_main == expected_data_on_main
@ -193,16 +241,28 @@ def test_replication_works_on_replica_instance_restart():
("instance_2", "", "127.0.0.1:10012", False, "unknown"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster)
mg_sleep_and_assert_collection(expected_data_on_coord, retrieve_data_show_repl_cluster)
def retrieve_data_show_replicas():
return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
expected_data_on_main = [
("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "invalid"),
(
"instance_1",
"127.0.0.1:10001",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "invalid"}},
),
]
mg_sleep_and_assert(expected_data_on_main, retrieve_data_show_replicas)
mg_sleep_and_assert_collection(expected_data_on_main, retrieve_data_show_replicas)
# 4
instance_1_cursor = connect(host="localhost", port=7688).cursor()
@ -217,10 +277,22 @@ def test_replication_works_on_replica_instance_restart():
return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
expected_data_on_main = [
("instance_1", "127.0.0.1:10001", "sync", 2, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "invalid"),
(
"instance_1",
"127.0.0.1:10001",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 2, "behind": 0, "status": "ready"}},
),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "invalid"}},
),
]
mg_sleep_and_assert(expected_data_on_main, retrieve_data_show_replicas)
mg_sleep_and_assert_collection(expected_data_on_main, retrieve_data_show_replicas)
# 5.
@ -241,10 +313,22 @@ def test_replication_works_on_replica_instance_restart():
return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
expected_data_on_main = [
("instance_1", "127.0.0.1:10001", "sync", 2, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 2, 0, "ready"),
(
"instance_1",
"127.0.0.1:10001",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 2, "behind": 0, "status": "ready"}},
),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 2, "behind": 0, "status": "ready"}},
),
]
mg_sleep_and_assert(expected_data_on_main, retrieve_data_show_replicas)
mg_sleep_and_assert_collection(expected_data_on_main, retrieve_data_show_replicas)
# 6.
instance_2_cursor = connect(port=7689, host="localhost").cursor()
@ -313,11 +397,23 @@ def test_simple_automatic_failover():
main_cursor = connect(host="localhost", port=7687).cursor()
expected_data_on_main = [
("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
(
"instance_1",
"127.0.0.1:10001",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
actual_data_on_main = sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
assert actual_data_on_main == expected_data_on_main
assert actual_data_on_main == sorted(expected_data_on_main)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
@ -340,18 +436,42 @@ def test_simple_automatic_failover():
return sorted(list(execute_and_fetch_all(new_main_cursor, "SHOW REPLICAS;")))
expected_data_on_new_main = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("instance_3", "127.0.0.1:10003", "sync", 0, 0, "invalid"),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"instance_3",
"127.0.0.1:10003",
"sync",
{"ts": 0, "behind": None, "status": "invalid"},
{"memgraph": {"ts": 0, "behind": 0, "status": "invalid"}},
),
]
mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas)
mg_sleep_and_assert_collection(expected_data_on_new_main, retrieve_data_show_replicas)
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
expected_data_on_new_main_old_alive = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("instance_3", "127.0.0.1:10003", "sync", 0, 0, "ready"),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"instance_3",
"127.0.0.1:10003",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
mg_sleep_and_assert(expected_data_on_new_main_old_alive, retrieve_data_show_replicas)
mg_sleep_and_assert_collection(expected_data_on_new_main_old_alive, retrieve_data_show_replicas)
def test_registering_replica_fails_name_exists():

View File

@ -15,3 +15,21 @@ def mg_sleep_and_assert(expected_value, function_to_retrieve_data, max_duration=
result = function_to_retrieve_data()
return result
def mg_sleep_and_assert_collection(
expected_value, function_to_retrieve_data, max_duration=20, time_between_attempt=0.2
):
result = function_to_retrieve_data()
start_time = time.time()
while len(result) != len(expected_value) or any((x not in result for x in expected_value)):
duration = time.time() - start_time
if duration > max_duration:
assert (
False
), f" mg_sleep_and_assert has tried for too long and did not get the expected result! Last result was: {result}"
time.sleep(time_between_attempt)
result = function_to_retrieve_data()
return result

View File

@ -14,7 +14,7 @@ import time
import pytest
from common import execute_and_fetch_all
from mg_utils import mg_sleep_and_assert
from mg_utils import mg_sleep_and_assert_collection
# BUGFIX: for issue https://github.com/memgraph/memgraph/issues/1515
@ -28,28 +28,52 @@ def test_replication_handles_delete_when_multiple_edges_of_same_type(connection)
conn = connection(7687, "main")
conn.autocommit = True
cursor = conn.cursor()
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
actual_data = execute_and_fetch_all(cursor, "SHOW REPLICAS;")
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("replica_2", "127.0.0.1:10002", "async", 0, 0, "ready"),
}
assert actual_data == expected_data
expected_data = [
(
"replica_1",
"127.0.0.1:10001",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"replica_2",
"127.0.0.1:10002",
"async",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
assert all([x in actual_data for x in expected_data])
# 1/
execute_and_fetch_all(cursor, "CREATE (a)-[r:X]->(b) CREATE (a)-[:X]->(b) DELETE r;")
# 2/
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 2, 0, "ready"),
("replica_2", "127.0.0.1:10002", "async", 2, 0, "ready"),
}
expected_data = [
(
"replica_1",
"127.0.0.1:10001",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 2, "behind": 0, "status": "ready"}},
),
(
"replica_2",
"127.0.0.1:10002",
"async",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 2, "behind": 0, "status": "ready"}},
),
]
def retrieve_data():
return set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
return execute_and_fetch_all(cursor, "SHOW REPLICAS;")
actual_data = mg_sleep_and_assert(expected_data, retrieve_data)
assert actual_data == expected_data
actual_data = mg_sleep_and_assert_collection(expected_data, retrieve_data)
assert all([x in actual_data for x in expected_data])
if __name__ == "__main__":

View File

@ -10,12 +10,11 @@
# licenses/APL.txt.
import sys
import pytest
import time
import pytest
from common import execute_and_fetch_all
from mg_utils import mg_sleep_and_assert
from mg_utils import mg_sleep_and_assert_collection
@pytest.mark.parametrize(
@ -31,25 +30,42 @@ def test_show_replication_role(port, role, connection):
def test_show_replicas(connection):
cursor = connection(7687, "main").cursor()
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
actual_data = execute_and_fetch_all(cursor, "SHOW REPLICAS;")
expected_column_names = {
"name",
"socket_address",
"sync_mode",
"current_timestamp_of_replica",
"number_of_timestamp_behind_master",
"state",
"system_info",
"data_info",
}
actual_column_names = {x.name for x in cursor.description}
assert actual_column_names == expected_column_names
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", 0, 0, "ready"),
}
assert actual_data == expected_data
expected_data = [
(
"replica_1",
"127.0.0.1:10001",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"replica_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"replica_3",
"127.0.0.1:10003",
"async",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
assert all([x in actual_data for x in expected_data])
def test_show_replicas_while_inserting_data(connection):
@ -62,49 +78,108 @@ def test_show_replicas_while_inserting_data(connection):
# 0/
cursor = connection(7687, "main").cursor()
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
actual_data = execute_and_fetch_all(cursor, "SHOW REPLICAS;")
expected_column_names = {
"name",
"socket_address",
"sync_mode",
"current_timestamp_of_replica",
"number_of_timestamp_behind_master",
"state",
"system_info",
"data_info",
}
actual_column_names = {x.name for x in cursor.description}
assert actual_column_names == expected_column_names
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", 0, 0, "ready"),
}
assert actual_data == expected_data
expected_data = [
(
"replica_1",
"127.0.0.1:10001",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"replica_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"replica_3",
"127.0.0.1:10003",
"async",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
assert all([x in actual_data for x in expected_data])
# 1/
execute_and_fetch_all(cursor, "CREATE (n1:Number {name: 'forty_two', value:42});")
# 2/
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 4, 0, "ready"),
("replica_2", "127.0.0.1:10002", "sync", 4, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", 4, 0, "ready"),
}
expected_data = [
(
"replica_1",
"127.0.0.1:10001",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 4, "behind": 0, "status": "ready"}},
),
(
"replica_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 4, "behind": 0, "status": "ready"}},
),
(
"replica_3",
"127.0.0.1:10003",
"async",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 4, "behind": 0, "status": "ready"}},
),
]
def retrieve_data():
return set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
return execute_and_fetch_all(cursor, "SHOW REPLICAS;")
actual_data = mg_sleep_and_assert(expected_data, retrieve_data)
assert actual_data == expected_data
actual_data = mg_sleep_and_assert_collection(expected_data, retrieve_data)
assert all([x in actual_data for x in expected_data])
# 3/
res = execute_and_fetch_all(cursor, "MATCH (node) return node;")
assert len(res) == 1
# 4/
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
assert actual_data == expected_data
expected_data = [
(
"replica_1",
"127.0.0.1:10001",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 4, "behind": 0, "status": "ready"}},
),
(
"replica_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 4, "behind": 0, "status": "ready"}},
),
(
"replica_3",
"127.0.0.1:10003",
"async",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 4, "behind": 0, "status": "ready"}},
),
]
actual_data = execute_and_fetch_all(cursor, "SHOW REPLICAS;")
assert all([x in actual_data for x in expected_data])
if __name__ == "__main__":

File diff suppressed because it is too large Load Diff

View File

@ -22,7 +22,7 @@ import interactive_mg_runner
import mgclient
import pytest
from common import execute_and_fetch_all
from mg_utils import mg_sleep_and_assert
from mg_utils import mg_sleep_and_assert, mg_sleep_and_assert_collection
interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
interactive_mg_runner.PROJECT_DIR = os.path.normpath(
@ -35,6 +35,10 @@ BOLT_PORTS = {"main": 7687, "replica_1": 7688, "replica_2": 7689}
REPLICATION_PORTS = {"replica_1": 10001, "replica_2": 10002}
def set_eq(actual, expected):
return len(actual) == len(expected) and all([x in actual for x in expected])
def create_memgraph_instances_with_role_recovery(data_directory: Any) -> Dict[str, Any]:
return {
"replica_1": {
@ -174,10 +178,9 @@ def setup_main(main_cursor):
execute_and_fetch_all(main_cursor, "CREATE (:Node{on:'B'});")
def show_replicas_func(cursor, db_name):
def show_replicas_func(cursor):
def func():
execute_and_fetch_all(cursor, f"USE DATABASE {db_name};")
return set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
return execute_and_fetch_all(cursor, "SHOW REPLICAS;")
return func
@ -271,17 +274,31 @@ def test_manual_databases_create_multitenancy_replication(connection):
execute_and_fetch_all(cursor, "CREATE ()-[:EDGE]->();")
# 2/
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 1, 0, "ready"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 1, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(cursor, "A"))
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 1, 0, "ready"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 1, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(cursor, "B"))
expected_data = [
(
"replica_1",
f"127.0.0.1:{REPLICATION_PORTS['replica_1']}",
"sync",
{"ts": 2, "behind": None, "status": "ready"},
{
"A": {"ts": 1, "behind": 0, "status": "ready"},
"B": {"ts": 1, "behind": 0, "status": "ready"},
"memgraph": {"ts": 0, "behind": 0, "status": "ready"},
},
),
(
"replica_2",
f"127.0.0.1:{REPLICATION_PORTS['replica_2']}",
"async",
{"ts": 2, "behind": None, "status": "ready"},
{
"A": {"ts": 1, "behind": 0, "status": "ready"},
"B": {"ts": 1, "behind": 0, "status": "ready"},
"memgraph": {"ts": 0, "behind": 0, "status": "ready"},
},
),
]
mg_sleep_and_assert_collection(expected_data, show_replicas_func(cursor))
cursor_replica = connection(BOLT_PORTS["replica_1"], "replica").cursor()
assert get_number_of_nodes_func(cursor_replica, "A")() == 1
@ -523,11 +540,23 @@ def test_manual_databases_create_multitenancy_replication_main_behind(connection
execute_and_fetch_all(main_cursor, "CREATE DATABASE A;")
# 2/
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 0, 0, "ready"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 0, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "A"))
expected_data = [
(
"replica_1",
f"127.0.0.1:{REPLICATION_PORTS['replica_1']}",
"sync",
{"ts": 3, "behind": None, "status": "ready"},
{"A": {"ts": 0, "behind": 0, "status": "ready"}, "memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"replica_2",
f"127.0.0.1:{REPLICATION_PORTS['replica_2']}",
"async",
{"ts": 3, "behind": None, "status": "ready"},
{"A": {"ts": 0, "behind": 0, "status": "ready"}, "memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
mg_sleep_and_assert_collection(expected_data, show_replicas_func(main_cursor))
databases_on_main = show_databases_func(main_cursor)()
@ -567,17 +596,31 @@ def test_automatic_databases_create_multitenancy_replication(connection):
execute_and_fetch_all(main_cursor, "CREATE (:Node)-[:EDGE]->(:Node)")
# 3/
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 7, 0, "ready"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 7, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "A"))
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 0, 0, "ready"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 0, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "B"))
expected_data = [
(
"replica_1",
f"127.0.0.1:{REPLICATION_PORTS['replica_1']}",
"sync",
{"ts": 4, "behind": None, "status": "ready"},
{
"A": {"ts": 7, "behind": 0, "status": "ready"},
"B": {"ts": 0, "behind": 0, "status": "ready"},
"memgraph": {"ts": 0, "behind": 0, "status": "ready"},
},
),
(
"replica_2",
f"127.0.0.1:{REPLICATION_PORTS['replica_2']}",
"async",
{"ts": 4, "behind": None, "status": "ready"},
{
"A": {"ts": 7, "behind": 0, "status": "ready"},
"B": {"ts": 0, "behind": 0, "status": "ready"},
"memgraph": {"ts": 0, "behind": 0, "status": "ready"},
},
),
]
mg_sleep_and_assert_collection(expected_data, show_replicas_func(main_cursor))
cursor_replica = connection(BOLT_PORTS["replica_1"], "replica").cursor()
assert get_number_of_nodes_func(cursor_replica, "A")() == 7
@ -640,19 +683,26 @@ def test_automatic_databases_multitenancy_replication_predefined(connection):
execute_and_fetch_all(cursor, "CREATE ()-[:EDGE]->();")
# 2/
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 1, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(cursor, "A"))
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 1, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(cursor, "B"))
expected_data = [
(
"replica_1",
f"127.0.0.1:{REPLICATION_PORTS['replica_1']}",
"sync",
{"ts": 2, "behind": None, "status": "ready"},
{
"A": {"ts": 1, "behind": 0, "status": "ready"},
"B": {"ts": 1, "behind": 0, "status": "ready"},
"memgraph": {"ts": 0, "behind": 0, "status": "ready"},
},
),
]
mg_sleep_and_assert_collection(expected_data, show_replicas_func(cursor))
cursor_replica = connection(BOLT_PORTS["replica_1"], "replica").cursor()
assert get_number_of_nodes_func(cursor_replica, "A")() == 1
assert get_number_of_edges_func(cursor_replica, "A")() == 0
assert get_number_of_nodes_func(cursor_replica, "B")() == 2
assert get_number_of_edges_func(cursor_replica, "B")() == 1
def test_automatic_databases_create_multitenancy_replication_dirty_main(connection):
@ -698,10 +748,16 @@ def test_automatic_databases_create_multitenancy_replication_dirty_main(connecti
cursor = connection(BOLT_PORTS["main"], "main").cursor()
# 1/
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 1, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(cursor, "A"))
expected_data = [
(
"replica_1",
f"127.0.0.1:{REPLICATION_PORTS['replica_1']}",
"sync",
{"ts": 1, "behind": None, "status": "ready"},
{"A": {"ts": 1, "behind": 0, "status": "ready"}, "memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
mg_sleep_and_assert_collection(expected_data, show_replicas_func(cursor))
cursor_replica = connection(BOLT_PORTS["replica_1"], "replica").cursor()
execute_and_fetch_all(cursor_replica, "USE DATABASE A;")
@ -740,31 +796,85 @@ def test_multitenancy_replication_restart_replica_w_fc(connection, replica_name)
time.sleep(3) # In order for the frequent check to run
# Check that the FC did invalidate
expected_data = {
"replica_1": {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 0, 0, "invalid"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 7, 0, "ready"),
},
"replica_2": {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 7, 0, "ready"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 0, 0, "invalid"),
},
"replica_1": [
(
"replica_1",
f"127.0.0.1:{REPLICATION_PORTS['replica_1']}",
"sync",
{"ts": 4, "behind": None, "status": "ready"},
{
"A": {"ts": 0, "behind": 0, "status": "invalid"},
"B": {"ts": 0, "behind": 0, "status": "invalid"},
"memgraph": {"ts": 0, "behind": 0, "status": "invalid"},
},
),
(
"replica_2",
f"127.0.0.1:{REPLICATION_PORTS['replica_2']}",
"async",
{"ts": 4, "behind": None, "status": "ready"},
{
"A": {"ts": 7, "behind": 0, "status": "ready"},
"B": {"ts": 3, "behind": 0, "status": "ready"},
"memgraph": {"ts": 0, "behind": 0, "status": "ready"},
},
),
],
"replica_2": [
(
"replica_1",
f"127.0.0.1:{REPLICATION_PORTS['replica_1']}",
"sync",
{"ts": 4, "behind": None, "status": "ready"},
{
"A": {"ts": 7, "behind": 0, "status": "ready"},
"B": {"ts": 3, "behind": 0, "status": "ready"},
"memgraph": {"ts": 0, "behind": 0, "status": "ready"},
},
),
(
"replica_2",
f"127.0.0.1:{REPLICATION_PORTS['replica_2']}",
"async",
{"ts": 4, "behind": None, "status": "ready"},
{
"A": {"ts": 0, "behind": 0, "status": "invalid"},
"B": {"ts": 0, "behind": 0, "status": "invalid"},
"memgraph": {"ts": 0, "behind": 0, "status": "invalid"},
},
),
],
}
assert expected_data[replica_name] == show_replicas_func(main_cursor, "A")()
assert set_eq(expected_data[replica_name], show_replicas_func(main_cursor)())
# Restart
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, replica_name)
# 4/
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 7, 0, "ready"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 7, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "A"))
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 3, 0, "ready"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 3, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "B"))
expected_data = [
(
"replica_1",
f"127.0.0.1:{REPLICATION_PORTS['replica_1']}",
"sync",
{"ts": 4, "behind": None, "status": "ready"},
{
"A": {"ts": 7, "behind": 0, "status": "ready"},
"B": {"ts": 3, "behind": 0, "status": "ready"},
"memgraph": {"ts": 0, "behind": 0, "status": "ready"},
},
),
(
"replica_2",
f"127.0.0.1:{REPLICATION_PORTS['replica_2']}",
"async",
{"ts": 4, "behind": None, "status": "ready"},
{
"A": {"ts": 7, "behind": 0, "status": "ready"},
"B": {"ts": 3, "behind": 0, "status": "ready"},
"memgraph": {"ts": 0, "behind": 0, "status": "ready"},
},
),
]
mg_sleep_and_assert_collection(expected_data, show_replicas_func(main_cursor))
cursor_replica = connection(BOLT_PORTS[replica_name], "replica").cursor()
@ -805,19 +915,33 @@ def test_multitenancy_replication_restart_replica_wo_fc(connection, replica_name
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, replica_name)
# 4/
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 7, 0, "ready"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 7, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "A"))
expected_data = [
(
"replica_1",
f"127.0.0.1:{REPLICATION_PORTS['replica_1']}",
"sync",
{"ts": 4, "behind": None, "status": "ready"},
{
"A": {"ts": 7, "behind": 0, "status": "ready"},
"B": {"ts": 3, "behind": 0, "status": "ready"},
"memgraph": {"ts": 0, "behind": 0, "status": "ready"},
},
),
(
"replica_2",
f"127.0.0.1:{REPLICATION_PORTS['replica_2']}",
"async",
{"ts": 4, "behind": None, "status": "ready"},
{
"A": {"ts": 7, "behind": 0, "status": "ready"},
"B": {"ts": 3, "behind": 0, "status": "ready"},
"memgraph": {"ts": 0, "behind": 0, "status": "ready"},
},
),
]
mg_sleep_and_assert_collection(expected_data, show_replicas_func(main_cursor))
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 3, 0, "ready"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 3, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "B"))
cursor_replica = connection(BOLT_PORTS[replica_name], "replica").cursor()
cursor_replica = connection(BOLT_PORTS[replica_name], replica_name).cursor()
assert get_number_of_nodes_func(cursor_replica, "A")() == 7
assert get_number_of_edges_func(cursor_replica, "A")() == 3
assert get_number_of_nodes_func(cursor_replica, "B")() == 2
@ -899,17 +1023,28 @@ def test_multitenancy_replication_drop_replica(connection, replica_name):
)
# 4/
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 7, 0, "ready"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 7, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "A"))
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 3, 0, "ready"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 3, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "B"))
expected_data = [
(
"replica_1",
f"127.0.0.1:{REPLICATION_PORTS['replica_1']}",
"sync",
{
"A": {"ts": 7, "behind": 0, "status": "ready"},
"B": {"ts": 3, "behind": 0, "status": "ready"},
"memgraph": {"ts": 0, "behind": 0, "status": "ready"},
},
),
(
"replica_2",
f"127.0.0.1:{REPLICATION_PORTS['replica_2']}",
"async",
{
"A": {"ts": 7, "behind": 0, "status": "ready"},
"B": {"ts": 3, "behind": 0, "status": "ready"},
"memgraph": {"ts": 0, "behind": 0, "status": "ready"},
},
),
]
cursor_replica = connection(BOLT_PORTS[replica_name], "replica").cursor()
assert get_number_of_nodes_func(cursor_replica, "A")() == 7
@ -993,17 +1128,31 @@ def test_automatic_databases_drop_multitenancy_replication(connection):
execute_and_fetch_all(main_cursor, "CREATE (:Node{on:'A'});")
# 3/
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 1, 0, "ready"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 1, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "A"))
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 0, 0, "ready"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 0, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "B"))
expected_data = [
(
"replica_1",
f"127.0.0.1:{REPLICATION_PORTS['replica_1']}",
"sync",
{"ts": 4, "behind": None, "status": "ready"},
{
"A": {"ts": 1, "behind": 0, "status": "ready"},
"B": {"ts": 0, "behind": 0, "status": "ready"},
"memgraph": {"ts": 0, "behind": 0, "status": "ready"},
},
),
(
"replica_2",
f"127.0.0.1:{REPLICATION_PORTS['replica_2']}",
"async",
{"ts": 4, "behind": None, "status": "ready"},
{
"A": {"ts": 1, "behind": 0, "status": "ready"},
"B": {"ts": 0, "behind": 0, "status": "ready"},
"memgraph": {"ts": 0, "behind": 0, "status": "ready"},
},
),
]
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor))
# 4/
execute_and_fetch_all(main_cursor, "USE DATABASE memgraph;")
@ -1088,11 +1237,23 @@ def test_multitenancy_drop_while_replica_using(connection):
execute_and_fetch_all(main_cursor, "CREATE (:Node{on:'A'});")
# 3/
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 1, 0, "ready"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 1, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "A"))
expected_data = [
(
"replica_1",
f"127.0.0.1:{REPLICATION_PORTS['replica_1']}",
"sync",
{"ts": 3, "behind": None, "status": "ready"},
{"A": {"ts": 1, "behind": 0, "status": "ready"}, "memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"replica_2",
f"127.0.0.1:{REPLICATION_PORTS['replica_2']}",
"async",
{"ts": 3, "behind": None, "status": "ready"},
{"A": {"ts": 1, "behind": 0, "status": "ready"}, "memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
mg_sleep_and_assert_collection(expected_data, show_replicas_func(main_cursor))
# 4/
replica1_cursor = connection(BOLT_PORTS["replica_1"], "replica").cursor()
@ -1110,11 +1271,23 @@ def test_multitenancy_drop_while_replica_using(connection):
execute_and_fetch_all(main_cursor, "CREATE DATABASE B;")
execute_and_fetch_all(main_cursor, "USE DATABASE B;")
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 0, 0, "ready"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 0, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "B"))
expected_data = [
(
"replica_1",
f"127.0.0.1:{REPLICATION_PORTS['replica_1']}",
"sync",
{"ts": 8, "behind": None, "status": "ready"},
{"B": {"ts": 0, "behind": 0, "status": "ready"}, "memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"replica_2",
f"127.0.0.1:{REPLICATION_PORTS['replica_2']}",
"async",
{"ts": 8, "behind": None, "status": "ready"},
{"B": {"ts": 0, "behind": 0, "status": "ready"}, "memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor))
# 6/
assert execute_and_fetch_all(replica1_cursor, "MATCH(n) RETURN count(*);")[0][0] == 1
@ -1163,11 +1336,23 @@ def test_multitenancy_drop_and_recreate_while_replica_using(connection):
execute_and_fetch_all(main_cursor, "CREATE (:Node{on:'A'});")
# 3/
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 1, 0, "ready"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 1, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "A"))
expected_data = [
(
"replica_1",
f"127.0.0.1:{REPLICATION_PORTS['replica_1']}",
"sync",
{"ts": 3, "behind": None, "status": "ready"},
{"A": {"ts": 1, "behind": 0, "status": "ready"}, "memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"replica_2",
f"127.0.0.1:{REPLICATION_PORTS['replica_2']}",
"async",
{"ts": 3, "behind": None, "status": "ready"},
{"A": {"ts": 1, "behind": 0, "status": "ready"}, "memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
mg_sleep_and_assert_collection(expected_data, show_replicas_func(main_cursor))
# 4/
replica1_cursor = connection(BOLT_PORTS["replica_1"], "replica").cursor()
@ -1184,11 +1369,23 @@ def test_multitenancy_drop_and_recreate_while_replica_using(connection):
execute_and_fetch_all(main_cursor, "CREATE DATABASE A;")
execute_and_fetch_all(main_cursor, "USE DATABASE A;")
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 0, 0, "ready"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 0, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "A"))
expected_data = [
(
"replica_1",
f"127.0.0.1:{REPLICATION_PORTS['replica_1']}",
"sync",
{"ts": 8, "behind": None, "status": "ready"},
{"A": {"ts": 0, "behind": 0, "status": "ready"}, "memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"replica_2",
f"127.0.0.1:{REPLICATION_PORTS['replica_2']}",
"async",
{"ts": 8, "behind": None, "status": "ready"},
{"A": {"ts": 0, "behind": 0, "status": "ready"}, "memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor))
# 6/
assert execute_and_fetch_all(replica1_cursor, "MATCH(n) RETURN count(*);")[0][0] == 1