Improve leadership changes

This commit is contained in:
Andi Skrgat 2024-03-01 13:40:22 +01:00
parent 8805021dd9
commit c23a6e3564
13 changed files with 171 additions and 51 deletions

View File

@ -92,6 +92,10 @@ auto CoordinatorClusterState::DoAction(TRaftLog log_entry, RaftLogAction log_act
it->second.role = ReplicationRole::REPLICA;
break;
}
case RaftLogAction::UPDATE_UUID: {
uuid_ = std::get<utils::UUID>(log_entry);
break;
}
}
}
@ -141,31 +145,12 @@ auto CoordinatorClusterState::Deserialize(buffer &data) -> CoordinatorClusterSta
return cluster_state;
}
auto CoordinatorClusterState::GetInstances() const -> std::vector<std::pair<std::string, std::string>> {
auto CoordinatorClusterState::GetInstances() const -> std::vector<InstanceState> {
auto lock = std::shared_lock{log_lock_};
// TODO: (andi) Abstract
auto const role_to_string = [](auto const &role) -> std::string {
switch (role) {
case ReplicationRole::MAIN:
return "main";
case ReplicationRole::REPLICA:
return "replica";
}
};
auto const entry_to_pair = [&role_to_string](auto const &entry) {
return std::make_pair(entry.first, role_to_string(entry.second.role));
};
return instance_roles_ | ranges::views::transform(entry_to_pair) | ranges::to<std::vector>();
return instance_roles_ | ranges::views::values | ranges::to<std::vector<InstanceState>>;
}
auto CoordinatorClusterState::GetClientConfigs() const -> std::vector<CoordinatorClientConfig> {
auto lock = std::shared_lock{log_lock_};
return instance_roles_ | ranges::views::values |
ranges::views::transform([](auto const &instance_state) { return instance_state.config; }) |
ranges::to<std::vector>();
}
auto CoordinatorClusterState::GetUUID() const -> utils::UUID { return uuid_; }
auto CoordinatorClusterState::FindCurrentMainInstanceName() const -> std::optional<std::string> {
auto lock = std::shared_lock{log_lock_};

View File

@ -57,6 +57,11 @@ auto CoordinatorStateMachine::SerializeSetInstanceAsReplica(std::string_view ins
return CreateLog(str_log);
}
auto CoordinatorStateMachine::SerializeUpdateUUID(utils::UUID const &uuid) -> ptr<buffer> {
auto const str_log = fmt::format("{}*update_uuid", nlohmann::json{{"uuid", uuid}}.dump());
return CreateLog(str_log);
}
auto CoordinatorStateMachine::DecodeLog(buffer &data) -> std::pair<TRaftLog, RaftLogAction> {
buffer_serializer bs(data);
@ -77,6 +82,11 @@ auto CoordinatorStateMachine::DecodeLog(buffer &data) -> std::pair<TRaftLog, Raf
if (action == "demote") {
return {info, RaftLogAction::SET_INSTANCE_AS_REPLICA};
}
if (action == "update_uuid") {
auto const json = nlohmann::json::parse(info);
return {json.at("uuid").get<utils::UUID>(), RaftLogAction::UPDATE_UUID};
}
throw std::runtime_error("Unknown action");
}
@ -87,7 +97,6 @@ auto CoordinatorStateMachine::commit(ulong const log_idx, buffer &data) -> ptr<b
auto const [parsed_data, log_action] = DecodeLog(data);
cluster_state_.DoAction(parsed_data, log_action);
// std::invoke(raft_commit_cb_, parsed_data, log_action);
last_committed_idx_ = log_idx;
// TODO: (andi) Don't return nullptr
@ -183,13 +192,11 @@ auto CoordinatorStateMachine::create_snapshot_internal(ptr<snapshot> snapshot) -
}
}
auto CoordinatorStateMachine::GetInstances() const -> std::vector<std::pair<std::string, std::string>> {
auto CoordinatorStateMachine::GetInstances() const -> std::vector<InstanceState> {
return cluster_state_.GetInstances();
}
auto CoordinatorStateMachine::GetClientConfigs() const -> std::vector<CoordinatorClientConfig> {
return cluster_state_.GetClientConfigs();
}
auto CoordinatorStateMachine::GetUUID() const -> utils::UUID { return cluster_state_.GetUUID(); }
} // namespace memgraph::coordination
#endif

View File

