Added InMemoryLogStore

This commit is contained in:
Andi Skrgat 2024-02-07 11:23:48 +01:00
parent e125c5cd98
commit ec6d35ff67
8 changed files with 135 additions and 309 deletions

View File

@ -224,26 +224,27 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig co
return RegisterInstanceCoordinatorStatus::NOT_LEADER;
}
// auto const res = self_.AppendRegisterReplicationInstance(config.instance_name);
auto const res = self_.AppendRegisterReplicationInstance(config.instance_name);
// if (res->get_accepted()) {
// spdlog::info("Request for registering instance {} accepted", config.instance_name);
try {
repl_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_);
} catch (CoordinatorRegisterInstanceException const &) {
return RegisterInstanceCoordinatorStatus::RPC_FAILED;
if (res->get_accepted()) {
spdlog::info("Request for registering instance {} accepted", config.instance_name);
try {
repl_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_);
} catch (CoordinatorRegisterInstanceException const &) {
return RegisterInstanceCoordinatorStatus::RPC_FAILED;
}
} else {
spdlog::error(
"Failed to accept request for registering instance {}. Most likely the reason is that the instance is not the "
"leader.",
config.instance_name);
return RegisterInstanceCoordinatorStatus::RAFT_COULD_NOT_ACCEPT;
}
// } else {
// spdlog::error(
// "Failed to accept request for registering instance {}. Most likely the reason is that the instance is not the
// " "leader.", config.instance_name);
// return RegisterInstanceCoordinatorStatus::RAFT_COULD_NOT_ACCEPT;
// }
// if (res->get_result_code() != nuraft::cmd_result_code::OK) {
// spdlog::error("Failed to register instance {} with error code {}", config.instance_name, res->get_result_code());
// return RegisterInstanceCoordinatorStatus::RAFT_COULD_NOT_APPEND;
// }
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
spdlog::error("Failed to register instance {} with error code {}", config.instance_name, res->get_result_code());
return RegisterInstanceCoordinatorStatus::RAFT_COULD_NOT_APPEND;
}
spdlog::info("Instance {} registered", config.instance_name);
return RegisterInstanceCoordinatorStatus::SUCCESS;

View File

