Merge branch 'T0941-MG-implement-basic-raft-version' of github.com:memgraph/memgraph into T0912-MG-in-memory-shard-map

This commit is contained in:
Tyler Neely 2022-08-16 09:14:03 +00:00
commit e545beaf78
3 changed files with 35 additions and 8 deletions

View File

@ -240,9 +240,16 @@ class Raft {
// When the entry has been safely replicated, the leader applies the
// entry to its state machine and returns the result of that
// execution to the client.
//
// "Safely replicated" is defined as being known to be present
// on at least a majority of all peers (inclusive of the Leader).
void BumpCommitIndexAndReplyToClients(Leader &leader) {
// set the current committed_log_size based on the
auto indices = std::vector<LogIndex>{state_.log.size()};
auto indices = std::vector<LogIndex>{};
// We include our own log size in the calculation of the log
// index that is present on at least a majority of all peers.
indices.push_back(state_.log.size());
for (const auto &[addr, f] : leader.followers) {
indices.push_back(f.confirmed_contiguous_index);
Log("at port ", addr.last_known_port, " has confirmed contiguous index of: ", f.confirmed_contiguous_index);
@ -251,7 +258,23 @@ class Raft {
// reverse sort from highest to lowest (using std::ranges::greater)
std::ranges::sort(indices, std::ranges::greater());
size_t new_committed_log_size = indices[(indices.size() / 2)];
// This is a particularly correctness-critical calculation because it
// determines which index we will consider to be the committed index.
//
// If the following indexes are recorded for clusters of different sizes,
// these are the expected indexes that are considered to have reached
// consensus:
// state | expected value | (indices.size() / 2)
// [1] 1 (1 / 2) => 0
// [2, 1] 1 (2 / 2) => 1
// [3, 2, 1] 2 (3 / 2) => 1
// [4, 3, 2, 1] 2 (4 / 2) => 2
// [5, 4, 3, 2, 1] 3 (5 / 2) => 2
size_t index_present_on_majority = indices.size() / 2;
LogIndex new_committed_log_size = indices[index_present_on_majority];
// We never go backwards in history.
MG_ASSERT(state_.committed_log_size <= new_committed_log_size);
state_.committed_log_size = new_committed_log_size;
@ -667,6 +690,7 @@ class Raft {
state_.log.insert(state_.log.end(), req.entries.begin(), req.entries.end());
MG_ASSERT(req.leader_commit >= state_.committed_log_size);
state_.committed_log_size = std::min(req.leader_commit, LastLogIndex());
for (; state_.applied_size < state_.committed_log_size; state_.applied_size++) {

View File

@ -118,9 +118,14 @@ class OpaquePromise {
std::unique_ptr<OpaquePromiseTraitBase> trait_;
public:
OpaquePromise(OpaquePromise &&old) noexcept : ptr_(old.ptr_), trait_(std::move(old.trait_)) { old.ptr_ = nullptr; }
OpaquePromise(OpaquePromise &&old) noexcept : ptr_(old.ptr_), trait_(std::move(old.trait_)) {
MG_ASSERT(old.ptr_ != nullptr);
old.ptr_ = nullptr;
}
OpaquePromise &operator=(OpaquePromise &&old) noexcept {
MG_ASSERT(ptr_ == nullptr);
MG_ASSERT(old.ptr_ != nullptr);
MG_ASSERT(this != &old);
ptr_ = old.ptr_;
trait_ = std::move(old.trait_);

View File

@ -54,10 +54,8 @@ size_t SimulatorHandle::BlockedServers() {
size_t blocked_servers = blocked_on_receive_;
for (auto &[promise_key, opaque_promise] : promises_) {
if (opaque_promise.promise.IsAwaited()) {
if (server_addresses_.contains(promise_key.requester_address)) {
blocked_servers++;
}
if (opaque_promise.promise.IsAwaited() && server_addresses_.contains(promise_key.requester_address)) {
blocked_servers++;
}
}