split between try register and register replica
This commit is contained in:
parent
f07fd21a9e
commit
ca20b78479
@ -327,7 +327,7 @@ class ReplQueryHandler {
|
||||
.port = static_cast<uint16_t>(*port),
|
||||
};
|
||||
|
||||
if (!handler_->SetReplicationRoleReplica(config, std::nullopt)) {
|
||||
if (!handler_->TrySetReplicationRoleReplica(config, std::nullopt)) {
|
||||
throw QueryRuntimeException("Couldn't set role to replica!");
|
||||
}
|
||||
}
|
||||
|
@ -49,6 +49,9 @@ struct ReplicationQueryHandler {
|
||||
virtual bool SetReplicationRoleReplica(const memgraph::replication::ReplicationServerConfig &config,
|
||||
const std::optional<utils::UUID> &main_uuid) = 0;
|
||||
|
||||
virtual bool TrySetReplicationRoleReplica(const memgraph::replication::ReplicationServerConfig &config,
|
||||
const std::optional<utils::UUID> &main_uuid) = 0;
|
||||
|
||||
// as MAIN, define and connect to REPLICAs
|
||||
virtual auto TryRegisterReplica(const memgraph::replication::ReplicationClientConfig &config, bool send_swap_uuid)
|
||||
-> utils::BasicResult<RegisterReplicaError> = 0;
|
||||
|
@ -13,6 +13,7 @@
|
||||
#include "auth/auth.hpp"
|
||||
#include "dbms/dbms_handler.hpp"
|
||||
#include "replication/include/replication/state.hpp"
|
||||
#include "replication_handler/system_replication.hpp"
|
||||
#include "replication_handler/system_rpc.hpp"
|
||||
#include "utils/result.hpp"
|
||||
|
||||
@ -108,10 +109,65 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler {
|
||||
// as REPLICA, become MAIN
|
||||
bool SetReplicationRoleMain() override;
|
||||
|
||||
// as MAIN, become REPLICA
|
||||
// as MAIN, become REPLICA, can be called on MAIN and REPLICA
|
||||
bool SetReplicationRoleReplica(const memgraph::replication::ReplicationServerConfig &config,
|
||||
const std::optional<utils::UUID> &main_uuid) override;
|
||||
|
||||
// as MAIN, become REPLICA, can be called only on MAIN
|
||||
bool TrySetReplicationRoleReplica(const memgraph::replication::ReplicationServerConfig &config,
|
||||
const std::optional<utils::UUID> &main_uuid) override;
|
||||
|
||||
// as MAIN, become REPLICA
|
||||
template <bool HandleIdempotency>
|
||||
bool SetReplicationRoleReplica_(const memgraph::replication::ReplicationServerConfig &config,
|
||||
const std::optional<utils::UUID> &main_uuid) {
|
||||
if (!HandleIdempotency && repl_state_.IsReplica()) {
|
||||
return false;
|
||||
}
|
||||
if (HandleIdempotency && repl_state_.IsReplica()) {
|
||||
// We don't want to restart the server if we're already a REPLICA with correct config
|
||||
auto &replica_data = std::get<memgraph::replication::RoleReplicaData>(repl_state_.ReplicationData());
|
||||
if (replica_data.config == config) {
|
||||
return true;
|
||||
}
|
||||
repl_state_.SetReplicationRoleReplica(config, main_uuid);
|
||||
#ifdef MG_ENTERPRISE
|
||||
return StartRpcServer(dbms_handler_, replica_data, auth_);
|
||||
#else
|
||||
return StartRpcServer(dbms_handler_, replica_data);
|
||||
#endif
|
||||
}
|
||||
|
||||
// TODO StorageState needs to be synched. Could have a dangling reference if someone adds a database as we are
|
||||
// deleting the replica.
|
||||
// Remove database specific clients
|
||||
dbms_handler_.ForEach([&](memgraph::dbms::DatabaseAccess db_acc) {
|
||||
auto *storage = db_acc->storage();
|
||||
storage->repl_storage_state_.replication_clients_.WithLock([](auto &clients) { clients.clear(); });
|
||||
});
|
||||
// Remove instance level clients
|
||||
std::get<memgraph::replication::RoleMainData>(repl_state_.ReplicationData()).registered_replicas_.clear();
|
||||
|
||||
// Creates the server
|
||||
repl_state_.SetReplicationRoleReplica(config, main_uuid);
|
||||
|
||||
// Start
|
||||
const auto success = std::visit(memgraph::utils::Overloaded{[](memgraph::replication::RoleMainData &) {
|
||||
// ASSERT
|
||||
return false;
|
||||
},
|
||||
[this](memgraph::replication::RoleReplicaData &data) {
|
||||
#ifdef MG_ENTERPRISE
|
||||
return StartRpcServer(dbms_handler_, data, auth_);
|
||||
#else
|
||||
return StartRpcServer(dbms_handler_, data);
|
||||
#endif
|
||||
}},
|
||||
repl_state_.ReplicationData());
|
||||
// TODO Handle error (restore to main?)
|
||||
return success;
|
||||
}
|
||||
|
||||
// as MAIN, define and connect to REPLICAs
|
||||
auto TryRegisterReplica(const memgraph::replication::ReplicationClientConfig &config, bool send_swap_uuid)
|
||||
-> memgraph::utils::BasicResult<memgraph::query::RegisterReplicaError> override;
|
||||
|
@ -196,44 +196,12 @@ bool ReplicationHandler::SetReplicationRoleMain() {
|
||||
|
||||
bool ReplicationHandler::SetReplicationRoleReplica(const memgraph::replication::ReplicationServerConfig &config,
|
||||
const std::optional<utils::UUID> &main_uuid) {
|
||||
if (repl_state_.IsReplica()) {
|
||||
// We don't want to restart the server if we're already a REPLICA with correct config
|
||||
auto &replica_data = std::get<memgraph::replication::RoleReplicaData>(repl_state_.ReplicationData());
|
||||
if (replica_data.config == config) {
|
||||
return true;
|
||||
}
|
||||
repl_state_.SetReplicationRoleReplica(config, main_uuid);
|
||||
return StartRpcServer(dbms_handler_, replica_data, auth_);
|
||||
}
|
||||
return SetReplicationRoleReplica_<true>(config, main_uuid);
|
||||
}
|
||||
|
||||
// TODO StorageState needs to be synched. Could have a dangling reference if someone adds a database as we are
|
||||
// deleting the replica.
|
||||
// Remove database specific clients
|
||||
dbms_handler_.ForEach([&](memgraph::dbms::DatabaseAccess db_acc) {
|
||||
auto *storage = db_acc->storage();
|
||||
storage->repl_storage_state_.replication_clients_.WithLock([](auto &clients) { clients.clear(); });
|
||||
});
|
||||
// Remove instance level clients
|
||||
std::get<memgraph::replication::RoleMainData>(repl_state_.ReplicationData()).registered_replicas_.clear();
|
||||
|
||||
// Creates the server
|
||||
repl_state_.SetReplicationRoleReplica(config, main_uuid);
|
||||
|
||||
// Start
|
||||
const auto success = std::visit(memgraph::utils::Overloaded{[](memgraph::replication::RoleMainData &) {
|
||||
// ASSERT
|
||||
return false;
|
||||
},
|
||||
[this](memgraph::replication::RoleReplicaData &data) {
|
||||
#ifdef MG_ENTERPRISE
|
||||
return StartRpcServer(dbms_handler_, data, auth_);
|
||||
#else
|
||||
return StartRpcServer(dbms_handler_, data);
|
||||
#endif
|
||||
}},
|
||||
repl_state_.ReplicationData());
|
||||
// TODO Handle error (restore to main?)
|
||||
return success;
|
||||
bool ReplicationHandler::TrySetReplicationRoleReplica(const memgraph::replication::ReplicationServerConfig &config,
|
||||
const std::optional<utils::UUID> &main_uuid) {
|
||||
return SetReplicationRoleReplica_<false>(config, main_uuid);
|
||||
}
|
||||
|
||||
bool ReplicationHandler::DoReplicaToMainPromotion(const utils::UUID &main_uuid) {
|
||||
|
@ -142,7 +142,7 @@ TEST_F(ReplicationTest, BasicSynchronousReplicationTest) {
|
||||
MinMemgraph replica(repl_conf);
|
||||
|
||||
auto replica_store_handler = replica.repl_handler;
|
||||
replica_store_handler.SetReplicationRoleReplica(
|
||||
replica_store_handler.TrySetReplicationRoleReplica(
|
||||
ReplicationServerConfig{
|
||||
.ip_address = local_host,
|
||||
.port = ports[0],
|
||||
@ -439,13 +439,13 @@ TEST_F(ReplicationTest, MultipleSynchronousReplicationTest) {
|
||||
MinMemgraph replica1(repl_conf);
|
||||
MinMemgraph replica2(repl2_conf);
|
||||
|
||||
replica1.repl_handler.SetReplicationRoleReplica(
|
||||
replica1.repl_handler.TrySetReplicationRoleReplica(
|
||||
ReplicationServerConfig{
|
||||
.ip_address = local_host,
|
||||
.port = ports[0],
|
||||
},
|
||||
std::nullopt);
|
||||
replica2.repl_handler.SetReplicationRoleReplica(
|
||||
replica2.repl_handler.TrySetReplicationRoleReplica(
|
||||
ReplicationServerConfig{
|
||||
.ip_address = local_host,
|
||||
.port = ports[1],
|
||||
@ -597,7 +597,7 @@ TEST_F(ReplicationTest, RecoveryProcess) {
|
||||
MinMemgraph replica(repl_conf);
|
||||
auto replica_store_handler = replica.repl_handler;
|
||||
|
||||
replica_store_handler.SetReplicationRoleReplica(
|
||||
replica_store_handler.TrySetReplicationRoleReplica(
|
||||
ReplicationServerConfig{
|
||||
.ip_address = local_host,
|
||||
.port = ports[0],
|
||||
@ -676,7 +676,7 @@ TEST_F(ReplicationTest, BasicAsynchronousReplicationTest) {
|
||||
MinMemgraph replica_async(repl_conf);
|
||||
|
||||
auto replica_store_handler = replica_async.repl_handler;
|
||||
replica_store_handler.SetReplicationRoleReplica(
|
||||
replica_store_handler.TrySetReplicationRoleReplica(
|
||||
ReplicationServerConfig{
|
||||
.ip_address = local_host,
|
||||
.port = ports[1],
|
||||
@ -726,7 +726,7 @@ TEST_F(ReplicationTest, EpochTest) {
|
||||
MinMemgraph main(main_conf);
|
||||
MinMemgraph replica1(repl_conf);
|
||||
|
||||
replica1.repl_handler.SetReplicationRoleReplica(
|
||||
replica1.repl_handler.TrySetReplicationRoleReplica(
|
||||
ReplicationServerConfig{
|
||||
.ip_address = local_host,
|
||||
.port = ports[0],
|
||||
@ -734,7 +734,7 @@ TEST_F(ReplicationTest, EpochTest) {
|
||||
std::nullopt);
|
||||
|
||||
MinMemgraph replica2(repl2_conf);
|
||||
replica2.repl_handler.SetReplicationRoleReplica(
|
||||
replica2.repl_handler.TrySetReplicationRoleReplica(
|
||||
ReplicationServerConfig{
|
||||
.ip_address = local_host,
|
||||
.port = 10001,
|
||||
@ -819,7 +819,7 @@ TEST_F(ReplicationTest, EpochTest) {
|
||||
ASSERT_FALSE(acc->Commit().HasError());
|
||||
}
|
||||
|
||||
replica1.repl_handler.SetReplicationRoleReplica(
|
||||
replica1.repl_handler.TrySetReplicationRoleReplica(
|
||||
ReplicationServerConfig{
|
||||
.ip_address = local_host,
|
||||
.port = ports[0],
|
||||
@ -858,7 +858,7 @@ TEST_F(ReplicationTest, ReplicationInformation) {
|
||||
MinMemgraph replica1(repl_conf);
|
||||
|
||||
uint16_t replica1_port = 10001;
|
||||
replica1.repl_handler.SetReplicationRoleReplica(
|
||||
replica1.repl_handler.TrySetReplicationRoleReplica(
|
||||
ReplicationServerConfig{
|
||||
.ip_address = local_host,
|
||||
.port = replica1_port,
|
||||
@ -867,7 +867,7 @@ TEST_F(ReplicationTest, ReplicationInformation) {
|
||||
|
||||
uint16_t replica2_port = 10002;
|
||||
MinMemgraph replica2(repl2_conf);
|
||||
replica2.repl_handler.SetReplicationRoleReplica(
|
||||
replica2.repl_handler.TrySetReplicationRoleReplica(
|
||||
ReplicationServerConfig{
|
||||
.ip_address = local_host,
|
||||
.port = replica2_port,
|
||||
@ -923,7 +923,7 @@ TEST_F(ReplicationTest, ReplicationReplicaWithExistingName) {
|
||||
MinMemgraph replica1(repl_conf);
|
||||
|
||||
uint16_t replica1_port = 10001;
|
||||
replica1.repl_handler.SetReplicationRoleReplica(
|
||||
replica1.repl_handler.TrySetReplicationRoleReplica(
|
||||
ReplicationServerConfig{
|
||||
.ip_address = local_host,
|
||||
.port = replica1_port,
|
||||
@ -932,7 +932,7 @@ TEST_F(ReplicationTest, ReplicationReplicaWithExistingName) {
|
||||
|
||||
uint16_t replica2_port = 10002;
|
||||
MinMemgraph replica2(repl2_conf);
|
||||
replica2.repl_handler.SetReplicationRoleReplica(
|
||||
replica2.repl_handler.TrySetReplicationRoleReplica(
|
||||
ReplicationServerConfig{
|
||||
.ip_address = local_host,
|
||||
.port = replica2_port,
|
||||
@ -966,7 +966,7 @@ TEST_F(ReplicationTest, ReplicationReplicaWithExistingEndPoint) {
|
||||
|
||||
MinMemgraph main(main_conf);
|
||||
MinMemgraph replica1(repl_conf);
|
||||
replica1.repl_handler.SetReplicationRoleReplica(
|
||||
replica1.repl_handler.TrySetReplicationRoleReplica(
|
||||
ReplicationServerConfig{
|
||||
.ip_address = local_host,
|
||||
.port = common_port,
|
||||
@ -974,7 +974,7 @@ TEST_F(ReplicationTest, ReplicationReplicaWithExistingEndPoint) {
|
||||
std::nullopt);
|
||||
|
||||
MinMemgraph replica2(repl2_conf);
|
||||
replica2.repl_handler.SetReplicationRoleReplica(
|
||||
replica2.repl_handler.TrySetReplicationRoleReplica(
|
||||
ReplicationServerConfig{
|
||||
.ip_address = local_host,
|
||||
.port = common_port,
|
||||
@ -1023,7 +1023,7 @@ TEST_F(ReplicationTest, RestoringReplicationAtStartupAfterDroppingReplica) {
|
||||
std::optional<MinMemgraph> main(main_config);
|
||||
MinMemgraph replica1(replica1_config);
|
||||
|
||||
replica1.repl_handler.SetReplicationRoleReplica(
|
||||
replica1.repl_handler.TrySetReplicationRoleReplica(
|
||||
ReplicationServerConfig{
|
||||
.ip_address = local_host,
|
||||
.port = ports[0],
|
||||
@ -1031,7 +1031,7 @@ TEST_F(ReplicationTest, RestoringReplicationAtStartupAfterDroppingReplica) {
|
||||
std::nullopt);
|
||||
|
||||
MinMemgraph replica2(replica2_config);
|
||||
replica2.repl_handler.SetReplicationRoleReplica(
|
||||
replica2.repl_handler.TrySetReplicationRoleReplica(
|
||||
ReplicationServerConfig{
|
||||
.ip_address = local_host,
|
||||
.port = ports[1],
|
||||
@ -1088,7 +1088,7 @@ TEST_F(ReplicationTest, RestoringReplicationAtStartup) {
|
||||
std::optional<MinMemgraph> main(main_config);
|
||||
MinMemgraph replica1(repl_conf);
|
||||
|
||||
replica1.repl_handler.SetReplicationRoleReplica(
|
||||
replica1.repl_handler.TrySetReplicationRoleReplica(
|
||||
ReplicationServerConfig{
|
||||
.ip_address = local_host,
|
||||
.port = ports[0],
|
||||
@ -1097,7 +1097,7 @@ TEST_F(ReplicationTest, RestoringReplicationAtStartup) {
|
||||
|
||||
MinMemgraph replica2(repl2_conf);
|
||||
|
||||
replica2.repl_handler.SetReplicationRoleReplica(
|
||||
replica2.repl_handler.TrySetReplicationRoleReplica(
|
||||
ReplicationServerConfig{
|
||||
.ip_address = local_host,
|
||||
.port = ports[1],
|
||||
|
Loading…
Reference in New Issue
Block a user