Propagate logs to state machine
This commit is contained in:
parent
9bc7f8933d
commit
cccea66703
@ -16,6 +16,7 @@ target_sources(mg-coordination
|
||||
include/coordination/raft_state.hpp
|
||||
include/coordination/rpc_errors.hpp
|
||||
|
||||
include/nuraft/raft_log_action.hpp
|
||||
include/nuraft/coordinator_cluster_state.hpp
|
||||
include/nuraft/coordinator_log_store.hpp
|
||||
include/nuraft/coordinator_state_machine.hpp
|
||||
|
@ -45,10 +45,12 @@ auto CoordinatorStateMachine::DecodeLog(buffer &data) -> std::pair<std::string,
|
||||
auto CoordinatorStateMachine::pre_commit(ulong const /*log_idx*/, buffer & /*data*/) -> ptr<buffer> { return nullptr; }
|
||||
|
||||
auto CoordinatorStateMachine::commit(ulong const log_idx, buffer &data) -> ptr<buffer> {
|
||||
// TODO: (andi) think about locking scheme
|
||||
buffer_serializer bs(data);
|
||||
std::string str = bs.get_str();
|
||||
|
||||
spdlog::info("commit {} : {}", log_idx, str);
|
||||
auto const [instance_name, log_action] = DecodeLog(data);
|
||||
spdlog::info("commit {} : {} {}", log_idx, instance_name, log_action);
|
||||
cluster_state_.DoAction(instance_name, log_action);
|
||||
|
||||
last_committed_idx_ = log_idx;
|
||||
return nullptr;
|
||||
@ -59,61 +61,89 @@ auto CoordinatorStateMachine::commit_config(ulong const log_idx, ptr<cluster_con
|
||||
}
|
||||
|
||||
auto CoordinatorStateMachine::rollback(ulong const log_idx, buffer &data) -> void {
|
||||
buffer_serializer bs(data);
|
||||
std::string str = bs.get_str();
|
||||
|
||||
spdlog::info("rollback {} : {}", log_idx, str);
|
||||
// NOTE: Nothing since we don't do anything in pre_commit
|
||||
}
|
||||
|
||||
auto CoordinatorStateMachine::read_logical_snp_obj(snapshot & /*snapshot*/, void *& /*user_snp_ctx*/, ulong /*obj_id*/,
|
||||
auto CoordinatorStateMachine::read_logical_snp_obj(snapshot &snapshot, void *& /*user_snp_ctx*/, ulong obj_id,
|
||||
ptr<buffer> &data_out, bool &is_last_obj) -> int {
|
||||
// Put dummy data.
|
||||
data_out = buffer::alloc(sizeof(int32));
|
||||
buffer_serializer bs(data_out);
|
||||
bs.put_i32(0);
|
||||
spdlog::info("read logical snapshot object, obj_id: {}", obj_id);
|
||||
|
||||
ptr<SnapshotCtx> ctx = nullptr;
|
||||
{
|
||||
std::lock_guard<std::mutex> ll(snapshots_lock_);
|
||||
auto entry = snapshots_.find(snapshot.get_last_log_idx());
|
||||
if (entry == snapshots_.end()) {
|
||||
data_out = nullptr;
|
||||
is_last_obj = true;
|
||||
return 0;
|
||||
}
|
||||
ctx = entry->second;
|
||||
}
|
||||
ctx->cluster_state_.Serialize(data_out);
|
||||
is_last_obj = true;
|
||||
return 0;
|
||||
}
|
||||
|
||||
auto CoordinatorStateMachine::save_logical_snp_obj(snapshot &s, ulong &obj_id, buffer & /*data*/, bool /*is_first_obj*/,
|
||||
bool /*is_last_obj*/) -> void {
|
||||
spdlog::info("save snapshot {} term {} object ID", s.get_last_log_idx(), s.get_last_log_term(), obj_id);
|
||||
// Request next object.
|
||||
obj_id++;
|
||||
auto CoordinatorStateMachine::save_logical_snp_obj(snapshot &snapshot, ulong &obj_id, buffer &data, bool is_first_obj,
|
||||
bool is_last_obj) -> void {
|
||||
spdlog::info("save logical snapshot object, obj_id: {}, is_first_obj: {}, is_last_obj: {}", obj_id, is_first_obj,
|
||||
is_last_obj);
|
||||
|
||||
buffer_serializer bs(data);
|
||||
auto cluster_state = CoordinatorClusterState::Deserialize(data);
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> ll(snapshots_lock_);
|
||||
auto entry = snapshots_.find(snapshot.get_last_log_idx());
|
||||
assert(entry != snapshots_.end());
|
||||
entry->second->cluster_state_ = cluster_state;
|
||||
}
|
||||
}
|
||||
|
||||
auto CoordinatorStateMachine::apply_snapshot(snapshot &s) -> bool {
|
||||
spdlog::info("apply snapshot {} term {}", s.get_last_log_idx(), s.get_last_log_term());
|
||||
{
|
||||
auto lock = std::lock_guard{last_snapshot_lock_};
|
||||
ptr<buffer> snp_buf = s.serialize();
|
||||
last_snapshot_ = snapshot::deserialize(*snp_buf);
|
||||
}
|
||||
std::lock_guard<std::mutex> ll(snapshots_lock_);
|
||||
|
||||
auto entry = snapshots_.find(s.get_last_log_idx());
|
||||
if (entry == snapshots_.end()) return false;
|
||||
|
||||
cluster_state_ = entry->second->cluster_state_;
|
||||
return true;
|
||||
}
|
||||
|
||||
auto CoordinatorStateMachine::free_user_snp_ctx(void *&user_snp_ctx) -> void {}
|
||||
|
||||
auto CoordinatorStateMachine::last_snapshot() -> ptr<snapshot> {
|
||||
auto lock = std::lock_guard{last_snapshot_lock_};
|
||||
return last_snapshot_;
|
||||
std::lock_guard<std::mutex> ll(snapshots_lock_);
|
||||
auto entry = snapshots_.rbegin();
|
||||
if (entry == snapshots_.rend()) return nullptr;
|
||||
|
||||
ptr<SnapshotCtx> ctx = entry->second;
|
||||
return ctx->snapshot_;
|
||||
}
|
||||
|
||||
auto CoordinatorStateMachine::last_commit_index() -> ulong { return last_committed_idx_; }
|
||||
|
||||
auto CoordinatorStateMachine::create_snapshot(snapshot &s, async_result<bool>::handler_type &when_done) -> void {
|
||||
spdlog::info("create snapshot {} term {}", s.get_last_log_idx(), s.get_last_log_term());
|
||||
// Clone snapshot from `s`.
|
||||
{
|
||||
auto lock = std::lock_guard{last_snapshot_lock_};
|
||||
ptr<buffer> snp_buf = s.serialize();
|
||||
last_snapshot_ = snapshot::deserialize(*snp_buf);
|
||||
}
|
||||
ptr<buffer> snp_buf = s.serialize();
|
||||
ptr<snapshot> ss = snapshot::deserialize(*snp_buf);
|
||||
create_snapshot_internal(ss);
|
||||
|
||||
ptr<std::exception> except(nullptr);
|
||||
bool ret = true;
|
||||
when_done(ret, except);
|
||||
}
|
||||
|
||||
auto CoordinatorStateMachine::create_snapshot_internal(ptr<snapshot> snapshot) -> void {
|
||||
std::lock_guard<std::mutex> ll(snapshots_lock_);
|
||||
|
||||
auto ctx = cs_new<SnapshotCtx>(snapshot, cluster_state_);
|
||||
snapshots_[snapshot->get_last_log_idx()] = ctx;
|
||||
|
||||
constexpr int MAX_SNAPSHOTS = 3;
|
||||
while (snapshots_.size() > MAX_SNAPSHOTS) {
|
||||
snapshots_.erase(snapshots_.begin());
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace memgraph::coordination
|
||||
#endif
|
||||
|
@ -13,15 +13,92 @@
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
|
||||
#include "nuraft/raft_log_action.hpp"
|
||||
#include "replication_coordination_glue/role.hpp"
|
||||
#include "utils/rw_lock.hpp"
|
||||
|
||||
#include <libnuraft/nuraft.hxx>
|
||||
#include <range/v3/view.hpp>
|
||||
|
||||
#include <map>
|
||||
#include <numeric>
|
||||
#include <string>
|
||||
|
||||
namespace memgraph::coordination {
|
||||
|
||||
struct CoordinatorClusterState {
|
||||
using nuraft::buffer;
|
||||
using nuraft::buffer_serializer;
|
||||
using nuraft::ptr;
|
||||
|
||||
class CoordinatorClusterState {
|
||||
public:
|
||||
auto InsertInstance(std::string const &instance_name, replication_coordination_glue::ReplicationRole role) -> void {
|
||||
instance_roles[instance_name] = role;
|
||||
}
|
||||
|
||||
auto DoAction(std::string const &instance_name, RaftLogAction log_action) -> void {
|
||||
switch (log_action) {
|
||||
case RaftLogAction::REGISTER_REPLICATION_INSTANCE:
|
||||
instance_roles[instance_name] = replication_coordination_glue::ReplicationRole::REPLICA;
|
||||
break;
|
||||
case RaftLogAction::UNREGISTER_REPLICATION_INSTANCE:
|
||||
instance_roles.erase(instance_name);
|
||||
break;
|
||||
case RaftLogAction::SET_INSTANCE_AS_MAIN:
|
||||
instance_roles[instance_name] = replication_coordination_glue::ReplicationRole::MAIN;
|
||||
break;
|
||||
case RaftLogAction::SET_INSTANCE_AS_REPLICA:
|
||||
instance_roles[instance_name] = replication_coordination_glue::ReplicationRole::REPLICA;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
auto Serialize(ptr<buffer> &data) -> void {
|
||||
auto const role_to_string = [](auto const &role) {
|
||||
switch (role) {
|
||||
case replication_coordination_glue::ReplicationRole::MAIN:
|
||||
return "main";
|
||||
case replication_coordination_glue::ReplicationRole::REPLICA:
|
||||
return "replica";
|
||||
}
|
||||
};
|
||||
|
||||
auto const entry_to_string = [&role_to_string](auto const &entry) {
|
||||
return entry.first + "_" + role_to_string(entry.second);
|
||||
};
|
||||
|
||||
auto instances_str_view = instance_roles | ranges::views::transform(entry_to_string);
|
||||
uint32_t size =
|
||||
std::accumulate(instances_str_view.begin(), instances_str_view.end(), 0,
|
||||
[](uint32_t acc, auto const &entry) { return acc + sizeof(uint32_t) + entry.size(); });
|
||||
|
||||
data = buffer::alloc(size);
|
||||
buffer_serializer bs(data);
|
||||
std::for_each(instances_str_view.begin(), instances_str_view.end(),
|
||||
[&bs](auto const &entry) { bs.put_str(entry); });
|
||||
}
|
||||
|
||||
static auto Deserialize(buffer &data) -> CoordinatorClusterState {
|
||||
auto const str_to_role = [](auto const &str) {
|
||||
if (str == "main") {
|
||||
return replication_coordination_glue::ReplicationRole::MAIN;
|
||||
}
|
||||
return replication_coordination_glue::ReplicationRole::REPLICA;
|
||||
};
|
||||
|
||||
CoordinatorClusterState cluster_state;
|
||||
buffer_serializer bs(data);
|
||||
while (bs.size() > 0) {
|
||||
auto const entry = bs.get_str();
|
||||
auto const first_dash = entry.find('_');
|
||||
auto const instance_name = entry.substr(0, first_dash);
|
||||
auto const role_str = entry.substr(first_dash + 1);
|
||||
cluster_state.InsertInstance(instance_name, str_to_role(role_str));
|
||||
}
|
||||
return cluster_state;
|
||||
}
|
||||
|
||||
private:
|
||||
std::map<std::string, replication_coordination_glue::ReplicationRole> instance_roles;
|
||||
};
|
||||
|
||||
|
@ -14,6 +14,7 @@
|
||||
#ifdef MG_ENTERPRISE
|
||||
|
||||
#include "nuraft/coordinator_cluster_state.hpp"
|
||||
#include "nuraft/raft_log_action.hpp"
|
||||
|
||||
#include <spdlog/spdlog.h>
|
||||
#include <libnuraft/nuraft.hxx>
|
||||
@ -29,13 +30,6 @@ using nuraft::ptr;
|
||||
using nuraft::snapshot;
|
||||
using nuraft::state_machine;
|
||||
|
||||
enum class RaftLogAction : uint8_t {
|
||||
REGISTER_REPLICATION_INSTANCE,
|
||||
UNREGISTER_REPLICATION_INSTANCE,
|
||||
SET_INSTANCE_AS_MAIN,
|
||||
SET_INSTANCE_AS_REPLICA
|
||||
};
|
||||
|
||||
class CoordinatorStateMachine : public state_machine {
|
||||
public:
|
||||
CoordinatorStateMachine() = default;
|
||||
@ -74,11 +68,25 @@ class CoordinatorStateMachine : public state_machine {
|
||||
auto create_snapshot(snapshot &s, async_result<bool>::handler_type &when_done) -> void override;
|
||||
|
||||
private:
|
||||
struct SnapshotCtx {
|
||||
SnapshotCtx(ptr<snapshot> &snapshot, CoordinatorClusterState const &cluster_state)
|
||||
: snapshot_(snapshot), cluster_state_(cluster_state) {}
|
||||
|
||||
ptr<snapshot> snapshot_;
|
||||
CoordinatorClusterState cluster_state_;
|
||||
};
|
||||
|
||||
auto create_snapshot_internal(ptr<snapshot> snapshot) -> void;
|
||||
|
||||
CoordinatorClusterState cluster_state_;
|
||||
mutable utils::RWLock lock{utils::RWLock::Priority::READ};
|
||||
|
||||
std::atomic<uint64_t> last_committed_idx_{0};
|
||||
|
||||
// TODO: (andi) Maybe not needed, remove it
|
||||
std::map<uint64_t, ptr<SnapshotCtx>> snapshots_;
|
||||
std::mutex snapshots_lock_;
|
||||
|
||||
ptr<snapshot> last_snapshot_;
|
||||
std::mutex last_snapshot_lock_;
|
||||
};
|
||||
|
28
src/coordination/include/nuraft/raft_log_action.hpp
Normal file
28
src/coordination/include/nuraft/raft_log_action.hpp
Normal file
@ -0,0 +1,28 @@
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#pragma once
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
|
||||
#include <cstdint>
|
||||
|
||||
namespace memgraph::coordination {
|
||||
|
||||
enum class RaftLogAction : uint8_t {
|
||||
REGISTER_REPLICATION_INSTANCE,
|
||||
UNREGISTER_REPLICATION_INSTANCE,
|
||||
SET_INSTANCE_AS_MAIN,
|
||||
SET_INSTANCE_AS_REPLICA
|
||||
};
|
||||
|
||||
} // namespace memgraph::coordination
|
||||
#endif
|
Loading…
Reference in New Issue
Block a user