Improve simulator determinism

This commit is contained in:
Tyler Neely 2022-11-17 17:36:46 +00:00
parent 5c0e41ed44
commit 80d6776210
7 changed files with 122 additions and 37 deletions

View File

@ -64,7 +64,6 @@ class Shared {
waiting_ = true;
while (!item_) {
bool simulator_progressed = false;
if (simulator_notifier_) [[unlikely]] {
// We can't hold our own lock while notifying
// the simulator because notifying the simulator
@ -77,7 +76,7 @@ class Shared {
// so we have to get out of its way to avoid
// a cyclical deadlock.
lock.unlock();
simulator_progressed = std::invoke(simulator_notifier_);
std::invoke(simulator_notifier_);
lock.lock();
if (item_) {
// item may have been filled while we
@ -85,8 +84,7 @@ class Shared {
// the simulator of our waiting_ status.
break;
}
}
if (!simulator_progressed) [[likely]] {
} else {
cv_.wait(lock);
}
MG_ASSERT(!consumed_, "Future consumed twice!");

View File

@ -39,6 +39,8 @@ struct LatencyHistogramSummary {
Duration p100;
Duration sum;
friend bool operator==(const LatencyHistogramSummary &lhs, const LatencyHistogramSummary &rhs) = default;
friend std::ostream &operator<<(std::ostream &in, const LatencyHistogramSummary &histo) {
in << "{ \"count\": " << histo.count;
in << ", \"p0\": " << histo.p0.count();
@ -80,6 +82,8 @@ struct LatencyHistogramSummaries {
return output;
}
friend bool operator==(const LatencyHistogramSummaries &lhs, const LatencyHistogramSummaries &rhs) = default;
friend std::ostream &operator<<(std::ostream &in, const LatencyHistogramSummaries &histo) {
using memgraph::utils::print_helpers::operator<<;
in << histo.latencies;

View File

@ -46,9 +46,13 @@ void SimulatorHandle::IncrementServerCountAndWaitForQuiescentState(Address addre
const bool all_servers_blocked = blocked_servers == server_addresses_.size();
if (all_servers_blocked) {
spdlog::info("quiescent state detected - {} out of {} servers now blocked on receive", blocked_servers,
server_addresses_.size());
return;
}
spdlog::info("not returning from quiescent because we see {} blocked out of {}", blocked_servers,
server_addresses_.size());
cv_.wait(lock);
}
}
@ -67,9 +71,14 @@ bool SimulatorHandle::MaybeTickSimulator() {
stats_.simulator_ticks++;
cv_.notify_all();
bool fired_cv = false;
bool timed_anything_out = TimeoutPromisesPastDeadline();
TimeoutPromisesPastDeadline();
if (timed_anything_out) {
fired_cv = true;
blocked_on_receive_.clear();
cv_.notify_all();
}
if (in_flight_.empty()) {
// return early here because there are no messages to schedule
@ -79,9 +88,15 @@ bool SimulatorHandle::MaybeTickSimulator() {
const Duration clock_advance = std::chrono::microseconds{time_distrib_(rng_)};
cluster_wide_time_microseconds_ += clock_advance;
if (!fired_cv) {
cv_.notify_all();
blocked_on_receive_.clear();
fired_cv = true;
}
if (cluster_wide_time_microseconds_ >= config_.abort_time) {
if (should_shut_down_) {
return false;
return true;
}
spdlog::error(
"Cluster has executed beyond its configured abort_time, and something may be failing to make progress "
@ -120,22 +135,32 @@ bool SimulatorHandle::MaybeTickSimulator() {
if (should_drop || normal_timeout) {
stats_.timed_out_requests++;
dop.promise.TimeOut();
spdlog::info("timing out request");
} else {
stats_.total_responses++;
Duration response_latency = cluster_wide_time_microseconds_ - dop.requested_at;
auto type_info = opaque_message.type_info;
dop.promise.Fill(std::move(opaque_message), response_latency);
histograms_.Measure(type_info, response_latency);
spdlog::info("replying to request");
}
} else if (should_drop) {
// don't add it anywhere, let it drop
spdlog::info("silently dropping request");
} else {
// add to can_receive_ if not
spdlog::info("adding message to can_receive_");
const auto &[om_vec, inserted] =
can_receive_.try_emplace(to_address.ToPartialAddress(), std::vector<OpaqueMessage>());
om_vec->second.emplace_back(std::move(opaque_message));
}
if (!fired_cv) {
cv_.notify_all();
blocked_on_receive_.clear();
fired_cv = true;
}
return true;
}

View File

@ -58,7 +58,8 @@ class SimulatorHandle {
MessageHistogramCollector histograms_;
RequestId request_id_counter_{0};
void TimeoutPromisesPastDeadline() {
bool TimeoutPromisesPastDeadline() {
bool timed_anything_out = false;
const Time now = cluster_wide_time_microseconds_;
for (auto it = promises_.begin(); it != promises_.end();) {
auto &[promise_key, dop] = *it;
@ -68,10 +69,12 @@ class SimulatorHandle {
it = promises_.erase(it);
stats_.timed_out_requests++;
timed_anything_out = true;
} else {
++it;
}
}
return timed_anything_out;
}
public:
@ -103,6 +106,7 @@ class SimulatorHandle {
template <Message Request, Message Response>
ResponseFuture<Response> SubmitRequest(Address to_address, Address from_address, Request &&request, Duration timeout,
std::function<bool()> &&maybe_tick_simulator) {
spdlog::info("submitting request to {}", to_address.last_known_port);
auto type_info = TypeInfoFor(request);
auto [future, promise] = memgraph::io::FuturePromisePairWithNotifier<ResponseResult<Response>>(
@ -146,8 +150,6 @@ class SimulatorHandle {
requires(sizeof...(Ms) > 0) RequestResult<Ms...> Receive(const Address &receiver, Duration timeout) {
std::unique_lock<std::mutex> lock(mu_);
blocked_on_receive_.emplace(receiver);
const Time deadline = cluster_wide_time_microseconds_ + timeout;
auto partial_address = receiver.ToPartialAddress();
@ -164,38 +166,46 @@ class SimulatorHandle {
auto m_opt = std::move(message).Take<Ms...>();
MG_ASSERT(m_opt.has_value(), "Wrong message type received compared to the expected type");
blocked_on_receive_.erase(receiver);
return std::move(m_opt).value();
}
}
lock.unlock();
spdlog::info("server calling MaybeTickSimulator");
bool made_progress = MaybeTickSimulator();
spdlog::info("server returned from MaybeTickSimulator");
lock.lock();
if (!should_shut_down_ && !made_progress) {
blocked_on_receive_.emplace(receiver);
cv_.notify_all();
spdlog::info("blocking receiver {}", receiver.ToPartialAddress().port);
cv_.wait(lock);
}
}
blocked_on_receive_.erase(receiver);
return TimedOut{};
}
template <Message M>
void Send(Address to_address, Address from_address, RequestId request_id, M message) {
spdlog::info("sending message from {} to {}", from_address.last_known_port, to_address.last_known_port);
auto type_info = TypeInfoFor(message);
std::unique_lock<std::mutex> lock(mu_);
std::any message_any(std::move(message));
OpaqueMessage om{.to_address = to_address,
.from_address = from_address,
.request_id = request_id,
.message = std::move(message_any),
.type_info = type_info};
in_flight_.emplace_back(std::make_pair(std::move(to_address), std::move(om)));
{
std::unique_lock<std::mutex> lock(mu_);
std::any message_any(std::move(message));
OpaqueMessage om{.to_address = to_address,
.from_address = from_address,
.request_id = request_id,
.message = std::move(message_any),
.type_info = type_info};
in_flight_.emplace_back(std::make_pair(std::move(to_address), std::move(om)));
stats_.total_messages++;
stats_.total_messages++;
} // lock dropped before cv notification
spdlog::info("sender calling MaybeTickSimulator");
MaybeTickSimulator();
spdlog::info("sender returned from MaybeTickSimulator");
cv_.notify_all();
}

View File

@ -21,5 +21,19 @@ struct SimulatorStats {
uint64_t total_requests = 0;
uint64_t total_responses = 0;
uint64_t simulator_ticks = 0;
friend bool operator==(const SimulatorStats &lhs, const SimulatorStats &rhs) = default;
friend std::ostream &operator<<(std::ostream &in, const SimulatorStats &stats) {
std::string formated = fmt::format(
"SimulatorStats {{ total_messages: {}, dropped_messages: {}, timed_out_requests: {}, total_requests: {}, "
"total_responses: {}, simulator_ticks: {} }}",
stats.total_messages, stats.dropped_messages, stats.timed_out_requests, stats.total_requests,
stats.total_responses, stats.simulator_ticks);
in << formated;
return in;
}
};
}; // namespace memgraph::io::simulator

View File

@ -34,7 +34,12 @@ class SimulatorTransport {
template <Message RequestT, Message ResponseT>
ResponseFuture<ResponseT> Request(Address to_address, Address from_address, RequestT request, Duration timeout) {
std::function<bool()> maybe_tick_simulator = [this] { return simulator_handle_->MaybeTickSimulator(); };
std::function<bool()> maybe_tick_simulator = [this] {
spdlog::info("client calling MaybeTickSimulator");
bool ret = simulator_handle_->MaybeTickSimulator();
spdlog::info("client returned from MaybeTickSimulator");
return ret;
};
return simulator_handle_->template SubmitRequest<RequestT, ResponseT>(to_address, from_address, std::move(request),
timeout, std::move(maybe_tick_simulator));

View File

@ -9,17 +9,25 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <memory>
#include <thread>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <spdlog/cfg/env.h>
#include "io/message_histogram_collector.hpp"
#include "io/simulator/simulator.hpp"
#include "utils/print_helpers.hpp"
using memgraph::io::Address;
using memgraph::io::Io;
using memgraph::io::LatencyHistogramSummaries;
using memgraph::io::ResponseFuture;
using memgraph::io::ResponseResult;
using memgraph::io::simulator::Simulator;
using memgraph::io::simulator::SimulatorConfig;
using memgraph::io::simulator::SimulatorStats;
using memgraph::io::simulator::SimulatorTransport;
struct CounterRequest {
@ -40,6 +48,7 @@ void run_server(Io<SimulatorTransport> io) {
std::cout << "[SERVER] Error, continue" << std::endl;
continue;
}
std::cout << "[SERVER] Got message" << std::endl;
auto request_envelope = request_result.GetValue();
auto req = std::get<CounterRequest>(request_envelope.message);
@ -50,13 +59,7 @@ void run_server(Io<SimulatorTransport> io) {
}
}
int main() {
auto config = SimulatorConfig{
.drop_percent = 0,
.perform_timeouts = true,
.scramble_messages = true,
.rng_seed = 0,
};
std::pair<SimulatorStats, LatencyHistogramSummaries> RunWorkload(SimulatorConfig &config) {
auto simulator = Simulator(config);
auto cli_addr = Address::TestAddress(1);
@ -72,21 +75,47 @@ int main() {
// send request
CounterRequest cli_req;
cli_req.proposal = i;
spdlog::info("[CLIENT] calling Request");
auto res_f = cli_io.Request<CounterRequest, CounterResponse>(srv_addr, cli_req);
spdlog::info("[CLIENT] calling Wait");
auto res_rez = std::move(res_f).Wait();
spdlog::info("[CLIENT] Wait returned");
if (!res_rez.HasError()) {
std::cout << "[CLIENT] Got a valid response" << std::endl;
spdlog::info("[CLIENT] Got a valid response");
auto env = res_rez.GetValue();
MG_ASSERT(env.message.highest_seen == i);
std::cout << "response latency: " << env.response_latency.count() << " microseconds" << std::endl;
spdlog::info("response latency: {} microseconds", env.response_latency.count());
} else {
std::cout << "[CLIENT] Got an error" << std::endl;
spdlog::info("[CLIENT] Got an error");
}
}
using memgraph::utils::print_helpers::operator<<;
std::cout << "response latencies: " << cli_io.ResponseLatencies() << std::endl;
simulator.ShutDown();
return std::make_pair(simulator.Stats(), cli_io.ResponseLatencies());
}
int main() {
spdlog::cfg::load_env_levels();
auto config = SimulatorConfig{
.drop_percent = 0,
.perform_timeouts = true,
.scramble_messages = true,
.rng_seed = 0,
};
auto [sim_stats_1, latency_stats_1] = RunWorkload(config);
auto [sim_stats_2, latency_stats_2] = RunWorkload(config);
if (sim_stats_1 != sim_stats_2 || latency_stats_1 != latency_stats_2) {
spdlog::info("simulator stats diverged across runs");
spdlog::info("run 1 simulator stats: {}", sim_stats_1);
spdlog::info("run 2 simulator stats: {}", sim_stats_2);
spdlog::info("run 1 latency:\n{}", latency_stats_1.SummaryTable());
spdlog::info("run 2 latency:\n{}", latency_stats_2.SummaryTable());
std::terminate();
}
return 0;
}