From 80d677621037fe56cb2b40bf56060601584750ad Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Thu, 17 Nov 2022 17:36:46 +0000 Subject: [PATCH 01/26] Improve simulator determinism --- src/io/future.hpp | 6 +-- src/io/message_histogram_collector.hpp | 4 ++ src/io/simulator/simulator_handle.cpp | 31 +++++++++++-- src/io/simulator/simulator_handle.hpp | 42 +++++++++++------- src/io/simulator/simulator_stats.hpp | 14 ++++++ src/io/simulator/simulator_transport.hpp | 7 ++- tests/simulation/basic_request.cpp | 55 ++++++++++++++++++------ 7 files changed, 122 insertions(+), 37 deletions(-) diff --git a/src/io/future.hpp b/src/io/future.hpp index bb2ef548c..98437b496 100644 --- a/src/io/future.hpp +++ b/src/io/future.hpp @@ -64,7 +64,6 @@ class Shared { waiting_ = true; while (!item_) { - bool simulator_progressed = false; if (simulator_notifier_) [[unlikely]] { // We can't hold our own lock while notifying // the simulator because notifying the simulator @@ -77,7 +76,7 @@ class Shared { // so we have to get out of its way to avoid // a cyclical deadlock. lock.unlock(); - simulator_progressed = std::invoke(simulator_notifier_); + std::invoke(simulator_notifier_); lock.lock(); if (item_) { // item may have been filled while we @@ -85,8 +84,7 @@ class Shared { // the simulator of our waiting_ status. break; } - } - if (!simulator_progressed) [[likely]] { + } else { cv_.wait(lock); } MG_ASSERT(!consumed_, "Future consumed twice!"); diff --git a/src/io/message_histogram_collector.hpp b/src/io/message_histogram_collector.hpp index e663be988..27521fdeb 100644 --- a/src/io/message_histogram_collector.hpp +++ b/src/io/message_histogram_collector.hpp @@ -39,6 +39,8 @@ struct LatencyHistogramSummary { Duration p100; Duration sum; + friend bool operator==(const LatencyHistogramSummary &lhs, const LatencyHistogramSummary &rhs) = default; + friend std::ostream &operator<<(std::ostream &in, const LatencyHistogramSummary &histo) { in << "{ \"count\": " << histo.count; in << ", \"p0\": " << histo.p0.count(); @@ -80,6 +82,8 @@ struct LatencyHistogramSummaries { return output; } + friend bool operator==(const LatencyHistogramSummaries &lhs, const LatencyHistogramSummaries &rhs) = default; + friend std::ostream &operator<<(std::ostream &in, const LatencyHistogramSummaries &histo) { using memgraph::utils::print_helpers::operator<<; in << histo.latencies; diff --git a/src/io/simulator/simulator_handle.cpp b/src/io/simulator/simulator_handle.cpp index 74925812e..096673e21 100644 --- a/src/io/simulator/simulator_handle.cpp +++ b/src/io/simulator/simulator_handle.cpp @@ -46,9 +46,13 @@ void SimulatorHandle::IncrementServerCountAndWaitForQuiescentState(Address addre const bool all_servers_blocked = blocked_servers == server_addresses_.size(); if (all_servers_blocked) { + spdlog::info("quiescent state detected - {} out of {} servers now blocked on receive", blocked_servers, + server_addresses_.size()); return; } + spdlog::info("not returning from quiescent because we see {} blocked out of {}", blocked_servers, + server_addresses_.size()); cv_.wait(lock); } } @@ -67,9 +71,14 @@ bool SimulatorHandle::MaybeTickSimulator() { stats_.simulator_ticks++; - cv_.notify_all(); + bool fired_cv = false; + bool timed_anything_out = TimeoutPromisesPastDeadline(); - TimeoutPromisesPastDeadline(); + if (timed_anything_out) { + fired_cv = true; + blocked_on_receive_.clear(); + cv_.notify_all(); + } if (in_flight_.empty()) { // return early here because there are no messages to schedule @@ -79,9 +88,15 @@ bool SimulatorHandle::MaybeTickSimulator() { const Duration clock_advance = std::chrono::microseconds{time_distrib_(rng_)}; cluster_wide_time_microseconds_ += clock_advance; + if (!fired_cv) { + cv_.notify_all(); + blocked_on_receive_.clear(); + fired_cv = true; + } + if (cluster_wide_time_microseconds_ >= config_.abort_time) { if (should_shut_down_) { - return false; + return true; } spdlog::error( "Cluster has executed beyond its configured abort_time, and something may be failing to make progress " @@ -120,22 +135,32 @@ bool SimulatorHandle::MaybeTickSimulator() { if (should_drop || normal_timeout) { stats_.timed_out_requests++; dop.promise.TimeOut(); + spdlog::info("timing out request"); } else { stats_.total_responses++; Duration response_latency = cluster_wide_time_microseconds_ - dop.requested_at; auto type_info = opaque_message.type_info; dop.promise.Fill(std::move(opaque_message), response_latency); histograms_.Measure(type_info, response_latency); + spdlog::info("replying to request"); } } else if (should_drop) { // don't add it anywhere, let it drop + spdlog::info("silently dropping request"); } else { // add to can_receive_ if not + spdlog::info("adding message to can_receive_"); const auto &[om_vec, inserted] = can_receive_.try_emplace(to_address.ToPartialAddress(), std::vector()); om_vec->second.emplace_back(std::move(opaque_message)); } + if (!fired_cv) { + cv_.notify_all(); + blocked_on_receive_.clear(); + fired_cv = true; + } + return true; } diff --git a/src/io/simulator/simulator_handle.hpp b/src/io/simulator/simulator_handle.hpp index 3420786c7..5e564224b 100644 --- a/src/io/simulator/simulator_handle.hpp +++ b/src/io/simulator/simulator_handle.hpp @@ -58,7 +58,8 @@ class SimulatorHandle { MessageHistogramCollector histograms_; RequestId request_id_counter_{0}; - void TimeoutPromisesPastDeadline() { + bool TimeoutPromisesPastDeadline() { + bool timed_anything_out = false; const Time now = cluster_wide_time_microseconds_; for (auto it = promises_.begin(); it != promises_.end();) { auto &[promise_key, dop] = *it; @@ -68,10 +69,12 @@ class SimulatorHandle { it = promises_.erase(it); stats_.timed_out_requests++; + timed_anything_out = true; } else { ++it; } } + return timed_anything_out; } public: @@ -103,6 +106,7 @@ class SimulatorHandle { template ResponseFuture SubmitRequest(Address to_address, Address from_address, Request &&request, Duration timeout, std::function &&maybe_tick_simulator) { + spdlog::info("submitting request to {}", to_address.last_known_port); auto type_info = TypeInfoFor(request); auto [future, promise] = memgraph::io::FuturePromisePairWithNotifier>( @@ -146,8 +150,6 @@ class SimulatorHandle { requires(sizeof...(Ms) > 0) RequestResult Receive(const Address &receiver, Duration timeout) { std::unique_lock lock(mu_); - blocked_on_receive_.emplace(receiver); - const Time deadline = cluster_wide_time_microseconds_ + timeout; auto partial_address = receiver.ToPartialAddress(); @@ -164,38 +166,46 @@ class SimulatorHandle { auto m_opt = std::move(message).Take(); MG_ASSERT(m_opt.has_value(), "Wrong message type received compared to the expected type"); - blocked_on_receive_.erase(receiver); - return std::move(m_opt).value(); } } lock.unlock(); + spdlog::info("server calling MaybeTickSimulator"); bool made_progress = MaybeTickSimulator(); + spdlog::info("server returned from MaybeTickSimulator"); lock.lock(); if (!should_shut_down_ && !made_progress) { + blocked_on_receive_.emplace(receiver); + cv_.notify_all(); + spdlog::info("blocking receiver {}", receiver.ToPartialAddress().port); cv_.wait(lock); } } - blocked_on_receive_.erase(receiver); - return TimedOut{}; } template void Send(Address to_address, Address from_address, RequestId request_id, M message) { + spdlog::info("sending message from {} to {}", from_address.last_known_port, to_address.last_known_port); auto type_info = TypeInfoFor(message); - std::unique_lock lock(mu_); - std::any message_any(std::move(message)); - OpaqueMessage om{.to_address = to_address, - .from_address = from_address, - .request_id = request_id, - .message = std::move(message_any), - .type_info = type_info}; - in_flight_.emplace_back(std::make_pair(std::move(to_address), std::move(om))); + { + std::unique_lock lock(mu_); + std::any message_any(std::move(message)); + OpaqueMessage om{.to_address = to_address, + .from_address = from_address, + .request_id = request_id, + .message = std::move(message_any), + .type_info = type_info}; + in_flight_.emplace_back(std::make_pair(std::move(to_address), std::move(om))); - stats_.total_messages++; + stats_.total_messages++; + } // lock dropped before cv notification + + spdlog::info("sender calling MaybeTickSimulator"); + MaybeTickSimulator(); + spdlog::info("sender returned from MaybeTickSimulator"); cv_.notify_all(); } diff --git a/src/io/simulator/simulator_stats.hpp b/src/io/simulator/simulator_stats.hpp index 7f529a456..08d1f1dd3 100644 --- a/src/io/simulator/simulator_stats.hpp +++ b/src/io/simulator/simulator_stats.hpp @@ -21,5 +21,19 @@ struct SimulatorStats { uint64_t total_requests = 0; uint64_t total_responses = 0; uint64_t simulator_ticks = 0; + + friend bool operator==(const SimulatorStats &lhs, const SimulatorStats &rhs) = default; + + friend std::ostream &operator<<(std::ostream &in, const SimulatorStats &stats) { + std::string formated = fmt::format( + "SimulatorStats {{ total_messages: {}, dropped_messages: {}, timed_out_requests: {}, total_requests: {}, " + "total_responses: {}, simulator_ticks: {} }}", + stats.total_messages, stats.dropped_messages, stats.timed_out_requests, stats.total_requests, + stats.total_responses, stats.simulator_ticks); + + in << formated; + + return in; + } }; }; // namespace memgraph::io::simulator diff --git a/src/io/simulator/simulator_transport.hpp b/src/io/simulator/simulator_transport.hpp index 5e5a24aa9..6b6bc7c5c 100644 --- a/src/io/simulator/simulator_transport.hpp +++ b/src/io/simulator/simulator_transport.hpp @@ -34,7 +34,12 @@ class SimulatorTransport { template ResponseFuture Request(Address to_address, Address from_address, RequestT request, Duration timeout) { - std::function maybe_tick_simulator = [this] { return simulator_handle_->MaybeTickSimulator(); }; + std::function maybe_tick_simulator = [this] { + spdlog::info("client calling MaybeTickSimulator"); + bool ret = simulator_handle_->MaybeTickSimulator(); + spdlog::info("client returned from MaybeTickSimulator"); + return ret; + }; return simulator_handle_->template SubmitRequest(to_address, from_address, std::move(request), timeout, std::move(maybe_tick_simulator)); diff --git a/tests/simulation/basic_request.cpp b/tests/simulation/basic_request.cpp index 1f6d60f77..27f083273 100644 --- a/tests/simulation/basic_request.cpp +++ b/tests/simulation/basic_request.cpp @@ -9,17 +9,25 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. +#include #include +#include +#include +#include + +#include "io/message_histogram_collector.hpp" #include "io/simulator/simulator.hpp" #include "utils/print_helpers.hpp" using memgraph::io::Address; using memgraph::io::Io; +using memgraph::io::LatencyHistogramSummaries; using memgraph::io::ResponseFuture; using memgraph::io::ResponseResult; using memgraph::io::simulator::Simulator; using memgraph::io::simulator::SimulatorConfig; +using memgraph::io::simulator::SimulatorStats; using memgraph::io::simulator::SimulatorTransport; struct CounterRequest { @@ -40,6 +48,7 @@ void run_server(Io io) { std::cout << "[SERVER] Error, continue" << std::endl; continue; } + std::cout << "[SERVER] Got message" << std::endl; auto request_envelope = request_result.GetValue(); auto req = std::get(request_envelope.message); @@ -50,13 +59,7 @@ void run_server(Io io) { } } -int main() { - auto config = SimulatorConfig{ - .drop_percent = 0, - .perform_timeouts = true, - .scramble_messages = true, - .rng_seed = 0, - }; +std::pair RunWorkload(SimulatorConfig &config) { auto simulator = Simulator(config); auto cli_addr = Address::TestAddress(1); @@ -72,21 +75,47 @@ int main() { // send request CounterRequest cli_req; cli_req.proposal = i; + spdlog::info("[CLIENT] calling Request"); auto res_f = cli_io.Request(srv_addr, cli_req); + spdlog::info("[CLIENT] calling Wait"); auto res_rez = std::move(res_f).Wait(); + spdlog::info("[CLIENT] Wait returned"); if (!res_rez.HasError()) { - std::cout << "[CLIENT] Got a valid response" << std::endl; + spdlog::info("[CLIENT] Got a valid response"); auto env = res_rez.GetValue(); MG_ASSERT(env.message.highest_seen == i); - std::cout << "response latency: " << env.response_latency.count() << " microseconds" << std::endl; + spdlog::info("response latency: {} microseconds", env.response_latency.count()); } else { - std::cout << "[CLIENT] Got an error" << std::endl; + spdlog::info("[CLIENT] Got an error"); } } - using memgraph::utils::print_helpers::operator<<; - std::cout << "response latencies: " << cli_io.ResponseLatencies() << std::endl; - simulator.ShutDown(); + + return std::make_pair(simulator.Stats(), cli_io.ResponseLatencies()); +} + +int main() { + spdlog::cfg::load_env_levels(); + + auto config = SimulatorConfig{ + .drop_percent = 0, + .perform_timeouts = true, + .scramble_messages = true, + .rng_seed = 0, + }; + + auto [sim_stats_1, latency_stats_1] = RunWorkload(config); + auto [sim_stats_2, latency_stats_2] = RunWorkload(config); + + if (sim_stats_1 != sim_stats_2 || latency_stats_1 != latency_stats_2) { + spdlog::info("simulator stats diverged across runs"); + spdlog::info("run 1 simulator stats: {}", sim_stats_1); + spdlog::info("run 2 simulator stats: {}", sim_stats_2); + spdlog::info("run 1 latency:\n{}", latency_stats_1.SummaryTable()); + spdlog::info("run 2 latency:\n{}", latency_stats_2.SummaryTable()); + std::terminate(); + } + return 0; } From 12880fc71a0e6cf09a64fb6cd0e882c7fb788d63 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Thu, 17 Nov 2022 18:27:12 +0000 Subject: [PATCH 02/26] Don't advance the simulator handle from server threads themselves --- src/io/simulator/simulator_handle.hpp | 11 +----- tests/simulation/raft.cpp | 48 ++++++++++++++++++++------- 2 files changed, 37 insertions(+), 22 deletions(-) diff --git a/src/io/simulator/simulator_handle.hpp b/src/io/simulator/simulator_handle.hpp index 5e564224b..00aeaee25 100644 --- a/src/io/simulator/simulator_handle.hpp +++ b/src/io/simulator/simulator_handle.hpp @@ -170,12 +170,7 @@ class SimulatorHandle { } } - lock.unlock(); - spdlog::info("server calling MaybeTickSimulator"); - bool made_progress = MaybeTickSimulator(); - spdlog::info("server returned from MaybeTickSimulator"); - lock.lock(); - if (!should_shut_down_ && !made_progress) { + if (!should_shut_down_) { blocked_on_receive_.emplace(receiver); cv_.notify_all(); spdlog::info("blocking receiver {}", receiver.ToPartialAddress().port); @@ -203,10 +198,6 @@ class SimulatorHandle { stats_.total_messages++; } // lock dropped before cv notification - spdlog::info("sender calling MaybeTickSimulator"); - MaybeTickSimulator(); - spdlog::info("sender returned from MaybeTickSimulator"); - cv_.notify_all(); } diff --git a/tests/simulation/raft.cpp b/tests/simulation/raft.cpp index df619bd42..9adbc152f 100644 --- a/tests/simulation/raft.cpp +++ b/tests/simulation/raft.cpp @@ -18,7 +18,10 @@ #include #include +#include + #include "io/address.hpp" +#include "io/message_histogram_collector.hpp" #include "io/rsm/raft.hpp" #include "io/rsm/rsm_client.hpp" #include "io/simulator/simulator.hpp" @@ -27,6 +30,7 @@ using memgraph::io::Address; using memgraph::io::Duration; using memgraph::io::Io; +using memgraph::io::LatencyHistogramSummaries; using memgraph::io::ResponseEnvelope; using memgraph::io::ResponseFuture; using memgraph::io::ResponseResult; @@ -123,16 +127,7 @@ void RunRaft(Raft RunSimulation(SimulatorConfig &config) { auto simulator = Simulator(config); auto cli_addr = Address::TestAddress(1); @@ -240,6 +235,8 @@ void RunSimulation() { spdlog::info("========================== SUCCESS :) =========================="); + return std::make_pair(simulator.Stats(), cli_io.ResponseLatencies()); + /* this is implicit in jthread's dtor srv_thread_1.join(); @@ -249,12 +246,39 @@ void RunSimulation() { } int main() { + spdlog::cfg::load_env_levels(); + int n_tests = 50; + SimulatorConfig config{ + .drop_percent = 5, + .perform_timeouts = true, + .scramble_messages = true, + .rng_seed = 0, + .start_time = Time::min() + std::chrono::microseconds{256 * 1024}, + .abort_time = Time::max(), + }; + for (int i = 0; i < n_tests; i++) { - spdlog::info("========================== NEW SIMULATION {} ==========================", i); + spdlog::error("========================== NEW SIMULATION SEED {} ==========================", i); spdlog::info("\tTime\t\tTerm\tPort\tRole\t\tMessage\n"); - RunSimulation(); + + // this is vital to cause tests to behave differently across runs + config.rng_seed = i; + + auto [sim_stats_1, latency_stats_1] = RunSimulation(config); + auto [sim_stats_2, latency_stats_2] = RunSimulation(config); + + if (sim_stats_1 != sim_stats_2 || latency_stats_1 != latency_stats_2) { + spdlog::error("simulator stats diverged across runs"); + spdlog::error("run 1 simulator stats: {}", sim_stats_1); + spdlog::error("run 2 simulator stats: {}", sim_stats_2); + spdlog::error("run 1 latency:\n{}", latency_stats_1.SummaryTable()); + spdlog::error("run 2 latency:\n{}", latency_stats_2.SummaryTable()); + std::terminate(); + } + spdlog::error("run 1 simulator stats: {}", sim_stats_1); + spdlog::error("run 2 simulator stats: {}", sim_stats_2); } spdlog::info("passed {} tests!", n_tests); From 098084314e87e1acec8c9965dad05548103249d1 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Thu, 17 Nov 2022 21:22:41 +0000 Subject: [PATCH 03/26] Make TestAddress deterministically sortable --- src/io/address.hpp | 2 +- tests/simulation/basic_request.cpp | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/io/address.hpp b/src/io/address.hpp index a6e372edb..533b62bcc 100644 --- a/src/io/address.hpp +++ b/src/io/address.hpp @@ -59,7 +59,7 @@ struct Address { static Address TestAddress(uint16_t port) { return Address{ - .unique_id = boost::uuids::uuid{boost::uuids::random_generator()()}, + .unique_id = boost::uuids::uuid{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, static_cast(port)}, .last_known_port = port, }; } diff --git a/tests/simulation/basic_request.cpp b/tests/simulation/basic_request.cpp index 27f083273..868f4ac10 100644 --- a/tests/simulation/basic_request.cpp +++ b/tests/simulation/basic_request.cpp @@ -109,11 +109,11 @@ int main() { auto [sim_stats_2, latency_stats_2] = RunWorkload(config); if (sim_stats_1 != sim_stats_2 || latency_stats_1 != latency_stats_2) { - spdlog::info("simulator stats diverged across runs"); - spdlog::info("run 1 simulator stats: {}", sim_stats_1); - spdlog::info("run 2 simulator stats: {}", sim_stats_2); - spdlog::info("run 1 latency:\n{}", latency_stats_1.SummaryTable()); - spdlog::info("run 2 latency:\n{}", latency_stats_2.SummaryTable()); + spdlog::error("simulator stats diverged across runs"); + spdlog::error("run 1 simulator stats: {}", sim_stats_1); + spdlog::error("run 2 simulator stats: {}", sim_stats_2); + spdlog::error("run 1 latency:\n{}", latency_stats_1.SummaryTable()); + spdlog::error("run 2 latency:\n{}", latency_stats_2.SummaryTable()); std::terminate(); } From 262df5c6a2e01b09cdb241e04a2e9eb690b1fe08 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Thu, 17 Nov 2022 21:24:13 +0000 Subject: [PATCH 04/26] Avoid unordered_map in Raft code for more determinism --- src/io/rsm/raft.hpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/io/rsm/raft.hpp b/src/io/rsm/raft.hpp index eccbf031b..4ec22ff87 100644 --- a/src/io/rsm/raft.hpp +++ b/src/io/rsm/raft.hpp @@ -19,7 +19,6 @@ #include #include #include -#include #include #include @@ -182,7 +181,7 @@ struct PendingClientRequest { struct Leader { std::map followers; - std::unordered_map pending_client_requests; + std::map pending_client_requests; Time last_broadcast = Time::min(); std::string static ToString() { return "\tLeader \t"; } @@ -683,7 +682,7 @@ class Raft { return Leader{ .followers = std::move(followers), - .pending_client_requests = std::unordered_map(), + .pending_client_requests = std::map(), }; } From cf73ed529d901209f42a22d2720fbea8c70b446c Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Thu, 17 Nov 2022 21:27:48 +0000 Subject: [PATCH 05/26] Block messages from being delivered upon ShutDown --- src/io/simulator/simulator_handle.cpp | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/io/simulator/simulator_handle.cpp b/src/io/simulator/simulator_handle.cpp index 096673e21..df7895dd7 100644 --- a/src/io/simulator/simulator_handle.cpp +++ b/src/io/simulator/simulator_handle.cpp @@ -23,6 +23,12 @@ namespace memgraph::io::simulator { void SimulatorHandle::ShutDown() { std::unique_lock lock(mu_); should_shut_down_ = true; + for (auto it = promises_.begin(); it != promises_.end();) { + auto &[promise_key, dop] = *it; + std::move(dop).promise.TimeOut(); + it = promises_.erase(it); + } + can_receive_.clear(); cv_.notify_all(); } @@ -62,7 +68,7 @@ bool SimulatorHandle::MaybeTickSimulator() { const size_t blocked_servers = blocked_on_receive_.size(); - if (blocked_servers < server_addresses_.size()) { + if (should_shut_down_ || blocked_servers < server_addresses_.size()) { // we only need to advance the simulator when all // servers have reached a quiescent state, blocked // on their own futures or receive methods. From 9c3d683942289b2eba282ee3101d83ac810c7008 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Thu, 17 Nov 2022 21:28:17 +0000 Subject: [PATCH 06/26] Explicitly join test threads before collecting test stats --- tests/simulation/raft.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/simulation/raft.cpp b/tests/simulation/raft.cpp index 9adbc152f..16fcc9906 100644 --- a/tests/simulation/raft.cpp +++ b/tests/simulation/raft.cpp @@ -224,6 +224,10 @@ std::pair RunSimulation(SimulatorConf simulator.ShutDown(); + srv_thread_1.join(); + srv_thread_2.join(); + srv_thread_3.join(); + SimulatorStats stats = simulator.Stats(); spdlog::info("total messages: {}", stats.total_messages); From f6017697d6b1841c6330abd196878762c30250c4 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Thu, 17 Nov 2022 21:32:55 +0000 Subject: [PATCH 07/26] Make raft tests fully deterministic for rng_seeds between 0 and 500 at 1% message loss --- src/io/simulator/simulator_handle.cpp | 15 +++++++++++++-- src/io/simulator/simulator_handle.hpp | 9 ++++++--- src/io/simulator/simulator_transport.hpp | 7 +------ tests/simulation/raft.cpp | 16 +++++----------- 4 files changed, 25 insertions(+), 22 deletions(-) diff --git a/src/io/simulator/simulator_handle.cpp b/src/io/simulator/simulator_handle.cpp index df7895dd7..2a5b0a247 100644 --- a/src/io/simulator/simulator_handle.cpp +++ b/src/io/simulator/simulator_handle.cpp @@ -75,12 +75,14 @@ bool SimulatorHandle::MaybeTickSimulator() { return false; } + spdlog::info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ simulator tick ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"); stats_.simulator_ticks++; bool fired_cv = false; bool timed_anything_out = TimeoutPromisesPastDeadline(); if (timed_anything_out) { + spdlog::info("simulator progressing: timed out a request"); fired_cv = true; blocked_on_receive_.clear(); cv_.notify_all(); @@ -95,6 +97,7 @@ bool SimulatorHandle::MaybeTickSimulator() { cluster_wide_time_microseconds_ += clock_advance; if (!fired_cv) { + spdlog::info("simulator progressing: clock advanced by {}", clock_advance.count()); cv_.notify_all(); blocked_on_receive_.clear(); fired_cv = true; @@ -109,6 +112,9 @@ bool SimulatorHandle::MaybeTickSimulator() { "in an expected amount of time."); throw utils::BasicException{"Cluster has executed beyond its configured abort_time"}; } + + // if the clock is advanced, no messages should be delivered also. + // do that in a future tick. return true; } @@ -141,7 +147,7 @@ bool SimulatorHandle::MaybeTickSimulator() { if (should_drop || normal_timeout) { stats_.timed_out_requests++; dop.promise.TimeOut(); - spdlog::info("timing out request"); + spdlog::info("timing out request "); } else { stats_.total_responses++; Duration response_latency = cluster_wide_time_microseconds_ - dop.requested_at; @@ -153,15 +159,20 @@ bool SimulatorHandle::MaybeTickSimulator() { } else if (should_drop) { // don't add it anywhere, let it drop spdlog::info("silently dropping request"); + + // we don't want to reset the block list here + return true; } else { // add to can_receive_ if not - spdlog::info("adding message to can_receive_"); + spdlog::info("adding message to can_receive_ from {} to {}", opaque_message.from_address.last_known_port, + opaque_message.to_address.last_known_port); const auto &[om_vec, inserted] = can_receive_.try_emplace(to_address.ToPartialAddress(), std::vector()); om_vec->second.emplace_back(std::move(opaque_message)); } if (!fired_cv) { + spdlog::info("simulator progressing: handled a message"); cv_.notify_all(); blocked_on_receive_.clear(); fired_cv = true; diff --git a/src/io/simulator/simulator_handle.hpp b/src/io/simulator/simulator_handle.hpp index 00aeaee25..ce341a50b 100644 --- a/src/io/simulator/simulator_handle.hpp +++ b/src/io/simulator/simulator_handle.hpp @@ -171,12 +171,15 @@ class SimulatorHandle { } if (!should_shut_down_) { - blocked_on_receive_.emplace(receiver); - cv_.notify_all(); - spdlog::info("blocking receiver {}", receiver.ToPartialAddress().port); + if (!blocked_on_receive_.contains(receiver)) { + blocked_on_receive_.emplace(receiver); + spdlog::info("blocking receiver {}", receiver.ToPartialAddress().port); + cv_.notify_all(); + } cv_.wait(lock); } } + spdlog::info("timing out receiver {}", receiver.ToPartialAddress().port); return TimedOut{}; } diff --git a/src/io/simulator/simulator_transport.hpp b/src/io/simulator/simulator_transport.hpp index 6b6bc7c5c..5e5a24aa9 100644 --- a/src/io/simulator/simulator_transport.hpp +++ b/src/io/simulator/simulator_transport.hpp @@ -34,12 +34,7 @@ class SimulatorTransport { template ResponseFuture Request(Address to_address, Address from_address, RequestT request, Duration timeout) { - std::function maybe_tick_simulator = [this] { - spdlog::info("client calling MaybeTickSimulator"); - bool ret = simulator_handle_->MaybeTickSimulator(); - spdlog::info("client returned from MaybeTickSimulator"); - return ret; - }; + std::function maybe_tick_simulator = [this] { return simulator_handle_->MaybeTickSimulator(); }; return simulator_handle_->template SubmitRequest(to_address, from_address, std::move(request), timeout, std::move(maybe_tick_simulator)); diff --git a/tests/simulation/raft.cpp b/tests/simulation/raft.cpp index 16fcc9906..b69470a72 100644 --- a/tests/simulation/raft.cpp +++ b/tests/simulation/raft.cpp @@ -240,13 +240,6 @@ std::pair RunSimulation(SimulatorConf spdlog::info("========================== SUCCESS :) =========================="); return std::make_pair(simulator.Stats(), cli_io.ResponseLatencies()); - - /* - this is implicit in jthread's dtor - srv_thread_1.join(); - srv_thread_2.join(); - srv_thread_3.join(); - */ } int main() { @@ -264,25 +257,26 @@ int main() { }; for (int i = 0; i < n_tests; i++) { - spdlog::error("========================== NEW SIMULATION SEED {} ==========================", i); spdlog::info("\tTime\t\tTerm\tPort\tRole\t\tMessage\n"); // this is vital to cause tests to behave differently across runs config.rng_seed = i; + spdlog::info("========================== NEW SIMULATION SEED {} ==========================", i); auto [sim_stats_1, latency_stats_1] = RunSimulation(config); + spdlog::info("========================== NEW SIMULATION SEED {} ==========================", i); auto [sim_stats_2, latency_stats_2] = RunSimulation(config); if (sim_stats_1 != sim_stats_2 || latency_stats_1 != latency_stats_2) { - spdlog::error("simulator stats diverged across runs"); + spdlog::error("simulator stats diverged across runs for test rng_seed {}", i); spdlog::error("run 1 simulator stats: {}", sim_stats_1); spdlog::error("run 2 simulator stats: {}", sim_stats_2); spdlog::error("run 1 latency:\n{}", latency_stats_1.SummaryTable()); spdlog::error("run 2 latency:\n{}", latency_stats_2.SummaryTable()); std::terminate(); } - spdlog::error("run 1 simulator stats: {}", sim_stats_1); - spdlog::error("run 2 simulator stats: {}", sim_stats_2); + spdlog::info("run 1 simulator stats: {}", sim_stats_1); + spdlog::info("run 2 simulator stats: {}", sim_stats_2); } spdlog::info("passed {} tests!", n_tests); From 923325b8fa9b0be7014c5734e0fcb5e63fa9623c Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Fri, 18 Nov 2022 09:04:29 +0000 Subject: [PATCH 08/26] Progress the simulator clock even when there are messages to deliver --- src/io/simulator/simulator_handle.cpp | 50 ++++++++++++--------------- src/io/simulator/simulator_handle.hpp | 2 +- 2 files changed, 24 insertions(+), 28 deletions(-) diff --git a/src/io/simulator/simulator_handle.cpp b/src/io/simulator/simulator_handle.cpp index 2a5b0a247..2596ad12d 100644 --- a/src/io/simulator/simulator_handle.cpp +++ b/src/io/simulator/simulator_handle.cpp @@ -75,6 +75,8 @@ bool SimulatorHandle::MaybeTickSimulator() { return false; } + // We allow the simulator to progress the state of the system only + // after all servers are blocked on receive. spdlog::info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ simulator tick ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"); stats_.simulator_ticks++; @@ -88,37 +90,31 @@ bool SimulatorHandle::MaybeTickSimulator() { cv_.notify_all(); } + const Duration clock_advance = std::chrono::microseconds{time_distrib_(rng_)}; + cluster_wide_time_microseconds_ += clock_advance; + + if (!fired_cv) { + spdlog::info("simulator progressing: clock advanced by {}", clock_advance.count()); + cv_.notify_all(); + blocked_on_receive_.clear(); + fired_cv = true; + } + + if (cluster_wide_time_microseconds_ >= config_.abort_time) { + if (should_shut_down_) { + return true; + } + spdlog::error( + "Cluster has executed beyond its configured abort_time, and something may be failing to make progress " + "in an expected amount of time."); + throw utils::BasicException{"Cluster has executed beyond its configured abort_time"}; + } + if (in_flight_.empty()) { - // return early here because there are no messages to schedule - - // We tick the clock forward when all servers are blocked but - // there are no in-flight messages to schedule delivery of. - const Duration clock_advance = std::chrono::microseconds{time_distrib_(rng_)}; - cluster_wide_time_microseconds_ += clock_advance; - - if (!fired_cv) { - spdlog::info("simulator progressing: clock advanced by {}", clock_advance.count()); - cv_.notify_all(); - blocked_on_receive_.clear(); - fired_cv = true; - } - - if (cluster_wide_time_microseconds_ >= config_.abort_time) { - if (should_shut_down_) { - return true; - } - spdlog::error( - "Cluster has executed beyond its configured abort_time, and something may be failing to make progress " - "in an expected amount of time."); - throw utils::BasicException{"Cluster has executed beyond its configured abort_time"}; - } - - // if the clock is advanced, no messages should be delivered also. - // do that in a future tick. return true; } - if (config_.scramble_messages) { + if (config_.scramble_messages && in_flight_.size() > 1) { // scramble messages std::uniform_int_distribution swap_distrib(0, in_flight_.size() - 1); const size_t swap_index = swap_distrib(rng_); diff --git a/src/io/simulator/simulator_handle.hpp b/src/io/simulator/simulator_handle.hpp index ce341a50b..71e60acf3 100644 --- a/src/io/simulator/simulator_handle.hpp +++ b/src/io/simulator/simulator_handle.hpp @@ -52,7 +52,7 @@ class SimulatorHandle { std::set
blocked_on_receive_; std::set
server_addresses_; std::mt19937 rng_; - std::uniform_int_distribution time_distrib_{5, 50}; + std::uniform_int_distribution time_distrib_{0, 50}; std::uniform_int_distribution drop_distrib_{0, 99}; SimulatorConfig config_; MessageHistogramCollector histograms_; From 6b9a617df079ae757e5a3de04d736fabadee4720 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Fri, 18 Nov 2022 09:20:15 +0000 Subject: [PATCH 09/26] Streamline simulator tick condition varible notification. Advance time more aggressively --- src/io/simulator/simulator_handle.cpp | 31 ++++++--------------------- src/io/simulator/simulator_handle.hpp | 2 +- tests/simulation/raft.cpp | 2 -- 3 files changed, 8 insertions(+), 27 deletions(-) diff --git a/src/io/simulator/simulator_handle.cpp b/src/io/simulator/simulator_handle.cpp index 2596ad12d..53b969712 100644 --- a/src/io/simulator/simulator_handle.cpp +++ b/src/io/simulator/simulator_handle.cpp @@ -79,26 +79,19 @@ bool SimulatorHandle::MaybeTickSimulator() { // after all servers are blocked on receive. spdlog::info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ simulator tick ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"); stats_.simulator_ticks++; + blocked_on_receive_.clear(); + cv_.notify_all(); - bool fired_cv = false; bool timed_anything_out = TimeoutPromisesPastDeadline(); if (timed_anything_out) { spdlog::info("simulator progressing: timed out a request"); - fired_cv = true; - blocked_on_receive_.clear(); - cv_.notify_all(); } const Duration clock_advance = std::chrono::microseconds{time_distrib_(rng_)}; cluster_wide_time_microseconds_ += clock_advance; - if (!fired_cv) { - spdlog::info("simulator progressing: clock advanced by {}", clock_advance.count()); - cv_.notify_all(); - blocked_on_receive_.clear(); - fired_cv = true; - } + spdlog::info("simulator progressing: clock advanced by {}", clock_advance.count()); if (cluster_wide_time_microseconds_ >= config_.abort_time) { if (should_shut_down_) { @@ -143,37 +136,27 @@ bool SimulatorHandle::MaybeTickSimulator() { if (should_drop || normal_timeout) { stats_.timed_out_requests++; dop.promise.TimeOut(); - spdlog::info("timing out request "); + spdlog::info("simulator timing out request "); } else { stats_.total_responses++; Duration response_latency = cluster_wide_time_microseconds_ - dop.requested_at; auto type_info = opaque_message.type_info; dop.promise.Fill(std::move(opaque_message), response_latency); histograms_.Measure(type_info, response_latency); - spdlog::info("replying to request"); + spdlog::info("simulator replying to request"); } } else if (should_drop) { // don't add it anywhere, let it drop - spdlog::info("silently dropping request"); - - // we don't want to reset the block list here - return true; + spdlog::info("simulator silently dropping request"); } else { // add to can_receive_ if not - spdlog::info("adding message to can_receive_ from {} to {}", opaque_message.from_address.last_known_port, + spdlog::info("simulator adding message to can_receive_ from {} to {}", opaque_message.from_address.last_known_port, opaque_message.to_address.last_known_port); const auto &[om_vec, inserted] = can_receive_.try_emplace(to_address.ToPartialAddress(), std::vector()); om_vec->second.emplace_back(std::move(opaque_message)); } - if (!fired_cv) { - spdlog::info("simulator progressing: handled a message"); - cv_.notify_all(); - blocked_on_receive_.clear(); - fired_cv = true; - } - return true; } diff --git a/src/io/simulator/simulator_handle.hpp b/src/io/simulator/simulator_handle.hpp index 71e60acf3..1ef0b3dfb 100644 --- a/src/io/simulator/simulator_handle.hpp +++ b/src/io/simulator/simulator_handle.hpp @@ -52,7 +52,7 @@ class SimulatorHandle { std::set
blocked_on_receive_; std::set
server_addresses_; std::mt19937 rng_; - std::uniform_int_distribution time_distrib_{0, 50}; + std::uniform_int_distribution time_distrib_{0, 1000}; std::uniform_int_distribution drop_distrib_{0, 99}; SimulatorConfig config_; MessageHistogramCollector histograms_; diff --git a/tests/simulation/raft.cpp b/tests/simulation/raft.cpp index b69470a72..9e2583814 100644 --- a/tests/simulation/raft.cpp +++ b/tests/simulation/raft.cpp @@ -275,8 +275,6 @@ int main() { spdlog::error("run 2 latency:\n{}", latency_stats_2.SummaryTable()); std::terminate(); } - spdlog::info("run 1 simulator stats: {}", sim_stats_1); - spdlog::info("run 2 simulator stats: {}", sim_stats_2); } spdlog::info("passed {} tests!", n_tests); From a37e7e4affc952405bc3936a253feea587b2ae62 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Fri, 18 Nov 2022 10:19:55 +0000 Subject: [PATCH 10/26] Add assert to ensure TestAddress will not be higher than the uchar max --- src/io/address.hpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/io/address.hpp b/src/io/address.hpp index 533b62bcc..e38b1e514 100644 --- a/src/io/address.hpp +++ b/src/io/address.hpp @@ -58,6 +58,8 @@ struct Address { uint16_t last_known_port; static Address TestAddress(uint16_t port) { + MG_ASSERT(port <= 255); + return Address{ .unique_id = boost::uuids::uuid{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, static_cast(port)}, .last_known_port = port, From 0f32407bdcc7280a9a90a93a473555ff84d055ca Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Fri, 18 Nov 2022 10:20:45 +0000 Subject: [PATCH 11/26] Add compare header to histogram collector header --- src/io/message_histogram_collector.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/io/message_histogram_collector.hpp b/src/io/message_histogram_collector.hpp index 27521fdeb..a2cf266b7 100644 --- a/src/io/message_histogram_collector.hpp +++ b/src/io/message_histogram_collector.hpp @@ -13,6 +13,7 @@ #include #include +#include #include #include From 7115a7e75bd6c4d6a5692ed494e600d4d95e3baa Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Fri, 18 Nov 2022 10:24:19 +0000 Subject: [PATCH 12/26] Apply clang-tidy fixes --- src/io/address.hpp | 2 ++ src/io/simulator/simulator_stats.hpp | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/io/address.hpp b/src/io/address.hpp index e38b1e514..3bb518b50 100644 --- a/src/io/address.hpp +++ b/src/io/address.hpp @@ -20,6 +20,8 @@ #include #include +#include "utils/logging.hpp" + namespace memgraph::io { struct PartialAddress { diff --git a/src/io/simulator/simulator_stats.hpp b/src/io/simulator/simulator_stats.hpp index 08d1f1dd3..ccc5ec1de 100644 --- a/src/io/simulator/simulator_stats.hpp +++ b/src/io/simulator/simulator_stats.hpp @@ -13,6 +13,8 @@ #include +#include + namespace memgraph::io::simulator { struct SimulatorStats { uint64_t total_messages = 0; @@ -22,7 +24,7 @@ struct SimulatorStats { uint64_t total_responses = 0; uint64_t simulator_ticks = 0; - friend bool operator==(const SimulatorStats &lhs, const SimulatorStats &rhs) = default; + friend bool operator==(const SimulatorStats & /* lhs */, const SimulatorStats & /* rhs */) = default; friend std::ostream &operator<<(std::ostream &in, const SimulatorStats &stats) { std::string formated = fmt::format( From 3ad84897351f1f23edae78cd1922ef0d5197a0d7 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Fri, 18 Nov 2022 10:34:21 +0000 Subject: [PATCH 13/26] Run raft sim with random seeds over time, but allow a seed to be easily replayed using the RunWithSeed function --- tests/simulation/raft.cpp | 58 +++++++++++++++++++++++---------------- 1 file changed, 34 insertions(+), 24 deletions(-) diff --git a/tests/simulation/raft.cpp b/tests/simulation/raft.cpp index 9e2583814..648814667 100644 --- a/tests/simulation/raft.cpp +++ b/tests/simulation/raft.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include #include @@ -242,39 +243,48 @@ std::pair RunSimulation(SimulatorConf return std::make_pair(simulator.Stats(), cli_io.ResponseLatencies()); } -int main() { - spdlog::cfg::load_env_levels(); - - int n_tests = 50; - +void RunWithSeed(uint64_t seed) { SimulatorConfig config{ .drop_percent = 5, .perform_timeouts = true, .scramble_messages = true, - .rng_seed = 0, + .rng_seed = seed, .start_time = Time::min() + std::chrono::microseconds{256 * 1024}, .abort_time = Time::max(), }; + spdlog::info("========================== NEW SIMULATION, replay with RunWithSeed({}) ==========================", + seed); + spdlog::info("\tTime\t\tTerm\tPort\tRole\t\tMessage\n"); + auto [sim_stats_1, latency_stats_1] = RunSimulation(config); + + spdlog::info("========================== NEW SIMULATION, replay with RunWithSeed({}) ==========================", + seed); + spdlog::info("\tTime\t\tTerm\tPort\tRole\t\tMessage\n"); + auto [sim_stats_2, latency_stats_2] = RunSimulation(config); + + if (sim_stats_1 != sim_stats_2 || latency_stats_1 != latency_stats_2) { + spdlog::error("simulator stats diverged across runs for test rng_seed: {}", seed); + spdlog::error("run 1 simulator stats: {}", sim_stats_1); + spdlog::error("run 2 simulator stats: {}", sim_stats_2); + spdlog::error("run 1 latency:\n{}", latency_stats_1.SummaryTable()); + spdlog::error("run 2 latency:\n{}", latency_stats_2.SummaryTable()); + std::terminate(); + } +} + +int main() { + spdlog::cfg::load_env_levels(); + + std::random_device random_device; + std::mt19937 generator(random_device()); + std::uniform_int_distribution<> distribution; + + int n_tests = 50; + for (int i = 0; i < n_tests; i++) { - spdlog::info("\tTime\t\tTerm\tPort\tRole\t\tMessage\n"); - - // this is vital to cause tests to behave differently across runs - config.rng_seed = i; - - spdlog::info("========================== NEW SIMULATION SEED {} ==========================", i); - auto [sim_stats_1, latency_stats_1] = RunSimulation(config); - spdlog::info("========================== NEW SIMULATION SEED {} ==========================", i); - auto [sim_stats_2, latency_stats_2] = RunSimulation(config); - - if (sim_stats_1 != sim_stats_2 || latency_stats_1 != latency_stats_2) { - spdlog::error("simulator stats diverged across runs for test rng_seed {}", i); - spdlog::error("run 1 simulator stats: {}", sim_stats_1); - spdlog::error("run 2 simulator stats: {}", sim_stats_2); - spdlog::error("run 1 latency:\n{}", latency_stats_1.SummaryTable()); - spdlog::error("run 2 latency:\n{}", latency_stats_2.SummaryTable()); - std::terminate(); - } + uint64_t seed = distribution(generator); + RunWithSeed(seed); } spdlog::info("passed {} tests!", n_tests); From ce45a548c773d19caf0081b5cb2b365c23048ea3 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Fri, 18 Nov 2022 16:42:18 +0000 Subject: [PATCH 14/26] Significantly improve the determinism of the coordinator, UUID generation, the machine manager, the shard manager, and the cluster property test --- src/coordinator/coordinator_worker.hpp | 32 +++++++++++------ src/coordinator/shard_map.cpp | 28 +++++++++++++-- src/io/address.hpp | 19 ++++++++++ src/io/transport.hpp | 9 ++++- src/machine_manager/machine_config.hpp | 1 + src/machine_manager/machine_manager.hpp | 41 ++++++++++++++++++---- src/query/v2/interpreter.cpp | 6 +++- src/storage/v3/shard_manager.hpp | 6 ++++ src/storage/v3/shard_worker.hpp | 32 +++++++++++------ tests/simulation/cluster_property_test.cpp | 20 ++++++++--- tests/simulation/test_cluster.hpp | 23 +++++++----- 11 files changed, 173 insertions(+), 44 deletions(-) diff --git a/src/coordinator/coordinator_worker.hpp b/src/coordinator/coordinator_worker.hpp index 46bc546cd..f77d9a263 100644 --- a/src/coordinator/coordinator_worker.hpp +++ b/src/coordinator/coordinator_worker.hpp @@ -71,6 +71,9 @@ struct QueueInner { // starvation by sometimes randomizing priorities, rather than following a strict // prioritization. std::deque queue; + + uint64_t submitted = 0; + uint64_t calls_to_pop = 0; }; /// There are two reasons to implement our own Queue instead of using @@ -86,6 +89,8 @@ class Queue { MG_ASSERT(inner_.use_count() > 0); std::unique_lock lock(inner_->mu); + inner_->submitted++; + inner_->queue.emplace_back(std::move(message)); } // lock dropped before notifying condition variable @@ -96,6 +101,9 @@ class Queue { MG_ASSERT(inner_.use_count() > 0); std::unique_lock lock(inner_->mu); + inner_->calls_to_pop++; + inner_->cv.notify_all(); + while (inner_->queue.empty()) { inner_->cv.wait(lock); } @@ -105,6 +113,15 @@ class Queue { return message; } + + void BlockOnQuiescence() const { + MG_ASSERT(inner_.use_count() > 0); + std::unique_lock lock(inner_->mu); + + while (inner_->calls_to_pop <= inner_->submitted) { + inner_->cv.wait(lock); + } + } }; /// A CoordinatorWorker owns Raft instances. receives messages from the MachineManager. @@ -129,9 +146,7 @@ class CoordinatorWorker { public: CoordinatorWorker(io::Io io, Queue queue, Coordinator coordinator) - : io_(std::move(io)), - queue_(std::move(queue)), - coordinator_{std::move(io_.ForkLocal()), {}, std::move(coordinator)} {} + : io_(std::move(io)), queue_(std::move(queue)), coordinator_{std::move(io_), {}, std::move(coordinator)} {} CoordinatorWorker(CoordinatorWorker &&) noexcept = default; CoordinatorWorker &operator=(CoordinatorWorker &&) noexcept = default; @@ -140,15 +155,12 @@ class CoordinatorWorker { ~CoordinatorWorker() = default; void Run() { - while (true) { + bool should_continue = true; + while (should_continue) { Message message = queue_.Pop(); - const bool should_continue = std::visit( - [this](auto &&msg) { return this->Process(std::forward(msg)); }, std::move(message)); - - if (!should_continue) { - return; - } + should_continue = std::visit([this](auto &&msg) { return this->Process(std::forward(msg)); }, + std::move(message)); } } }; diff --git a/src/coordinator/shard_map.cpp b/src/coordinator/shard_map.cpp index d0b2cf9ad..687228890 100644 --- a/src/coordinator/shard_map.cpp +++ b/src/coordinator/shard_map.cpp @@ -228,7 +228,7 @@ Hlc ShardMap::IncrementShardMapVersion() noexcept { return shard_map_version; } -// TODO(antaljanosbenjamin) use a single map for all name id +// TODO(antaljanosbenjamin) use a single map for all name id // mapping and a single counter to maintain the next id std::unordered_map ShardMap::IdToNames() { std::unordered_map id_to_names; @@ -248,6 +248,25 @@ std::unordered_map ShardMap::IdToNames() { Hlc ShardMap::GetHlc() const noexcept { return shard_map_version; } +boost::uuids::uuid NewShardUuid(uint64_t shard_id) { + return boost::uuids::uuid{0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + static_cast(shard_id >> 56), + static_cast(shard_id >> 48), + static_cast(shard_id >> 40), + static_cast(shard_id >> 32), + static_cast(shard_id >> 24), + static_cast(shard_id >> 16), + static_cast(shard_id >> 8), + static_cast(shard_id)}; +} + std::vector ShardMap::AssignShards(Address storage_manager, std::set initialized) { std::vector ret{}; @@ -268,6 +287,7 @@ std::vector ShardMap::AssignShards(Address storage_manager, if (initialized.contains(aas.address.unique_id)) { machine_contains_shard = true; if (aas.status != Status::CONSENSUS_PARTICIPANT) { + mutated = true; spdlog::info("marking shard as full consensus participant: {}", aas.address.unique_id); aas.status = Status::CONSENSUS_PARTICIPANT; } @@ -292,10 +312,13 @@ std::vector ShardMap::AssignShards(Address storage_manager, } if (!machine_contains_shard && shard.size() < label_space.replication_factor) { + // increment version for each new uuid for deterministic creation + IncrementShardMapVersion(); + Address address = storage_manager; // TODO(tyler) use deterministic UUID so that coordinators don't diverge here - address.unique_id = boost::uuids::uuid{boost::uuids::random_generator()()}, + address.unique_id = NewShardUuid(shard_map_version.logical_id); spdlog::info("assigning shard manager to shard"); @@ -383,6 +406,7 @@ std::optional ShardMap::InitializeNewLabel(std::string label_name, std: void ShardMap::AddServer(Address server_address) { // Find a random place for the server to plug in } + std::optional ShardMap::GetLabelId(const std::string &label) const { if (const auto it = labels.find(label); it != labels.end()) { return it->second; diff --git a/src/io/address.hpp b/src/io/address.hpp index 3bb518b50..4e5cb7627 100644 --- a/src/io/address.hpp +++ b/src/io/address.hpp @@ -68,12 +68,31 @@ struct Address { }; } + // NB: don't use this in test code because it is non-deterministic static Address UniqueLocalAddress() { return Address{ .unique_id = boost::uuids::uuid{boost::uuids::random_generator()()}, }; } + /// `Coordinator`s have constant UUIDs because there is at most one per ip/port pair. + Address ForkLocalCoordinator() { + return Address{ + .unique_id = boost::uuids::uuid{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + .last_known_ip = last_known_ip, + .last_known_port = last_known_port, + }; + } + + /// `ShardManager`s have constant UUIDs because there is at most one per ip/port pair. + Address ForkLocalShardManager() { + return Address{ + .unique_id = boost::uuids::uuid{2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + .last_known_ip = last_known_ip, + .last_known_port = last_known_port, + }; + } + /// Returns a new ID with the same IP and port but a unique UUID. Address ForkUniqueAddress() { return Address{ diff --git a/src/io/transport.hpp b/src/io/transport.hpp index 2abf10af2..4994cb436 100644 --- a/src/io/transport.hpp +++ b/src/io/transport.hpp @@ -137,7 +137,14 @@ class Io { Address GetAddress() { return address_; } void SetAddress(Address address) { address_ = address; } - Io ForkLocal() { return Io(implementation_, address_.ForkUniqueAddress()); } + Io ForkLocal(boost::uuids::uuid uuid) { + Address new_address{ + .unique_id = uuid, + .last_known_ip = address_.last_known_ip, + .last_known_port = address_.last_known_port, + }; + return Io(implementation_, new_address); + } LatencyHistogramSummaries ResponseLatencies() { return implementation_.ResponseLatencies(); } }; diff --git a/src/machine_manager/machine_config.hpp b/src/machine_manager/machine_config.hpp index 52711642b..8e8f503d7 100644 --- a/src/machine_manager/machine_config.hpp +++ b/src/machine_manager/machine_config.hpp @@ -42,6 +42,7 @@ struct MachineConfig { boost::asio::ip::address listen_ip; uint16_t listen_port; size_t shard_worker_threads = std::max(static_cast(1), std::thread::hardware_concurrency()); + bool sync_message_handling = false; }; } // namespace memgraph::machine_manager diff --git a/src/machine_manager/machine_manager.hpp b/src/machine_manager/machine_manager.hpp index f9ea8ff2a..04d5a3444 100644 --- a/src/machine_manager/machine_manager.hpp +++ b/src/machine_manager/machine_manager.hpp @@ -78,10 +78,10 @@ class MachineManager { MachineManager(io::Io io, MachineConfig config, Coordinator coordinator) : io_(io), config_(config), - coordinator_address_(io.GetAddress().ForkUniqueAddress()), - shard_manager_{io.ForkLocal(), config.shard_worker_threads, coordinator_address_} { - auto coordinator_io = io.ForkLocal(); - coordinator_io.SetAddress(coordinator_address_); + coordinator_address_(io.GetAddress().ForkLocalCoordinator()), + shard_manager_{io.ForkLocal(io.GetAddress().ForkLocalShardManager().unique_id), config.shard_worker_threads, + coordinator_address_} { + auto coordinator_io = io.ForkLocal(coordinator_address_.unique_id); CoordinatorWorker coordinator_worker{coordinator_io, coordinator_queue_, coordinator}; coordinator_handle_ = std::jthread([coordinator = std::move(coordinator_worker)]() mutable { coordinator.Run(); }); } @@ -101,11 +101,23 @@ class MachineManager { Address CoordinatorAddress() { return coordinator_address_; } void Run() { - while (!io_.ShouldShutDown()) { + while (true) { + MaybeBlockOnSyncHandling(); + + if (io_.ShouldShutDown()) { + break; + } + const auto now = io_.Now(); + uint64_t now_us = now.time_since_epoch().count(); + uint64_t next_us = next_cron_.time_since_epoch().count(); + if (now >= next_cron_) { + spdlog::info("now {} >= next_cron_ {}", now_us, next_us); next_cron_ = Cron(); + } else { + spdlog::info("now {} < next_cron_ {}", now_us, next_us); } Duration receive_timeout = std::max(next_cron_, now) - now; @@ -194,10 +206,27 @@ class MachineManager { } private: + // This method exists for controlling concurrency + // during deterministic simulation testing. + void MaybeBlockOnSyncHandling() { + if (!config_.sync_message_handling) { + return; + } + + // block on coordinator + coordinator_queue_.BlockOnQuiescence(); + + // block on shards + shard_manager_.BlockOnQuiescence(); + } + Time Cron() { spdlog::info("running MachineManager::Cron, address {}", io_.GetAddress().ToString()); coordinator_queue_.Push(coordinator::coordinator_worker::Cron{}); - return shard_manager_.Cron(); + MaybeBlockOnSyncHandling(); + Time ret = shard_manager_.Cron(); + MaybeBlockOnSyncHandling(); + return ret; } }; diff --git a/src/query/v2/interpreter.cpp b/src/query/v2/interpreter.cpp index c835c3d27..d7e5a9bb6 100644 --- a/src/query/v2/interpreter.cpp +++ b/src/query/v2/interpreter.cpp @@ -800,7 +800,11 @@ InterpreterContext::InterpreterContext(storage::v3::Shard *db, const Interpreter Interpreter::Interpreter(InterpreterContext *interpreter_context) : interpreter_context_(interpreter_context) { MG_ASSERT(interpreter_context_, "Interpreter context must not be NULL"); - auto query_io = interpreter_context_->io.ForkLocal(); + + // TODO(tyler) make this deterministic so that it can be tested. + auto random_uuid = boost::uuids::uuid{boost::uuids::random_generator()()}; + auto query_io = interpreter_context_->io.ForkLocal(random_uuid); + shard_request_manager_ = std::make_unique>( coordinator::CoordinatorClient( query_io, interpreter_context_->coordinator_address, std::vector{interpreter_context_->coordinator_address}), diff --git a/src/storage/v3/shard_manager.hpp b/src/storage/v3/shard_manager.hpp index a3dc8b9b6..74ec8d2d1 100644 --- a/src/storage/v3/shard_manager.hpp +++ b/src/storage/v3/shard_manager.hpp @@ -190,6 +190,12 @@ class ShardManager { }); } + void BlockOnQuiescence() { + for (const auto &worker : workers_) { + worker.BlockOnQuiescence(); + } + } + private: io::Io io_; std::vector workers_; diff --git a/src/storage/v3/shard_worker.hpp b/src/storage/v3/shard_worker.hpp index 46e02e6cc..547aa0a6f 100644 --- a/src/storage/v3/shard_worker.hpp +++ b/src/storage/v3/shard_worker.hpp @@ -80,6 +80,9 @@ struct QueueInner { // starvation by sometimes randomizing priorities, rather than following a strict // prioritization. std::deque queue; + + uint64_t submitted = 0; + uint64_t calls_to_pop = 0; }; /// There are two reasons to implement our own Queue instead of using @@ -95,6 +98,8 @@ class Queue { MG_ASSERT(inner_.use_count() > 0); std::unique_lock lock(inner_->mu); + inner_->submitted++; + inner_->queue.emplace_back(std::forward(message)); } // lock dropped before notifying condition variable @@ -105,6 +110,9 @@ class Queue { MG_ASSERT(inner_.use_count() > 0); std::unique_lock lock(inner_->mu); + inner_->calls_to_pop++; + inner_->cv.notify_all(); + while (inner_->queue.empty()) { inner_->cv.wait(lock); } @@ -114,6 +122,15 @@ class Queue { return message; } + + void BlockOnQuiescence() const { + MG_ASSERT(inner_.use_count() > 0); + std::unique_lock lock(inner_->mu); + + while (inner_->calls_to_pop <= inner_->submitted) { + inner_->cv.wait(lock); + } + } }; /// A ShardWorker owns Raft instances. receives messages from the ShardManager. @@ -122,7 +139,6 @@ class ShardWorker { io::Io io_; Queue queue_; std::priority_queue, std::vector>, std::greater<>> cron_schedule_; - Time next_cron_ = Time::min(); std::map> rsm_map_; bool Process(ShutDown && /* shut_down */) { return false; } @@ -175,10 +191,7 @@ class ShardWorker { return; } - auto rsm_io = io_.ForkLocal(); - auto io_addr = rsm_io.GetAddress(); - io_addr.unique_id = to_init.uuid; - rsm_io.SetAddress(io_addr); + auto rsm_io = io_.ForkLocal(to_init.uuid); // TODO(tyler) get peers from Coordinator in HeartbeatResponse std::vector
rsm_peers = {}; @@ -208,15 +221,12 @@ class ShardWorker { ~ShardWorker() = default; void Run() { - while (true) { + bool should_continue = true; + while (should_continue) { Message message = queue_.Pop(); - const bool should_continue = + should_continue = std::visit([&](auto &&msg) { return Process(std::forward(msg)); }, std::move(message)); - - if (!should_continue) { - return; - } } } }; diff --git a/tests/simulation/cluster_property_test.cpp b/tests/simulation/cluster_property_test.cpp index 48327be5b..e74bad697 100644 --- a/tests/simulation/cluster_property_test.cpp +++ b/tests/simulation/cluster_property_test.cpp @@ -33,21 +33,31 @@ using io::Time; using io::simulator::SimulatorConfig; using storage::v3::kMaximumCronInterval; -RC_GTEST_PROP(RandomClusterConfig, HappyPath, (ClusterConfig cluster_config, NonEmptyOpVec ops)) { - // TODO(tyler) set abort_time to something more restrictive than Time::max() - +RC_GTEST_PROP(RandomClusterConfig, HappyPath, (ClusterConfig cluster_config, NonEmptyOpVec ops, uint64_t rng_seed)) { spdlog::cfg::load_env_levels(); SimulatorConfig sim_config{ .drop_percent = 0, .perform_timeouts = false, .scramble_messages = true, - .rng_seed = 0, + .rng_seed = rng_seed, .start_time = Time::min(), + // TODO(tyler) set abort_time to something more restrictive than Time::max() .abort_time = Time::max(), }; - RunClusterSimulation(sim_config, cluster_config, ops.ops); + auto [sim_stats_1, latency_stats_1] = RunClusterSimulation(sim_config, cluster_config, ops.ops); + auto [sim_stats_2, latency_stats_2] = RunClusterSimulation(sim_config, cluster_config, ops.ops); + + if (latency_stats_1 != latency_stats_2) { + spdlog::error("simulator stats diverged across runs"); + spdlog::error("run 1 simulator stats: {}", sim_stats_1); + spdlog::error("run 2 simulator stats: {}", sim_stats_2); + spdlog::error("run 1 latency:\n{}", latency_stats_1.SummaryTable()); + spdlog::error("run 2 latency:\n{}", latency_stats_2.SummaryTable()); + RC_ASSERT(latency_stats_1 == latency_stats_2); + RC_ASSERT(sim_stats_1 == sim_stats_2); + } } } // namespace memgraph::tests::simulation diff --git a/tests/simulation/test_cluster.hpp b/tests/simulation/test_cluster.hpp index 6a32a391d..0ac7ca1da 100644 --- a/tests/simulation/test_cluster.hpp +++ b/tests/simulation/test_cluster.hpp @@ -24,6 +24,7 @@ #include "coordinator/shard_map.hpp" #include "generated_operations.hpp" #include "io/address.hpp" +#include "io/message_histogram_collector.hpp" #include "io/simulator/simulator.hpp" #include "io/simulator/simulator_config.hpp" #include "io/simulator/simulator_transport.hpp" @@ -57,6 +58,7 @@ using io::simulator::SimulatorStats; using io::simulator::SimulatorTransport; using machine_manager::MachineConfig; using machine_manager::MachineManager; +using memgraph::io::LatencyHistogramSummaries; using msgs::ReadRequests; using msgs::ReadResponses; using msgs::WriteRequests; @@ -75,6 +77,8 @@ MachineManager MkMm(Simulator &simulator, std::vector io = simulator.Register(addr); @@ -210,17 +214,19 @@ struct DetachIfDropped { } }; -void RunClusterSimulation(const SimulatorConfig &sim_config, const ClusterConfig &cluster_config, - const std::vector &ops) { +std::pair RunClusterSimulation(const SimulatorConfig &sim_config, + const ClusterConfig &cluster_config, + const std::vector &ops) { spdlog::info("========================== NEW SIMULATION =========================="); auto simulator = Simulator(sim_config); - auto cli_addr = Address::TestAddress(1); - auto machine_1_addr = cli_addr.ForkUniqueAddress(); + auto machine_1_addr = Address::TestAddress(1); + auto cli_addr = Address::TestAddress(2); + auto cli_addr_2 = Address::TestAddress(3); Io cli_io = simulator.Register(cli_addr); - Io cli_io_2 = simulator.Register(Address::TestAddress(2)); + Io cli_io_2 = simulator.Register(cli_addr_2); auto coordinator_addresses = std::vector{ machine_1_addr, @@ -232,6 +238,7 @@ void RunClusterSimulation(const SimulatorConfig &sim_config, const ClusterConfig Address coordinator_address = mm_1.CoordinatorAddress(); auto mm_thread_1 = std::jthread(RunMachine, std::move(mm_1)); + simulator.IncrementServerCountAndWaitForQuiescentState(machine_1_addr); auto detach_on_error = DetachIfDropped{.handle = mm_thread_1}; @@ -257,6 +264,8 @@ void RunClusterSimulation(const SimulatorConfig &sim_config, const ClusterConfig simulator.ShutDown(); + mm_thread_1.join(); + SimulatorStats stats = simulator.Stats(); spdlog::info("total messages: {}", stats.total_messages); @@ -268,10 +277,8 @@ void RunClusterSimulation(const SimulatorConfig &sim_config, const ClusterConfig auto histo = cli_io_2.ResponseLatencies(); - using memgraph::utils::print_helpers::operator<<; - std::cout << "response latencies: " << histo << std::endl; - spdlog::info("========================== SUCCESS :) =========================="); + return std::make_pair(stats, histo); } } // namespace memgraph::tests::simulation From 04420a84c76bb63c6846b3259b35fbfb81e8a9ca Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Fri, 18 Nov 2022 16:54:38 +0000 Subject: [PATCH 15/26] Fix incorrect usage of IncrementServerCountAndWaitForQuiescentState in the shard_rsm.cpp simulation test --- tests/simulation/shard_rsm.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/simulation/shard_rsm.cpp b/tests/simulation/shard_rsm.cpp index 64d0a0861..230bd9222 100644 --- a/tests/simulation/shard_rsm.cpp +++ b/tests/simulation/shard_rsm.cpp @@ -1115,11 +1115,12 @@ int TestMessages() { ConcreteShardRsm shard_server3(std::move(shard_server_io_3), address_for_3, ShardRsm(std::move(shard_ptr3))); auto server_thread1 = std::jthread([&shard_server1]() { shard_server1.Run(); }); - auto server_thread2 = std::jthread([&shard_server2]() { shard_server2.Run(); }); - auto server_thread3 = std::jthread([&shard_server3]() { shard_server3.Run(); }); - simulator.IncrementServerCountAndWaitForQuiescentState(shard_server_1_address); + + auto server_thread2 = std::jthread([&shard_server2]() { shard_server2.Run(); }); simulator.IncrementServerCountAndWaitForQuiescentState(shard_server_2_address); + + auto server_thread3 = std::jthread([&shard_server3]() { shard_server3.Run(); }); simulator.IncrementServerCountAndWaitForQuiescentState(shard_server_3_address); std::cout << "Beginning test after servers have become quiescent." << std::endl; From 45badbe21fd51ea4463b3c5af1c6c18b83da16d5 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Fri, 18 Nov 2022 17:22:50 +0000 Subject: [PATCH 16/26] Use unsigned integer literals for bit shifting in the NewShardUuid function --- src/coordinator/shard_map.cpp | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/coordinator/shard_map.cpp b/src/coordinator/shard_map.cpp index 687228890..918dd4b8c 100644 --- a/src/coordinator/shard_map.cpp +++ b/src/coordinator/shard_map.cpp @@ -9,6 +9,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. +#include #include #include #include @@ -257,13 +258,13 @@ boost::uuids::uuid NewShardUuid(uint64_t shard_id) { 0, 0, 0, - static_cast(shard_id >> 56), - static_cast(shard_id >> 48), - static_cast(shard_id >> 40), - static_cast(shard_id >> 32), - static_cast(shard_id >> 24), - static_cast(shard_id >> 16), - static_cast(shard_id >> 8), + static_cast(shard_id >> UINT8_C(56)), + static_cast(shard_id >> UINT8_C(48)), + static_cast(shard_id >> UINT8_C(40)), + static_cast(shard_id >> UINT8_C(32)), + static_cast(shard_id >> UINT8_C(24)), + static_cast(shard_id >> UINT8_C(16)), + static_cast(shard_id >> UINT8_C(8)), static_cast(shard_id)}; } From e43f4e218180fd4a9a532faa6c5f5df566f2b3bc Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Mon, 21 Nov 2022 10:08:42 +0000 Subject: [PATCH 17/26] Sort simulator in_flight_ messages based on a stable sort of the sender address --- src/io/simulator/simulator_handle.cpp | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/io/simulator/simulator_handle.cpp b/src/io/simulator/simulator_handle.cpp index 53b969712..eb34bc201 100644 --- a/src/io/simulator/simulator_handle.cpp +++ b/src/io/simulator/simulator_handle.cpp @@ -63,6 +63,17 @@ void SimulatorHandle::IncrementServerCountAndWaitForQuiescentState(Address addre } } +bool SortInFlight(const std::pair &lhs, const std::pair &rhs) { + // NB: never sort based on the request ID etc... + // This should only be used from std::stable_sort + // because by comparing on the from_address alone, + // we expect the sender ordering to remain + // deterministic. + const auto &[addr_1, om_1] = lhs; + const auto &[addr_2, om_2] = rhs; + return om_1.from_address < om_2.from_address; +} + bool SimulatorHandle::MaybeTickSimulator() { std::unique_lock lock(mu_); @@ -107,6 +118,8 @@ bool SimulatorHandle::MaybeTickSimulator() { return true; } + std::stable_sort(in_flight_.begin(), in_flight_.end(), SortInFlight); + if (config_.scramble_messages && in_flight_.size() > 1) { // scramble messages std::uniform_int_distribution swap_distrib(0, in_flight_.size() - 1); From 71dcba331ef3c39e216aa2358a996f9905647ac1 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Mon, 21 Nov 2022 10:10:45 +0000 Subject: [PATCH 18/26] Increment simulator time by up to 30ms in ticks --- src/io/simulator/simulator_handle.hpp | 2 +- tests/simulation/raft.cpp | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/io/simulator/simulator_handle.hpp b/src/io/simulator/simulator_handle.hpp index 1ef0b3dfb..2fbc74c4d 100644 --- a/src/io/simulator/simulator_handle.hpp +++ b/src/io/simulator/simulator_handle.hpp @@ -52,7 +52,7 @@ class SimulatorHandle { std::set
blocked_on_receive_; std::set
server_addresses_; std::mt19937 rng_; - std::uniform_int_distribution time_distrib_{0, 1000}; + std::uniform_int_distribution time_distrib_{0, 30000}; std::uniform_int_distribution drop_distrib_{0, 99}; SimulatorConfig config_; MessageHistogramCollector histograms_; diff --git a/tests/simulation/raft.cpp b/tests/simulation/raft.cpp index 648814667..6e7b345f9 100644 --- a/tests/simulation/raft.cpp +++ b/tests/simulation/raft.cpp @@ -253,8 +253,8 @@ void RunWithSeed(uint64_t seed) { .abort_time = Time::max(), }; - spdlog::info("========================== NEW SIMULATION, replay with RunWithSeed({}) ==========================", - seed); + spdlog::error("========================== NEW SIMULATION, replay with RunWithSeed({}) ==========================", + seed); spdlog::info("\tTime\t\tTerm\tPort\tRole\t\tMessage\n"); auto [sim_stats_1, latency_stats_1] = RunSimulation(config); From 0f66ae31dd831f222984588efc3e6ecf520134f3 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Mon, 21 Nov 2022 11:11:39 +0000 Subject: [PATCH 19/26] Use explicit unsigned integer in right shift operation --- src/coordinator/shard_map.cpp | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/src/coordinator/shard_map.cpp b/src/coordinator/shard_map.cpp index 918dd4b8c..4839c9efc 100644 --- a/src/coordinator/shard_map.cpp +++ b/src/coordinator/shard_map.cpp @@ -9,7 +9,6 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -#include #include #include #include @@ -258,13 +257,13 @@ boost::uuids::uuid NewShardUuid(uint64_t shard_id) { 0, 0, 0, - static_cast(shard_id >> UINT8_C(56)), - static_cast(shard_id >> UINT8_C(48)), - static_cast(shard_id >> UINT8_C(40)), - static_cast(shard_id >> UINT8_C(32)), - static_cast(shard_id >> UINT8_C(24)), - static_cast(shard_id >> UINT8_C(16)), - static_cast(shard_id >> UINT8_C(8)), + static_cast(shard_id >> 56u), + static_cast(shard_id >> 48u), + static_cast(shard_id >> 40u), + static_cast(shard_id >> 32u), + static_cast(shard_id >> 24u), + static_cast(shard_id >> 16u), + static_cast(shard_id >> 8u), static_cast(shard_id)}; } From 081c3e5bed6e84217df6c4cb32d101cb56ba46bd Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Mon, 21 Nov 2022 13:16:35 +0000 Subject: [PATCH 20/26] Capitalize unsigned integer literal --- src/coordinator/shard_map.cpp | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/coordinator/shard_map.cpp b/src/coordinator/shard_map.cpp index 4839c9efc..ea167db87 100644 --- a/src/coordinator/shard_map.cpp +++ b/src/coordinator/shard_map.cpp @@ -257,13 +257,13 @@ boost::uuids::uuid NewShardUuid(uint64_t shard_id) { 0, 0, 0, - static_cast(shard_id >> 56u), - static_cast(shard_id >> 48u), - static_cast(shard_id >> 40u), - static_cast(shard_id >> 32u), - static_cast(shard_id >> 24u), - static_cast(shard_id >> 16u), - static_cast(shard_id >> 8u), + static_cast(shard_id >> 56U), + static_cast(shard_id >> 48U), + static_cast(shard_id >> 40U), + static_cast(shard_id >> 32U), + static_cast(shard_id >> 24U), + static_cast(shard_id >> 16U), + static_cast(shard_id >> 8U), static_cast(shard_id)}; } From 66f39f2681ab693ef3b1975157b8872bc59685fc Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Tue, 22 Nov 2022 08:55:48 +0000 Subject: [PATCH 21/26] Add elapsed time to the SimulatorStats --- src/io/simulator/simulator_handle.cpp | 1 + src/io/simulator/simulator_stats.hpp | 9 +++++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/io/simulator/simulator_handle.cpp b/src/io/simulator/simulator_handle.cpp index eb34bc201..a521943f8 100644 --- a/src/io/simulator/simulator_handle.cpp +++ b/src/io/simulator/simulator_handle.cpp @@ -101,6 +101,7 @@ bool SimulatorHandle::MaybeTickSimulator() { const Duration clock_advance = std::chrono::microseconds{time_distrib_(rng_)}; cluster_wide_time_microseconds_ += clock_advance; + stats_.elapsed_time = cluster_wide_time_microseconds_ - config_.start_time; spdlog::info("simulator progressing: clock advanced by {}", clock_advance.count()); diff --git a/src/io/simulator/simulator_stats.hpp b/src/io/simulator/simulator_stats.hpp index ccc5ec1de..972d09bf8 100644 --- a/src/io/simulator/simulator_stats.hpp +++ b/src/io/simulator/simulator_stats.hpp @@ -15,6 +15,8 @@ #include +#include "io/time.hpp" + namespace memgraph::io::simulator { struct SimulatorStats { uint64_t total_messages = 0; @@ -23,15 +25,18 @@ struct SimulatorStats { uint64_t total_requests = 0; uint64_t total_responses = 0; uint64_t simulator_ticks = 0; + Duration elapsed_time; friend bool operator==(const SimulatorStats & /* lhs */, const SimulatorStats & /* rhs */) = default; friend std::ostream &operator<<(std::ostream &in, const SimulatorStats &stats) { + auto elapsed_ms = stats.elapsed_time.count() / 1000; + std::string formated = fmt::format( "SimulatorStats {{ total_messages: {}, dropped_messages: {}, timed_out_requests: {}, total_requests: {}, " - "total_responses: {}, simulator_ticks: {} }}", + "total_responses: {}, simulator_ticks: {}, elapsed_time: {}ms }}", stats.total_messages, stats.dropped_messages, stats.timed_out_requests, stats.total_requests, - stats.total_responses, stats.simulator_ticks); + stats.total_responses, stats.simulator_ticks, elapsed_ms); in << formated; From 0b19b62b122678b749db5d92a7070e0eb28b869f Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Tue, 22 Nov 2022 11:25:24 +0000 Subject: [PATCH 22/26] Set the abort_time for raft tests to 1 simulated hour --- tests/simulation/raft.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/simulation/raft.cpp b/tests/simulation/raft.cpp index 6e7b345f9..f2dd82e9d 100644 --- a/tests/simulation/raft.cpp +++ b/tests/simulation/raft.cpp @@ -250,7 +250,7 @@ void RunWithSeed(uint64_t seed) { .scramble_messages = true, .rng_seed = seed, .start_time = Time::min() + std::chrono::microseconds{256 * 1024}, - .abort_time = Time::max(), + .abort_time = Time::min() + std::chrono::seconds{3600}, }; spdlog::error("========================== NEW SIMULATION, replay with RunWithSeed({}) ==========================", From c8c72de6ac6d8e8eae988a61c825d9c4c054f175 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Tue, 22 Nov 2022 11:30:24 +0000 Subject: [PATCH 23/26] Use duration_cast to ensure that we are retrieving milliseconds inside SimulatorStats::operator<< --- src/io/simulator/simulator_stats.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/io/simulator/simulator_stats.hpp b/src/io/simulator/simulator_stats.hpp index 972d09bf8..b02dc9f2a 100644 --- a/src/io/simulator/simulator_stats.hpp +++ b/src/io/simulator/simulator_stats.hpp @@ -30,7 +30,7 @@ struct SimulatorStats { friend bool operator==(const SimulatorStats & /* lhs */, const SimulatorStats & /* rhs */) = default; friend std::ostream &operator<<(std::ostream &in, const SimulatorStats &stats) { - auto elapsed_ms = stats.elapsed_time.count() / 1000; + auto elapsed_ms = std::chrono::duration_cast(stats.elapsed_time).count() / 1000; std::string formated = fmt::format( "SimulatorStats {{ total_messages: {}, dropped_messages: {}, timed_out_requests: {}, total_requests: {}, " From c0a103e851c148b478a2eeb0a221e70089684fbc Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Tue, 22 Nov 2022 16:00:06 +0000 Subject: [PATCH 24/26] Do not advance the clock with every message, as this prevents messages of a certain request depth from ever completing --- src/io/simulator/simulator_handle.cpp | 19 +++++++++++++------ src/io/simulator/simulator_stats.hpp | 2 +- tests/simulation/raft.cpp | 1 + 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/src/io/simulator/simulator_handle.cpp b/src/io/simulator/simulator_handle.cpp index a521943f8..f216f3c01 100644 --- a/src/io/simulator/simulator_handle.cpp +++ b/src/io/simulator/simulator_handle.cpp @@ -100,15 +100,22 @@ bool SimulatorHandle::MaybeTickSimulator() { } const Duration clock_advance = std::chrono::microseconds{time_distrib_(rng_)}; - cluster_wide_time_microseconds_ += clock_advance; - stats_.elapsed_time = cluster_wide_time_microseconds_ - config_.start_time; - spdlog::info("simulator progressing: clock advanced by {}", clock_advance.count()); + // We don't always want to advance the clock with every message that we deliver because + // when we advance it for every message, it causes timeouts to occur for any "request path" + // over a certain length. Alternatively, we don't want to simply deliver multiple messages + // in a single simulator tick because that would reduce the amount of concurrent message + // mixing that may naturally occur in production. This approach is to mod the random clock + // advance by a prime number (hopefully avoiding most harmonic effects that would be introduced + // by only advancing the clock by an even amount etc...) and only advancing the clock close to + // half of the time. + if (clock_advance.count() % 97 > 49) { + spdlog::info("simulator progressing: clock advanced by {}", clock_advance.count()); + cluster_wide_time_microseconds_ += clock_advance; + stats_.elapsed_time = cluster_wide_time_microseconds_ - config_.start_time; + } if (cluster_wide_time_microseconds_ >= config_.abort_time) { - if (should_shut_down_) { - return true; - } spdlog::error( "Cluster has executed beyond its configured abort_time, and something may be failing to make progress " "in an expected amount of time."); diff --git a/src/io/simulator/simulator_stats.hpp b/src/io/simulator/simulator_stats.hpp index b02dc9f2a..1ae52ef28 100644 --- a/src/io/simulator/simulator_stats.hpp +++ b/src/io/simulator/simulator_stats.hpp @@ -30,7 +30,7 @@ struct SimulatorStats { friend bool operator==(const SimulatorStats & /* lhs */, const SimulatorStats & /* rhs */) = default; friend std::ostream &operator<<(std::ostream &in, const SimulatorStats &stats) { - auto elapsed_ms = std::chrono::duration_cast(stats.elapsed_time).count() / 1000; + auto elapsed_ms = std::chrono::duration_cast(stats.elapsed_time).count(); std::string formated = fmt::format( "SimulatorStats {{ total_messages: {}, dropped_messages: {}, timed_out_requests: {}, total_requests: {}, " diff --git a/tests/simulation/raft.cpp b/tests/simulation/raft.cpp index f2dd82e9d..d725b0712 100644 --- a/tests/simulation/raft.cpp +++ b/tests/simulation/raft.cpp @@ -185,6 +185,7 @@ std::pair RunSimulation(SimulatorConf auto write_cas_response_result = client.SendWriteRequest(cas_req); if (write_cas_response_result.HasError()) { + spdlog::debug("timed out"); // timed out continue; } From e0086e566620e915f5f69725a05dadd098cd4cd6 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Tue, 22 Nov 2022 16:06:35 +0000 Subject: [PATCH 25/26] Use spdlog::trace instead of info for simulator-related messages --- src/io/simulator/simulator_handle.cpp | 24 ++++++++++++------------ src/io/simulator/simulator_handle.hpp | 10 +++++----- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/src/io/simulator/simulator_handle.cpp b/src/io/simulator/simulator_handle.cpp index f216f3c01..674920d8e 100644 --- a/src/io/simulator/simulator_handle.cpp +++ b/src/io/simulator/simulator_handle.cpp @@ -52,13 +52,13 @@ void SimulatorHandle::IncrementServerCountAndWaitForQuiescentState(Address addre const bool all_servers_blocked = blocked_servers == server_addresses_.size(); if (all_servers_blocked) { - spdlog::info("quiescent state detected - {} out of {} servers now blocked on receive", blocked_servers, - server_addresses_.size()); + spdlog::trace("quiescent state detected - {} out of {} servers now blocked on receive", blocked_servers, + server_addresses_.size()); return; } - spdlog::info("not returning from quiescent because we see {} blocked out of {}", blocked_servers, - server_addresses_.size()); + spdlog::trace("not returning from quiescent because we see {} blocked out of {}", blocked_servers, + server_addresses_.size()); cv_.wait(lock); } } @@ -88,7 +88,7 @@ bool SimulatorHandle::MaybeTickSimulator() { // We allow the simulator to progress the state of the system only // after all servers are blocked on receive. - spdlog::info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ simulator tick ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"); + spdlog::trace("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ simulator tick ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"); stats_.simulator_ticks++; blocked_on_receive_.clear(); cv_.notify_all(); @@ -96,7 +96,7 @@ bool SimulatorHandle::MaybeTickSimulator() { bool timed_anything_out = TimeoutPromisesPastDeadline(); if (timed_anything_out) { - spdlog::info("simulator progressing: timed out a request"); + spdlog::trace("simulator progressing: timed out a request"); } const Duration clock_advance = std::chrono::microseconds{time_distrib_(rng_)}; @@ -110,7 +110,7 @@ bool SimulatorHandle::MaybeTickSimulator() { // by only advancing the clock by an even amount etc...) and only advancing the clock close to // half of the time. if (clock_advance.count() % 97 > 49) { - spdlog::info("simulator progressing: clock advanced by {}", clock_advance.count()); + spdlog::trace("simulator progressing: clock advanced by {}", clock_advance.count()); cluster_wide_time_microseconds_ += clock_advance; stats_.elapsed_time = cluster_wide_time_microseconds_ - config_.start_time; } @@ -157,22 +157,22 @@ bool SimulatorHandle::MaybeTickSimulator() { if (should_drop || normal_timeout) { stats_.timed_out_requests++; dop.promise.TimeOut(); - spdlog::info("simulator timing out request "); + spdlog::trace("simulator timing out request "); } else { stats_.total_responses++; Duration response_latency = cluster_wide_time_microseconds_ - dop.requested_at; auto type_info = opaque_message.type_info; dop.promise.Fill(std::move(opaque_message), response_latency); histograms_.Measure(type_info, response_latency); - spdlog::info("simulator replying to request"); + spdlog::trace("simulator replying to request"); } } else if (should_drop) { // don't add it anywhere, let it drop - spdlog::info("simulator silently dropping request"); + spdlog::trace("simulator silently dropping request"); } else { // add to can_receive_ if not - spdlog::info("simulator adding message to can_receive_ from {} to {}", opaque_message.from_address.last_known_port, - opaque_message.to_address.last_known_port); + spdlog::trace("simulator adding message to can_receive_ from {} to {}", opaque_message.from_address.last_known_port, + opaque_message.to_address.last_known_port); const auto &[om_vec, inserted] = can_receive_.try_emplace(to_address.ToPartialAddress(), std::vector()); om_vec->second.emplace_back(std::move(opaque_message)); diff --git a/src/io/simulator/simulator_handle.hpp b/src/io/simulator/simulator_handle.hpp index 2fbc74c4d..5a5ad1ec0 100644 --- a/src/io/simulator/simulator_handle.hpp +++ b/src/io/simulator/simulator_handle.hpp @@ -64,7 +64,7 @@ class SimulatorHandle { for (auto it = promises_.begin(); it != promises_.end();) { auto &[promise_key, dop] = *it; if (dop.deadline < now && config_.perform_timeouts) { - spdlog::info("timing out request from requester {}.", promise_key.requester_address.ToString()); + spdlog::trace("timing out request from requester {}.", promise_key.requester_address.ToString()); std::move(dop).promise.TimeOut(); it = promises_.erase(it); @@ -106,7 +106,7 @@ class SimulatorHandle { template ResponseFuture SubmitRequest(Address to_address, Address from_address, Request &&request, Duration timeout, std::function &&maybe_tick_simulator) { - spdlog::info("submitting request to {}", to_address.last_known_port); + spdlog::trace("submitting request to {}", to_address.last_known_port); auto type_info = TypeInfoFor(request); auto [future, promise] = memgraph::io::FuturePromisePairWithNotifier>( @@ -173,20 +173,20 @@ class SimulatorHandle { if (!should_shut_down_) { if (!blocked_on_receive_.contains(receiver)) { blocked_on_receive_.emplace(receiver); - spdlog::info("blocking receiver {}", receiver.ToPartialAddress().port); + spdlog::trace("blocking receiver {}", receiver.ToPartialAddress().port); cv_.notify_all(); } cv_.wait(lock); } } - spdlog::info("timing out receiver {}", receiver.ToPartialAddress().port); + spdlog::trace("timing out receiver {}", receiver.ToPartialAddress().port); return TimedOut{}; } template void Send(Address to_address, Address from_address, RequestId request_id, M message) { - spdlog::info("sending message from {} to {}", from_address.last_known_port, to_address.last_known_port); + spdlog::trace("sending message from {} to {}", from_address.last_known_port, to_address.last_known_port); auto type_info = TypeInfoFor(message); { std::unique_lock lock(mu_); From ea533f43fc36bfc13684e519729f2768adbf7758 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Tue, 22 Nov 2022 16:06:57 +0000 Subject: [PATCH 26/26] Print out the simulator seed when we exceed the configured abort_time --- src/io/simulator/simulator_handle.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/io/simulator/simulator_handle.cpp b/src/io/simulator/simulator_handle.cpp index 674920d8e..cc0bd0598 100644 --- a/src/io/simulator/simulator_handle.cpp +++ b/src/io/simulator/simulator_handle.cpp @@ -118,7 +118,8 @@ bool SimulatorHandle::MaybeTickSimulator() { if (cluster_wide_time_microseconds_ >= config_.abort_time) { spdlog::error( "Cluster has executed beyond its configured abort_time, and something may be failing to make progress " - "in an expected amount of time."); + "in an expected amount of time. The SimulatorConfig.rng_seed for this run is {}", + config_.rng_seed); throw utils::BasicException{"Cluster has executed beyond its configured abort_time"}; }