Merge branch 'T0912-MG-in-memory-shard-map' of github.com:memgraph/memgraph into T0916-MG-fbthrift-transport

This commit is contained in:
Tyler Neely 2022-08-17 09:53:19 +00:00
commit b9efa2499d
5 changed files with 79 additions and 47 deletions

View File

@ -135,9 +135,14 @@ class OpaquePromise {
std::unique_ptr<OpaquePromiseTraitBase> trait_;
public:
OpaquePromise(OpaquePromise &&old) noexcept : ptr_(old.ptr_), trait_(std::move(old.trait_)) { old.ptr_ = nullptr; }
OpaquePromise(OpaquePromise &&old) noexcept : ptr_(old.ptr_), trait_(std::move(old.trait_)) {
MG_ASSERT(old.ptr_ != nullptr);
old.ptr_ = nullptr;
}
OpaquePromise &operator=(OpaquePromise &&old) noexcept {
MG_ASSERT(ptr_ == nullptr);
MG_ASSERT(old.ptr_ != nullptr);
MG_ASSERT(this != &old);
ptr_ = old.ptr_;
trait_ = std::move(old.trait_);

View File

@ -135,7 +135,7 @@ struct Leader {
std::unordered_map<LogIndex, PendingClientRequest> pending_client_requests;
Time last_broadcast = Time::min();
static void Print() { std::cout << "\tLeader \t"; }
std::string ToString() { return "\tLeader \t"; }
};
struct Candidate {
@ -143,27 +143,29 @@ struct Candidate {
Time election_began = Time::min();
std::set<Address> outstanding_votes;
static void Print() { std::cout << "\tCandidate\t"; }
std::string ToString() { return "\tCandidate\t"; }
};
struct Follower {
Time last_received_append_entries_timestamp;
Address leader_address;
static void Print() { std::cout << "\tFollower \t"; }
std::string ToString() { return "\tFollower \t"; }
};
using Role = std::variant<Candidate, Leader, Follower>;
/*
all ReplicatedState classes should have an Apply method
that returns our WriteResponseValue:
that returns our WriteResponseValue after consensus, and
a Read method that returns our ReadResponseValue without
requiring consensus.
ReadResponse Read(ReadOperation);
WriteResponseValue ReplicatedState::Apply(WriteRequest);
for examples:
if the state is uint64_t, and WriteRequest is `struct PlusOne {};`,
For example:
If the state is uint64_t, and WriteRequest is `struct PlusOne {};`,
and WriteResponseValue is also uint64_t (the new value), then
each call to state.Apply(PlusOne{}) will return the new value
after incrementing it. 0, 1, 2, 3... and this will be sent back
@ -186,7 +188,7 @@ concept Rsm = requires(ReplicatedState state, WriteOperation w, ReadOperation r)
/// ReplicatedState the high-level data structure that is managed by the raft-backed replicated state machine
/// WriteOperation the individual operation type that is applied to the ReplicatedState in identical order
/// across each replica
/// WriteResponseValue the return value of calling ReplicatedState::write(WriteOperation), which is executed in
/// WriteResponseValue the return value of calling ReplicatedState::Apply(WriteOperation), which is executed in
/// identical order across all replicas after an WriteOperation reaches consensus.
/// ReadOperation the type of operations that do not require consensus before executing directly
/// on a const ReplicatedState &
@ -237,9 +239,16 @@ class Raft {
// When the entry has been safely replicated, the leader applies the
// entry to its state machine and returns the result of that
// execution to the client.
//
// "Safely replicated" is defined as being known to be present
// on at least a majority of all peers (inclusive of the Leader).
void BumpCommitIndexAndReplyToClients(Leader &leader) {
// set the current committed_log_size based on the
auto indices = std::vector<LogIndex>{state_.log.size()};
auto indices = std::vector<LogIndex>{};
// We include our own log size in the calculation of the log
// index that is present on at least a majority of all peers.
indices.push_back(state_.log.size());
for (const auto &[addr, f] : leader.followers) {
indices.push_back(f.confirmed_contiguous_index);
Log("at port ", addr.last_known_port, " has confirmed contiguous index of: ", f.confirmed_contiguous_index);
@ -248,7 +257,23 @@ class Raft {
// reverse sort from highest to lowest (using std::ranges::greater)
std::ranges::sort(indices, std::ranges::greater());
size_t new_committed_log_size = indices[(indices.size() / 2)];
// This is a particularly correctness-critical calculation because it
// determines which index we will consider to be the committed index.
//
// If the following indexes are recorded for clusters of different sizes,
// these are the expected indexes that are considered to have reached
// consensus:
// state | expected value | (indices.size() / 2)
// [1] 1 (1 / 2) => 0
// [2, 1] 1 (2 / 2) => 1
// [3, 2, 1] 2 (3 / 2) => 1
// [4, 3, 2, 1] 2 (4 / 2) => 2
// [5, 4, 3, 2, 1] 3 (5 / 2) => 2
size_t index_present_on_majority = indices.size() / 2;
LogIndex new_committed_log_size = indices[index_present_on_majority];
// We never go backwards in history.
MG_ASSERT(state_.committed_log_size <= new_committed_log_size);
state_.committed_log_size = new_committed_log_size;
@ -266,7 +291,7 @@ class Raft {
WriteResponse<WriteResponseValue> resp;
resp.success = true;
resp.write_return = write_return;
resp.write_return = std::move(write_return);
io_.Send(client_request.address, client_request.request_id, std::move(resp));
leader.pending_client_requests.erase(apply_index);
@ -302,7 +327,7 @@ class Raft {
};
// request_id not necessary to set because it's not a Future-backed Request.
const RequestId request_id = 0;
static constexpr RequestId request_id = 0;
io_.Send(address, request_id, ar);
}
@ -311,10 +336,7 @@ class Raft {
// Raft paper - 5.2
// Raft uses randomized election timeouts to ensure that split votes are rare and that they are resolved quickly
Duration RandomTimeout(Duration min, Duration max) {
auto min_micros = std::chrono::duration_cast<std::chrono::milliseconds>(min).count();
auto max_micros = std::chrono::duration_cast<std::chrono::milliseconds>(max).count();
std::uniform_int_distribution time_distrib(min_micros, max_micros);
std::uniform_int_distribution time_distrib(min.count(), max.count());
auto rand_micros = io_.Rand(time_distrib);
@ -329,12 +351,12 @@ class Raft {
return std::chrono::microseconds{rand_micros};
}
Term PreviousTermFromIndex(LogIndex index) {
Term PreviousTermFromIndex(LogIndex index) const {
if (index == 0 || state_.log.size() + 1 <= index) {
return 0;
}
auto &[term, data] = state_.log.at(index - 1);
const auto &[term, data] = state_.log.at(index - 1);
return term;
}
@ -352,12 +374,12 @@ class Raft {
LogIndex LastLogIndex() { return state_.log.size(); }
Term LastLogTerm() {
Term LastLogTerm() const {
if (state_.log.empty()) {
return 0;
}
auto &[term, data] = state_.log.back();
const auto &[term, data] = state_.log.back();
return term;
}
@ -368,11 +390,17 @@ class Raft {
const Term term = state_.term;
std::cout << '\t' << micros << "\t" << term << "\t" << io_.GetAddress().last_known_port;
std::ostringstream out;
std::visit([&](auto &&role) { role.Print(); }, role_);
out << '\t' << (int)micros << "\t" << term << "\t" << io_.GetAddress().last_known_port;
(std::cout << ... << args) << std::endl;
std::string role_string = std::visit([&](auto &&role) { return role.ToString(); }, role_);
out << role_string;
(out << ... << args);
spdlog::debug(out.str());
}
/////////////////////////////////////////////////////////////
@ -404,7 +432,7 @@ class Raft {
std::optional<Role> Cron(Candidate &candidate) {
const auto now = io_.Now();
const Duration election_timeout = RandomTimeout(100000, 200000);
auto election_timeout_us = std::chrono::duration_cast<std::chrono::milliseconds>(election_timeout).count();
const auto election_timeout_us = std::chrono::duration_cast<std::chrono::milliseconds>(election_timeout).count();
if (now - candidate.election_began > election_timeout) {
state_.term++;
@ -421,7 +449,7 @@ class Raft {
for (const auto &peer : peers_) {
// request_id not necessary to set because it's not a Future-backed Request.
auto request_id = 0;
static constexpr auto request_id = 0;
io_.template Send<VoteRequest>(peer, request_id, request);
outstanding_votes.insert(peer);
}
@ -664,6 +692,7 @@ class Raft {
state_.log.insert(state_.log.end(), req.entries.begin(), req.entries.end());
MG_ASSERT(req.leader_commit >= state_.committed_log_size);
state_.committed_log_size = std::min(req.leader_commit, LastLogIndex());
for (; state_.applied_size < state_.committed_log_size; state_.applied_size++) {

View File

@ -54,10 +54,8 @@ size_t SimulatorHandle::BlockedServers() {
size_t blocked_servers = blocked_on_receive_;
for (auto &[promise_key, opaque_promise] : promises_) {
if (opaque_promise.promise.IsAwaited()) {
if (server_addresses_.contains(promise_key.requester_address)) {
blocked_servers++;
}
if (opaque_promise.promise.IsAwaited() && server_addresses_.contains(promise_key.requester_address)) {
blocked_servers++;
}
}

View File

@ -162,7 +162,7 @@ void RunSimulation() {
auto srv_thread_3 = std::jthread(RunRaft<SimulatorTransport>, std::move(srv_3));
simulator.IncrementServerCountAndWaitForQuiescentState(srv_addr_3);
std::cout << "beginning test after servers have become quiescent" << std::endl;
spdlog::debug("beginning test after servers have become quiescent");
std::mt19937 cli_rng_{0};
std::vector<Address> server_addrs{srv_addr_1, srv_addr_2, srv_addr_3};
@ -197,8 +197,8 @@ void RunSimulation() {
bool cas_succeeded = cas_response.cas_success;
std::cout << "Client received CasResponse! success: " << cas_succeeded
<< " last_known_value: " << (int)*last_known_value << std::endl;
spdlog::debug("Client received CasResponse! success: {} last_known_value {}", cas_succeeded,
(int)*last_known_value);
if (cas_succeeded) {
last_known_value = i;
@ -223,7 +223,7 @@ void RunSimulation() {
MG_ASSERT(get_response.value == i);
std::cout << "client successfully cas'd a value and read it back! value: " << i << std::endl;
spdlog::debug("client successfully cas'd a value and read it back! value: {}", i);
success = true;
}
@ -234,14 +234,14 @@ void RunSimulation() {
SimulatorStats stats = simulator.Stats();
std::cout << "total messages: " << stats.total_messages << std::endl;
std::cout << "dropped messages: " << stats.dropped_messages << std::endl;
std::cout << "timed out requests: " << stats.timed_out_requests << std::endl;
std::cout << "total requests: " << stats.total_requests << std::endl;
std::cout << "total responses: " << stats.total_responses << std::endl;
std::cout << "simulator ticks: " << stats.simulator_ticks << std::endl;
spdlog::debug("total messages: ", stats.total_messages);
spdlog::debug("dropped messages: ", stats.dropped_messages);
spdlog::debug("timed out requests: ", stats.timed_out_requests);
spdlog::debug("total requests: ", stats.total_requests);
spdlog::debug("total responses: ", stats.total_responses);
spdlog::debug("simulator ticks: ", stats.simulator_ticks);
std::cout << "========================== SUCCESS :) ==========================" << std::endl;
spdlog::debug("========================== SUCCESS :) ==========================");
/*
this is implicit in jthread's dtor
@ -255,12 +255,12 @@ int main() {
int n_tests = 50;
for (int i = 0; i < n_tests; i++) {
std::cout << "========================== NEW SIMULATION " << i << " ==========================" << std::endl;
std::cout << "\tTime\tTerm\tPort\tRole\t\tMessage\n";
spdlog::debug("========================== NEW SIMULATION {} ==========================", i);
spdlog::debug("\tTime\tTerm\tPort\tRole\t\tMessage\n");
RunSimulation();
}
std::cout << "passed " << n_tests << " tests!" << std::endl;
spdlog::debug("passed {} tests!", n_tests);
return 0;
}

View File

@ -193,8 +193,6 @@ int main() {
auto b_thread_3 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(b_3));
simulator.IncrementServerCountAndWaitForQuiescentState(b_addrs[2]);
std::cout << "beginning test after servers have become quiescent" << std::endl;
// Spin up coordinators
Io<SimulatorTransport> c_io_1 = simulator.RegisterNew();
@ -259,7 +257,9 @@ int main() {
// Transaction ID to be used later...
auto transaction_id = res.new_hlc;
client_shard_map = res.fresher_shard_map.value();
if (res.fresher_shard_map) {
client_shard_map = res.fresher_shard_map.value();
}
// TODO(gabor) check somewhere in the call chain if the entries are actually valid
// for (auto &[key, val] : client_shard_map.GetShards()) {