Use std::chrono instead of raw uint64_t
This commit is contained in:
parent
4ee4612a9c
commit
e2968c2e21
src/io
tests/simulation
@ -11,13 +11,20 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <chrono>
|
||||
|
||||
#include "io/time.hpp"
|
||||
|
||||
namespace memgraph::io::simulator {
|
||||
|
||||
using memgraph::io::Time;
|
||||
|
||||
struct SimulatorConfig {
|
||||
int drop_percent = 0;
|
||||
bool perform_timeouts = false;
|
||||
bool scramble_messages = true;
|
||||
uint64_t rng_seed = 0;
|
||||
uint64_t start_time = 0;
|
||||
uint64_t abort_time = ULLONG_MAX;
|
||||
Time start_time = Time::min();
|
||||
Time abort_time = Time::max();
|
||||
};
|
||||
}; // namespace memgraph::io::simulator
|
||||
|
@ -27,9 +27,14 @@
|
||||
#include "io/errors.hpp"
|
||||
#include "io/simulator/simulator_config.hpp"
|
||||
#include "io/simulator/simulator_stats.hpp"
|
||||
#include "io/time.hpp"
|
||||
#include "io/transport.hpp"
|
||||
|
||||
namespace memgraph::io::simulator {
|
||||
|
||||
using memgraph::io::Duration;
|
||||
using memgraph::io::Time;
|
||||
|
||||
struct OpaqueMessage {
|
||||
Address from_address;
|
||||
uint64_t request_id;
|
||||
@ -195,7 +200,7 @@ class OpaquePromise {
|
||||
};
|
||||
|
||||
struct DeadlineAndOpaquePromise {
|
||||
uint64_t deadline;
|
||||
Time deadline;
|
||||
OpaquePromise promise;
|
||||
};
|
||||
|
||||
@ -212,7 +217,7 @@ class SimulatorHandle {
|
||||
// messages that are sent to servers that may later receive them
|
||||
std::map<Address, std::vector<OpaqueMessage>> can_receive_;
|
||||
|
||||
uint64_t cluster_wide_time_microseconds_;
|
||||
Time cluster_wide_time_microseconds_;
|
||||
bool should_shut_down_ = false;
|
||||
SimulatorStats stats_;
|
||||
size_t blocked_on_receive_ = 0;
|
||||
@ -250,7 +255,7 @@ class SimulatorHandle {
|
||||
}
|
||||
|
||||
void TimeoutPromisesPastDeadline() {
|
||||
uint64_t now = cluster_wide_time_microseconds_;
|
||||
Time now = cluster_wide_time_microseconds_;
|
||||
|
||||
for (auto &[promise_key, dop] : promises_) {
|
||||
// TODO(tyler) queue this up and drop it after its deadline
|
||||
@ -298,7 +303,7 @@ class SimulatorHandle {
|
||||
// We tick the clock forward when all servers are blocked but
|
||||
// there are no in-flight messages to schedule delivery of.
|
||||
std::poisson_distribution<> time_distrib(50);
|
||||
uint64_t clock_advance = time_distrib(rng_);
|
||||
Duration clock_advance = std::chrono::microseconds{time_distrib(rng_)};
|
||||
cluster_wide_time_microseconds_ += clock_advance;
|
||||
|
||||
MG_ASSERT(cluster_wide_time_microseconds_ < config_.abort_time,
|
||||
@ -366,11 +371,11 @@ class SimulatorHandle {
|
||||
}
|
||||
|
||||
template <Message Request, Message Response>
|
||||
void SubmitRequest(Address to_address, Address from_address, uint64_t request_id, Request &&request,
|
||||
uint64_t timeout_microseconds, ResponsePromise<Response> &&promise) {
|
||||
void SubmitRequest(Address to_address, Address from_address, uint64_t request_id, Request &&request, Duration timeout,
|
||||
ResponsePromise<Response> &&promise) {
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
|
||||
uint64_t deadline = cluster_wide_time_microseconds_ + timeout_microseconds;
|
||||
Time deadline = cluster_wide_time_microseconds_ + timeout;
|
||||
|
||||
std::any message(std::move(request));
|
||||
OpaqueMessage om{.from_address = from_address, .request_id = request_id, .message = std::move(message)};
|
||||
@ -390,10 +395,10 @@ class SimulatorHandle {
|
||||
}
|
||||
|
||||
template <Message... Ms>
|
||||
requires(sizeof...(Ms) > 0) RequestResult<Ms...> Receive(const Address &receiver, uint64_t timeout_microseconds) {
|
||||
requires(sizeof...(Ms) > 0) RequestResult<Ms...> Receive(const Address &receiver, Duration timeout) {
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
|
||||
uint64_t deadline = cluster_wide_time_microseconds_ + timeout_microseconds;
|
||||
Time deadline = cluster_wide_time_microseconds_ + timeout;
|
||||
|
||||
while (!should_shut_down_ && (cluster_wide_time_microseconds_ < deadline)) {
|
||||
if (can_receive_.contains(receiver)) {
|
||||
@ -434,7 +439,7 @@ class SimulatorHandle {
|
||||
cv_.notify_all();
|
||||
}
|
||||
|
||||
uint64_t Now() {
|
||||
Time Now() {
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
return cluster_wide_time_microseconds_;
|
||||
}
|
||||
|
@ -16,8 +16,13 @@
|
||||
|
||||
#include "io/address.hpp"
|
||||
#include "io/simulator/simulator_handle.hpp"
|
||||
#include "io/time.hpp"
|
||||
|
||||
namespace memgraph::io::simulator {
|
||||
|
||||
using memgraph::io::Duration;
|
||||
using memgraph::io::Time;
|
||||
|
||||
class SimulatorTransport {
|
||||
std::shared_ptr<SimulatorHandle> simulator_handle_;
|
||||
Address address_;
|
||||
@ -28,21 +33,19 @@ class SimulatorTransport {
|
||||
: simulator_handle_(simulator_handle), address_(address), rng_(std::mt19937{seed}) {}
|
||||
|
||||
template <Message Request, Message Response>
|
||||
ResponseFuture<Response> Request(Address address, uint64_t request_id, Request request,
|
||||
uint64_t timeout_microseconds) {
|
||||
ResponseFuture<Response> Request(Address address, uint64_t request_id, Request request, Duration timeout) {
|
||||
std::function<bool()> maybe_tick_simulator = [=] { return simulator_handle_->MaybeTickSimulator(); };
|
||||
auto [future, promise] =
|
||||
memgraph::io::FuturePromisePairWithNotifier<ResponseResult<Response>>(maybe_tick_simulator);
|
||||
|
||||
simulator_handle_->SubmitRequest(address, address_, request_id, std::move(request), timeout_microseconds,
|
||||
std::move(promise));
|
||||
simulator_handle_->SubmitRequest(address, address_, request_id, std::move(request), timeout, std::move(promise));
|
||||
|
||||
return std::move(future);
|
||||
}
|
||||
|
||||
template <Message... Ms>
|
||||
requires(sizeof...(Ms) > 0) RequestResult<Ms...> Receive(uint64_t timeout_microseconds) {
|
||||
return simulator_handle_->template Receive<Ms...>(address_, timeout_microseconds);
|
||||
requires(sizeof...(Ms) > 0) RequestResult<Ms...> Receive(Duration timeout) {
|
||||
return simulator_handle_->template Receive<Ms...>(address_, timeout);
|
||||
}
|
||||
|
||||
template <Message M>
|
||||
@ -50,7 +53,7 @@ class SimulatorTransport {
|
||||
return simulator_handle_->template Send<M>(address, address_, request_id, message);
|
||||
}
|
||||
|
||||
uint64_t Now() { return simulator_handle_->Now(); }
|
||||
Time Now() { return simulator_handle_->Now(); }
|
||||
|
||||
bool ShouldShutDown() { return simulator_handle_->ShouldShutDown(); }
|
||||
|
||||
|
@ -11,6 +11,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <chrono>
|
||||
#include <concepts>
|
||||
#include <random>
|
||||
#include <variant>
|
||||
@ -20,6 +21,7 @@
|
||||
#include "io/address.hpp"
|
||||
#include "io/errors.hpp"
|
||||
#include "io/future.hpp"
|
||||
#include "io/time.hpp"
|
||||
|
||||
using memgraph::utils::BasicResult;
|
||||
|
||||
@ -61,46 +63,43 @@ class Io {
|
||||
I implementation_;
|
||||
Address address_;
|
||||
uint64_t request_id_counter_ = 0;
|
||||
uint64_t default_timeout_microseconds_ = 50 * 1000;
|
||||
Duration default_timeout_ = std::chrono::microseconds{50000};
|
||||
|
||||
public:
|
||||
Io(I io, Address address) : implementation_(io), address_(address) {}
|
||||
|
||||
/// Set the default timeout for all requests that are issued
|
||||
/// without an explicit timeout set.
|
||||
void SetDefaultTimeoutMicroseconds(uint64_t timeout_microseconds) {
|
||||
default_timeout_microseconds_ = timeout_microseconds;
|
||||
}
|
||||
void SetDefaultTimeout(Duration timeout) { default_timeout_ = timeout; }
|
||||
|
||||
/// Issue a request with an explicit timeout in microseconds provided.
|
||||
template <Message Request, Message Response>
|
||||
ResponseFuture<Response> RequestWithTimeout(Address address, Request request, uint64_t timeout_microseconds) {
|
||||
ResponseFuture<Response> RequestWithTimeout(Address address, Request request, Duration timeout) {
|
||||
uint64_t request_id = ++request_id_counter_;
|
||||
return implementation_.template Request<Request, Response>(address, request_id, request, timeout_microseconds);
|
||||
return implementation_.template Request<Request, Response>(address, request_id, request, timeout);
|
||||
}
|
||||
|
||||
/// Issue a request that times out after the default timeout.
|
||||
template <Message Request, Message Response>
|
||||
ResponseFuture<Response> Request(Address address, Request request) {
|
||||
uint64_t request_id = ++request_id_counter_;
|
||||
uint64_t timeout_microseconds = default_timeout_microseconds_;
|
||||
return implementation_.template Request<Request, Response>(address, request_id, std::move(request),
|
||||
timeout_microseconds);
|
||||
Duration timeout = default_timeout_;
|
||||
return implementation_.template Request<Request, Response>(address, request_id, std::move(request), timeout);
|
||||
}
|
||||
|
||||
/// Wait for an explicit number of microseconds for a request of one of the
|
||||
/// provided types to arrive.
|
||||
template <Message... Ms>
|
||||
RequestResult<Ms...> ReceiveWithTimeout(uint64_t timeout_microseconds) {
|
||||
return implementation_.template Receive<Ms...>(timeout_microseconds);
|
||||
RequestResult<Ms...> ReceiveWithTimeout(Duration timeout) {
|
||||
return implementation_.template Receive<Ms...>(timeout);
|
||||
}
|
||||
|
||||
/// Wait the default number of microseconds for a request of one of the
|
||||
/// provided types to arrive.
|
||||
template <Message... Ms>
|
||||
requires(sizeof...(Ms) > 0) RequestResult<Ms...> Receive() {
|
||||
uint64_t timeout_microseconds = default_timeout_microseconds_;
|
||||
return implementation_.template Receive<Ms...>(timeout_microseconds);
|
||||
Duration timeout = default_timeout_;
|
||||
return implementation_.template Receive<Ms...>(timeout);
|
||||
}
|
||||
|
||||
/// Send a message in a best-effort fashion. If you need reliable delivery,
|
||||
@ -114,7 +113,7 @@ class Io {
|
||||
/// This time source should be preferred over any other, because it
|
||||
/// lets us deterministically control clocks from tests for making
|
||||
/// things like timeouts deterministic.
|
||||
uint64_t Now() { return implementation_.Now(); }
|
||||
Time Now() { return implementation_.Now(); }
|
||||
|
||||
/// Returns true of the system should shut-down.
|
||||
bool ShouldShutDown() { return implementation_.ShouldShutDown(); }
|
||||
|
@ -34,7 +34,7 @@ void run_server(Io<SimulatorTransport> io) {
|
||||
|
||||
while (!io.ShouldShutDown()) {
|
||||
std::cout << "[SERVER] Is receiving..." << std::endl;
|
||||
auto request_result = io.ReceiveWithTimeout<CounterRequest>(100000);
|
||||
auto request_result = io.Receive<CounterRequest>();
|
||||
if (request_result.HasError()) {
|
||||
std::cout << "[SERVER] Error, continue" << std::endl;
|
||||
continue;
|
||||
@ -71,7 +71,7 @@ int main() {
|
||||
// send request
|
||||
CounterRequest cli_req;
|
||||
cli_req.proposal = i;
|
||||
auto res_f = cli_io.RequestWithTimeout<CounterRequest, CounterResponse>(srv_addr, cli_req, 1000);
|
||||
auto res_f = cli_io.Request<CounterRequest, CounterResponse>(srv_addr, cli_req);
|
||||
auto res_rez = res_f.Wait();
|
||||
if (!res_rez.HasError()) {
|
||||
std::cout << "[CLIENT] Got a valid response" << std::endl;
|
||||
|
@ -26,7 +26,7 @@ using memgraph::io::simulator::SimulatorTransport;
|
||||
void run_server(Io<SimulatorTransport> io) {
|
||||
while (!io.ShouldShutDown()) {
|
||||
std::cout << "[STORAGE] Is receiving..." << std::endl;
|
||||
auto request_result = io.ReceiveWithTimeout<ScanVerticesRequest>(100000);
|
||||
auto request_result = io.Receive<ScanVerticesRequest>();
|
||||
if (request_result.HasError()) {
|
||||
std::cout << "[STORAGE] Error, continue" << std::endl;
|
||||
continue;
|
||||
@ -78,7 +78,7 @@ int main() {
|
||||
|
||||
auto req = ScanVerticesRequest{2, std::nullopt};
|
||||
|
||||
auto res_f = cli_io.RequestWithTimeout<ScanVerticesRequest, VerticesResponse>(srv_addr, req, 1000);
|
||||
auto res_f = cli_io.Request<ScanVerticesRequest, VerticesResponse>(srv_addr, req);
|
||||
auto res_rez = res_f.Wait();
|
||||
// MG_ASSERT(res_rez.HasError());
|
||||
simulator.ShutDown();
|
||||
|
Loading…
Reference in New Issue
Block a user