From 05acece20cb3b40b3cc94a05fe7a5aecd1033878 Mon Sep 17 00:00:00 2001 From: Tyler Neely <t@jujit.su> Date: Tue, 7 Feb 2023 16:40:53 +0000 Subject: [PATCH 1/4] Add simulator support for adding a message delay to every sent message --- src/io/message_conversion.hpp | 2 ++ src/io/simulator/simulator_config.hpp | 1 + src/io/simulator/simulator_handle.cpp | 4 ++-- src/io/simulator/simulator_handle.hpp | 21 ++++++++++++++++----- 4 files changed, 21 insertions(+), 7 deletions(-) diff --git a/src/io/message_conversion.hpp b/src/io/message_conversion.hpp index 1463abc06..1ff03a36c 100644 --- a/src/io/message_conversion.hpp +++ b/src/io/message_conversion.hpp @@ -13,6 +13,7 @@ #include <boost/core/demangle.hpp> +#include "io/time.hpp" #include "io/transport.hpp" #include "utils/type_info_ref.hpp" @@ -38,6 +39,7 @@ struct OpaqueMessage { uint64_t request_id; std::any message; utils::TypeInfoRef type_info; + Time deliverable_at; /// Recursively tries to match a specific type from the outer /// variant's parameter pack against the type of the std::any, diff --git a/src/io/simulator/simulator_config.hpp b/src/io/simulator/simulator_config.hpp index 4719488d2..cc77c14c7 100644 --- a/src/io/simulator/simulator_config.hpp +++ b/src/io/simulator/simulator_config.hpp @@ -26,5 +26,6 @@ struct SimulatorConfig { uint64_t rng_seed = 0; Time start_time = Time::min(); Time abort_time = Time::max(); + Duration message_delay = std::chrono::microseconds(100); }; }; // namespace memgraph::io::simulator diff --git a/src/io/simulator/simulator_handle.cpp b/src/io/simulator/simulator_handle.cpp index cc0bd0598..084cf971f 100644 --- a/src/io/simulator/simulator_handle.cpp +++ b/src/io/simulator/simulator_handle.cpp @@ -175,8 +175,8 @@ bool SimulatorHandle::MaybeTickSimulator() { spdlog::trace("simulator adding message to can_receive_ from {} to {}", opaque_message.from_address.last_known_port, opaque_message.to_address.last_known_port); const auto &[om_vec, inserted] = - can_receive_.try_emplace(to_address.ToPartialAddress(), std::vector<OpaqueMessage>()); - om_vec->second.emplace_back(std::move(opaque_message)); + can_receive_.try_emplace(to_address.ToPartialAddress(), std::deque<OpaqueMessage>()); + om_vec->second.emplace_front(std::move(opaque_message)); } return true; diff --git a/src/io/simulator/simulator_handle.hpp b/src/io/simulator/simulator_handle.hpp index 3fd9b4965..03d3e8db5 100644 --- a/src/io/simulator/simulator_handle.hpp +++ b/src/io/simulator/simulator_handle.hpp @@ -46,7 +46,7 @@ class SimulatorHandle { std::map<PromiseKey, DeadlineAndOpaquePromise> promises_; // messages that are sent to servers that may later receive them - std::map<PartialAddress, std::vector<OpaqueMessage>> can_receive_; + std::map<PartialAddress, std::deque<OpaqueMessage>> can_receive_; Time cluster_wide_time_microseconds_; bool should_shut_down_ = false; @@ -131,7 +131,8 @@ class SimulatorHandle { .from_address = from_address, .request_id = request_id, .message = std::move(message), - .type_info = type_info}; + .type_info = type_info, + .deliverable_at = cluster_wide_time_microseconds_ + config_.message_delay}; in_flight_.emplace_back(std::make_pair(to_address, std::move(om))); PromiseKey promise_key{.requester_address = from_address, .request_id = request_id}; @@ -165,8 +166,13 @@ class SimulatorHandle { while (!should_shut_down_ && (cluster_wide_time_microseconds_ < deadline)) { if (can_receive_.contains(partial_address)) { - std::vector<OpaqueMessage> &can_rx = can_receive_.at(partial_address); - if (!can_rx.empty()) { + std::deque<OpaqueMessage> &can_rx = can_receive_.at(partial_address); + + bool contains_items = !can_rx.empty(); + bool can_receive = contains_items && can_rx.back().deliverable_at <= cluster_wide_time_microseconds_; + + if (can_receive) { + spdlog::warn("can receive"); OpaqueMessage message = std::move(can_rx.back()); can_rx.pop_back(); @@ -176,6 +182,10 @@ class SimulatorHandle { MG_ASSERT(m_opt.has_value(), "Wrong message type received compared to the expected type"); return std::move(m_opt).value(); + } else if (contains_items) { + auto count = can_rx.back().deliverable_at.time_since_epoch().count(); + auto now_count = cluster_wide_time_microseconds_.time_since_epoch().count(); + spdlog::warn("can't receive item in the buffer. deliverable_at: {}, now: {}", count, now_count); } } @@ -204,7 +214,8 @@ class SimulatorHandle { .from_address = from_address, .request_id = request_id, .message = std::move(message_any), - .type_info = type_info}; + .type_info = type_info, + .deliverable_at = cluster_wide_time_microseconds_ + config_.message_delay}; in_flight_.emplace_back(std::make_pair(std::move(to_address), std::move(om))); stats_.total_messages++; From 5d89b5525c4c94e858366ade9e9bfa3e896bbb2e Mon Sep 17 00:00:00 2001 From: Tyler Neely <t@jujit.su> Date: Tue, 7 Feb 2023 16:47:26 +0000 Subject: [PATCH 2/4] Reduce log level for skipping receives in the simulator due to artificial latency --- src/io/simulator/simulator_handle.hpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/io/simulator/simulator_handle.hpp b/src/io/simulator/simulator_handle.hpp index 03d3e8db5..0a2d626ba 100644 --- a/src/io/simulator/simulator_handle.hpp +++ b/src/io/simulator/simulator_handle.hpp @@ -185,7 +185,8 @@ class SimulatorHandle { } else if (contains_items) { auto count = can_rx.back().deliverable_at.time_since_epoch().count(); auto now_count = cluster_wide_time_microseconds_.time_since_epoch().count(); - spdlog::warn("can't receive item in the buffer. deliverable_at: {}, now: {}", count, now_count); + spdlog::trace("can't receive message yet due to artificial latency. deliverable_at: {}, now: {}", count, + now_count); } } From 1f16ab6d5610c60c4a242ee1776994c2145c5995 Mon Sep 17 00:00:00 2001 From: Tyler Neely <t@jujit.su> Date: Tue, 7 Feb 2023 16:47:56 +0000 Subject: [PATCH 3/4] Remove unnecessary log warning --- src/io/simulator/simulator_handle.hpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/io/simulator/simulator_handle.hpp b/src/io/simulator/simulator_handle.hpp index 0a2d626ba..806d4487b 100644 --- a/src/io/simulator/simulator_handle.hpp +++ b/src/io/simulator/simulator_handle.hpp @@ -172,7 +172,6 @@ class SimulatorHandle { bool can_receive = contains_items && can_rx.back().deliverable_at <= cluster_wide_time_microseconds_; if (can_receive) { - spdlog::warn("can receive"); OpaqueMessage message = std::move(can_rx.back()); can_rx.pop_back(); From 3a76eafefd513a47b422fbe4b5957295c1198278 Mon Sep 17 00:00:00 2001 From: jbajic <jure.bajic@memgraph.com> Date: Wed, 29 Mar 2023 13:19:02 +0200 Subject: [PATCH 4/4] Fix clang tidy errror --- src/io/simulator/simulator_handle.hpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/io/simulator/simulator_handle.hpp b/src/io/simulator/simulator_handle.hpp index f1befaab5..5dd56460e 100644 --- a/src/io/simulator/simulator_handle.hpp +++ b/src/io/simulator/simulator_handle.hpp @@ -181,7 +181,8 @@ class SimulatorHandle { MG_ASSERT(m_opt.has_value(), "Wrong message type received compared to the expected type"); return std::move(m_opt).value(); - } else if (contains_items) { + } + if (contains_items) { auto count = can_rx.back().deliverable_at.time_since_epoch().count(); auto now_count = cluster_wide_time_microseconds_.time_since_epoch().count(); spdlog::trace("can't receive message yet due to artificial latency. deliverable_at: {}, now: {}", count,