Merge branch 'project-pineapples' into join-in-optional

This commit is contained in:
Jure Bajic 2023-03-29 13:37:48 +02:00 committed by GitHub
commit a3019a5c44
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 22 additions and 7 deletions

View File

@ -13,6 +13,7 @@
#include <boost/core/demangle.hpp>
#include "io/time.hpp"
#include "io/transport.hpp"
#include "utils/type_info_ref.hpp"
@ -38,6 +39,7 @@ struct OpaqueMessage {
uint64_t request_id;
std::any message;
utils::TypeInfoRef type_info;
Time deliverable_at;
/// Recursively tries to match a specific type from the outer
/// variant's parameter pack against the type of the std::any,

View File

@ -26,5 +26,6 @@ struct SimulatorConfig {
uint64_t rng_seed = 0;
Time start_time = Time::min();
Time abort_time = Time::max();
Duration message_delay = std::chrono::microseconds(100);
};
}; // namespace memgraph::io::simulator

View File

@ -175,8 +175,8 @@ bool SimulatorHandle::MaybeTickSimulator() {
spdlog::trace("simulator adding message to can_receive_ from {} to {}", opaque_message.from_address.last_known_port,
opaque_message.to_address.last_known_port);
const auto &[om_vec, inserted] =
can_receive_.try_emplace(to_address.ToPartialAddress(), std::vector<OpaqueMessage>());
om_vec->second.emplace_back(std::move(opaque_message));
can_receive_.try_emplace(to_address.ToPartialAddress(), std::deque<OpaqueMessage>());
om_vec->second.emplace_front(std::move(opaque_message));
}
return true;

View File

@ -46,7 +46,7 @@ class SimulatorHandle {
std::map<PromiseKey, DeadlineAndOpaquePromise> promises_;
// messages that are sent to servers that may later receive them
std::map<PartialAddress, std::vector<OpaqueMessage>> can_receive_;
std::map<PartialAddress, std::deque<OpaqueMessage>> can_receive_;
Time cluster_wide_time_microseconds_;
bool should_shut_down_ = false;
@ -131,7 +131,8 @@ class SimulatorHandle {
.from_address = from_address,
.request_id = request_id,
.message = std::move(message),
.type_info = type_info};
.type_info = type_info,
.deliverable_at = cluster_wide_time_microseconds_ + config_.message_delay};
in_flight_.emplace_back(std::make_pair(to_address, std::move(om)));
PromiseKey promise_key{.requester_address = from_address, .request_id = request_id};
@ -165,8 +166,12 @@ class SimulatorHandle {
while (!should_shut_down_ && (cluster_wide_time_microseconds_ < deadline)) {
if (can_receive_.contains(partial_address)) {
std::vector<OpaqueMessage> &can_rx = can_receive_.at(partial_address);
if (!can_rx.empty()) {
std::deque<OpaqueMessage> &can_rx = can_receive_.at(partial_address);
bool contains_items = !can_rx.empty();
bool can_receive = contains_items && can_rx.back().deliverable_at <= cluster_wide_time_microseconds_;
if (can_receive) {
OpaqueMessage message = std::move(can_rx.back());
can_rx.pop_back();
@ -177,6 +182,12 @@ class SimulatorHandle {
return std::move(m_opt).value();
}
if (contains_items) {
auto count = can_rx.back().deliverable_at.time_since_epoch().count();
auto now_count = cluster_wide_time_microseconds_.time_since_epoch().count();
spdlog::trace("can't receive message yet due to artificial latency. deliverable_at: {}, now: {}", count,
now_count);
}
}
if (!should_shut_down_) {
@ -204,7 +215,8 @@ class SimulatorHandle {
.from_address = from_address,
.request_id = request_id,
.message = std::move(message_any),
.type_info = type_info};
.type_info = type_info,
.deliverable_at = cluster_wide_time_microseconds_ + config_.message_delay};
in_flight_.emplace_back(std::make_pair(std::move(to_address), std::move(om)));
stats_.total_messages++;