Fix rebase
This commit is contained in:
parent
b826a0a518
commit
4159c9f6a5
@ -29,14 +29,6 @@ void from_json(nlohmann::json const &j, ReplicationInstanceState &instance_state
|
|||||||
j.at("uuid").get_to(instance_state.instance_uuid);
|
j.at("uuid").get_to(instance_state.instance_uuid);
|
||||||
}
|
}
|
||||||
|
|
||||||
CoordinatorClusterState::CoordinatorClusterState(std::map<std::string, ReplicationInstanceState, std::less<>> instances,
|
|
||||||
std::vector<CoordinatorInstanceState> coordinators,
|
|
||||||
utils::UUID const ¤t_main_uuid, bool is_lock_opened)
|
|
||||||
: repl_instances_{std::move(instances)},
|
|
||||||
coordinators_{std::move(coordinators)},
|
|
||||||
current_main_uuid_(current_main_uuid),
|
|
||||||
is_lock_opened_(is_lock_opened) {}
|
|
||||||
|
|
||||||
void to_json(nlohmann::json &j, CoordinatorInstanceState const &instance_state) {
|
void to_json(nlohmann::json &j, CoordinatorInstanceState const &instance_state) {
|
||||||
j = nlohmann::json{{"config", instance_state.config}};
|
j = nlohmann::json{{"config", instance_state.config}};
|
||||||
}
|
}
|
||||||
@ -45,6 +37,14 @@ void from_json(nlohmann::json const &j, CoordinatorInstanceState &instance_state
|
|||||||
j.at("config").get_to(instance_state.config);
|
j.at("config").get_to(instance_state.config);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
CoordinatorClusterState::CoordinatorClusterState(std::map<std::string, ReplicationInstanceState, std::less<>> instances,
|
||||||
|
std::vector<CoordinatorInstanceState> coordinators,
|
||||||
|
utils::UUID const ¤t_main_uuid, bool is_lock_opened)
|
||||||
|
: repl_instances_{std::move(instances)},
|
||||||
|
coordinators_{std::move(coordinators)},
|
||||||
|
current_main_uuid_(current_main_uuid),
|
||||||
|
is_lock_opened_(is_lock_opened) {}
|
||||||
|
|
||||||
CoordinatorClusterState::CoordinatorClusterState(CoordinatorInstanceInitConfig const &config) {
|
CoordinatorClusterState::CoordinatorClusterState(CoordinatorInstanceInitConfig const &config) {
|
||||||
auto c2c_config = CoordinatorToCoordinatorConfig{
|
auto c2c_config = CoordinatorToCoordinatorConfig{
|
||||||
.coordinator_id = config.coordinator_id,
|
.coordinator_id = config.coordinator_id,
|
||||||
@ -53,10 +53,6 @@ CoordinatorClusterState::CoordinatorClusterState(CoordinatorInstanceInitConfig c
|
|||||||
coordinators_.emplace_back(CoordinatorInstanceState{.config = std::move(c2c_config)});
|
coordinators_.emplace_back(CoordinatorInstanceState{.config = std::move(c2c_config)});
|
||||||
}
|
}
|
||||||
|
|
||||||
CoordinatorClusterState::CoordinatorClusterState(std::map<std::string, ReplicationInstanceState, std::less<>> instances,
|
|
||||||
utils::UUID const ¤t_main_uuid, bool is_lock_opened)
|
|
||||||
: repl_instances_{std::move(instances)}, current_main_uuid_(current_main_uuid), is_lock_opened_(is_lock_opened) {}
|
|
||||||
|
|
||||||
CoordinatorClusterState::CoordinatorClusterState(CoordinatorClusterState const &other)
|
CoordinatorClusterState::CoordinatorClusterState(CoordinatorClusterState const &other)
|
||||||
: repl_instances_{other.repl_instances_},
|
: repl_instances_{other.repl_instances_},
|
||||||
current_main_uuid_(other.current_main_uuid_),
|
current_main_uuid_(other.current_main_uuid_),
|
||||||
@ -178,7 +174,7 @@ auto CoordinatorClusterState::DoAction(TRaftLog const &log_entry, RaftLogAction
|
|||||||
case RaftLogAction::ADD_COORDINATOR_INSTANCE: {
|
case RaftLogAction::ADD_COORDINATOR_INSTANCE: {
|
||||||
auto const &config = std::get<CoordinatorToCoordinatorConfig>(log_entry);
|
auto const &config = std::get<CoordinatorToCoordinatorConfig>(log_entry);
|
||||||
coordinators_.emplace_back(CoordinatorInstanceState{config});
|
coordinators_.emplace_back(CoordinatorInstanceState{config});
|
||||||
spdlog::trace("DoAction: add coordinator instance {}", config.coordinator_server_id);
|
spdlog::trace("DoAction: add coordinator instance {}", config.coordinator_id);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case RaftLogAction::OPEN_LOCK_REGISTER_REPLICATION_INSTANCE: {
|
case RaftLogAction::OPEN_LOCK_REGISTER_REPLICATION_INSTANCE: {
|
||||||
|
@ -28,9 +28,10 @@ namespace memgraph::coordination {
|
|||||||
|
|
||||||
using nuraft::ptr;
|
using nuraft::ptr;
|
||||||
|
|
||||||
CoordinatorInstance::CoordinatorInstance(CoordinationInstanceInitConfig const &config)
|
CoordinatorInstance::CoordinatorInstance(CoordinatorInstanceInitConfig const &config)
|
||||||
: thread_pool_{1},
|
: thread_pool_{1},
|
||||||
raft_state_(RaftState::MakeRaftState(
|
raft_state_(RaftState::MakeRaftState(
|
||||||
|
config,
|
||||||
[this]() {
|
[this]() {
|
||||||
spdlog::info("Leader changed, starting all replication instances!");
|
spdlog::info("Leader changed, starting all replication instances!");
|
||||||
auto const instances = raft_state_.GetReplicationInstances();
|
auto const instances = raft_state_.GetReplicationInstances();
|
||||||
@ -40,8 +41,9 @@ CoordinatorInstance::CoordinatorInstance(CoordinationInstanceInitConfig const &c
|
|||||||
|
|
||||||
std::ranges::for_each(replicas, [this](auto &replica) {
|
std::ranges::for_each(replicas, [this](auto &replica) {
|
||||||
spdlog::info("Started pinging replication instance {}", replica.config.instance_name);
|
spdlog::info("Started pinging replication instance {}", replica.config.instance_name);
|
||||||
repl_instances_.emplace_back(this, replica.config, client_succ_cb_, client_fail_cb_,
|
auto client =
|
||||||
&CoordinatorInstance::ReplicaSuccessCallback,
|
std::make_unique<ReplicationInstanceClient>(this, replica.config, client_succ_cb_, client_fail_cb_);
|
||||||
|
repl_instances_.emplace_back(std::move(client), &CoordinatorInstance::ReplicaSuccessCallback,
|
||||||
&CoordinatorInstance::ReplicaFailCallback);
|
&CoordinatorInstance::ReplicaFailCallback);
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -50,8 +52,9 @@ CoordinatorInstance::CoordinatorInstance(CoordinationInstanceInitConfig const &c
|
|||||||
|
|
||||||
std::ranges::for_each(main, [this](auto &main_instance) {
|
std::ranges::for_each(main, [this](auto &main_instance) {
|
||||||
spdlog::info("Started pinging main instance {}", main_instance.config.instance_name);
|
spdlog::info("Started pinging main instance {}", main_instance.config.instance_name);
|
||||||
repl_instances_.emplace_back(this, main_instance.config, client_succ_cb_, client_fail_cb_,
|
auto client = std::make_unique<ReplicationInstanceClient>(this, main_instance.config, client_succ_cb_,
|
||||||
&CoordinatorInstance::MainSuccessCallback,
|
client_fail_cb_);
|
||||||
|
repl_instances_.emplace_back(std::move(client), &CoordinatorInstance::MainSuccessCallback,
|
||||||
&CoordinatorInstance::MainFailCallback);
|
&CoordinatorInstance::MainFailCallback);
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -88,7 +91,6 @@ CoordinatorInstance::CoordinatorInstance(CoordinationInstanceInitConfig const &c
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
auto CoordinatorInstance::FindReplicationInstance(std::string_view replication_instance_name)
|
auto CoordinatorInstance::FindReplicationInstance(std::string_view replication_instance_name)
|
||||||
-> ReplicationInstanceConnector & {
|
-> ReplicationInstanceConnector & {
|
||||||
auto repl_instance =
|
auto repl_instance =
|
||||||
|
@ -41,7 +41,7 @@ class CoordinatorInstance {
|
|||||||
CoordinatorInstance(CoordinatorInstance &&) noexcept = delete;
|
CoordinatorInstance(CoordinatorInstance &&) noexcept = delete;
|
||||||
CoordinatorInstance &operator=(CoordinatorInstance &&) noexcept = delete;
|
CoordinatorInstance &operator=(CoordinatorInstance &&) noexcept = delete;
|
||||||
|
|
||||||
~CoordinatorInstance();
|
~CoordinatorInstance() = default;
|
||||||
|
|
||||||
[[nodiscard]] auto RegisterReplicationInstance(CoordinatorToReplicaConfig const &config)
|
[[nodiscard]] auto RegisterReplicationInstance(CoordinatorToReplicaConfig const &config)
|
||||||
-> RegisterInstanceCoordinatorStatus;
|
-> RegisterInstanceCoordinatorStatus;
|
||||||
|
@ -91,9 +91,6 @@ auto ReplicationInstanceConnector::GetFailCallback() -> HealthCheckInstanceCallb
|
|||||||
|
|
||||||
auto ReplicationInstanceConnector::GetClient() -> ReplicationInstanceClient & { return *client_; }
|
auto ReplicationInstanceConnector::GetClient() -> ReplicationInstanceClient & { return *client_; }
|
||||||
|
|
||||||
auto ReplicationInstance::SetNewMainUUID(utils::UUID const &main_uuid) -> void { main_uuid_ = main_uuid; }
|
|
||||||
auto ReplicationInstance::GetMainUUID() const -> std::optional<utils::UUID> const & { return main_uuid_; }
|
|
||||||
|
|
||||||
auto ReplicationInstanceConnector::EnsureReplicaHasCorrectMainUUID(utils::UUID const &curr_main_uuid) -> bool {
|
auto ReplicationInstanceConnector::EnsureReplicaHasCorrectMainUUID(utils::UUID const &curr_main_uuid) -> bool {
|
||||||
if (!IsReadyForUUIDPing()) {
|
if (!IsReadyForUUIDPing()) {
|
||||||
return true;
|
return true;
|
||||||
@ -113,11 +110,7 @@ auto ReplicationInstanceConnector::EnsureReplicaHasCorrectMainUUID(utils::UUID c
|
|||||||
}
|
}
|
||||||
|
|
||||||
auto ReplicationInstanceConnector::SendSwapAndUpdateUUID(utils::UUID const &new_main_uuid) -> bool {
|
auto ReplicationInstanceConnector::SendSwapAndUpdateUUID(utils::UUID const &new_main_uuid) -> bool {
|
||||||
if (!replication_coordination_glue::SendSwapMainUUIDRpc(client_.RpcClient(), new_main_uuid)) {
|
return replication_coordination_glue::SendSwapMainUUIDRpc(client_->RpcClient(), new_main_uuid);
|
||||||
return false;
|
|
||||||
}
|
|
||||||
SetNewMainUUID(new_main_uuid);
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
auto ReplicationInstanceConnector::SendUnregisterReplicaRpc(std::string_view instance_name) -> bool {
|
auto ReplicationInstanceConnector::SendUnregisterReplicaRpc(std::string_view instance_name) -> bool {
|
||||||
|
Loading…
Reference in New Issue
Block a user