@ -13,214 +13,149 @@
#include "nuraft/coordinator_log_store.hpp"
#include "coordination/coordinator_exceptions.hpp"
namespace memgraph::coordination {
using nuraft::cs_new;
using nuraft::timer_helper;
CoordinatorLogStore::CoordinatorLogStore()
: start_idx_(1),
raft_server_bwd_pointer_(nullptr),
disk_emul_delay(0),
disk_emul_thread_(nullptr),
disk_emul_thread_stop_signal_(false),
disk_emul_last_durable_index_(0) {
// Dummy entry for index 0.
ptr<buffer> buf = buffer::alloc(sz_ulong);
namespace {
ptr<log_entry> MakeClone(const ptr<log_entry> &entry) {
return cs_new<log_entry>(entry->get_term(), buffer::clone(entry->get_buf()), entry->get_val_type(),
entry->get_timestamp());
}
} // namespace
CoordinatorLogStore::CoordinatorLogStore() : start_idx_(1) {
ptr<buffer> buf = buffer::alloc(sizeof(uint64_t));
logs_[0] = cs_new<log_entry>(0, buf);
}
CoordinatorLogStore::~CoordinatorLogStore() {
if (disk_emul_thread_) {
disk_emul_thread_stop_signal_ = true;
// disk_emul_ea_.invoke();
if (disk_emul_thread_->joinable()) {
disk_emul_thread_->join();
}
}
}
CoordinatorLogStore::~CoordinatorLogStore() {}
ptr<log_entry> CoordinatorLogStore::MakeClone(const ptr<log_entry> &entry) {
// NOTE:
// Timestamp is used only when `replicate_log_timestamp_` option is on.
// Otherwise, log store does not need to store or load it.
ptr<log_entry> clone = cs_new<log_entry>(entry->get_term(), buffer::clone(entry->get_buf()), entry->get_val_type(),
entry->get_timestamp());
return clone;
}
ulong CoordinatorLogStore::next_slot() const {
std::lock_guard<std::mutex> l(logs_lock_);
// Exclude the dummy entry.
return start_idx_ + logs_.size() - 1;
}
ulong CoordinatorLogStore::start_index() const { return start_idx_; }
ptr<log_entry> CoordinatorLogStore::last_entry() const {
ulong next_idx = next_slot();
std::lock_guard<std::mutex> l(logs_lock_);
auto entry = logs_.find(next_idx - 1);
auto CoordinatorLogStore::FindOrDefault_(uint64_t index) const -> ptr<log_entry> {
auto entry = logs_.find(index);
if (entry == logs_.end()) {
entry = logs_.find(0);
}
return MakeClone(entry->second);
return entry->second;
}
ulong CoordinatorLogStore::append(ptr<log_entry> &entry) {
uint64_t CoordinatorLogStore::next_slot() const {
auto lock = std::lock_guard{logs_lock_};
return start_idx_ + logs_.size() - 1;
}
uint64_t CoordinatorLogStore::start_index() const { return start_idx_; }
ptr<log_entry> CoordinatorLogStore::last_entry() const {
auto lock = std::lock_guard{logs_lock_};
uint64_t const last_idx = start_idx_ + logs_.size() - 1;
auto const last_src = FindOrDefault_(last_idx - 1);
return MakeClone(last_src);
}
uint64_t CoordinatorLogStore::append(ptr<log_entry> &entry) {
ptr<log_entry> clone = MakeClone(entry);
std::lock_guard<std::mutex> l(logs_lock_);
size_t idx = start_idx_ + logs_.size() - 1;
logs_[idx] = clone;
if (disk_emul_delay) {
uint64_t cur_time = timer_helper::get_timeofday_us();
disk_emul_logs_being_written_[cur_time + disk_emul_delay * 1000] = idx;
// disk_emul_ea_.invoke();
uint64_t next_slot{0};
{
auto lock = std::lock_guard{logs_lock_};
next_slot = start_idx_ + logs_.size() - 1;
logs_[next_slot] = clone;
}
return idx;
return next_slot;
}
void CoordinatorLogStore::write_at(ulong index, ptr<log_entry> &entry) {
void CoordinatorLogStore::write_at(uint64_t index, ptr<log_entry> &entry) {
ptr<log_entry> clone = MakeClone(entry);
// Discard all logs equal to or greater than `index.
std::lock_guard<std::mutex> l(logs_lock_);
auto itr = logs_.lower_bound(index);
while (itr != logs_.end()) {
itr = logs_.erase(itr);
}
logs_[index] = clone;
if (disk_emul_delay) {
uint64_t cur_time = timer_helper::get_timeofday_us();
disk_emul_logs_being_written_[cur_time + disk_emul_delay * 1000] = index;
// Remove entries greater than `index`.
auto entry = disk_emul_logs_being_written_.begin();
while (entry != disk_emul_logs_being_written_.end()) {
if (entry->second > index) {
entry = disk_emul_logs_being_written_.erase(entry);
} else {
entry++;
}
{
auto lock = std::lock_guard{logs_lock_};
auto itr = logs_.lower_bound(index);
while (itr != logs_.end()) {
itr = logs_.erase(itr);
}
// disk_emul_ea_.invoke();
logs_[index] = clone;
}
}
ptr<std::vector<ptr<log_entry>>> CoordinatorLogStore::log_entries(ulong start, ulong end) {
ptr<std::vector<ptr<log_entry>>> ret = cs_new<std::vector<ptr<log_entry>>>();
ptr<std::vector<ptr<log_entry>>> CoordinatorLogStore::log_entries(uint64_t start, uint64_t end) {
auto ret = cs_new<std::vector<ptr<log_entry>>>();
ret->resize(end - start);
ulong cc = 0;
for (ulong ii = start; ii < end; ++ii) {
for (uint64_t i = start, curr_index = 0; i < end; ++i, ++curr_index) {
ptr<log_entry> src = nullptr;
{
std::lock_guard<std::mutex> l(logs_lock_);
auto entry = logs_.find(ii);
if (entry == logs_.end()) {
entry = logs_.find(0);
assert(0);
auto lock = std::lock_guard{logs_lock_};
if (auto const entry = logs_.find(i); entry != logs_.end()) {
src = entry->second;
} else {
throw RaftCouldNotFindEntryException("Could not find entry at index {}", i);
}
src = entry->second;
}
(*ret)[cc++] = MakeClone(src);
(*ret)[curr_index] = MakeClone(src);
}
return ret;
}
// NOLINTNEXTLINE(google-default-arguments)
ptr<std::vector<ptr<log_entry>>> CoordinatorLogStore::log_entries_ext(ulong start, ulong end,
int64 batch_size_hint_in_bytes) {
ptr<std::vector<ptr<log_entry>>> ret = cs_new<std::vector<ptr<log_entry>>>();
if (batch_size_hint_in_bytes < 0) {
return ret;
}
size_t accum_size = 0;
for (ulong ii = start; ii < end; ++ii) {
ptr<log_entry> src = nullptr;
{
std::lock_guard<std::mutex> l(logs_lock_);
auto entry = logs_.find(ii);
if (entry == logs_.end()) {
entry = logs_.find(0);
assert(0);
}
src = entry->second;
}
ret->push_back(MakeClone(src));
accum_size += src->get_buf().size();
if (batch_size_hint_in_bytes && accum_size >= (ulong)batch_size_hint_in_bytes) break;
}
return ret;
}
ptr<log_entry> CoordinatorLogStore::entry_at(ulong index) {
ptr<log_entry> CoordinatorLogStore::entry_at(uint64_t index) {
ptr<log_entry> src = nullptr;
{
std::lock_guard<std::mutex> l(logs_lock_);
auto entry = logs_.find(index);
if (entry == logs_.end()) {
entry = logs_.find(0);
}
src = entry->second;
auto lock = std::lock_guard{logs_lock_};
src = FindOrDefault_(index);
}
return MakeClone(src);
}
ulong CoordinatorLogStore::term_at(ulong index) {
ulong term = 0;
uint64_t CoordinatorLogStore::term_at(uint64_t index) {
uint64_t term = 0;
{
std::lock_guard<std::mutex> l(logs_lock_);
auto entry = logs_.find(index);
if (entry == logs_.end()) {
entry = logs_.find(0);
}
term = entry->second->get_term();
auto lock = std::lock_guard{logs_lock_};
term = FindOrDefault_(index)->get_term();
}
return term;
}
ptr<buffer> CoordinatorLogStore::pack(ulong index, int32 cnt) {
ptr<buffer> CoordinatorLogStore::pack(uint64_t index, int32 cnt) {
std::vector<ptr<buffer>> logs;
size_t size_total = 0;
for (ulong ii = index; ii < index + cnt; ++ii) {
uint64_t const end_index = index + cnt;
for (uint64_t i = index; i < end_index; ++i) {
ptr<log_entry> le = nullptr;
{
std::lock_guard<std::mutex> l(logs_lock_);
le = logs_[ii];
auto lock = std::lock_guard{logs_lock_};
le = logs_[i];
}
assert(le.get());
ptr<buffer> buf = le->serialize();
auto buf = le->serialize();
size_total += buf->size();
logs.push_back(buf);
}
ptr<buffer> buf_out = buffer::alloc(sizeof(int32) + cnt * sizeof(int32) + size_total);
auto buf_out = buffer::alloc(sizeof(int32) + cnt * sizeof(int32) + size_total);
buf_out->pos(0);
buf_out->put((int32)cnt);
for (auto &entry : logs) {
ptr<buffer> &bb = entry;
buf_out->put((int32)bb->size());
auto &bb = entry; // TODO: (andi) This smells like not needed
buf_out->put(static_cast<int32>(bb->size()));
buf_out->put(*bb);
}
return buf_out;
}
void CoordinatorLogStore::apply_pack(ulong index, buffer &pack) {
void CoordinatorLogStore::apply_pack(uint64_t index, buffer &pack) {
pack.pos(0);
int32 num_logs = pack.get_int();
int32 const num_logs = pack.get_int();
for (int32 ii = 0; ii < num_logs; ++ii) {
ulong cur_idx = index + ii;
for (int32 i = 0; i < num_logs; ++i) {
uint64_t cur_idx = index + i;
int32 buf_size = pack.get_int();
ptr<buffer> buf_local = buffer::alloc(buf_size);
@ -228,14 +163,14 @@ void CoordinatorLogStore::apply_pack(ulong index, buffer &pack) {
ptr<log_entry> le = log_entry::deserialize(*buf_local);
{
std::lock_guard<std::mutex> l(logs_lock_);
auto lock = std::lock_guard{logs_lock_};
logs_[cur_idx] = le;
}
}
{
std::lock_guard<std::mutex> l(logs_lock_);
auto entry = logs_.upper_bound(0);
auto lock = std::lock_guard{logs_lock_};
auto const entry = logs_.upper_bound(0);
if (entry != logs_.end()) {
start_idx_ = entry->first;
} else {
@ -244,88 +179,23 @@ void CoordinatorLogStore::apply_pack(ulong index, buffer &pack) {
}
}
bool CoordinatorLogStore::compact(ulong last_log_index) {
std::lock_guard<std::mutex> l(logs_lock_);
for (ulong ii = start_idx_; ii <= last_log_index; ++ii) {
auto entry = logs_.find(ii);
// NOTE: Remove all logs up to given 'last_log_index' (inclusive).
bool CoordinatorLogStore::compact(uint64_t last_log_index) {
auto lock = std::lock_guard{logs_lock_};
for (uint64_t ii = start_idx_; ii <= last_log_index; ++ii) {
auto const entry = logs_.find(ii);
if (entry != logs_.end()) {
logs_.erase(entry);
}
}
// WARNING:
// Even though nothing has been erased,
// we should set `start_idx_` to new index.
if (start_idx_ <= last_log_index) {
start_idx_ = last_log_index + 1;
}
return true;
}
bool CoordinatorLogStore::flush() {
disk_emul_last_durable_index_ = next_slot() - 1;
return true;
}
ulong CoordinatorLogStore::last_durable_index() {
uint64_t last_log = next_slot() - 1;
if (!disk_emul_delay) {
return last_log;
}
return disk_emul_last_durable_index_;
}
void CoordinatorLogStore::DiskEmulLoop() {
// This thread mimics async disk writes.
// uint32_t next_sleep_us = 100 * 1000;
while (!disk_emul_thread_stop_signal_) {
// disk_emul_ea_.wait_us(next_sleep_us);
// disk_emul_ea_.reset();
if (disk_emul_thread_stop_signal_) break;
uint64_t cur_time = timer_helper::get_timeofday_us();
// next_sleep_us = 100 * 1000;
bool call_notification = false;
{
std::lock_guard<std::mutex> l(logs_lock_);
// Remove all timestamps equal to or smaller than `cur_time`,
// and pick the greatest one among them.
auto entry = disk_emul_logs_being_written_.begin();
while (entry != disk_emul_logs_being_written_.end()) {
if (entry->first <= cur_time) {
disk_emul_last_durable_index_ = entry->second;
entry = disk_emul_logs_being_written_.erase(entry);
call_notification = true;
} else {
break;
}
}
entry = disk_emul_logs_being_written_.begin();
if (entry != disk_emul_logs_being_written_.end()) {
// next_sleep_us = entry->first - cur_time;
}
}
if (call_notification) {
raft_server_bwd_pointer_->notify_log_append_completion(true);
}
}
}
void CoordinatorLogStore::Close() {}
void CoordinatorLogStore::SetDiskDelay(raft_server *raft, size_t delay_ms) {
disk_emul_delay = delay_ms;
raft_server_bwd_pointer_ = raft;
if (!disk_emul_thread_) {
disk_emul_thread_ = std::make_unique<std::thread>(&CoordinatorLogStore::DiskEmulLoop, this);
}
}
bool CoordinatorLogStore::flush() { return true; }
} // namespace memgraph::coordination
#endif

View File

@ -15,6 +15,19 @@
namespace memgraph::coordination {
auto CoordinatorStateMachine::EncodeRegisterReplicationInstance(const std::string &name) -> ptr<buffer> {
std::string str_log = name + "_replica";
ptr<buffer> log = buffer::alloc(sizeof(uint32_t) + str_log.size());
buffer_serializer bs(log);
bs.put_str(str_log);
return log;
}
auto CoordinatorStateMachine::DecodeRegisterReplicationInstance(buffer &data) -> std::string {
buffer_serializer bs(data);
return bs.get_str();
}
auto CoordinatorStateMachine::pre_commit(ulong const log_idx, buffer &data) -> ptr<buffer> {
buffer_serializer bs(data);
std::string str = bs.get_str();

View File

@ -61,5 +61,16 @@ class RaftBecomeLeaderException final : public utils::BasicException {
SPECIALIZE_GET_EXCEPTION_NAME(RaftBecomeLeaderException)
};
class RaftCouldNotFindEntryException final : public utils::BasicException {
public:
explicit RaftCouldNotFindEntryException(std::string_view what) noexcept : BasicException(what) {}
template <class... Args>
explicit RaftCouldNotFindEntryException(fmt::format_string<Args...> fmt, Args &&...args) noexcept
: RaftCouldNotFindEntryException(fmt::format(fmt, std::forward<Args>(args)...)) {}
SPECIALIZE_GET_EXCEPTION_NAME(RaftCouldNotFindEntryException)
};
} // namespace memgraph::coordination
#endif

View File

@ -40,7 +40,7 @@ class RaftInstance {
RaftInstance &operator=(RaftInstance const &other) = delete;
RaftInstance(RaftInstance &&other) noexcept = delete;
RaftInstance &operator=(RaftInstance &&other) noexcept = delete;
~RaftInstance() = default;
~RaftInstance();
auto InstanceName() const -> std::string;
auto RaftSocketAddress() const -> std::string;

View File

@ -46,9 +46,6 @@ class CoordinatorLogStore : public log_store {
ptr<std::vector<ptr<log_entry>>> log_entries(ulong start, ulong end) override;
// NOLINTNEXTLINE
ptr<std::vector<ptr<log_entry>>> log_entries_ext(ulong start, ulong end, int64 batch_size_hint_in_bytes = 0) override;
ptr<log_entry> entry_at(ulong index) override;
ulong term_at(ulong index) override;
@ -61,67 +58,12 @@ class CoordinatorLogStore : public log_store {
bool flush() override;
ulong last_durable_index() override;
void Close();
void SetDiskDelay(raft_server *raft, size_t delay_ms);
private:
static ptr<log_entry> MakeClone(ptr<log_entry> const &entry);
auto FindOrDefault_(ulong index) const -> ptr<log_entry>;
void DiskEmulLoop();
/**
* Map of <log index, log data>.
*/
std::map<ulong, ptr<log_entry>> logs_;
/**
* Lock for `logs_`.
*/
mutable std::mutex logs_lock_;
/**
* The index of the first log.
*/
std::atomic<ulong> start_idx_;
/**
* Backward pointer to Raft server.
*/
raft_server *raft_server_bwd_pointer_;
// Testing purpose --------------- BEGIN
/**
* If non-zero, this log store will emulate the disk write delay.
*/
std::atomic<size_t> disk_emul_delay;
/**
* Map of <timestamp, log index>, emulating logs that is being written to disk.
* Log index will be regarded as "durable" after the corresponding timestamp.
*/
std::map<uint64_t, uint64_t> disk_emul_logs_being_written_;
/**
* Thread that will update `last_durable_index_` and call
* `notify_log_append_completion` at proper time.
*/
std::unique_ptr<std::thread> disk_emul_thread_;
/**
* Flag to terminate the thread.
*/
std::atomic<bool> disk_emul_thread_stop_signal_;
/**
* Last written log index.
*/
std::atomic<uint64_t> disk_emul_last_durable_index_;
// Testing purpose --------------- END
};
} // namespace memgraph::coordination

View File

@ -36,18 +36,9 @@ class CoordinatorStateMachine : public state_machine {
CoordinatorStateMachine &operator=(CoordinatorStateMachine &&) = delete;
~CoordinatorStateMachine() override {}
static auto EncodeRegisterReplicationInstance(const std::string &name) -> ptr<buffer> {
std::string str_log = name + "_replica";
ptr<buffer> log = buffer::alloc(str_log.size());
buffer_serializer bs(log);
bs.put_str(str_log);
return log;
}
static auto EncodeRegisterReplicationInstance(const std::string &name) -> ptr<buffer>;
static auto DecodeRegisterReplicationInstance(buffer &data) -> std::string {
buffer_serializer bs(data);
return bs.get_str();
}
static auto DecodeRegisterReplicationInstance(buffer &data) -> std::string;
auto pre_commit(ulong log_idx, buffer &data) -> ptr<buffer> override;

View File

@ -72,7 +72,6 @@ RaftInstance::RaftInstance(BecomeLeaderCb become_leader_cb, BecomeFollowerCb bec
raft_server_ = launcher_.init(state_machine_, state_manager_, logger_, static_cast<int>(raft_port_), asio_opts,
params, init_opts);
raft_server_->request_leadership();
if (!raft_server_) {
throw RaftServerStartException("Failed to launch raft server on {}", raft_endpoint);
@ -90,6 +89,8 @@ RaftInstance::RaftInstance(BecomeLeaderCb become_leader_cb, BecomeFollowerCb bec
spdlog::info("Raft server started on {}", raft_endpoint);
}
RaftInstance::~RaftInstance() { launcher_.shutdown(); }
auto RaftInstance::InstanceName() const -> std::string { return "coordinator_" + std::to_string(raft_server_id_); }
auto RaftInstance::RaftSocketAddress() const -> std::string { return raft_address_ + ":" + std::to_string(raft_port_); }
@ -113,10 +114,7 @@ auto RaftInstance::GetAllCoordinators() const -> std::vector<ptr<srv_config>> {
auto RaftInstance::IsLeader() const -> bool { return raft_server_->is_leader(); }
auto RaftInstance::RequestLeadership() -> bool {
if (!raft_server_->is_leader()) {
raft_server_->request_leadership();
}
return raft_server_->is_leader();
return raft_server_->is_leader() || raft_server_->request_leadership();
}
auto RaftInstance::AppendRegisterReplicationInstance(std::string const &instance) -> ptr<raft_result> {