@ -27,6 +27,7 @@ namespace memgraph::coordination {
inline constexpr auto *kDefaultReplicationServerIp = "0.0.0.0";
// TODO: (andi) JSON serialization for RAFT log.
struct CoordinatorClientConfig {
std::string instance_name;
std::string ip_address;

View File

@ -75,8 +75,6 @@ class CoordinatorInstance {
std::list<ReplicationInstance> repl_instances_;
mutable utils::ResourceLock coord_instance_lock_{};
utils::UUID main_uuid_;
RaftState raft_state_;
};

View File

@ -72,10 +72,10 @@ class RaftState {
auto AppendUnregisterReplicationInstanceLog(std::string_view instance_name) -> bool;
auto AppendSetInstanceAsMainLog(std::string_view instance_name) -> bool;
auto AppendSetInstanceAsReplicaLog(std::string_view instance_name) -> bool;
auto AppendUpdateUUIDLog(utils::UUID const &uuid) -> bool;
auto GetInstances() const -> std::vector<std::pair<std::string, std::string>>;
auto GetClientConfigs() const -> std::vector<CoordinatorClientConfig>;
auto GetInstances() const -> std::vector<InstanceState>;
auto GetUUID() const -> utils::UUID;
private:
// TODO: (andi) I think variables below can be abstracted/clean them.

View File

@ -17,6 +17,7 @@
#include "nuraft/raft_log_action.hpp"
#include "replication_coordination_glue/role.hpp"
#include "utils/resource_lock.hpp"
#include "utils/uuid.hpp"
#include <libnuraft/nuraft.hxx>
#include <range/v3/view.hpp>
@ -35,7 +36,7 @@ struct InstanceState {
ReplicationRole role;
};
using TRaftLog = std::variant<CoordinatorClientConfig, std::string>;
using TRaftLog = std::variant<CoordinatorClientConfig, std::string, utils::UUID>;
using nuraft::buffer;
using nuraft::buffer_serializer;
@ -67,12 +68,13 @@ class CoordinatorClusterState {
static auto Deserialize(buffer &data) -> CoordinatorClusterState;
auto GetInstances() const -> std::vector<std::pair<std::string, std::string>>;
auto GetInstances() const -> std::vector<InstanceState>;
auto GetClientConfigs() const -> std::vector<CoordinatorClientConfig>;
auto GetUUID() const -> utils::UUID;
private:
std::map<std::string, InstanceState, std::less<>> instance_roles_;
utils::UUID uuid_{};
mutable utils::ResourceLock log_lock_{};
};

View File

@ -52,6 +52,7 @@ class CoordinatorStateMachine : public state_machine {
static auto SerializeUnregisterInstance(std::string_view instance_name) -> ptr<buffer>;
static auto SerializeSetInstanceAsMain(std::string_view instance_name) -> ptr<buffer>;
static auto SerializeSetInstanceAsReplica(std::string_view instance_name) -> ptr<buffer>;
static auto SerializeUpdateUUID(utils::UUID const &uuid) -> ptr<buffer>;
static auto DecodeLog(buffer &data) -> std::pair<TRaftLog, RaftLogAction>;
@ -79,9 +80,8 @@ class CoordinatorStateMachine : public state_machine {
auto create_snapshot(snapshot &s, async_result<bool>::handler_type &when_done) -> void override;
auto GetInstances() const -> std::vector<std::pair<std::string, std::string>>;
auto GetClientConfigs() const -> std::vector<CoordinatorClientConfig>;
auto GetInstances() const -> std::vector<InstanceState>;
auto GetUUID() const -> utils::UUID;
private:
struct SnapshotCtx {

View File

@ -24,7 +24,8 @@ enum class RaftLogAction : uint8_t {
REGISTER_REPLICATION_INSTANCE,
UNREGISTER_REPLICATION_INSTANCE,
SET_INSTANCE_AS_MAIN,
SET_INSTANCE_AS_REPLICA
SET_INSTANCE_AS_REPLICA,
UPDATE_UUID
};
inline auto ParseRaftLogAction(std::string_view action) -> RaftLogAction {
@ -40,6 +41,11 @@ inline auto ParseRaftLogAction(std::string_view action) -> RaftLogAction {
if (action == "demote") {
return RaftLogAction::SET_INSTANCE_AS_REPLICA;
}
if (action == "update_uuid") {
return RaftLogAction::UPDATE_UUID;
}
throw InvalidRaftLogActionException("Invalid Raft log action: {}.", action);
}

View File

@ -209,6 +209,25 @@ auto RaftState::AppendSetInstanceAsReplicaLog(std::string_view instance_name) ->
return true;
}
auto RaftState::AppendUpdateUUIDLog(utils::UUID const &uuid) -> bool {
auto new_log = CoordinatorStateMachine::SerializeUpdateUUID(uuid);
auto const res = raft_server_->append_entries({new_log});
if (!res->get_accepted()) {
spdlog::error(
"Failed to accept request for updating UUID. Most likely the reason is that the instance is not "
"the leader.");
return false;
}
spdlog::info("Request for updating UUID accepted");
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
spdlog::error("Failed to update UUID with error code {}", res->get_result_code());
return false;
}
return true;
}
auto RaftState::FindCurrentMainInstanceName() const -> std::optional<std::string> {
return state_machine_->FindCurrentMainInstanceName();
}
@ -221,13 +240,9 @@ auto RaftState::IsReplica(std::string_view instance_name) const -> bool {
return state_machine_->IsReplica(instance_name);
}
auto RaftState::GetInstances() const -> std::vector<std::pair<std::string, std::string>> {
return state_machine_->GetInstances();
}
auto RaftState::GetInstances() const -> std::vector<InstanceState> { return state_machine_->GetInstances(); }
auto RaftState::GetClientConfigs() const -> std::vector<CoordinatorClientConfig> {
return state_machine_->GetClientConfigs();
}
auto RaftState::GetUUID() const -> utils::UUID { return state_machine_->GetUUID(); }
} // namespace memgraph::coordination
#endif

View File

@ -26,9 +26,7 @@ ReplicationInstance::ReplicationInstance(CoordinatorInstance *peer, CoordinatorC
HealthCheckInstanceCallback fail_instance_cb)
: client_(peer, std::move(config), std::move(succ_cb), std::move(fail_cb)),
succ_cb_(succ_instance_cb),
fail_cb_(fail_instance_cb) {
client_.StartFrequentCheck();
}
fail_cb_(fail_instance_cb) {}
auto ReplicationInstance::OnSuccessPing() -> void {
last_response_time_ = std::chrono::system_clock::now();

View File

@ -19,7 +19,6 @@
#include "storage/v2/durability/durability.hpp"
#include "storage/v2/durability/snapshot.hpp"
#include "storage/v2/durability/version.hpp"
#include "storage/v2/fmt.hpp"
#include "storage/v2/indices/label_index_stats.hpp"
#include "storage/v2/inmemory/storage.hpp"
#include "storage/v2/inmemory/unique_constraints.hpp"
@ -135,7 +134,7 @@ void InMemoryReplicationHandlers::SwapMainUUIDHandler(dbms::DbmsHandler *dbms_ha
replication_coordination_glue::SwapMainUUIDReq req;
slk::Load(&req, req_reader);
spdlog::info(fmt::format("Set replica data UUID to main uuid {}", std::string(req.uuid)));
spdlog::info("Set replica data UUID to main uuid {}", std::string(req.uuid));
dbms_handler->ReplicationState().TryPersistRoleReplica(role_replica_data.config, req.uuid);
role_replica_data.uuid_ = req.uuid;

View File

@ -17,7 +17,11 @@ import tempfile
import interactive_mg_runner
import pytest
from common import connect, execute_and_fetch_all, safe_execute
from mg_utils import mg_sleep_and_assert, mg_sleep_and_assert_collection
from mg_utils import (
mg_sleep_and_assert,
mg_sleep_and_assert_any_function,
mg_sleep_and_assert_collection,
)
interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
interactive_mg_runner.PROJECT_DIR = os.path.normpath(
@ -205,5 +209,88 @@ def test_distributed_automatic_failover():
mg_sleep_and_assert_collection(expected_data_on_new_main_old_alive, retrieve_data_show_replicas)
def test_distributed_automatic_failover_after_coord_dies():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_3")
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
coord_cursor_1 = connect(host="localhost", port=7690).cursor()
def show_instances_coord1():
return sorted(list(execute_and_fetch_all(coord_cursor_1, "SHOW INSTANCES;")))
coord_cursor_2 = connect(host="localhost", port=7691).cursor()
def show_instances_coord2():
return sorted(list(execute_and_fetch_all(coord_cursor_2, "SHOW INSTANCES;")))
leader_data = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "main"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "down", "unknown"),
]
mg_sleep_and_assert_any_function(leader_data, [show_instances_coord1, show_instances_coord2])
follower_data = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "", "unknown", "main"),
("instance_2", "", "", "unknown", "replica"),
("instance_3", "", "", "unknown", "main"), # TODO: (andi) Will become unknown.
]
mg_sleep_and_assert_any_function(leader_data, [show_instances_coord1, show_instances_coord2])
mg_sleep_and_assert_any_function(follower_data, [show_instances_coord1, show_instances_coord2])
new_main_cursor = connect(host="localhost", port=7687).cursor()
def retrieve_data_show_replicas():
return sorted(list(execute_and_fetch_all(new_main_cursor, "SHOW REPLICAS;")))
expected_data_on_new_main = [
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"instance_3",
"127.0.0.1:10003",
"sync",
{"ts": 0, "behind": None, "status": "invalid"},
{"memgraph": {"ts": 0, "behind": 0, "status": "invalid"}},
),
]
mg_sleep_and_assert_collection(expected_data_on_new_main, retrieve_data_show_replicas)
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
expected_data_on_new_main_old_alive = [
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"instance_3",
"127.0.0.1:10003",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
mg_sleep_and_assert_collection(expected_data_on_new_main_old_alive, retrieve_data_show_replicas)
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -17,6 +17,28 @@ def mg_sleep_and_assert(expected_value, function_to_retrieve_data, max_duration=
return result
def mg_sleep_and_assert_any_function(
expected_value, functions_to_retrieve_data, max_duration=20, time_between_attempt=0.2
):
result = [f() for f in functions_to_retrieve_data]
if any((x == expected_value for x in result)):
return result
start_time = time.time()
while result != expected_value:
duration = time.time() - start_time
if duration > max_duration:
assert (
False
), f" mg_sleep_and_assert has tried for too long and did not get the expected result! Last result was: {result}"
time.sleep(time_between_attempt)
result = [f() for f in functions_to_retrieve_data]
if any((x == expected_value for x in result)):
return result
return result
def mg_sleep_and_assert_collection(
expected_value, function_to_retrieve_data, max_duration=20, time_between_attempt=0.2
):