Restructure responsibilities for assigning request ids to the transport handles. Simplify promise tracking to avoid replier addresses, enabling eventual direct server return (DSR)
This commit is contained in:
parent
fa1ddfea12
commit
dd8dd4f6c4
@ -31,14 +31,9 @@ class LocalTransport {
|
||||
: local_transport_handle_(std::move(local_transport_handle)) {}
|
||||
|
||||
template <Message RequestT, Message ResponseT>
|
||||
ResponseFuture<ResponseT> Request(Address to_address, Address from_address, RequestId request_id, RequestT request,
|
||||
Duration timeout) {
|
||||
auto [future, promise] = memgraph::io::FuturePromisePair<ResponseResult<ResponseT>>();
|
||||
|
||||
local_transport_handle_->SubmitRequest(to_address, from_address, request_id, std::move(request), timeout,
|
||||
std::move(promise));
|
||||
|
||||
return std::move(future);
|
||||
ResponseFuture<ResponseT> Request(Address to_address, Address from_address, RequestT request, Duration timeout) {
|
||||
return local_transport_handle_->template SubmitRequest<RequestT, ResponseT>(to_address, from_address,
|
||||
std::move(request), timeout);
|
||||
}
|
||||
|
||||
template <Message... Ms>
|
||||
|
@ -30,6 +30,7 @@ class LocalTransportHandle {
|
||||
mutable std::condition_variable cv_;
|
||||
bool should_shut_down_ = false;
|
||||
MessageHistogramCollector histograms_;
|
||||
RequestId request_id_counter_ = 0;
|
||||
|
||||
// the responses to requests that are being waited on
|
||||
std::map<PromiseKey, DeadlineAndOpaquePromise> promises_;
|
||||
@ -138,8 +139,10 @@ class LocalTransportHandle {
|
||||
}
|
||||
|
||||
template <Message RequestT, Message ResponseT>
|
||||
void SubmitRequest(Address to_address, Address from_address, RequestId request_id, RequestT &&request,
|
||||
Duration timeout, ResponsePromise<ResponseT> promise) {
|
||||
ResponseFuture<ResponseT> SubmitRequest(Address to_address, Address from_address, RequestT &&request,
|
||||
Duration timeout) {
|
||||
auto [future, promise] = memgraph::io::FuturePromisePair<ResponseResult<ResponseT>>();
|
||||
|
||||
const bool port_matches = to_address.last_known_port == from_address.last_known_port;
|
||||
const bool ip_matches = to_address.last_known_ip == from_address.last_known_ip;
|
||||
|
||||
@ -148,16 +151,23 @@ class LocalTransportHandle {
|
||||
const auto now = Now();
|
||||
const Time deadline = now + timeout;
|
||||
|
||||
RequestId request_id;
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
|
||||
request_id = ++request_id_counter_;
|
||||
PromiseKey promise_key{.requester_address = from_address, .request_id = request_id};
|
||||
OpaquePromise opaque_promise(std::move(promise).ToUnique());
|
||||
DeadlineAndOpaquePromise dop{.requested_at = now, .deadline = deadline, .promise = std::move(opaque_promise)};
|
||||
|
||||
// TODO(tyler) assert not already present
|
||||
|
||||
promises_.emplace(std::move(promise_key), std::move(dop));
|
||||
} // lock dropped
|
||||
|
||||
Send(to_address, from_address, request_id, std::forward<RequestT>(request));
|
||||
|
||||
return std::move(future);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -21,9 +21,6 @@ namespace memgraph::io {
|
||||
struct PromiseKey {
|
||||
Address requester_address;
|
||||
uint64_t request_id;
|
||||
// TODO(tyler) possibly remove replier_address from promise key
|
||||
// once we want to support DSR.
|
||||
Address replier_address;
|
||||
|
||||
public:
|
||||
friend bool operator<(const PromiseKey &lhs, const PromiseKey &rhs) {
|
||||
@ -31,11 +28,7 @@ struct PromiseKey {
|
||||
return lhs.requester_address < rhs.requester_address;
|
||||
}
|
||||
|
||||
if (lhs.request_id != rhs.request_id) {
|
||||
return lhs.request_id < rhs.request_id;
|
||||
}
|
||||
|
||||
return lhs.replier_address < rhs.replier_address;
|
||||
return lhs.request_id < rhs.request_id;
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -108,9 +108,7 @@ bool SimulatorHandle::MaybeTickSimulator() {
|
||||
stats_.dropped_messages++;
|
||||
}
|
||||
|
||||
PromiseKey promise_key{.requester_address = to_address,
|
||||
.request_id = opaque_message.request_id,
|
||||
.replier_address = opaque_message.from_address};
|
||||
PromiseKey promise_key{.requester_address = to_address, .request_id = opaque_message.request_id};
|
||||
|
||||
if (promises_.contains(promise_key)) {
|
||||
// complete waiting promise if it's there
|
||||
|
@ -56,14 +56,14 @@ class SimulatorHandle {
|
||||
std::uniform_int_distribution<int> drop_distrib_{0, 99};
|
||||
SimulatorConfig config_;
|
||||
MessageHistogramCollector histograms_;
|
||||
RequestId request_id_counter_{0};
|
||||
|
||||
void TimeoutPromisesPastDeadline() {
|
||||
const Time now = cluster_wide_time_microseconds_;
|
||||
for (auto it = promises_.begin(); it != promises_.end();) {
|
||||
auto &[promise_key, dop] = *it;
|
||||
if (dop.deadline < now && config_.perform_timeouts) {
|
||||
spdlog::info("timing out request from requester {} to replier {}.", promise_key.requester_address.ToString(),
|
||||
promise_key.replier_address.ToString());
|
||||
spdlog::info("timing out request from requester {}.", promise_key.requester_address.ToString());
|
||||
std::move(dop).promise.TimeOut();
|
||||
it = promises_.erase(it);
|
||||
|
||||
@ -101,12 +101,17 @@ class SimulatorHandle {
|
||||
bool ShouldShutDown() const;
|
||||
|
||||
template <Message Request, Message Response>
|
||||
void SubmitRequest(Address to_address, Address from_address, RequestId request_id, Request &&request,
|
||||
Duration timeout, ResponsePromise<Response> &&promise) {
|
||||
ResponseFuture<Response> SubmitRequest(Address to_address, Address from_address, Request &&request, Duration timeout,
|
||||
std::function<bool()> &&maybe_tick_simulator) {
|
||||
auto type_info = TypeInfoFor(request);
|
||||
|
||||
auto [future, promise] = memgraph::io::FuturePromisePairWithNotifier<ResponseResult<Response>>(
|
||||
std::forward<std::function<bool()>>(maybe_tick_simulator));
|
||||
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
|
||||
RequestId request_id = ++request_id_counter_;
|
||||
|
||||
const Time deadline = cluster_wide_time_microseconds_ + timeout;
|
||||
|
||||
std::any message(request);
|
||||
@ -117,7 +122,7 @@ class SimulatorHandle {
|
||||
.type_info = type_info};
|
||||
in_flight_.emplace_back(std::make_pair(to_address, std::move(om)));
|
||||
|
||||
PromiseKey promise_key{.requester_address = from_address, .request_id = request_id, .replier_address = to_address};
|
||||
PromiseKey promise_key{.requester_address = from_address, .request_id = request_id};
|
||||
OpaquePromise opaque_promise(std::move(promise).ToUnique());
|
||||
DeadlineAndOpaquePromise dop{
|
||||
.requested_at = cluster_wide_time_microseconds_,
|
||||
@ -130,6 +135,8 @@ class SimulatorHandle {
|
||||
stats_.total_requests++;
|
||||
|
||||
cv_.notify_all();
|
||||
|
||||
return std::move(future);
|
||||
}
|
||||
|
||||
template <Message... Ms>
|
||||
|
@ -33,16 +33,11 @@ class SimulatorTransport {
|
||||
: simulator_handle_(simulator_handle), address_(address), rng_(std::mt19937{seed}) {}
|
||||
|
||||
template <Message RequestT, Message ResponseT>
|
||||
ResponseFuture<ResponseT> Request(Address to_address, Address from_address, uint64_t request_id, RequestT request,
|
||||
Duration timeout) {
|
||||
ResponseFuture<ResponseT> Request(Address to_address, Address from_address, RequestT request, Duration timeout) {
|
||||
std::function<bool()> maybe_tick_simulator = [this] { return simulator_handle_->MaybeTickSimulator(); };
|
||||
auto [future, promise] =
|
||||
memgraph::io::FuturePromisePairWithNotifier<ResponseResult<ResponseT>>(maybe_tick_simulator);
|
||||
|
||||
simulator_handle_->SubmitRequest(to_address, from_address, request_id, std::move(request), timeout,
|
||||
std::move(promise));
|
||||
|
||||
return std::move(future);
|
||||
return simulator_handle_->template SubmitRequest<RequestT, ResponseT>(to_address, from_address, std::move(request),
|
||||
timeout, std::move(maybe_tick_simulator));
|
||||
}
|
||||
|
||||
template <Message... Ms>
|
||||
|
@ -68,7 +68,6 @@ template <typename I>
|
||||
class Io {
|
||||
I implementation_;
|
||||
Address address_;
|
||||
RequestId request_id_counter_ = 0;
|
||||
Duration default_timeout_ = std::chrono::microseconds{100000};
|
||||
|
||||
public:
|
||||
@ -84,20 +83,17 @@ class Io {
|
||||
/// Issue a request with an explicit timeout in microseconds provided. This tends to be used by clients.
|
||||
template <Message RequestT, Message ResponseT>
|
||||
ResponseFuture<ResponseT> RequestWithTimeout(Address address, RequestT request, Duration timeout) {
|
||||
const RequestId request_id = ++request_id_counter_;
|
||||
const Address from_address = address_;
|
||||
return implementation_.template Request<RequestT, ResponseT>(address, from_address, request_id, request, timeout);
|
||||
return implementation_.template Request<RequestT, ResponseT>(address, from_address, request, timeout);
|
||||
}
|
||||
|
||||
/// Issue a request that times out after the default timeout. This tends
|
||||
/// to be used by clients.
|
||||
template <Message RequestT, Message ResponseT>
|
||||
ResponseFuture<ResponseT> Request(Address to_address, RequestT request) {
|
||||
const RequestId request_id = ++request_id_counter_;
|
||||
const Duration timeout = default_timeout_;
|
||||
const Address from_address = address_;
|
||||
return implementation_.template Request<RequestT, ResponseT>(to_address, from_address, request_id,
|
||||
std::move(request), timeout);
|
||||
return implementation_.template Request<RequestT, ResponseT>(to_address, from_address, std::move(request), timeout);
|
||||
}
|
||||
|
||||
/// Wait for an explicit number of microseconds for a request of one of the
|
||||
|
Loading…
Reference in New Issue
Block a user