diff --git a/src/coordination/CMakeLists.txt b/src/coordination/CMakeLists.txt index 8154429b8..448e12282 100644 --- a/src/coordination/CMakeLists.txt +++ b/src/coordination/CMakeLists.txt @@ -16,6 +16,7 @@ target_sources(mg-coordination include/coordination/raft_state.hpp include/coordination/rpc_errors.hpp + include/nuraft/raft_log_action.hpp include/nuraft/coordinator_cluster_state.hpp include/nuraft/coordinator_log_store.hpp include/nuraft/coordinator_state_machine.hpp diff --git a/src/coordination/coordinator_state_machine.cpp b/src/coordination/coordinator_state_machine.cpp index 870daa51a..9bbf355e4 100644 --- a/src/coordination/coordinator_state_machine.cpp +++ b/src/coordination/coordinator_state_machine.cpp @@ -45,10 +45,12 @@ auto CoordinatorStateMachine::DecodeLog(buffer &data) -> std::pair ptr { return nullptr; } auto CoordinatorStateMachine::commit(ulong const log_idx, buffer &data) -> ptr { + // TODO: (andi) think about locking scheme buffer_serializer bs(data); - std::string str = bs.get_str(); - spdlog::info("commit {} : {}", log_idx, str); + auto const [instance_name, log_action] = DecodeLog(data); + spdlog::info("commit {} : {} {}", log_idx, instance_name, log_action); + cluster_state_.DoAction(instance_name, log_action); last_committed_idx_ = log_idx; return nullptr; @@ -59,61 +61,89 @@ auto CoordinatorStateMachine::commit_config(ulong const log_idx, ptr void { - buffer_serializer bs(data); - std::string str = bs.get_str(); - - spdlog::info("rollback {} : {}", log_idx, str); + // NOTE: Nothing since we don't do anything in pre_commit } -auto CoordinatorStateMachine::read_logical_snp_obj(snapshot & /*snapshot*/, void *& /*user_snp_ctx*/, ulong /*obj_id*/, +auto CoordinatorStateMachine::read_logical_snp_obj(snapshot &snapshot, void *& /*user_snp_ctx*/, ulong obj_id, ptr &data_out, bool &is_last_obj) -> int { - // Put dummy data. - data_out = buffer::alloc(sizeof(int32)); - buffer_serializer bs(data_out); - bs.put_i32(0); + spdlog::info("read logical snapshot object, obj_id: {}", obj_id); + ptr ctx = nullptr; + { + std::lock_guard ll(snapshots_lock_); + auto entry = snapshots_.find(snapshot.get_last_log_idx()); + if (entry == snapshots_.end()) { + data_out = nullptr; + is_last_obj = true; + return 0; + } + ctx = entry->second; + } + ctx->cluster_state_.Serialize(data_out); is_last_obj = true; return 0; } -auto CoordinatorStateMachine::save_logical_snp_obj(snapshot &s, ulong &obj_id, buffer & /*data*/, bool /*is_first_obj*/, - bool /*is_last_obj*/) -> void { - spdlog::info("save snapshot {} term {} object ID", s.get_last_log_idx(), s.get_last_log_term(), obj_id); - // Request next object. - obj_id++; +auto CoordinatorStateMachine::save_logical_snp_obj(snapshot &snapshot, ulong &obj_id, buffer &data, bool is_first_obj, + bool is_last_obj) -> void { + spdlog::info("save logical snapshot object, obj_id: {}, is_first_obj: {}, is_last_obj: {}", obj_id, is_first_obj, + is_last_obj); + + buffer_serializer bs(data); + auto cluster_state = CoordinatorClusterState::Deserialize(data); + + { + std::lock_guard ll(snapshots_lock_); + auto entry = snapshots_.find(snapshot.get_last_log_idx()); + assert(entry != snapshots_.end()); + entry->second->cluster_state_ = cluster_state; + } } auto CoordinatorStateMachine::apply_snapshot(snapshot &s) -> bool { - spdlog::info("apply snapshot {} term {}", s.get_last_log_idx(), s.get_last_log_term()); - { - auto lock = std::lock_guard{last_snapshot_lock_}; - ptr snp_buf = s.serialize(); - last_snapshot_ = snapshot::deserialize(*snp_buf); - } + std::lock_guard ll(snapshots_lock_); + + auto entry = snapshots_.find(s.get_last_log_idx()); + if (entry == snapshots_.end()) return false; + + cluster_state_ = entry->second->cluster_state_; return true; } auto CoordinatorStateMachine::free_user_snp_ctx(void *&user_snp_ctx) -> void {} auto CoordinatorStateMachine::last_snapshot() -> ptr { - auto lock = std::lock_guard{last_snapshot_lock_}; - return last_snapshot_; + std::lock_guard ll(snapshots_lock_); + auto entry = snapshots_.rbegin(); + if (entry == snapshots_.rend()) return nullptr; + + ptr ctx = entry->second; + return ctx->snapshot_; } auto CoordinatorStateMachine::last_commit_index() -> ulong { return last_committed_idx_; } auto CoordinatorStateMachine::create_snapshot(snapshot &s, async_result::handler_type &when_done) -> void { - spdlog::info("create snapshot {} term {}", s.get_last_log_idx(), s.get_last_log_term()); - // Clone snapshot from `s`. - { - auto lock = std::lock_guard{last_snapshot_lock_}; - ptr snp_buf = s.serialize(); - last_snapshot_ = snapshot::deserialize(*snp_buf); - } + ptr snp_buf = s.serialize(); + ptr ss = snapshot::deserialize(*snp_buf); + create_snapshot_internal(ss); + ptr except(nullptr); bool ret = true; when_done(ret, except); } +auto CoordinatorStateMachine::create_snapshot_internal(ptr snapshot) -> void { + std::lock_guard ll(snapshots_lock_); + + auto ctx = cs_new(snapshot, cluster_state_); + snapshots_[snapshot->get_last_log_idx()] = ctx; + + constexpr int MAX_SNAPSHOTS = 3; + while (snapshots_.size() > MAX_SNAPSHOTS) { + snapshots_.erase(snapshots_.begin()); + } +} + } // namespace memgraph::coordination #endif diff --git a/src/coordination/include/nuraft/coordinator_cluster_state.hpp b/src/coordination/include/nuraft/coordinator_cluster_state.hpp index 797092855..33e2bdeb7 100644 --- a/src/coordination/include/nuraft/coordinator_cluster_state.hpp +++ b/src/coordination/include/nuraft/coordinator_cluster_state.hpp @@ -13,15 +13,92 @@ #ifdef MG_ENTERPRISE +#include "nuraft/raft_log_action.hpp" #include "replication_coordination_glue/role.hpp" #include "utils/rw_lock.hpp" +#include +#include + #include +#include #include namespace memgraph::coordination { -struct CoordinatorClusterState { +using nuraft::buffer; +using nuraft::buffer_serializer; +using nuraft::ptr; + +class CoordinatorClusterState { + public: + auto InsertInstance(std::string const &instance_name, replication_coordination_glue::ReplicationRole role) -> void { + instance_roles[instance_name] = role; + } + + auto DoAction(std::string const &instance_name, RaftLogAction log_action) -> void { + switch (log_action) { + case RaftLogAction::REGISTER_REPLICATION_INSTANCE: + instance_roles[instance_name] = replication_coordination_glue::ReplicationRole::REPLICA; + break; + case RaftLogAction::UNREGISTER_REPLICATION_INSTANCE: + instance_roles.erase(instance_name); + break; + case RaftLogAction::SET_INSTANCE_AS_MAIN: + instance_roles[instance_name] = replication_coordination_glue::ReplicationRole::MAIN; + break; + case RaftLogAction::SET_INSTANCE_AS_REPLICA: + instance_roles[instance_name] = replication_coordination_glue::ReplicationRole::REPLICA; + break; + } + } + + auto Serialize(ptr &data) -> void { + auto const role_to_string = [](auto const &role) { + switch (role) { + case replication_coordination_glue::ReplicationRole::MAIN: + return "main"; + case replication_coordination_glue::ReplicationRole::REPLICA: + return "replica"; + } + }; + + auto const entry_to_string = [&role_to_string](auto const &entry) { + return entry.first + "_" + role_to_string(entry.second); + }; + + auto instances_str_view = instance_roles | ranges::views::transform(entry_to_string); + uint32_t size = + std::accumulate(instances_str_view.begin(), instances_str_view.end(), 0, + [](uint32_t acc, auto const &entry) { return acc + sizeof(uint32_t) + entry.size(); }); + + data = buffer::alloc(size); + buffer_serializer bs(data); + std::for_each(instances_str_view.begin(), instances_str_view.end(), + [&bs](auto const &entry) { bs.put_str(entry); }); + } + + static auto Deserialize(buffer &data) -> CoordinatorClusterState { + auto const str_to_role = [](auto const &str) { + if (str == "main") { + return replication_coordination_glue::ReplicationRole::MAIN; + } + return replication_coordination_glue::ReplicationRole::REPLICA; + }; + + CoordinatorClusterState cluster_state; + buffer_serializer bs(data); + while (bs.size() > 0) { + auto const entry = bs.get_str(); + auto const first_dash = entry.find('_'); + auto const instance_name = entry.substr(0, first_dash); + auto const role_str = entry.substr(first_dash + 1); + cluster_state.InsertInstance(instance_name, str_to_role(role_str)); + } + return cluster_state; + } + + private: std::map instance_roles; }; diff --git a/src/coordination/include/nuraft/coordinator_state_machine.hpp b/src/coordination/include/nuraft/coordinator_state_machine.hpp index 64cc521d1..f9dc265c8 100644 --- a/src/coordination/include/nuraft/coordinator_state_machine.hpp +++ b/src/coordination/include/nuraft/coordinator_state_machine.hpp @@ -14,6 +14,7 @@ #ifdef MG_ENTERPRISE #include "nuraft/coordinator_cluster_state.hpp" +#include "nuraft/raft_log_action.hpp" #include #include @@ -29,13 +30,6 @@ using nuraft::ptr; using nuraft::snapshot; using nuraft::state_machine; -enum class RaftLogAction : uint8_t { - REGISTER_REPLICATION_INSTANCE, - UNREGISTER_REPLICATION_INSTANCE, - SET_INSTANCE_AS_MAIN, - SET_INSTANCE_AS_REPLICA -}; - class CoordinatorStateMachine : public state_machine { public: CoordinatorStateMachine() = default; @@ -74,11 +68,25 @@ class CoordinatorStateMachine : public state_machine { auto create_snapshot(snapshot &s, async_result::handler_type &when_done) -> void override; private: + struct SnapshotCtx { + SnapshotCtx(ptr &snapshot, CoordinatorClusterState const &cluster_state) + : snapshot_(snapshot), cluster_state_(cluster_state) {} + + ptr snapshot_; + CoordinatorClusterState cluster_state_; + }; + + auto create_snapshot_internal(ptr snapshot) -> void; + CoordinatorClusterState cluster_state_; mutable utils::RWLock lock{utils::RWLock::Priority::READ}; std::atomic last_committed_idx_{0}; + // TODO: (andi) Maybe not needed, remove it + std::map> snapshots_; + std::mutex snapshots_lock_; + ptr last_snapshot_; std::mutex last_snapshot_lock_; }; diff --git a/src/coordination/include/nuraft/raft_log_action.hpp b/src/coordination/include/nuraft/raft_log_action.hpp new file mode 100644 index 000000000..f38e99538 --- /dev/null +++ b/src/coordination/include/nuraft/raft_log_action.hpp @@ -0,0 +1,28 @@ +// Copyright 2024 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#ifdef MG_ENTERPRISE + +#include + +namespace memgraph::coordination { + +enum class RaftLogAction : uint8_t { + REGISTER_REPLICATION_INSTANCE, + UNREGISTER_REPLICATION_INSTANCE, + SET_INSTANCE_AS_MAIN, + SET_INSTANCE_AS_REPLICA +}; + +} // namespace memgraph::coordination +#endif