Merge branch 'T0941-MG-implement-basic-raft-version' of github.com:memgraph/memgraph into T0912-MG-in-memory-shard-map

This commit is contained in:
Tyler Neely 2022-08-30 08:38:14 +00:00
commit 2a395a18b0
14 changed files with 643 additions and 246 deletions

View File

@ -16,6 +16,7 @@
#include <fmt/format.h>
#include <boost/asio/ip/tcp.hpp>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
namespace memgraph::io {
@ -37,6 +38,12 @@ struct Address {
return ret;
}
static Address UniqueLocalAddress() {
return Address{
.unique_id = boost::uuids::uuid{boost::uuids::random_generator()()},
};
}
friend bool operator==(const Address &lhs, const Address &rhs) = default;
/// unique_id is most dominant for ordering, then last_known_ip, then last_known_port

View File

@ -0,0 +1,35 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <random>
#include "io/address.hpp"
#include "io/local_transport/local_transport.hpp"
#include "io/local_transport/local_transport_handle.hpp"
#include "io/transport.hpp"
namespace memgraph::io::local_transport {
class LocalSystem {
std::shared_ptr<LocalTransportHandle> local_transport_handle_ = std::make_shared<LocalTransportHandle>();
public:
Io<LocalTransport> Register(Address address) {
LocalTransport local_transport(local_transport_handle_, address);
return Io{local_transport, address};
}
void ShutDown() { local_transport_handle_->ShutDown(); }
};
} // namespace memgraph::io::local_transport

View File

@ -0,0 +1,67 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <chrono>
#include <memory>
#include <random>
#include <utility>
#include "io/address.hpp"
#include "io/local_transport/local_transport_handle.hpp"
#include "io/time.hpp"
#include "io/transport.hpp"
namespace memgraph::io::local_transport {
class LocalTransport {
std::shared_ptr<LocalTransportHandle> local_transport_handle_;
const Address address_;
public:
LocalTransport(std::shared_ptr<LocalTransportHandle> local_transport_handle, Address address)
: local_transport_handle_(std::move(local_transport_handle)), address_(address) {}
template <Message RequestT, Message ResponseT>
ResponseFuture<ResponseT> Request(Address to_address, RequestId request_id, RequestT request, Duration timeout) {
auto [future, promise] = memgraph::io::FuturePromisePair<ResponseResult<ResponseT>>();
Address from_address = address_;
local_transport_handle_->SubmitRequest(to_address, from_address, request_id, std::move(request), timeout,
std::move(promise));
return std::move(future);
}
template <Message... Ms>
requires(sizeof...(Ms) > 0) RequestResult<Ms...> Receive(Duration timeout) {
Address from_address = address_;
return local_transport_handle_->template Receive<Ms...>(timeout);
}
template <Message M>
void Send(Address to_address, Address from_address, RequestId request_id, M &&message) {
return local_transport_handle_->template Send<M>(to_address, from_address, request_id, std::forward<M>(message));
}
Time Now() const { return local_transport_handle_->Now(); }
bool ShouldShutDown() const { return local_transport_handle_->ShouldShutDown(); }
template <class D = std::poisson_distribution<>, class Return = uint64_t>
Return Rand(D distrib) {
std::random_device rng;
return distrib(rng);
}
};
}; // namespace memgraph::io::local_transport

View File

@ -0,0 +1,139 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <chrono>
#include <condition_variable>
#include <iostream>
#include <map>
#include <mutex>
#include "io/errors.hpp"
#include "io/message_conversion.hpp"
#include "io/time.hpp"
#include "io/transport.hpp"
namespace memgraph::io::local_transport {
class LocalTransportHandle {
mutable std::mutex mu_{};
mutable std::condition_variable cv_;
bool should_shut_down_ = false;
// the responses to requests that are being waited on
std::map<PromiseKey, DeadlineAndOpaquePromise> promises_;
// messages that are sent to servers that may later receive them
std::vector<OpaqueMessage> can_receive_;
public:
void ShutDown() {
std::unique_lock<std::mutex> lock(mu_);
should_shut_down_ = true;
cv_.notify_all();
}
bool ShouldShutDown() const {
std::unique_lock<std::mutex> lock(mu_);
return should_shut_down_;
}
static Time Now() {
auto nano_time = std::chrono::system_clock::now();
return std::chrono::time_point_cast<std::chrono::microseconds>(nano_time);
}
template <Message... Ms>
requires(sizeof...(Ms) > 0) RequestResult<Ms...> Receive(Duration timeout) {
std::unique_lock lock(mu_);
Time before = Now();
while (can_receive_.empty()) {
Time now = Now();
// protection against non-monotonic timesources
auto maxed_now = std::max(now, before);
auto elapsed = maxed_now - before;
if (timeout < elapsed) {
return TimedOut{};
}
Duration relative_timeout = timeout - elapsed;
std::cv_status cv_status_value = cv_.wait_for(lock, relative_timeout);
if (cv_status_value == std::cv_status::timeout) {
return TimedOut{};
}
}
auto current_message = std::move(can_receive_.back());
can_receive_.pop_back();
auto m_opt = std::move(current_message).Take<Ms...>();
return std::move(m_opt).value();
}
template <Message M>
void Send(Address to_address, Address from_address, RequestId request_id, M &&message) {
std::any message_any(std::forward<M>(message));
OpaqueMessage opaque_message{
.from_address = from_address, .request_id = request_id, .message = std::move(message_any)};
PromiseKey promise_key{.requester_address = to_address,
.request_id = opaque_message.request_id,
.replier_address = opaque_message.from_address};
{
std::unique_lock<std::mutex> lock(mu_);
if (promises_.contains(promise_key)) {
// complete waiting promise if it's there
DeadlineAndOpaquePromise dop = std::move(promises_.at(promise_key));
promises_.erase(promise_key);
dop.promise.Fill(std::move(opaque_message));
} else {
can_receive_.emplace_back(std::move(opaque_message));
}
} // lock dropped
cv_.notify_all();
}
template <Message RequestT, Message ResponseT>
void SubmitRequest(Address to_address, Address from_address, RequestId request_id, RequestT &&request,
Duration timeout, ResponsePromise<ResponseT> promise) {
const bool port_matches = to_address.last_known_port == from_address.last_known_port;
const bool ip_matches = to_address.last_known_ip == from_address.last_known_ip;
MG_ASSERT(port_matches && ip_matches);
const Time deadline = Now() + timeout;
{
std::unique_lock<std::mutex> lock(mu_);
PromiseKey promise_key{
.requester_address = from_address, .request_id = request_id, .replier_address = to_address};
OpaquePromise opaque_promise(std::move(promise).ToUnique());
DeadlineAndOpaquePromise dop{.deadline = deadline, .promise = std::move(opaque_promise)};
promises_.emplace(std::move(promise_key), std::move(dop));
} // lock dropped
Send(to_address, from_address, request_id, std::forward<RequestT>(request));
}
};
} // namespace memgraph::io::local_transport

View File

@ -13,13 +13,35 @@
#include "io/transport.hpp"
namespace memgraph::io::simulator {
namespace memgraph::io {
using memgraph::io::Duration;
using memgraph::io::Message;
using memgraph::io::Time;
struct PromiseKey {
Address requester_address;
uint64_t request_id;
// TODO(tyler) possibly remove replier_address from promise key
// once we want to support DSR.
Address replier_address;
public:
friend bool operator<(const PromiseKey &lhs, const PromiseKey &rhs) {
if (lhs.requester_address != rhs.requester_address) {
return lhs.requester_address < rhs.requester_address;
}
if (lhs.request_id != rhs.request_id) {
return lhs.request_id < rhs.request_id;
}
return lhs.replier_address < rhs.replier_address;
}
};
struct OpaqueMessage {
Address to_address;
Address from_address;
uint64_t request_id;
std::any message;
@ -65,6 +87,7 @@ struct OpaqueMessage {
return RequestEnvelope<Ms...>{
.message = std::move(*m_opt),
.request_id = request_id,
.to_address = to_address,
.from_address = from_address,
};
}
@ -99,6 +122,7 @@ class OpaquePromiseTrait : public OpaquePromiseTraitBase {
T message = std::any_cast<T>(std::move(opaque_message.message));
auto response_envelope = ResponseEnvelope<T>{.message = std::move(message),
.request_id = opaque_message.request_id,
.to_address = opaque_message.to_address,
.from_address = opaque_message.from_address};
auto promise = static_cast<ResponsePromise<T> *>(ptr);
auto unique_promise = std::unique_ptr<ResponsePromise<T>>(promise);
@ -174,4 +198,9 @@ class OpaquePromise {
}
};
} // namespace memgraph::io::simulator
struct DeadlineAndOpaquePromise {
Time deadline;
OpaquePromise promise;
};
} // namespace memgraph::io

View File

@ -24,18 +24,35 @@
#include "io/simulator/simulator.hpp"
#include "io/transport.hpp"
#include "utils/concepts.hpp"
namespace memgraph::io::rsm {
using memgraph::io::Address;
using memgraph::io::Duration;
using memgraph::io::Io;
using memgraph::io::ResponseFuture;
using memgraph::io::ResponseResult;
using memgraph::io::Time;
/// Timeout and replication tunables
using namespace std::chrono_literals;
static constexpr auto kMinimumElectionTimeout = 100ms;
static constexpr auto kMaximumElectionTimeout = 200ms;
static constexpr auto kMinimumBroadcastTimeout = 40ms;
static constexpr auto kMaximumBroadcastTimeout = 60ms;
static constexpr auto kMinimumCronInterval = 1ms;
static constexpr auto kMaximumCronInterval = 2ms;
static constexpr auto kMinimumReceiveTimeout = 40ms;
static constexpr auto kMaximumReceiveTimeout = 60ms;
static_assert(kMinimumElectionTimeout > kMaximumBroadcastTimeout,
"The broadcast timeout has to be smaller than the election timeout!");
static_assert(kMinimumElectionTimeout < kMaximumElectionTimeout,
"The minimum election timeout has to be smaller than the maximum election timeout!");
static_assert(kMinimumBroadcastTimeout < kMaximumBroadcastTimeout,
"The minimum broadcast timeout has to be smaller than the maximum broadcast timeout!");
static_assert(kMinimumCronInterval < kMaximumCronInterval,
"The minimum cron interval has to be smaller than the maximum cron interval!");
static_assert(kMinimumReceiveTimeout < kMaximumReceiveTimeout,
"The minimum receive timeout has to be smaller than the maximum receive timeout!");
static constexpr size_t kMaximumAppendBatchSize = 1024;
using Term = uint64_t;
using LogIndex = uint64_t;
using LogSize = uint64_t;
using RequestId = uint64_t;
template <typename WriteOperation>
@ -57,6 +74,7 @@ struct WriteResponse {
bool success;
WriteReturn write_return;
std::optional<Address> retry_leader;
LogIndex raft_index;
};
template <typename ReadOperation>
@ -82,10 +100,10 @@ struct ReadResponse {
template <typename WriteRequest>
struct AppendRequest {
Term term = 0;
LogIndex last_log_index;
LogIndex batch_start_log_index;
Term last_log_term;
std::vector<std::pair<Term, WriteRequest>> entries;
LogIndex leader_commit;
LogSize leader_commit;
};
struct AppendResponse {
@ -96,18 +114,18 @@ struct AppendResponse {
// the leader the offset that we are interested in
// to send log offsets from for us. This will only
// be useful at the beginning of a leader's term.
LogIndex last_log_index;
LogSize log_size;
};
struct VoteRequest {
Term term = 0;
LogIndex last_log_index;
LogSize log_size;
Term last_log_term;
};
struct VoteResponse {
Term term = 0;
LogIndex committed_log_size;
LogSize committed_log_size;
bool vote_granted = false;
};
@ -115,13 +133,13 @@ template <typename WriteRequest>
struct CommonState {
Term term = 0;
std::vector<std::pair<Term, WriteRequest>> log;
LogIndex committed_log_size = 0;
LogIndex applied_size = 0;
LogSize committed_log_size = 0;
LogSize applied_size = 0;
};
struct FollowerTracker {
LogIndex next_index = 0;
LogIndex confirmed_contiguous_index = 0;
LogSize confirmed_log_size = 0;
};
struct PendingClientRequest {
@ -135,26 +153,32 @@ struct Leader {
std::unordered_map<LogIndex, PendingClientRequest> pending_client_requests;
Time last_broadcast = Time::min();
std::string ToString() { return "\tLeader \t"; }
std::string static ToString() { return "\tLeader \t"; }
};
struct Candidate {
std::map<Address, LogIndex> successful_votes;
std::map<Address, LogSize> successful_votes;
Time election_began = Time::min();
std::set<Address> outstanding_votes;
std::string ToString() { return "\tCandidate\t"; }
std::string static ToString() { return "\tCandidate\t"; }
};
struct Follower {
Time last_received_append_entries_timestamp;
Address leader_address;
std::string ToString() { return "\tFollower \t"; }
std::string static ToString() { return "\tFollower \t"; }
};
using Role = std::variant<Candidate, Leader, Follower>;
template <typename Role>
concept AllRoles = memgraph::utils::SameAsAnyOf<Role, Leader, Follower, Candidate>;
template <typename Role>
concept LeaderOrFollower = memgraph::utils::SameAsAnyOf<Role, Leader, Follower>;
/*
all ReplicatedState classes should have an Apply method
@ -207,20 +231,22 @@ class Raft {
public:
Raft(Io<IoImpl> &&io, std::vector<Address> peers, ReplicatedState &&replicated_state)
: io_(std::move(io)), peers_(peers), replicated_state_(std::move(replicated_state)) {}
: io_(std::forward<Io<IoImpl>>(io)),
peers_(peers),
replicated_state_(std::forward<ReplicatedState>(replicated_state)) {}
void Run() {
Time last_cron = io_.Now();
while (!io_.ShouldShutDown()) {
const auto now = io_.Now();
const Duration random_cron_interval = RandomTimeout(1000, 2000);
const Duration random_cron_interval = RandomTimeout(kMinimumCronInterval, kMaximumCronInterval);
if (now - last_cron > random_cron_interval) {
Cron();
last_cron = now;
}
Duration receive_timeout = RandomTimeout(10000, 50000);
const Duration receive_timeout = RandomTimeout(kMinimumReceiveTimeout, kMaximumReceiveTimeout);
auto request_result =
io_.template ReceiveWithTimeout<ReadRequest<ReadOperation>, AppendRequest<WriteOperation>, AppendResponse,
@ -244,58 +270,64 @@ class Raft {
// "Safely replicated" is defined as being known to be present
// on at least a majority of all peers (inclusive of the Leader).
void BumpCommitIndexAndReplyToClients(Leader &leader) {
auto indices = std::vector<LogIndex>{};
auto confirmed_log_sizes = std::vector<LogSize>{};
// We include our own log size in the calculation of the log
// index that is present on at least a majority of all peers.
indices.push_back(state_.log.size());
// confirmed log size that is present on at least a majority of all peers.
confirmed_log_sizes.push_back(state_.log.size());
for (const auto &[addr, f] : leader.followers) {
indices.push_back(f.confirmed_contiguous_index);
Log("at port ", addr.last_known_port, " has confirmed contiguous index of: ", f.confirmed_contiguous_index);
confirmed_log_sizes.push_back(f.confirmed_log_size);
Log("Follower at port ", addr.last_known_port, " has confirmed log size of: ", f.confirmed_log_size);
}
// reverse sort from highest to lowest (using std::ranges::greater)
std::ranges::sort(indices, std::ranges::greater());
std::ranges::sort(confirmed_log_sizes, std::ranges::greater());
// This is a particularly correctness-critical calculation because it
// determines which index we will consider to be the committed index.
// determines the committed log size that will be broadcast in
// the next AppendRequest.
//
// If the following indexes are recorded for clusters of different sizes,
// these are the expected indexes that are considered to have reached
// consensus:
// state | expected value | (indices.size() / 2)
// If the following sizes are recorded for clusters of different numbers of peers,
// these are the expected sizes that are considered to have reached consensus:
//
// state | expected value | (confirmed_log_sizes.size() / 2)
// [1] 1 (1 / 2) => 0
// [2, 1] 1 (2 / 2) => 1
// [3, 2, 1] 2 (3 / 2) => 1
// [4, 3, 2, 1] 2 (4 / 2) => 2
// [5, 4, 3, 2, 1] 3 (5 / 2) => 2
size_t index_present_on_majority = indices.size() / 2;
LogIndex new_committed_log_size = indices[index_present_on_majority];
const size_t majority_index = confirmed_log_sizes.size() / 2;
const LogSize new_committed_log_size = confirmed_log_sizes[majority_index];
// We never go backwards in history.
MG_ASSERT(state_.committed_log_size <= new_committed_log_size);
MG_ASSERT(state_.committed_log_size <= new_committed_log_size,
"as a Leader, we have previously set our committed_log_size to {}, but our Followers have a majority "
"committed_log_size of {}",
state_.committed_log_size, new_committed_log_size);
state_.committed_log_size = new_committed_log_size;
// For each index between the old index and the new one (inclusive),
// For each size between the old size and the new one (inclusive),
// Apply that log's WriteOperation to our replicated_state_,
// and use the specific return value of the ReplicatedState::Apply
// method (WriteResponseValue) to respond to the requester.
for (; state_.applied_size < state_.committed_log_size; state_.applied_size++) {
const LogIndex apply_index = state_.applied_size;
const auto &write_request = state_.log[apply_index].second;
WriteResponseValue write_return = replicated_state_.Apply(write_request);
const WriteResponseValue write_return = replicated_state_.Apply(write_request);
if (leader.pending_client_requests.contains(apply_index)) {
PendingClientRequest client_request = std::move(leader.pending_client_requests.at(apply_index));
const PendingClientRequest client_request = std::move(leader.pending_client_requests.at(apply_index));
leader.pending_client_requests.erase(apply_index);
WriteResponse<WriteResponseValue> resp;
resp.success = true;
resp.write_return = std::move(write_return);
const WriteResponse<WriteResponseValue> resp{
.success = true,
.write_return = std::move(write_return),
.raft_index = apply_index,
};
io_.Send(client_request.address, client_request.request_id, std::move(resp));
leader.pending_client_requests.erase(apply_index);
}
}
@ -306,31 +338,37 @@ class Raft {
// AppendEntries RPCs are initiated by leaders to replicate log entries and to provide a form of heartbeat
void BroadcastAppendEntries(std::map<Address, FollowerTracker> &followers) {
for (auto &[address, follower] : followers) {
const LogIndex index = follower.confirmed_contiguous_index;
const LogIndex next_index = follower.next_index;
const auto missing = state_.log.size() - next_index;
const auto batch_size = std::min(missing, kMaximumAppendBatchSize);
const auto start_index = next_index;
const auto end_index = start_index + batch_size;
// advance follower's next index
follower.next_index += batch_size;
std::vector<std::pair<Term, WriteOperation>> entries;
if (state_.log.size() > index) {
entries.insert(entries.begin(), state_.log.begin() + index, state_.log.end());
}
entries.insert(entries.begin(), state_.log.begin() + start_index, state_.log.begin() + end_index);
const Term previous_term_from_index = PreviousTermFromIndex(index);
const Term previous_term_from_index = PreviousTermFromIndex(start_index);
Log("sending ", entries.size(), " entries to Follower ", address.last_known_port,
" which are above its known index of ", index);
" which are above its next_index of ", next_index);
AppendRequest<WriteOperation> ar{
.term = state_.term,
.last_log_index = index,
.batch_start_log_index = start_index,
.last_log_term = previous_term_from_index,
.entries = entries,
.entries = std::move(entries),
.leader_commit = state_.committed_log_size,
};
// request_id not necessary to set because it's not a Future-backed Request.
static constexpr RequestId request_id = 0;
io_.Send(address, request_id, ar);
io_.Send(address, request_id, std::move(ar));
}
}
@ -339,15 +377,15 @@ class Raft {
Duration RandomTimeout(Duration min, Duration max) {
std::uniform_int_distribution time_distrib(min.count(), max.count());
auto rand_micros = io_.Rand(time_distrib);
const auto rand_micros = io_.Rand(time_distrib);
return std::chrono::microseconds{rand_micros};
return Duration{rand_micros};
}
Duration RandomTimeout(int min_micros, int max_micros) {
std::uniform_int_distribution time_distrib(min_micros, max_micros);
int rand_micros = io_.Rand(time_distrib);
const int rand_micros = io_.Rand(time_distrib);
return std::chrono::microseconds{rand_micros};
}
@ -361,20 +399,16 @@ class Raft {
return term;
}
LogIndex CommittedLogIndex() { return state_.committed_log_size; }
Term CommittedLogTerm() {
MG_ASSERT(state_.log.size() >= state_.committed_log_size);
if (state_.log.empty() || state_.committed_log_size == 0) {
return 0;
}
auto &[term, data] = state_.log.at(state_.committed_log_size - 1);
const auto &[term, data] = state_.log.at(state_.committed_log_size - 1);
return term;
}
LogIndex LastLogIndex() { return state_.log.size(); }
Term LastLogTerm() const {
if (state_.log.empty()) {
return 0;
@ -387,21 +421,19 @@ class Raft {
template <typename... Ts>
void Log(Ts &&...args) {
const Time now = io_.Now();
auto micros = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
const auto micros = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
const Term term = state_.term;
const std::string role_string = std::visit([&](const auto &role) { return role.ToString(); }, role_);
std::ostringstream out;
out << '\t' << (int)micros << "\t" << term << "\t" << io_.GetAddress().last_known_port;
std::string role_string = std::visit([&](auto &&role) { return role.ToString(); }, role_);
out << '\t' << static_cast<int>(micros) << "\t" << term << "\t" << io_.GetAddress().last_known_port;
out << role_string;
(out << ... << args);
spdlog::debug(out.str());
spdlog::info(out.str());
}
/////////////////////////////////////////////////////////////
@ -418,7 +450,7 @@ class Raft {
/// Periodic protocol maintenance.
void Cron() {
// dispatch periodic logic based on our role to a specific Cron method.
std::optional<Role> new_role = std::visit([&](auto &&role) { return Cron(role); }, role_);
std::optional<Role> new_role = std::visit([&](auto &role) { return Cron(role); }, role_);
if (new_role) {
role_ = std::move(new_role).value();
@ -432,17 +464,17 @@ class Raft {
// 3. receiving a quorum of responses to our last batch of Vote (become a Leader)
std::optional<Role> Cron(Candidate &candidate) {
const auto now = io_.Now();
const Duration election_timeout = RandomTimeout(100000, 200000);
const Duration election_timeout = RandomTimeout(kMinimumElectionTimeout, kMaximumElectionTimeout);
const auto election_timeout_us = std::chrono::duration_cast<std::chrono::milliseconds>(election_timeout).count();
if (now - candidate.election_began > election_timeout) {
state_.term++;
Log("becoming Candidate for term ", state_.term, " after leader timeout of ", election_timeout_us,
" elapsed since last election attempt");
"ms elapsed since last election attempt");
const VoteRequest request{
.term = state_.term,
.last_log_index = LastLogIndex(),
.log_size = state_.log.size(),
.last_log_term = LastLogTerm(),
};
@ -470,9 +502,10 @@ class Raft {
std::optional<Role> Cron(Follower &follower) {
const auto now = io_.Now();
const auto time_since_last_append_entries = now - follower.last_received_append_entries_timestamp;
Duration election_timeout = RandomTimeout(100000, 200000);
// randomized follower timeout with a range of 100-150ms.
// randomized follower timeout
const Duration election_timeout = RandomTimeout(kMinimumElectionTimeout, kMaximumElectionTimeout);
if (time_since_last_append_entries > election_timeout) {
// become a Candidate if we haven't heard from the Leader after this timeout
return Candidate{};
@ -484,7 +517,7 @@ class Raft {
// Leaders (re)send AppendRequest to followers.
std::optional<Role> Cron(Leader &leader) {
const Time now = io_.Now();
const Duration broadcast_timeout = RandomTimeout(40000, 60000);
const Duration broadcast_timeout = RandomTimeout(kMinimumBroadcastTimeout, kMaximumBroadcastTimeout);
if (now - leader.last_broadcast > broadcast_timeout) {
BroadcastAppendEntries(leader.followers);
@ -506,16 +539,21 @@ class Raft {
/// message that has been received.
/////////////////////////////////////////////////////////////
void Handle(std::variant<ReadRequest<ReadOperation>, AppendRequest<WriteOperation>, AppendResponse,
WriteRequest<WriteOperation>, VoteRequest, VoteResponse> &&message_variant,
RequestId request_id, Address from_address) {
using ReceiveVariant = std::variant<ReadRequest<ReadOperation>, AppendRequest<WriteOperation>, AppendResponse,
WriteRequest<WriteOperation>, VoteRequest, VoteResponse>;
void Handle(ReceiveVariant &&message_variant, RequestId request_id, Address from_address) {
// dispatch the message to a handler based on our role,
// which can be specified in the Handle first argument,
// or it can be `auto` if it's a handler for several roles
// or messages.
std::optional<Role> new_role =
std::visit([&](auto &&msg, auto &&role) { return Handle(role, std::move(msg), request_id, from_address); },
std::move(message_variant), role_);
std::optional<Role> new_role = std::visit(
[&](auto &&msg, auto &role) {
// We use decltype(msg)(msg) in place of std::forward<?> because msg's type
// is anonymous from our point of view.
return Handle(role, decltype(msg)(msg), request_id, from_address);
},
std::forward<ReceiveVariant>(message_variant), role_);
// TODO(tyler) (M3) maybe replace std::visit with get_if for explicit prioritized matching, [[likely]] etc...
if (new_role) {
@ -524,13 +562,13 @@ class Raft {
}
// all roles can receive Vote and possibly become a follower
template <typename AllRoles>
std::optional<Role> Handle(AllRoles &, VoteRequest &&req, RequestId request_id, Address from_address) {
Log("received Vote from ", from_address.last_known_port, " with term ", req.term);
template <AllRoles ALL>
std::optional<Role> Handle(ALL & /* variable */, VoteRequest &&req, RequestId request_id, Address from_address) {
Log("received VoteRequest from ", from_address.last_known_port, " with term ", req.term);
const bool last_log_term_dominates = req.last_log_term >= LastLogTerm();
const bool term_dominates = req.term > state_.term;
const bool last_log_index_dominates = req.last_log_index >= LastLogIndex();
const bool new_leader = last_log_term_dominates && term_dominates && last_log_index_dominates;
const bool log_size_dominates = req.log_size >= state_.log.size();
const bool new_leader = last_log_term_dominates && term_dominates && log_size_dominates;
if (new_leader) {
MG_ASSERT(req.term > state_.term);
@ -552,7 +590,9 @@ class Raft {
.last_received_append_entries_timestamp = io_.Now(),
.leader_address = from_address,
};
} else if (term_dominates) {
}
if (term_dominates) {
Log("received a vote from an inferior candidate. Becoming Candidate");
state_.term = std::max(state_.term, req.term) + 1;
return Candidate{};
@ -561,7 +601,7 @@ class Raft {
return std::nullopt;
}
std::optional<Role> Handle(Candidate &candidate, VoteResponse &&res, RequestId, Address from_address) {
std::optional<Role> Handle(Candidate &candidate, VoteResponse &&res, RequestId /* variable */, Address from_address) {
Log("received VoteResponse");
if (!res.vote_granted || res.term != state_.term) {
@ -585,14 +625,14 @@ class Raft {
for (const auto &[address, committed_log_size] : candidate.successful_votes) {
FollowerTracker follower{
.next_index = committed_log_size,
.confirmed_contiguous_index = committed_log_size,
.confirmed_log_size = committed_log_size,
};
followers.insert({address, follower});
}
for (const auto &address : candidate.outstanding_votes) {
FollowerTracker follower{
.next_index = state_.log.size(),
.confirmed_contiguous_index = 0,
.confirmed_log_size = 0,
};
followers.insert({address, follower});
}
@ -610,27 +650,30 @@ class Raft {
return std::nullopt;
}
template <typename AllRoles>
std::optional<Role> Handle(AllRoles &, VoteResponse &&, RequestId, Address) {
template <LeaderOrFollower LOF>
std::optional<Role> Handle(LOF & /* variable */, VoteResponse && /* variable */, RequestId /* variable */,
Address /* variable */) {
Log("non-Candidate received VoteResponse");
return std::nullopt;
}
template <typename AllRoles>
std::optional<Role> Handle(AllRoles &role, AppendRequest<WriteOperation> &&req, RequestId request_id,
template <AllRoles ALL>
std::optional<Role> Handle(ALL &role, AppendRequest<WriteOperation> &&req, RequestId request_id,
Address from_address) {
// log size starts out as state_.committed_log_size and only if everything is successful do we
// switch it to the log length.
AppendResponse res{
.success = false,
.term = state_.term,
.last_log_term = CommittedLogTerm(),
.last_log_index = CommittedLogIndex(),
.log_size = state_.log.size(),
};
if constexpr (std::is_same<AllRoles, Leader>()) {
if constexpr (std::is_same<ALL, Leader>()) {
MG_ASSERT(req.term != state_.term, "Multiple leaders are acting under the term ", req.term);
}
const bool is_candidate = std::is_same<AllRoles, Candidate>();
const bool is_candidate = std::is_same<ALL, Candidate>();
const bool is_failed_competitor = is_candidate && req.term == state_.term;
const Time now = io_.Now();
@ -651,7 +694,9 @@ class Raft {
.last_received_append_entries_timestamp = now,
.leader_address = from_address,
};
} else if (req.term < state_.term) {
}
if (req.term < state_.term) {
// nack this request from an old leader
io_.Send(from_address, request_id, res);
@ -659,7 +704,7 @@ class Raft {
}
// at this point, we're dealing with our own leader
if constexpr (std::is_same<AllRoles, Follower>()) {
if constexpr (std::is_same<ALL, Follower>()) {
// small specialization for when we're already a Follower
MG_ASSERT(role.leader_address == from_address, "Multiple Leaders are acting under the same term number!");
role.last_received_append_entries_timestamp = now;
@ -668,33 +713,38 @@ class Raft {
MG_ASSERT(false, "Somehow entered Follower-specific logic as a non-Follower");
}
res.last_log_term = LastLogTerm();
res.last_log_index = LastLogIndex();
Log("returning last_log_index of ", res.last_log_index);
// Handle steady-state conditions.
if (req.last_log_index != LastLogIndex()) {
Log("req.last_log_index is above our last applied log index");
if (req.batch_start_log_index != state_.log.size()) {
Log("req.batch_start_log_index of ", req.batch_start_log_index, " does not match our log size of ",
state_.log.size());
} else if (req.last_log_term != LastLogTerm()) {
Log("req.last_log_term differs from our leader term at that slot, expected: ", LastLogTerm(), " but got ",
req.last_log_term);
} else {
// happy path - Apply log
Log("applying batch of entries to log of size ", req.entries.size());
Log("applying batch of ", req.entries.size(), " entries to our log starting at index ",
req.batch_start_log_index);
MG_ASSERT(req.last_log_index >= state_.committed_log_size,
const auto resize_length = req.batch_start_log_index;
MG_ASSERT(resize_length >= state_.committed_log_size,
"Applied history from Leader which goes back in time from our commit_index");
// possibly chop-off stuff that was replaced by
// things with different terms (we got data that
// hasn't reached consensus yet, which is normal)
state_.log.resize(req.last_log_index);
state_.log.resize(resize_length);
state_.log.insert(state_.log.end(), req.entries.begin(), req.entries.end());
if (req.entries.size() > 0) {
auto &[first_term, op] = req.entries.at(0);
MG_ASSERT(LastLogTerm() <= first_term);
}
state_.log.insert(state_.log.end(), std::make_move_iterator(req.entries.begin()),
std::make_move_iterator(req.entries.end()));
MG_ASSERT(req.leader_commit >= state_.committed_log_size);
state_.committed_log_size = std::min(req.leader_commit, LastLogIndex());
state_.committed_log_size = std::min(req.leader_commit, state_.log.size());
for (; state_.applied_size < state_.committed_log_size; state_.applied_size++) {
const auto &write_request = state_.log[state_.applied_size].second;
@ -704,35 +754,48 @@ class Raft {
res.success = true;
}
res.last_log_term = LastLogTerm();
res.log_size = state_.log.size();
Log("returning log_size of ", res.log_size);
io_.Send(from_address, request_id, res);
return std::nullopt;
}
std::optional<Role> Handle(Leader &leader, AppendResponse &&res, RequestId, Address from_address) {
std::optional<Role> Handle(Leader &leader, AppendResponse &&res, RequestId /* variable */, Address from_address) {
if (res.term != state_.term) {
} else if (!leader.followers.contains(from_address)) {
Log("received AppendResponse from unknown Follower");
MG_ASSERT(false, "received AppendResponse from unknown Follower");
} else {
if (res.success) {
Log("got successful AppendResponse from ", from_address.last_known_port, " with last_log_index of ",
res.last_log_index);
} else {
Log("got unsuccessful AppendResponse from ", from_address.last_known_port, " with last_log_index of ",
res.last_log_index);
}
FollowerTracker &follower = leader.followers.at(from_address);
follower.next_index = std::max(follower.next_index, res.last_log_index);
follower.confirmed_contiguous_index = std::max(follower.confirmed_contiguous_index, res.last_log_index);
BumpCommitIndexAndReplyToClients(leader);
Log("received AppendResponse related to a previous term when we (presumably) were the leader");
return std::nullopt;
}
// TODO(tyler) when we have dynamic membership, this assert will become incorrect, but we should
// keep it in-place until then because it has bug finding value.
MG_ASSERT(leader.followers.contains(from_address), "received AppendResponse from unknown Follower");
// at this point, we know the term matches and we know this Follower
FollowerTracker &follower = leader.followers.at(from_address);
if (res.success) {
Log("got successful AppendResponse from ", from_address.last_known_port, " with log_size of ", res.log_size);
follower.next_index = std::max(follower.next_index, res.log_size);
} else {
Log("got unsuccessful AppendResponse from ", from_address.last_known_port, " with log_size of ", res.log_size);
follower.next_index = res.log_size;
}
follower.confirmed_log_size = std::max(follower.confirmed_log_size, res.log_size);
BumpCommitIndexAndReplyToClients(leader);
return std::nullopt;
}
template <typename AllRoles>
std::optional<Role> Handle(AllRoles &, AppendResponse &&, RequestId, Address) {
template <AllRoles ALL>
std::optional<Role> Handle(ALL & /* variable */, AppendResponse && /* variable */, RequestId /* variable */,
Address /* variable */) {
// we used to be the leader, and are getting old delayed responses
return std::nullopt;
}
@ -742,13 +805,14 @@ class Raft {
/////////////////////////////////////////////////////////////
// Leaders are able to immediately respond to the requester (with a ReadResponseValue) applied to the ReplicatedState
std::optional<Role> Handle(Leader &, ReadRequest<ReadOperation> &&req, RequestId request_id, Address from_address) {
std::optional<Role> Handle(Leader & /* variable */, ReadRequest<ReadOperation> &&req, RequestId request_id,
Address from_address) {
Log("handling ReadOperation");
ReadOperation read_operation = req.operation;
ReadResponseValue read_return = replicated_state_.Read(read_operation);
ReadResponse<ReadResponseValue> resp{
const ReadResponse<ReadResponseValue> resp{
.success = true,
.read_return = std::move(read_return),
.retry_leader = std::nullopt,
@ -760,27 +824,29 @@ class Raft {
}
// Candidates should respond with a failure, similar to the Candidate + WriteRequest failure below
std::optional<Role> Handle(Candidate &, ReadRequest<ReadOperation> &&, RequestId request_id, Address from_address) {
std::optional<Role> Handle(Candidate & /* variable */, ReadRequest<ReadOperation> && /* variable */,
RequestId request_id, Address from_address) {
Log("received ReadOperation - not redirecting because no Leader is known");
auto res = ReadResponse<ReadResponseValue>{};
res.success = false;
Cron();
const ReadResponse<ReadResponseValue> res{
.success = false,
};
io_.Send(from_address, request_id, res);
Cron();
return std::nullopt;
}
// Followers should respond with a redirection, similar to the Follower + WriteRequest response below
std::optional<Role> Handle(Follower &follower, ReadRequest<ReadOperation> &&, RequestId request_id,
std::optional<Role> Handle(Follower &follower, ReadRequest<ReadOperation> && /* variable */, RequestId request_id,
Address from_address) {
auto res = ReadResponse<ReadResponseValue>{};
res.success = false;
Log("redirecting client to known Leader with port ", follower.leader_address.last_known_port);
res.retry_leader = follower.leader_address;
const ReadResponse<ReadResponseValue> res{
.success = false,
.retry_leader = follower.leader_address,
};
io_.Send(from_address, request_id, res);
@ -792,29 +858,32 @@ class Raft {
// server. If the clients first choice is not the leader, that
// server will reject the clients request and supply information
// about the most recent leader it has heard from.
std::optional<Role> Handle(Follower &follower, WriteRequest<WriteOperation> &&, RequestId request_id,
std::optional<Role> Handle(Follower &follower, WriteRequest<WriteOperation> && /* variable */, RequestId request_id,
Address from_address) {
auto res = WriteResponse<WriteResponseValue>{};
res.success = false;
Log("redirecting client to known Leader with port ", follower.leader_address.last_known_port);
res.retry_leader = follower.leader_address;
const WriteResponse<WriteResponseValue> res{
.success = false,
.retry_leader = follower.leader_address,
};
io_.Send(from_address, request_id, res);
return std::nullopt;
}
std::optional<Role> Handle(Candidate &, WriteRequest<WriteOperation> &&, RequestId request_id, Address from_address) {
std::optional<Role> Handle(Candidate & /* variable */, WriteRequest<WriteOperation> && /* variable */,
RequestId request_id, Address from_address) {
Log("received WriteRequest - not redirecting because no Leader is known");
auto res = WriteResponse<WriteResponseValue>{};
res.success = false;
Cron();
const WriteResponse<WriteResponseValue> res{
.success = false,
};
io_.Send(from_address, request_id, res);
Cron();
return std::nullopt;
}
@ -824,6 +893,7 @@ class Raft {
Log("handling WriteRequest");
// we are the leader. add item to log and send Append to peers
MG_ASSERT(state_.term >= LastLogTerm());
state_.log.emplace_back(std::pair(state_.term, std::move(req.operation)));
LogIndex log_index = state_.log.size() - 1;

View File

@ -38,7 +38,7 @@ void SimulatorHandle::IncrementServerCountAndWaitForQuiescentState(Address addre
server_addresses_.insert(address);
while (true) {
const size_t blocked_servers = BlockedServers();
const size_t blocked_servers = blocked_on_receive_;
const bool all_servers_blocked = blocked_servers == server_addresses_.size();
@ -50,22 +50,10 @@ void SimulatorHandle::IncrementServerCountAndWaitForQuiescentState(Address addre
}
}
size_t SimulatorHandle::BlockedServers() {
size_t blocked_servers = blocked_on_receive_;
for (auto &[promise_key, opaque_promise] : promises_) {
if (opaque_promise.promise.IsAwaited() && server_addresses_.contains(promise_key.requester_address)) {
blocked_servers++;
}
}
return blocked_servers;
}
bool SimulatorHandle::MaybeTickSimulator() {
std::unique_lock<std::mutex> lock(mu_);
const size_t blocked_servers = BlockedServers();
const size_t blocked_servers = blocked_on_receive_;
if (blocked_servers < server_addresses_.size()) {
// we only need to advance the simulator when all

View File

@ -24,7 +24,7 @@
#include "io/address.hpp"
#include "io/errors.hpp"
#include "io/simulator/message_conversion.hpp"
#include "io/message_conversion.hpp"
#include "io/simulator/simulator_config.hpp"
#include "io/simulator/simulator_stats.hpp"
#include "io/time.hpp"
@ -32,35 +32,6 @@
namespace memgraph::io::simulator {
using memgraph::io::Duration;
using memgraph::io::Time;
struct PromiseKey {
Address requester_address;
uint64_t request_id;
// TODO(tyler) possibly remove replier_address from promise key
// once we want to support DSR.
Address replier_address;
public:
friend bool operator<(const PromiseKey &lhs, const PromiseKey &rhs) {
if (lhs.requester_address != rhs.requester_address) {
return lhs.requester_address < rhs.requester_address;
}
if (lhs.request_id != rhs.request_id) {
return lhs.request_id < rhs.request_id;
}
return lhs.replier_address < rhs.replier_address;
}
};
struct DeadlineAndOpaquePromise {
Time deadline;
OpaquePromise promise;
};
class SimulatorHandle {
mutable std::mutex mu_{};
mutable std::condition_variable cv_;
@ -82,14 +53,6 @@ class SimulatorHandle {
std::mt19937 rng_;
SimulatorConfig config_;
/// Returns the number of servers currently blocked on Receive, plus
/// the servers that are blocked on Futures that were created through
/// SimulatorTransport::Request.
///
/// TODO(tyler) investigate whether avoiding consideration of Futures
/// increases determinism.
size_t BlockedServers();
void TimeoutPromisesPastDeadline() {
const Time now = cluster_wide_time_microseconds_;
@ -122,14 +85,17 @@ class SimulatorHandle {
bool ShouldShutDown() const;
template <Message Request, Message Response>
void SubmitRequest(Address to_address, Address from_address, uint64_t request_id, Request &&request, Duration timeout,
ResponsePromise<Response> &&promise) {
void SubmitRequest(Address to_address, Address from_address, RequestId request_id, Request &&request,
Duration timeout, ResponsePromise<Response> &&promise) {
std::unique_lock<std::mutex> lock(mu_);
const Time deadline = cluster_wide_time_microseconds_ + timeout;
std::any message(request);
OpaqueMessage om{.from_address = from_address, .request_id = request_id, .message = std::move(message)};
OpaqueMessage om{.to_address = to_address,
.from_address = from_address,
.request_id = request_id,
.message = std::move(message)};
in_flight_.emplace_back(std::make_pair(to_address, std::move(om)));
PromiseKey promise_key{.requester_address = from_address, .request_id = request_id, .replier_address = to_address};
@ -182,10 +148,13 @@ class SimulatorHandle {
}
template <Message M>
void Send(Address to_address, Address from_address, uint64_t request_id, M message) {
void Send(Address to_address, Address from_address, RequestId request_id, M message) {
std::unique_lock<std::mutex> lock(mu_);
std::any message_any(std::move(message));
OpaqueMessage om{.from_address = from_address, .request_id = request_id, .message = std::move(message_any)};
OpaqueMessage om{.to_address = to_address,
.from_address = from_address,
.request_id = request_id,
.message = std::move(message_any)};
in_flight_.emplace_back(std::make_pair(std::move(to_address), std::move(om)));
stats_.total_messages++;

View File

@ -32,11 +32,11 @@ class SimulatorTransport {
SimulatorTransport(std::shared_ptr<SimulatorHandle> simulator_handle, Address address, uint64_t seed)
: simulator_handle_(simulator_handle), address_(address), rng_(std::mt19937{seed}) {}
template <Message Request, Message Response>
ResponseFuture<Response> Request(Address address, uint64_t request_id, Request request, Duration timeout) {
template <Message RequestT, Message ResponseT>
ResponseFuture<ResponseT> Request(Address address, uint64_t request_id, RequestT request, Duration timeout) {
std::function<bool()> maybe_tick_simulator = [this] { return simulator_handle_->MaybeTickSimulator(); };
auto [future, promise] =
memgraph::io::FuturePromisePairWithNotifier<ResponseResult<Response>>(maybe_tick_simulator);
memgraph::io::FuturePromisePairWithNotifier<ResponseResult<ResponseT>>(maybe_tick_simulator);
simulator_handle_->SubmitRequest(address, address_, request_id, std::move(request), timeout, std::move(promise));
@ -49,8 +49,8 @@ class SimulatorTransport {
}
template <Message M>
void Send(Address address, uint64_t request_id, M message) {
return simulator_handle_->template Send<M>(address, address_, request_id, message);
void Send(Address to_address, Address from_address, uint64_t request_id, M message) {
return simulator_handle_->template Send<M>(to_address, from_address, request_id, message);
}
Time Now() const { return simulator_handle_->Now(); }

View File

@ -16,6 +16,6 @@
namespace memgraph::io {
using Duration = std::chrono::microseconds;
using Time = std::chrono::time_point<std::chrono::local_t, Duration>;
using Time = std::chrono::time_point<std::chrono::system_clock, Duration>;
} // namespace memgraph::io

View File

@ -32,10 +32,13 @@ using memgraph::utils::BasicResult;
template <typename T>
concept Message = std::same_as<T, std::decay_t<T>>;
using RequestId = uint64_t;
template <Message M>
struct ResponseEnvelope {
M message;
uint64_t request_id;
RequestId request_id;
Address to_address;
Address from_address;
};
@ -51,7 +54,8 @@ using ResponsePromise = memgraph::io::Promise<ResponseResult<M>>;
template <Message... Ms>
struct RequestEnvelope {
std::variant<Ms...> message;
uint64_t request_id;
RequestId request_id;
Address to_address;
Address from_address;
};
@ -62,7 +66,7 @@ template <typename I>
class Io {
I implementation_;
Address address_;
uint64_t request_id_counter_ = 0;
RequestId request_id_counter_ = 0;
Duration default_timeout_ = std::chrono::microseconds{50000};
public:
@ -76,19 +80,19 @@ class Io {
Duration GetDefaultTimeout() { return default_timeout_; }
/// Issue a request with an explicit timeout in microseconds provided. This tends to be used by clients.
template <Message Request, Message Response>
ResponseFuture<Response> RequestWithTimeout(Address address, Request request, Duration timeout) {
const uint64_t request_id = ++request_id_counter_;
return implementation_.template Request<Request, Response>(address, request_id, request, timeout);
template <Message RequestT, Message ResponseT>
ResponseFuture<ResponseT> RequestWithTimeout(Address address, RequestT request, Duration timeout) {
const RequestId request_id = ++request_id_counter_;
return implementation_.template Request<RequestT, ResponseT>(address, request_id, request, timeout);
}
/// Issue a request that times out after the default timeout. This tends
/// to be used by clients.
template <Message Request, Message Response>
ResponseFuture<Response> Request(Address address, Request request) {
const uint64_t request_id = ++request_id_counter_;
template <Message RequestT, Message ResponseT>
ResponseFuture<ResponseT> Request(Address address, RequestT request) {
const RequestId request_id = ++request_id_counter_;
const Duration timeout = default_timeout_;
return implementation_.template Request<Request, Response>(address, request_id, std::move(request), timeout);
return implementation_.template Request<RequestT, ResponseT>(address, request_id, std::move(request), timeout);
}
/// Wait for an explicit number of microseconds for a request of one of the
@ -110,8 +114,9 @@ class Io {
/// responses are not necessarily expected, and for servers to respond to requests.
/// If you need reliable delivery, this must be built on-top. TCP is not enough for most use cases.
template <Message M>
void Send(Address address, uint64_t request_id, M message) {
return implementation_.template Send<M>(address, request_id, std::move(message));
void Send(Address to_address, RequestId request_id, M message) {
Address from_address = address_;
return implementation_.template Send<M>(to_address, from_address, request_id, std::move(message));
}
/// The current system time. This time source should be preferred over any other,

View File

@ -130,7 +130,7 @@ void RunSimulation() {
.scramble_messages = true,
.rng_seed = 0,
.start_time = Time::min() + std::chrono::microseconds{256 * 1024},
.abort_time = Time::min() + std::chrono::microseconds{8 * 1024 * 1024},
.abort_time = Time::min() + std::chrono::microseconds{8 * 1024 * 128},
};
auto simulator = Simulator(config);
@ -163,17 +163,16 @@ void RunSimulation() {
auto srv_thread_3 = std::jthread(RunRaft<SimulatorTransport>, std::move(srv_3));
simulator.IncrementServerCountAndWaitForQuiescentState(srv_addr_3);
spdlog::debug("beginning test after servers have become quiescent");
spdlog::info("beginning test after servers have become quiescent");
std::mt19937 cli_rng_{0};
std::vector<Address> server_addrs{srv_addr_1, srv_addr_2, srv_addr_3};
Address leader = server_addrs[0];
RsmClient<Io<SimulatorTransport>, CasRequest, CasResponse, GetRequest, GetResponse> client(cli_io, leader,
server_addrs);
RsmClient<SimulatorTransport, CasRequest, CasResponse, GetRequest, GetResponse> client(cli_io, leader, server_addrs);
const int key = 0;
std::optional<int> last_known_value;
std::optional<int> last_known_value = 0;
bool success = false;
@ -197,8 +196,7 @@ void RunSimulation() {
bool cas_succeeded = cas_response.cas_success;
spdlog::debug("Client received CasResponse! success: {} last_known_value {}", cas_succeeded,
(int)*last_known_value);
spdlog::info("Client received CasResponse! success: {} last_known_value {}", cas_succeeded, (int)*last_known_value);
if (cas_succeeded) {
last_known_value = i;
@ -222,7 +220,7 @@ void RunSimulation() {
MG_ASSERT(get_response.value == i);
spdlog::debug("client successfully cas'd a value and read it back! value: {}", i);
spdlog::info("client successfully cas'd a value and read it back! value: {}", i);
success = true;
}
@ -233,14 +231,14 @@ void RunSimulation() {
SimulatorStats stats = simulator.Stats();
spdlog::debug("total messages: ", stats.total_messages);
spdlog::debug("dropped messages: ", stats.dropped_messages);
spdlog::debug("timed out requests: ", stats.timed_out_requests);
spdlog::debug("total requests: ", stats.total_requests);
spdlog::debug("total responses: ", stats.total_responses);
spdlog::debug("simulator ticks: ", stats.simulator_ticks);
spdlog::info("total messages: {}", stats.total_messages);
spdlog::info("dropped messages: {}", stats.dropped_messages);
spdlog::info("timed out requests: {}", stats.timed_out_requests);
spdlog::info("total requests: {}", stats.total_requests);
spdlog::info("total responses: {}", stats.total_responses);
spdlog::info("simulator ticks: {}", stats.simulator_ticks);
spdlog::debug("========================== SUCCESS :) ==========================");
spdlog::info("========================== SUCCESS :) ==========================");
/*
this is implicit in jthread's dtor
@ -254,12 +252,12 @@ int main() {
int n_tests = 50;
for (int i = 0; i < n_tests; i++) {
spdlog::debug("========================== NEW SIMULATION {} ==========================", i);
spdlog::debug("\tTime\tTerm\tPort\tRole\t\tMessage\n");
spdlog::info("========================== NEW SIMULATION {} ==========================", i);
spdlog::info("\tTime\t\tTerm\tPort\tRole\t\tMessage\n");
RunSimulation();
}
spdlog::debug("passed {} tests!", n_tests);
spdlog::info("passed {} tests!", n_tests);
return 0;
}

View File

@ -394,3 +394,7 @@ add_dependencies(memgraph__unit test_lcp)
# Test future
add_unit_test(future.cpp)
target_link_libraries(${test_prefix}future mg-io)
# Test Local Transport
add_unit_test(local_transport.cpp)
target_link_libraries(${test_prefix}local_transport mg-io)

View File

@ -0,0 +1,86 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <chrono>
#include <thread>
#include <gtest/gtest.h>
#include "io/local_transport/local_system.hpp"
#include "io/local_transport/local_transport.hpp"
#include "io/transport.hpp"
namespace memgraph::io::tests {
using memgraph::io::local_transport::LocalSystem;
using memgraph::io::local_transport::LocalTransport;
struct CounterRequest {
uint64_t proposal;
};
struct CounterResponse {
uint64_t highest_seen;
};
void RunServer(Io<LocalTransport> io) {
uint64_t highest_seen = 0;
while (!io.ShouldShutDown()) {
spdlog::info("[SERVER] Is receiving...");
auto request_result = io.Receive<CounterRequest>();
if (request_result.HasError()) {
spdlog::info("[SERVER] timed out, continue");
continue;
}
auto request_envelope = request_result.GetValue();
ASSERT_TRUE(std::holds_alternative<CounterRequest>(request_envelope.message));
auto req = std::get<CounterRequest>(request_envelope.message);
highest_seen = std::max(highest_seen, req.proposal);
auto srv_res = CounterResponse{highest_seen};
io.Send(request_envelope.from_address, request_envelope.request_id, srv_res);
}
}
TEST(LocalTransport, BasicRequest) {
LocalSystem local_system;
// rely on uuid to be unique on default Address
auto cli_addr = Address::UniqueLocalAddress();
auto srv_addr = Address::UniqueLocalAddress();
Io<LocalTransport> cli_io = local_system.Register(cli_addr);
Io<LocalTransport> srv_io = local_system.Register(srv_addr);
auto srv_thread = std::jthread(RunServer, std::move(srv_io));
for (int i = 1; i < 3; ++i) {
// send request
CounterRequest cli_req;
auto value = 1; // i;
cli_req.proposal = value;
spdlog::info("[CLIENT] sending request");
auto res_f = cli_io.Request<CounterRequest, CounterResponse>(srv_addr, cli_req);
spdlog::info("[CLIENT] waiting on future");
auto res_rez = std::move(res_f).Wait();
spdlog::info("[CLIENT] future returned");
MG_ASSERT(!res_rez.HasError());
spdlog::info("[CLIENT] Got a valid response");
auto env = res_rez.GetValue();
MG_ASSERT(env.message.highest_seen == value);
}
local_system.ShutDown();
}
} // namespace memgraph::io::tests