From 366a4e2b9a08d75dbbe64460ce28c78e1e3085e6 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Thu, 1 Dec 2022 15:56:16 +0000 Subject: [PATCH 01/12] Add support for efficiently executing multiple asynchronous requests out-of-order from the RequestRouter --- src/io/future.hpp | 26 +- src/io/local_transport/local_transport.hpp | 7 +- .../local_transport_handle.hpp | 8 +- src/io/notifier.hpp | 69 +++++ src/io/rsm/rsm_client.hpp | 100 ++++---- src/io/simulator/simulator_handle.hpp | 10 +- src/io/simulator/simulator_transport.hpp | 7 +- src/io/transport.hpp | 30 ++- src/query/v2/request_router.hpp | 238 ++++++++++-------- tests/unit/future.cpp | 12 +- tests/unit/high_density_shard_create_scan.cpp | 3 +- tests/unit/query_v2_expression_evaluator.cpp | 7 +- 12 files changed, 329 insertions(+), 188 deletions(-) create mode 100644 src/io/notifier.hpp diff --git a/src/io/future.hpp b/src/io/future.hpp index 98437b496..585f18938 100644 --- a/src/io/future.hpp +++ b/src/io/future.hpp @@ -35,10 +35,13 @@ class Shared { std::optional item_; bool consumed_ = false; bool waiting_ = false; - std::function simulator_notifier_ = nullptr; + bool filled_ = false; + std::function wait_notifier_ = nullptr; + std::function fill_notifier_ = nullptr; public: - explicit Shared(std::function simulator_notifier) : simulator_notifier_(simulator_notifier) {} + explicit Shared(std::function wait_notifier, std::function fill_notifier) + : wait_notifier_(wait_notifier), fill_notifier_(fill_notifier) {} Shared() = default; Shared(Shared &&) = delete; Shared &operator=(Shared &&) = delete; @@ -64,7 +67,7 @@ class Shared { waiting_ = true; while (!item_) { - if (simulator_notifier_) [[unlikely]] { + if (wait_notifier_) [[unlikely]] { // We can't hold our own lock while notifying // the simulator because notifying the simulator // involves acquiring the simulator's mutex @@ -76,7 +79,7 @@ class Shared { // so we have to get out of its way to avoid // a cyclical deadlock. lock.unlock(); - std::invoke(simulator_notifier_); + std::invoke(wait_notifier_); lock.lock(); if (item_) { // item may have been filled while we @@ -115,11 +118,19 @@ class Shared { std::unique_lock lock(mu_); MG_ASSERT(!consumed_, "Promise filled after it was already consumed!"); - MG_ASSERT(!item_, "Promise filled twice!"); + MG_ASSERT(!filled_, "Promise filled twice!"); item_ = item; + filled_ = true; } // lock released before condition variable notification + if (fill_notifier_) { + spdlog::trace("calling fill notifier"); + std::invoke(fill_notifier_); + } else { + spdlog::trace("not calling fill notifier"); + } + cv_.notify_all(); } @@ -251,8 +262,9 @@ std::pair, Promise> FuturePromisePair() { } template -std::pair, Promise> FuturePromisePairWithNotifier(std::function simulator_notifier) { - std::shared_ptr> shared = std::make_shared>(simulator_notifier); +std::pair, Promise> FuturePromisePairWithNotifications(std::function wait_notifier, + std::function fill_notifier) { + std::shared_ptr> shared = std::make_shared>(wait_notifier, fill_notifier); Future future = Future(shared); Promise promise = Promise(shared); diff --git a/src/io/local_transport/local_transport.hpp b/src/io/local_transport/local_transport.hpp index 258df6385..b64cabf1d 100644 --- a/src/io/local_transport/local_transport.hpp +++ b/src/io/local_transport/local_transport.hpp @@ -31,9 +31,10 @@ class LocalTransport { : local_transport_handle_(std::move(local_transport_handle)) {} template - ResponseFuture Request(Address to_address, Address from_address, RequestT request, Duration timeout) { - return local_transport_handle_->template SubmitRequest(to_address, from_address, - std::move(request), timeout); + ResponseFuture Request(Address to_address, Address from_address, RequestT request, + std::function fill_notifier, Duration timeout) { + return local_transport_handle_->template SubmitRequest( + to_address, from_address, std::move(request), timeout, fill_notifier); } template diff --git a/src/io/local_transport/local_transport_handle.hpp b/src/io/local_transport/local_transport_handle.hpp index 2303ae735..38538620f 100644 --- a/src/io/local_transport/local_transport_handle.hpp +++ b/src/io/local_transport/local_transport_handle.hpp @@ -140,8 +140,12 @@ class LocalTransportHandle { template ResponseFuture SubmitRequest(Address to_address, Address from_address, RequestT &&request, - Duration timeout) { - auto [future, promise] = memgraph::io::FuturePromisePair>(); + Duration timeout, std::function fill_notifier) { + auto [future, promise] = memgraph::io::FuturePromisePairWithNotifications>( + // set null notifier for when the Future::Wait is called + nullptr, + // set notifier for when Promise::Fill is called + std::forward>(fill_notifier)); const bool port_matches = to_address.last_known_port == from_address.last_known_port; const bool ip_matches = to_address.last_known_ip == from_address.last_known_ip; diff --git a/src/io/notifier.hpp b/src/io/notifier.hpp new file mode 100644 index 000000000..e6b073046 --- /dev/null +++ b/src/io/notifier.hpp @@ -0,0 +1,69 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +namespace memgraph::io { + +class ReadinessToken { + size_t id_; + + public: + explicit ReadinessToken(size_t id) : id_(id) {} + size_t GetId() const { return id_; } +}; + +class Inner { + std::condition_variable cv_; + std::mutex mu_; + std::vector ready_; + + public: + void Notify(ReadinessToken readiness_token) { + { + std::unique_lock lock(mu_); + spdlog::trace("Notifier notifying token {}", readiness_token.GetId()); + ready_.emplace_back(readiness_token); + } // mutex dropped + + cv_.notify_all(); + } + + ReadinessToken Await() { + std::unique_lock lock(mu_); + + while (ready_.empty()) { + cv_.wait(lock); + } + + ReadinessToken ret = ready_.back(); + ready_.pop_back(); + return ret; + } +}; + +class Notifier { + std::shared_ptr inner_; + + public: + Notifier() : inner_(std::make_shared()) {} + Notifier(const Notifier &) = default; + Notifier &operator=(const Notifier &) = default; + Notifier(Notifier &&old) = default; + Notifier &operator=(Notifier &&old) = default; + ~Notifier() = default; + + void Notify(ReadinessToken readiness_token) { inner_->Notify(readiness_token); } + + ReadinessToken Await() { return inner_->Await(); } +}; + +} // namespace memgraph::io diff --git a/src/io/rsm/rsm_client.hpp b/src/io/rsm/rsm_client.hpp index 920866c7a..1283ec3dc 100644 --- a/src/io/rsm/rsm_client.hpp +++ b/src/io/rsm/rsm_client.hpp @@ -19,6 +19,7 @@ #include "io/address.hpp" #include "io/errors.hpp" +#include "io/notifier.hpp" #include "io/rsm/raft.hpp" #include "utils/result.hpp" @@ -37,18 +38,11 @@ using memgraph::io::rsm::WriteRequest; using memgraph::io::rsm::WriteResponse; using memgraph::utils::BasicResult; -class AsyncRequestToken { - size_t id_; - - public: - explicit AsyncRequestToken(size_t id) : id_(id) {} - size_t GetId() const { return id_; } -}; - template struct AsyncRequest { Time start_time; RequestT request; + Notifier notifier; ResponseFuture future; }; @@ -66,8 +60,6 @@ class RsmClient { std::unordered_map>> async_reads_; std::unordered_map>> async_writes_; - size_t async_token_generator_ = 0; - void SelectRandomLeader() { std::uniform_int_distribution addr_distrib(0, (server_addrs_.size() - 1)); size_t addr_index = io_.Rand(addr_distrib); @@ -81,6 +73,7 @@ class RsmClient { if (response.retry_leader) { MG_ASSERT(!response.success, "retry_leader should never be set for successful responses"); leader_ = response.retry_leader.value(); + spdlog::error("client redirected to leader server {}", leader_.ToString()); spdlog::debug("client redirected to leader server {}", leader_.ToString()); } if (!response.success) { @@ -101,61 +94,63 @@ class RsmClient { ~RsmClient() = default; BasicResult SendWriteRequest(WriteRequestT req) { - auto token = SendAsyncWriteRequest(req); - auto poll_result = AwaitAsyncWriteRequest(token); + Notifier notifier; + ReadinessToken readiness_token{0}; + SendAsyncWriteRequest(req, notifier, readiness_token); + auto poll_result = AwaitAsyncWriteRequest(readiness_token); while (!poll_result) { - poll_result = AwaitAsyncWriteRequest(token); + poll_result = AwaitAsyncWriteRequest(readiness_token); } return poll_result.value(); } BasicResult SendReadRequest(ReadRequestT req) { - auto token = SendAsyncReadRequest(req); - auto poll_result = AwaitAsyncReadRequest(token); + Notifier notifier; + ReadinessToken readiness_token{0}; + SendAsyncReadRequest(req, notifier, readiness_token); + auto poll_result = AwaitAsyncReadRequest(readiness_token); while (!poll_result) { - poll_result = AwaitAsyncReadRequest(token); + poll_result = AwaitAsyncReadRequest(readiness_token); } return poll_result.value(); } /// AsyncRead methods - AsyncRequestToken SendAsyncReadRequest(const ReadRequestT &req) { - size_t token = async_token_generator_++; - + void SendAsyncReadRequest(const ReadRequestT &req, Notifier notifier, ReadinessToken readiness_token) { ReadRequest read_req = {.operation = req}; AsyncRequest> async_request{ .start_time = io_.Now(), .request = std::move(req), - .future = io_.template Request, ReadResponse>(leader_, read_req), + .notifier = notifier, + .future = io_.template RequestWithNotification, ReadResponse>( + leader_, read_req, notifier, readiness_token), }; - async_reads_.emplace(token, std::move(async_request)); - - return AsyncRequestToken{token}; + async_reads_.emplace(readiness_token.GetId(), std::move(async_request)); } - void ResendAsyncReadRequest(const AsyncRequestToken &token) { - auto &async_request = async_reads_.at(token.GetId()); + void ResendAsyncReadRequest(const ReadinessToken &readiness_token) { + auto &async_request = async_reads_.at(readiness_token.GetId()); ReadRequest read_req = {.operation = async_request.request}; - async_request.future = - io_.template Request, ReadResponse>(leader_, read_req); + async_request.future = io_.template RequestWithNotification, ReadResponse>( + leader_, read_req, async_request.notifier, readiness_token); } - std::optional> PollAsyncReadRequest(const AsyncRequestToken &token) { - auto &async_request = async_reads_.at(token.GetId()); + std::optional> PollAsyncReadRequest(const ReadinessToken &readiness_token) { + auto &async_request = async_reads_.at(readiness_token.GetId()); if (!async_request.future.IsReady()) { return std::nullopt; } - return AwaitAsyncReadRequest(); + return AwaitAsyncReadRequest(readiness_token); } - std::optional> AwaitAsyncReadRequest(const AsyncRequestToken &token) { - auto &async_request = async_reads_.at(token.GetId()); + std::optional> AwaitAsyncReadRequest(const ReadinessToken &readiness_token) { + auto &async_request = async_reads_.at(readiness_token.GetId()); ResponseResult> get_response_result = std::move(async_request.future).Wait(); const Duration overall_timeout = io_.GetDefaultTimeout(); @@ -165,7 +160,7 @@ class RsmClient { if (result_has_error && past_time_out) { // TODO static assert the exact type of error. spdlog::debug("client timed out while trying to communicate with leader server {}", leader_.ToString()); - async_reads_.erase(token.GetId()); + async_reads_.erase(readiness_token.GetId()); return TimedOut{}; } @@ -176,7 +171,7 @@ class RsmClient { PossiblyRedirectLeader(read_get_response); if (read_get_response.success) { - async_reads_.erase(token.GetId()); + async_reads_.erase(readiness_token.GetId()); spdlog::debug("returning read_return for RSM request"); return std::move(read_get_response.read_return); } @@ -184,49 +179,48 @@ class RsmClient { SelectRandomLeader(); } - ResendAsyncReadRequest(token); + ResendAsyncReadRequest(readiness_token); return std::nullopt; } /// AsyncWrite methods - AsyncRequestToken SendAsyncWriteRequest(const WriteRequestT &req) { - size_t token = async_token_generator_++; - + void SendAsyncWriteRequest(const WriteRequestT &req, Notifier notifier, ReadinessToken readiness_token) { WriteRequest write_req = {.operation = req}; AsyncRequest> async_request{ .start_time = io_.Now(), .request = std::move(req), - .future = io_.template Request, WriteResponse>(leader_, write_req), + .notifier = notifier, + .future = io_.template RequestWithNotification, WriteResponse>( + leader_, write_req, notifier, readiness_token), }; - async_writes_.emplace(token, std::move(async_request)); - - return AsyncRequestToken{token}; + async_writes_.emplace(readiness_token.GetId(), std::move(async_request)); } - void ResendAsyncWriteRequest(const AsyncRequestToken &token) { - auto &async_request = async_writes_.at(token.GetId()); + void ResendAsyncWriteRequest(const ReadinessToken &readiness_token) { + auto &async_request = async_writes_.at(readiness_token.GetId()); WriteRequest write_req = {.operation = async_request.request}; async_request.future = - io_.template Request, WriteResponse>(leader_, write_req); + io_.template RequestWithNotification, WriteResponse>( + leader_, write_req, async_request.notifier, readiness_token); } - std::optional> PollAsyncWriteRequest(const AsyncRequestToken &token) { - auto &async_request = async_writes_.at(token.GetId()); + std::optional> PollAsyncWriteRequest(const ReadinessToken &readiness_token) { + auto &async_request = async_writes_.at(readiness_token.GetId()); if (!async_request.future.IsReady()) { return std::nullopt; } - return AwaitAsyncWriteRequest(); + return AwaitAsyncWriteRequest(readiness_token); } - std::optional> AwaitAsyncWriteRequest(const AsyncRequestToken &token) { - auto &async_request = async_writes_.at(token.GetId()); + std::optional> AwaitAsyncWriteRequest(const ReadinessToken &readiness_token) { + auto &async_request = async_writes_.at(readiness_token.GetId()); ResponseResult> get_response_result = std::move(async_request.future).Wait(); const Duration overall_timeout = io_.GetDefaultTimeout(); @@ -236,7 +230,7 @@ class RsmClient { if (result_has_error && past_time_out) { // TODO static assert the exact type of error. spdlog::debug("client timed out while trying to communicate with leader server {}", leader_.ToString()); - async_writes_.erase(token.GetId()); + async_writes_.erase(readiness_token.GetId()); return TimedOut{}; } @@ -248,14 +242,14 @@ class RsmClient { PossiblyRedirectLeader(write_get_response); if (write_get_response.success) { - async_writes_.erase(token.GetId()); + async_writes_.erase(readiness_token.GetId()); return std::move(write_get_response.write_return); } } else { SelectRandomLeader(); } - ResendAsyncWriteRequest(token); + ResendAsyncWriteRequest(readiness_token); return std::nullopt; } diff --git a/src/io/simulator/simulator_handle.hpp b/src/io/simulator/simulator_handle.hpp index 5a5ad1ec0..0cd6b77b0 100644 --- a/src/io/simulator/simulator_handle.hpp +++ b/src/io/simulator/simulator_handle.hpp @@ -105,12 +105,16 @@ class SimulatorHandle { template ResponseFuture SubmitRequest(Address to_address, Address from_address, Request &&request, Duration timeout, - std::function &&maybe_tick_simulator) { + std::function &&maybe_tick_simulator, + std::function &&fill_notifier) { spdlog::trace("submitting request to {}", to_address.last_known_port); auto type_info = TypeInfoFor(request); - auto [future, promise] = memgraph::io::FuturePromisePairWithNotifier>( - std::forward>(maybe_tick_simulator)); + auto [future, promise] = memgraph::io::FuturePromisePairWithNotifications>( + // set notifier for when the Future::Wait is called + std::forward>(maybe_tick_simulator), + // set notifier for when Promise::Fill is called + std::forward>(fill_notifier)); std::unique_lock lock(mu_); diff --git a/src/io/simulator/simulator_transport.hpp b/src/io/simulator/simulator_transport.hpp index 5e5a24aa9..2107c34ca 100644 --- a/src/io/simulator/simulator_transport.hpp +++ b/src/io/simulator/simulator_transport.hpp @@ -15,6 +15,7 @@ #include #include "io/address.hpp" +#include "io/notifier.hpp" #include "io/simulator/simulator_handle.hpp" #include "io/time.hpp" @@ -33,11 +34,13 @@ class SimulatorTransport { : simulator_handle_(simulator_handle), address_(address), rng_(std::mt19937{seed}) {} template - ResponseFuture Request(Address to_address, Address from_address, RequestT request, Duration timeout) { + ResponseFuture Request(Address to_address, Address from_address, RequestT request, + std::function notification, Duration timeout) { std::function maybe_tick_simulator = [this] { return simulator_handle_->MaybeTickSimulator(); }; return simulator_handle_->template SubmitRequest(to_address, from_address, std::move(request), - timeout, std::move(maybe_tick_simulator)); + timeout, std::move(maybe_tick_simulator), + std::move(notification)); } template diff --git a/src/io/transport.hpp b/src/io/transport.hpp index 4994cb436..2c2e79060 100644 --- a/src/io/transport.hpp +++ b/src/io/transport.hpp @@ -20,6 +20,7 @@ #include "io/errors.hpp" #include "io/future.hpp" #include "io/message_histogram_collector.hpp" +#include "io/notifier.hpp" #include "io/time.hpp" #include "utils/result.hpp" @@ -84,7 +85,9 @@ class Io { template ResponseFuture RequestWithTimeout(Address address, RequestT request, Duration timeout) { const Address from_address = address_; - return implementation_.template Request(address, from_address, request, timeout); + std::function fill_notifier = nullptr; + return implementation_.template Request(address, from_address, request, fill_notifier, + timeout); } /// Issue a request that times out after the default timeout. This tends @@ -93,7 +96,30 @@ class Io { ResponseFuture Request(Address to_address, RequestT request) { const Duration timeout = default_timeout_; const Address from_address = address_; - return implementation_.template Request(to_address, from_address, std::move(request), timeout); + std::function fill_notifier = nullptr; + return implementation_.template Request(to_address, from_address, std::move(request), + fill_notifier, timeout); + } + + /// Issue a request that will notify a Notifier when it is filled or times out. + template + ResponseFuture RequestWithNotification(Address to_address, RequestT request, Notifier notifier, + ReadinessToken readiness_token) { + const Duration timeout = default_timeout_; + const Address from_address = address_; + std::function fill_notifier = std::bind(&Notifier::Notify, notifier, readiness_token); + return implementation_.template Request(to_address, from_address, std::move(request), + fill_notifier, timeout); + } + + /// Issue a request that will notify a Notifier when it is filled or times out. + template + ResponseFuture RequestWithNotificationAndTimeout(Address to_address, RequestT request, Notifier notifier, + ReadinessToken readiness_token, Duration timeout) { + const Address from_address = address_; + std::function fill_notifier = std::bind(&Notifier::Notify, notifier, readiness_token); + return implementation_.template Request(to_address, from_address, std::move(request), + fill_notifier, timeout); } /// Wait for an explicit number of microseconds for a request of one of the diff --git a/src/query/v2/request_router.hpp b/src/query/v2/request_router.hpp index 996272fdc..2d563ade0 100644 --- a/src/query/v2/request_router.hpp +++ b/src/query/v2/request_router.hpp @@ -31,6 +31,7 @@ #include "coordinator/shard_map.hpp" #include "io/address.hpp" #include "io/errors.hpp" +#include "io/notifier.hpp" #include "io/rsm/raft.hpp" #include "io/rsm/rsm_client.hpp" #include "io/rsm/shard_rsm.hpp" @@ -75,25 +76,11 @@ template struct ShardRequestState { memgraph::coordinator::Shard shard; TRequest request; - std::optional async_request_token; }; +// maps from ReadinessToken's internal size_t to the associated state template -struct ExecutionState { - using CompoundKey = io::rsm::ShardRsmKey; - using Shard = coordinator::Shard; - - // label is optional because some operators can create/remove etc, vertices. These kind of requests contain the label - // on the request itself. - std::optional label; - // Transaction id to be filled by the RequestRouter implementation - coordinator::Hlc transaction_id; - // Initialized by RequestRouter implementation. This vector is filled with the shards that - // the RequestRouter impl will send requests to. When a request to a shard exhausts it, meaning that - // it pulled all the requested data from the given Shard, it will be removed from the Vector. When the Vector becomes - // empty, it means that all of the requests have completed succefully. - std::vector> requests; -}; +using RunningRequests = std::unordered_map>; class RequestRouterInterface { public: @@ -238,26 +225,25 @@ class RequestRouter : public RequestRouterInterface { // TODO(kostasrim) Simplify return result std::vector ScanVertices(std::optional label) override { - ExecutionState state = {}; - state.label = label; - // create requests - InitializeExecutionState(state); + std::vector> unsent_requests = RequestsForScanVertices(label); + spdlog::error("created {} ScanVertices requests", unsent_requests.size()); // begin all requests in parallel - for (auto &request : state.requests) { + RunningRequests running_requests = {}; + running_requests.reserve(unsent_requests.size()); + for (size_t i = 0; i < unsent_requests.size(); i++) { + auto &request = unsent_requests[i]; + io::ReadinessToken readiness_token{i}; auto &storage_client = GetStorageClientForShard(request.shard); - msgs::ReadRequests req = request.request; - - request.async_request_token = storage_client.SendAsyncReadRequest(request.request); + storage_client.SendAsyncReadRequest(request.request, notifier_, readiness_token); + running_requests.emplace(readiness_token.GetId(), request); } + spdlog::error("sent {} ScanVertices requests in parallel", running_requests.size()); // drive requests to completion - std::vector responses; - responses.reserve(state.requests.size()); - do { - DriveReadResponses(state, responses); - } while (!state.requests.empty()); + auto responses = DriveReadResponses(running_requests); + spdlog::error("got back {} ScanVertices responses after driving to completion", responses.size()); // convert responses into VertexAccessor objects to return std::vector accessors; @@ -272,62 +258,53 @@ class RequestRouter : public RequestRouterInterface { } std::vector CreateVertices(std::vector new_vertices) override { - ExecutionState state = {}; MG_ASSERT(!new_vertices.empty()); // create requests - InitializeExecutionState(state, new_vertices); + std::vector> unsent_requests = + RequestsForCreateVertices(new_vertices); // begin all requests in parallel - for (auto &request : state.requests) { - auto req_deep_copy = request.request; - - for (auto &new_vertex : req_deep_copy.new_vertices) { + RunningRequests running_requests = {}; + running_requests.reserve(unsent_requests.size()); + for (size_t i = 0; i < unsent_requests.size(); i++) { + auto &request = unsent_requests[i]; + io::ReadinessToken readiness_token{i}; + for (auto &new_vertex : request.request.new_vertices) { new_vertex.label_ids.erase(new_vertex.label_ids.begin()); } - auto &storage_client = GetStorageClientForShard(request.shard); - - msgs::WriteRequests req = req_deep_copy; - request.async_request_token = storage_client.SendAsyncWriteRequest(req); + storage_client.SendAsyncWriteRequest(request.request, notifier_, readiness_token); + running_requests.emplace(readiness_token.GetId(), request); } // drive requests to completion - std::vector responses; - responses.reserve(state.requests.size()); - do { - DriveWriteResponses(state, responses); - } while (!state.requests.empty()); - - return responses; + return DriveWriteResponses(running_requests); } std::vector CreateExpand(std::vector new_edges) override { - ExecutionState state = {}; MG_ASSERT(!new_edges.empty()); // create requests - InitializeExecutionState(state, new_edges); + std::vector> unsent_requests = RequestsForCreateExpand(new_edges); // begin all requests in parallel - for (auto &request : state.requests) { + RunningRequests running_requests = {}; + running_requests.reserve(unsent_requests.size()); + for (size_t i = 0; i < unsent_requests.size(); i++) { + auto &request = unsent_requests[i]; + io::ReadinessToken readiness_token{i}; auto &storage_client = GetStorageClientForShard(request.shard); msgs::WriteRequests req = request.request; - request.async_request_token = storage_client.SendAsyncWriteRequest(req); + storage_client.SendAsyncWriteRequest(req, notifier_, readiness_token); + running_requests.emplace(readiness_token.GetId(), request); } // drive requests to completion - std::vector responses; - responses.reserve(state.requests.size()); - do { - DriveWriteResponses(state, responses); - } while (!state.requests.empty()); - - return responses; + return DriveWriteResponses(running_requests); } std::vector ExpandOne(msgs::ExpandOneRequest request) override { - ExecutionState state = {}; // TODO(kostasrim)Update to limit the batch size here // Expansions of the destination must be handled by the caller. For example // match (u:L1 { prop : 1 })-[:Friend]-(v:L1) @@ -335,21 +312,22 @@ class RequestRouter : public RequestRouterInterface { // must be fetched again with an ExpandOne(Edges.dst) // create requests - InitializeExecutionState(state, std::move(request)); + std::vector> unsent_requests = RequestsForExpandOne(request); // begin all requests in parallel - for (auto &request : state.requests) { + RunningRequests running_requests = {}; + running_requests.reserve(unsent_requests.size()); + for (size_t i = 0; i < unsent_requests.size(); i++) { + auto &request = unsent_requests[i]; + io::ReadinessToken readiness_token{i}; auto &storage_client = GetStorageClientForShard(request.shard); msgs::ReadRequests req = request.request; - request.async_request_token = storage_client.SendAsyncReadRequest(req); + storage_client.SendAsyncReadRequest(req, notifier_, readiness_token); + running_requests.emplace(readiness_token.GetId(), request); } // drive requests to completion - std::vector responses; - responses.reserve(state.requests.size()); - do { - DriveReadResponses(state, responses); - } while (!state.requests.empty()); + auto responses = DriveReadResponses(running_requests); // post-process responses std::vector result_rows; @@ -380,10 +358,8 @@ class RequestRouter : public RequestRouterInterface { } private: - void InitializeExecutionState(ExecutionState &state, - std::vector new_vertices) { - state.transaction_id = transaction_id_; - + std::vector> RequestsForCreateVertices( + const std::vector &new_vertices) { std::map per_shard_request_table; for (auto &new_vertex : new_vertices) { @@ -397,20 +373,21 @@ class RequestRouter : public RequestRouterInterface { per_shard_request_table[shard].new_vertices.push_back(std::move(new_vertex)); } + std::vector> requests = {}; + for (auto &[shard, request] : per_shard_request_table) { ShardRequestState shard_request_state{ .shard = shard, .request = request, - .async_request_token = std::nullopt, }; - state.requests.emplace_back(std::move(shard_request_state)); + requests.emplace_back(std::move(shard_request_state)); } + + return requests; } - void InitializeExecutionState(ExecutionState &state, - std::vector new_expands) { - state.transaction_id = transaction_id_; - + std::vector> RequestsForCreateExpand( + const std::vector &new_expands) { std::map per_shard_request_table; auto ensure_shard_exists_in_table = [&per_shard_request_table, transaction_id = transaction_id_](const Shard &shard) { @@ -435,27 +412,33 @@ class RequestRouter : public RequestRouterInterface { per_shard_request_table[shard_src_vertex].new_expands.push_back(std::move(new_expand)); } + std::vector> requests = {}; + for (auto &[shard, request] : per_shard_request_table) { ShardRequestState shard_request_state{ .shard = shard, .request = request, - .async_request_token = std::nullopt, }; - state.requests.emplace_back(std::move(shard_request_state)); + requests.emplace_back(std::move(shard_request_state)); } + + return requests; } - void InitializeExecutionState(ExecutionState &state) { + std::vector> RequestsForScanVertices( + const std::optional &label) { std::vector multi_shards; - state.transaction_id = transaction_id_; - if (!state.label) { - multi_shards = shards_map_.GetAllShards(); - } else { - const auto label_id = shards_map_.GetLabelId(*state.label); + if (label) { + const auto label_id = shards_map_.GetLabelId(*label); MG_ASSERT(label_id); MG_ASSERT(IsPrimaryLabel(*label_id)); - multi_shards = {shards_map_.GetShardsForLabel(*state.label)}; + multi_shards = {shards_map_.GetShardsForLabel(*label)}; + } else { + multi_shards = shards_map_.GetAllShards(); } + + std::vector> requests = {}; + for (auto &shards : multi_shards) { for (auto &[key, shard] : shards) { MG_ASSERT(!shard.empty()); @@ -467,22 +450,21 @@ class RequestRouter : public RequestRouterInterface { ShardRequestState shard_request_state{ .shard = shard, .request = std::move(request), - .async_request_token = std::nullopt, }; - state.requests.emplace_back(std::move(shard_request_state)); + requests.emplace_back(std::move(shard_request_state)); } } + + return requests; } - void InitializeExecutionState(ExecutionState &state, msgs::ExpandOneRequest request) { - state.transaction_id = transaction_id_; - + std::vector> RequestsForExpandOne(const msgs::ExpandOneRequest &request) { std::map per_shard_request_table; - auto top_level_rqst_template = request; + msgs::ExpandOneRequest top_level_rqst_template = request; top_level_rqst_template.transaction_id = transaction_id_; top_level_rqst_template.src_vertices.clear(); - state.requests.clear(); + for (auto &vertex : request.src_vertices) { auto shard = shards_map_.GetShardForKey(vertex.first.id, storage::conversions::ConvertPropertyVector(vertex.second)); @@ -492,15 +474,18 @@ class RequestRouter : public RequestRouterInterface { per_shard_request_table[shard].src_vertices.push_back(vertex); } + std::vector> requests = {}; + for (auto &[shard, request] : per_shard_request_table) { ShardRequestState shard_request_state{ .shard = shard, .request = request, - .async_request_token = std::nullopt, }; - state.requests.emplace_back(std::move(shard_request_state)); + requests.emplace_back(std::move(shard_request_state)); } + + return requests; } StorageClient &GetStorageClientForShard(Shard shard) { @@ -528,14 +513,18 @@ class RequestRouter : public RequestRouterInterface { } template - void DriveReadResponses(ExecutionState &state, std::vector &responses) { - for (auto &request : state.requests) { + std::vector DriveReadResponses(RunningRequests &running_requests) { + // Store responses in a map based on the corresponding request + // offset, so that they can be reassembled in the correct order + // even if they came back in randomized orders. + std::map response_map; + + while (response_map.size() < running_requests.size()) { + auto ready = notifier_.Await(); + auto &request = running_requests.at(ready.GetId()); auto &storage_client = GetStorageClientForShard(request.shard); - auto poll_result = storage_client.AwaitAsyncReadRequest(request.async_request_token.value()); - while (!poll_result) { - poll_result = storage_client.AwaitAsyncReadRequest(request.async_request_token.value()); - } + auto poll_result = storage_client.PollAsyncReadRequest(ready); if (poll_result->HasError()) { throw std::runtime_error("RequestRouter Read request timed out"); @@ -547,20 +536,36 @@ class RequestRouter : public RequestRouterInterface { throw std::runtime_error("RequestRouter Read request did not succeed"); } - responses.push_back(std::move(response)); + // the readiness token has an ID based on the request vector offset + response_map.emplace(ready.GetId(), std::move(response)); } - state.requests.clear(); + + std::vector responses; + responses.reserve(running_requests.size()); + + int last = -1; + for (auto &&[offset, response] : response_map) { + MG_ASSERT(last + 1 == offset); + responses.emplace_back(std::forward(response)); + last = offset; + } + + return responses; } template - void DriveWriteResponses(ExecutionState &state, std::vector &responses) { - for (auto &request : state.requests) { + std::vector DriveWriteResponses(RunningRequests &running_requests) { + // Store responses in a map based on the corresponding request + // offset, so that they can be reassembled in the correct order + // even if they came back in randomized orders. + std::map response_map; + + while (response_map.size() < running_requests.size()) { + auto ready = notifier_.Await(); + auto &request = running_requests.at(ready.GetId()); auto &storage_client = GetStorageClientForShard(request.shard); - auto poll_result = storage_client.AwaitAsyncWriteRequest(request.async_request_token.value()); - while (!poll_result) { - poll_result = storage_client.AwaitAsyncWriteRequest(request.async_request_token.value()); - } + auto poll_result = storage_client.PollAsyncWriteRequest(ready); if (poll_result->HasError()) { throw std::runtime_error("RequestRouter Write request timed out"); @@ -572,9 +577,21 @@ class RequestRouter : public RequestRouterInterface { throw std::runtime_error("RequestRouter Write request did not succeed"); } - responses.push_back(std::move(response)); + // the readiness token has an ID based on the request vector offset + response_map.emplace(ready.GetId(), std::move(response)); } - state.requests.clear(); + + std::vector responses; + responses.reserve(running_requests.size()); + + int last = -1; + for (auto &&[offset, response] : response_map) { + MG_ASSERT(last + 1 == offset); + responses.emplace_back(std::forward(response)); + last = offset; + } + + return responses; } void SetUpNameIdMappers() { @@ -603,6 +620,7 @@ class RequestRouter : public RequestRouterInterface { RsmStorageClientManager storage_cli_manager_; io::Io io_; coordinator::Hlc transaction_id_; + io::Notifier notifier_ = {}; // TODO(kostasrim) Add batch prefetching }; } // namespace memgraph::query::v2 diff --git a/tests/unit/future.cpp b/tests/unit/future.cpp index 490e19bbc..866a74fce 100644 --- a/tests/unit/future.cpp +++ b/tests/unit/future.cpp @@ -28,13 +28,19 @@ void Wait(Future future_1, Promise promise_2) { TEST(Future, BasicLifecycle) { std::atomic_bool waiting = false; + std::atomic_bool filled = false; - std::function notifier = [&] { + std::function wait_notifier = [&] { waiting.store(true, std::memory_order_seq_cst); return false; }; - auto [future_1, promise_1] = FuturePromisePairWithNotifier(notifier); + std::function fill_notifier = [&] { + filled.store(true, std::memory_order_seq_cst); + return false; + }; + + auto [future_1, promise_1] = FuturePromisePairWithNotifications(wait_notifier, fill_notifier); auto [future_2, promise_2] = FuturePromisePair(); std::jthread t1(Wait, std::move(future_1), std::move(promise_2)); @@ -50,6 +56,8 @@ TEST(Future, BasicLifecycle) { t1.join(); t2.join(); + EXPECT_TRUE(filled.load(std::memory_order_acquire)); + std::string result_2 = std::move(future_2).Wait(); EXPECT_TRUE(result_2 == "it worked"); } diff --git a/tests/unit/high_density_shard_create_scan.cpp b/tests/unit/high_density_shard_create_scan.cpp index 2be48fc77..cefa238ed 100644 --- a/tests/unit/high_density_shard_create_scan.cpp +++ b/tests/unit/high_density_shard_create_scan.cpp @@ -194,7 +194,8 @@ void ExecuteOp(query::v2::RequestRouter &request_router, std::se ScanAll scan_all) { auto results = request_router.ScanVertices("test_label"); - MG_ASSERT(results.size() == correctness_model.size()); + spdlog::error("got {} results, model size is {}", results.size(), correctness_model.size()); + EXPECT_EQ(results.size(), correctness_model.size()); for (const auto &vertex_accessor : results) { const auto properties = vertex_accessor.Properties(); diff --git a/tests/unit/query_v2_expression_evaluator.cpp b/tests/unit/query_v2_expression_evaluator.cpp index 5f77ed4e7..50f578bb2 100644 --- a/tests/unit/query_v2_expression_evaluator.cpp +++ b/tests/unit/query_v2_expression_evaluator.cpp @@ -84,13 +84,14 @@ class MockedRequestRouter : public RequestRouterInterface { void Commit() override {} std::vector ScanVertices(std::optional /* label */) override { return {}; } - std::vector CreateVertices(std::vector new_vertices) override { + std::vector CreateVertices( + std::vector /* new_vertices */) override { return {}; } - std::vector ExpandOne(ExpandOneRequest request) override { return {}; } + std::vector ExpandOne(ExpandOneRequest /* request */) override { return {}; } - std::vector CreateExpand(std::vector new_edges) override { return {}; } + std::vector CreateExpand(std::vector /* new_edges */) override { return {}; } const std::string &PropertyToName(memgraph::storage::v3::PropertyId id) const override { return properties_.IdToName(id.AsUint()); From 438b51970308d00b76fabf26927a62a694be537e Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Thu, 1 Dec 2022 16:26:41 +0000 Subject: [PATCH 02/12] Apply clang-tidy feedback --- src/io/notifier.hpp | 9 ++++++--- src/io/transport.hpp | 4 ++-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/io/notifier.hpp b/src/io/notifier.hpp index e6b073046..81d6507bb 100644 --- a/src/io/notifier.hpp +++ b/src/io/notifier.hpp @@ -11,6 +11,10 @@ #pragma once +#include +#include +#include + namespace memgraph::io { class ReadinessToken { @@ -30,7 +34,6 @@ class Inner { void Notify(ReadinessToken readiness_token) { { std::unique_lock lock(mu_); - spdlog::trace("Notifier notifying token {}", readiness_token.GetId()); ready_.emplace_back(readiness_token); } // mutex dropped @@ -61,9 +64,9 @@ class Notifier { Notifier &operator=(Notifier &&old) = default; ~Notifier() = default; - void Notify(ReadinessToken readiness_token) { inner_->Notify(readiness_token); } + void Notify(ReadinessToken readiness_token) const { inner_->Notify(readiness_token); } - ReadinessToken Await() { return inner_->Await(); } + ReadinessToken Await() const { return inner_->Await(); } }; } // namespace memgraph::io diff --git a/src/io/transport.hpp b/src/io/transport.hpp index 2c2e79060..5dd7a9a39 100644 --- a/src/io/transport.hpp +++ b/src/io/transport.hpp @@ -107,7 +107,7 @@ class Io { ReadinessToken readiness_token) { const Duration timeout = default_timeout_; const Address from_address = address_; - std::function fill_notifier = std::bind(&Notifier::Notify, notifier, readiness_token); + std::function fill_notifier = [notifier, readiness_token]() { notifier.Notify(readiness_token); }; return implementation_.template Request(to_address, from_address, std::move(request), fill_notifier, timeout); } @@ -117,7 +117,7 @@ class Io { ResponseFuture RequestWithNotificationAndTimeout(Address to_address, RequestT request, Notifier notifier, ReadinessToken readiness_token, Duration timeout) { const Address from_address = address_; - std::function fill_notifier = std::bind(&Notifier::Notify, notifier, readiness_token); + std::function fill_notifier = [notifier, readiness_token]() { notifier.Notify(readiness_token); }; return implementation_.template Request(to_address, from_address, std::move(request), fill_notifier, timeout); } From 9a62503803f5e1667bd70a02454bdc5419eeb3ef Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Fri, 2 Dec 2022 18:04:38 +0000 Subject: [PATCH 03/12] Tick the simulator forward from Notify::Await in a similar way that Future::Wait does --- src/io/notifier.hpp | 22 ++++++++++- src/io/rsm/raft.hpp | 32 +++++++++++----- src/io/rsm/rsm_client.hpp | 1 - src/io/simulator/simulator.hpp | 6 +++ src/io/simulator/simulator_handle.hpp | 49 +++++++++++++----------- src/io/simulator/simulator_transport.hpp | 8 ++-- src/query/v2/request_router.hpp | 18 +++++++-- tests/simulation/request_router.cpp | 2 + tests/simulation/test_cluster.hpp | 2 + 9 files changed, 99 insertions(+), 41 deletions(-) diff --git a/src/io/notifier.hpp b/src/io/notifier.hpp index 81d6507bb..9e0dec7df 100644 --- a/src/io/notifier.hpp +++ b/src/io/notifier.hpp @@ -12,6 +12,7 @@ #pragma once #include +#include #include #include @@ -29,6 +30,7 @@ class Inner { std::condition_variable cv_; std::mutex mu_; std::vector ready_; + std::optional> tick_simulator_; public: void Notify(ReadinessToken readiness_token) { @@ -44,13 +46,29 @@ class Inner { std::unique_lock lock(mu_); while (ready_.empty()) { - cv_.wait(lock); + if (tick_simulator_) [[unlikely]] { + // This avoids a deadlock in a similar way that + // Future::Wait will release its mutex while + // interacting with the simulator, due to + // the fact that the simulator may cause + // notifications that we are interested in. + lock.unlock(); + std::invoke(tick_simulator_.value()); + lock.lock(); + } else { + cv_.wait(lock); + } } ReadinessToken ret = ready_.back(); ready_.pop_back(); return ret; } + + void InstallSimulatorTicker(std::function tick_simulator) { + std::unique_lock lock(mu_); + tick_simulator_ = tick_simulator; + } }; class Notifier { @@ -67,6 +85,8 @@ class Notifier { void Notify(ReadinessToken readiness_token) const { inner_->Notify(readiness_token); } ReadinessToken Await() const { return inner_->Await(); } + + void InstallSimulatorTicker(std::function tick_simulator) { inner_->InstallSimulatorTicker(tick_simulator); } }; } // namespace memgraph::io diff --git a/src/io/rsm/raft.hpp b/src/io/rsm/raft.hpp index 4ec22ff87..07a51288d 100644 --- a/src/io/rsm/raft.hpp +++ b/src/io/rsm/raft.hpp @@ -91,33 +91,43 @@ struct ReadResponse { }; template -utils::TypeInfoRef TypeInfoFor(const ReadResponse> &read_response) { - return TypeInfoForVariant(read_response.read_return); +utils::TypeInfoRef TypeInfoFor(const ReadResponse> &response) { + return TypeInfoForVariant(response.read_return); } template -utils::TypeInfoRef TypeInfoFor(const ReadResponse & /* read_response */) { +utils::TypeInfoRef TypeInfoFor(const ReadResponse & /* response */) { return typeid(ReadReturn); } +template +utils::TypeInfoRef TypeInfoFor(const ReadRequest & /* request */) { + return typeid(ReadOperation); +} + +template +utils::TypeInfoRef TypeInfoFor(const ReadRequest> &request) { + return TypeInfoForVariant(request.operation); +} + template -utils::TypeInfoRef TypeInfoFor(const WriteResponse> &write_response) { - return TypeInfoForVariant(write_response.write_return); +utils::TypeInfoRef TypeInfoFor(const WriteResponse> &response) { + return TypeInfoForVariant(response.write_return); } template -utils::TypeInfoRef TypeInfoFor(const WriteResponse & /* write_response */) { +utils::TypeInfoRef TypeInfoFor(const WriteResponse & /* response */) { return typeid(WriteReturn); } template -utils::TypeInfoRef TypeInfoFor(const WriteRequest & /* write_request */) { +utils::TypeInfoRef TypeInfoFor(const WriteRequest & /* request */) { return typeid(WriteOperation); } template -utils::TypeInfoRef TypeInfoFor(const WriteRequest> &write_request) { - return TypeInfoForVariant(write_request.operation); +utils::TypeInfoRef TypeInfoFor(const WriteRequest> &request) { + return TypeInfoForVariant(request.operation); } /// AppendRequest is a raft-level message that the Leader @@ -846,7 +856,9 @@ class Raft { // Leaders are able to immediately respond to the requester (with a ReadResponseValue) applied to the ReplicatedState std::optional Handle(Leader & /* variable */, ReadRequest &&req, RequestId request_id, Address from_address) { - Log("handling ReadOperation"); + auto type_info = TypeInfoFor(req); + std::string demangled_name = boost::core::demangle(type_info.get().name()); + Log("handling ReadOperation<" + demangled_name + ">"); ReadOperation read_operation = req.operation; ReadResponseValue read_return = replicated_state_.Read(read_operation); diff --git a/src/io/rsm/rsm_client.hpp b/src/io/rsm/rsm_client.hpp index 1283ec3dc..b863e79a4 100644 --- a/src/io/rsm/rsm_client.hpp +++ b/src/io/rsm/rsm_client.hpp @@ -73,7 +73,6 @@ class RsmClient { if (response.retry_leader) { MG_ASSERT(!response.success, "retry_leader should never be set for successful responses"); leader_ = response.retry_leader.value(); - spdlog::error("client redirected to leader server {}", leader_.ToString()); spdlog::debug("client redirected to leader server {}", leader_.ToString()); } if (!response.success) { diff --git a/src/io/simulator/simulator.hpp b/src/io/simulator/simulator.hpp index 622c264b4..667570276 100644 --- a/src/io/simulator/simulator.hpp +++ b/src/io/simulator/simulator.hpp @@ -49,5 +49,11 @@ class Simulator { } SimulatorStats Stats() { return simulator_handle_->Stats(); } + + std::function GetSimulatorTickClosure() { + std::shared_ptr handle_copy = simulator_handle_; + std::function tick_closure = [handle_copy] { return handle_copy->MaybeTickSimulator(); }; + return tick_closure; + } }; }; // namespace memgraph::io::simulator diff --git a/src/io/simulator/simulator_handle.hpp b/src/io/simulator/simulator_handle.hpp index 0cd6b77b0..3fd9b4965 100644 --- a/src/io/simulator/simulator_handle.hpp +++ b/src/io/simulator/simulator_handle.hpp @@ -22,6 +22,8 @@ #include #include +#include + #include "io/address.hpp" #include "io/errors.hpp" #include "io/message_conversion.hpp" @@ -107,8 +109,9 @@ class SimulatorHandle { ResponseFuture SubmitRequest(Address to_address, Address from_address, Request &&request, Duration timeout, std::function &&maybe_tick_simulator, std::function &&fill_notifier) { - spdlog::trace("submitting request to {}", to_address.last_known_port); auto type_info = TypeInfoFor(request); + std::string demangled_name = boost::core::demangle(type_info.get().name()); + spdlog::trace("simulator sending request {} to {}", demangled_name, to_address); auto [future, promise] = memgraph::io::FuturePromisePairWithNotifications>( // set notifier for when the Future::Wait is called @@ -116,34 +119,36 @@ class SimulatorHandle { // set notifier for when Promise::Fill is called std::forward>(fill_notifier)); - std::unique_lock lock(mu_); + { + std::unique_lock lock(mu_); - RequestId request_id = ++request_id_counter_; + RequestId request_id = ++request_id_counter_; - const Time deadline = cluster_wide_time_microseconds_ + timeout; + const Time deadline = cluster_wide_time_microseconds_ + timeout; - std::any message(request); - OpaqueMessage om{.to_address = to_address, - .from_address = from_address, - .request_id = request_id, - .message = std::move(message), - .type_info = type_info}; - in_flight_.emplace_back(std::make_pair(to_address, std::move(om))); + std::any message(request); + OpaqueMessage om{.to_address = to_address, + .from_address = from_address, + .request_id = request_id, + .message = std::move(message), + .type_info = type_info}; + in_flight_.emplace_back(std::make_pair(to_address, std::move(om))); - PromiseKey promise_key{.requester_address = from_address, .request_id = request_id}; - OpaquePromise opaque_promise(std::move(promise).ToUnique()); - DeadlineAndOpaquePromise dop{ - .requested_at = cluster_wide_time_microseconds_, - .deadline = deadline, - .promise = std::move(opaque_promise), - }; + PromiseKey promise_key{.requester_address = from_address, .request_id = request_id}; + OpaquePromise opaque_promise(std::move(promise).ToUnique()); + DeadlineAndOpaquePromise dop{ + .requested_at = cluster_wide_time_microseconds_, + .deadline = deadline, + .promise = std::move(opaque_promise), + }; - MG_ASSERT(!promises_.contains(promise_key)); + MG_ASSERT(!promises_.contains(promise_key)); - promises_.emplace(std::move(promise_key), std::move(dop)); + promises_.emplace(std::move(promise_key), std::move(dop)); - stats_.total_messages++; - stats_.total_requests++; + stats_.total_messages++; + stats_.total_requests++; + } // lock dropped here cv_.notify_all(); diff --git a/src/io/simulator/simulator_transport.hpp b/src/io/simulator/simulator_transport.hpp index 2107c34ca..758bc43b7 100644 --- a/src/io/simulator/simulator_transport.hpp +++ b/src/io/simulator/simulator_transport.hpp @@ -36,11 +36,11 @@ class SimulatorTransport { template ResponseFuture Request(Address to_address, Address from_address, RequestT request, std::function notification, Duration timeout) { - std::function maybe_tick_simulator = [this] { return simulator_handle_->MaybeTickSimulator(); }; + std::shared_ptr handle_copy = simulator_handle_; + std::function tick_simulator = [handle_copy] { return handle_copy->MaybeTickSimulator(); }; - return simulator_handle_->template SubmitRequest(to_address, from_address, std::move(request), - timeout, std::move(maybe_tick_simulator), - std::move(notification)); + return simulator_handle_->template SubmitRequest( + to_address, from_address, std::move(request), timeout, std::move(tick_simulator), std::move(notification)); } template diff --git a/src/query/v2/request_router.hpp b/src/query/v2/request_router.hpp index 2d563ade0..1336addae 100644 --- a/src/query/v2/request_router.hpp +++ b/src/query/v2/request_router.hpp @@ -135,10 +135,16 @@ class RequestRouter : public RequestRouterInterface { ~RequestRouter() override {} + void InstallSimulatorTicker(std::function tick_simulator) { + notifier_.InstallSimulatorTicker(tick_simulator); + } + void StartTransaction() override { coordinator::HlcRequest req{.last_shard_map_version = shards_map_.GetHlc()}; CoordinatorWriteRequests write_req = req; + spdlog::trace("sending hlc request to start transaction"); auto write_res = coord_cli_.SendWriteRequest(write_req); + spdlog::trace("received hlc response to start transaction"); if (write_res.HasError()) { throw std::runtime_error("HLC request failed"); } @@ -157,7 +163,9 @@ class RequestRouter : public RequestRouterInterface { void Commit() override { coordinator::HlcRequest req{.last_shard_map_version = shards_map_.GetHlc()}; CoordinatorWriteRequests write_req = req; + spdlog::trace("sending hlc request before committing transaction"); auto write_res = coord_cli_.SendWriteRequest(write_req); + spdlog::trace("received hlc response before committing transaction"); if (write_res.HasError()) { throw std::runtime_error("HLC request for commit failed"); } @@ -227,7 +235,7 @@ class RequestRouter : public RequestRouterInterface { std::vector ScanVertices(std::optional label) override { // create requests std::vector> unsent_requests = RequestsForScanVertices(label); - spdlog::error("created {} ScanVertices requests", unsent_requests.size()); + spdlog::trace("created {} ScanVertices requests", unsent_requests.size()); // begin all requests in parallel RunningRequests running_requests = {}; @@ -239,11 +247,11 @@ class RequestRouter : public RequestRouterInterface { storage_client.SendAsyncReadRequest(request.request, notifier_, readiness_token); running_requests.emplace(readiness_token.GetId(), request); } - spdlog::error("sent {} ScanVertices requests in parallel", running_requests.size()); + spdlog::trace("sent {} ScanVertices requests in parallel", running_requests.size()); // drive requests to completion auto responses = DriveReadResponses(running_requests); - spdlog::error("got back {} ScanVertices responses after driving to completion", responses.size()); + spdlog::trace("got back {} ScanVertices responses after driving to completion", responses.size()); // convert responses into VertexAccessor objects to return std::vector accessors; @@ -263,6 +271,7 @@ class RequestRouter : public RequestRouterInterface { // create requests std::vector> unsent_requests = RequestsForCreateVertices(new_vertices); + spdlog::trace("created {} CreateVertices requests", unsent_requests.size()); // begin all requests in parallel RunningRequests running_requests = {}; @@ -277,6 +286,7 @@ class RequestRouter : public RequestRouterInterface { storage_client.SendAsyncWriteRequest(request.request, notifier_, readiness_token); running_requests.emplace(readiness_token.GetId(), request); } + spdlog::trace("sent {} CreateVertices requests in parallel", running_requests.size()); // drive requests to completion return DriveWriteResponses(running_requests); @@ -519,8 +529,10 @@ class RequestRouter : public RequestRouterInterface { // even if they came back in randomized orders. std::map response_map; + spdlog::trace("waiting on readiness for token"); while (response_map.size() < running_requests.size()) { auto ready = notifier_.Await(); + spdlog::trace("got readiness for token {}", ready.GetId()); auto &request = running_requests.at(ready.GetId()); auto &storage_client = GetStorageClientForShard(request.shard); diff --git a/tests/simulation/request_router.cpp b/tests/simulation/request_router.cpp index 8187f138b..af8ec62ef 100644 --- a/tests/simulation/request_router.cpp +++ b/tests/simulation/request_router.cpp @@ -338,6 +338,8 @@ void DoTest() { CoordinatorClient coordinator_client(cli_io, c_addrs[0], c_addrs); query::v2::RequestRouter request_router(std::move(coordinator_client), std::move(cli_io)); + std::function tick_simulator = simulator.GetSimulatorTickClosure(); + request_router.InstallSimulatorTicker(tick_simulator); request_router.StartTransaction(); TestScanVertices(request_router); diff --git a/tests/simulation/test_cluster.hpp b/tests/simulation/test_cluster.hpp index 1392a0632..3e14545a9 100644 --- a/tests/simulation/test_cluster.hpp +++ b/tests/simulation/test_cluster.hpp @@ -244,6 +244,8 @@ std::pair RunClusterSimulation(const WaitForShardsToInitialize(coordinator_client); query::v2::RequestRouter request_router(std::move(coordinator_client), std::move(cli_io)); + std::function tick_simulator = simulator.GetSimulatorTickClosure(); + request_router.InstallSimulatorTicker(tick_simulator); request_router.StartTransaction(); From 6efe074313572b6ab1c0dd83b0970ff891ee33f4 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Mon, 5 Dec 2022 13:15:12 +0000 Subject: [PATCH 04/12] Update GetProperties to use the correct style of request driving in the RequestRouter --- src/query/v2/request_router.hpp | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/src/query/v2/request_router.hpp b/src/query/v2/request_router.hpp index 5f5d103cc..110aa5d96 100644 --- a/src/query/v2/request_router.hpp +++ b/src/query/v2/request_router.hpp @@ -358,25 +358,23 @@ class RequestRouter : public RequestRouterInterface { } std::vector GetProperties(msgs::GetPropertiesRequest requests) override { - ExecutionState state = {}; - InitializeExecutionState(state, std::move(requests)); - for (auto &request : state.requests) { + // create requests + std::vector unsent_requests = RequestsForGetProperties(requests); + + // begin all requests in parallel + RunningRequests running_requests = {}; + running_requests.reserve(unsent_requests.size()); + for (size_t i = 0; i < unsent_requests.size(); i++) { + auto &request = unsent_requests[i]; + io::ReadinessToken readiness_token{i}; auto &storage_client = GetStorageClientForShard(request.shard); msgs::ReadRequests req = request.request; - request.async_request_token = storage_client.SendAsyncReadRequest(req); + storage_client.SendAsyncReadRequest(req, notifier_, readiness_token); + running_requests.emplace(readiness_token.GetId(), request); } - std::vector responses; - do { - DriveReadResponses(state, responses); - } while (!state.requests.empty()); - - std::vector result; - for (auto &res : responses) { - std::move(res.result_row.begin(), res.result_row.end(), std::back_inserter(result)); - } - - return result; + // drive requests to completion + return DriveReadResponses(running_requests); } std::optional MaybeNameToProperty(const std::string &name) const override { @@ -522,7 +520,7 @@ class RequestRouter : public RequestRouterInterface { return requests; } - void InitializeExecutionState(ExecutionState &state, msgs::GetPropertiesRequest request) { + std::vector RequestsForGetProperties(const msgs::GetPropertiesRequest &request) { std::map per_shard_request_table; auto top_level_rqst_template = request; top_level_rqst_template.transaction_id = transaction_id_; From 1b458ebc410f93d85a7c9aa0c6c1d5a32af0f032 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Mon, 5 Dec 2022 13:26:44 +0000 Subject: [PATCH 05/12] Complete migration of GetProperties to new request style --- src/query/v2/request_router.hpp | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/src/query/v2/request_router.hpp b/src/query/v2/request_router.hpp index 110aa5d96..0d4bf27e1 100644 --- a/src/query/v2/request_router.hpp +++ b/src/query/v2/request_router.hpp @@ -359,7 +359,7 @@ class RequestRouter : public RequestRouterInterface { std::vector GetProperties(msgs::GetPropertiesRequest requests) override { // create requests - std::vector unsent_requests = RequestsForGetProperties(requests); + std::vector> unsent_requests = RequestsForGetProperties(requests); // begin all requests in parallel RunningRequests running_requests = {}; @@ -374,7 +374,16 @@ class RequestRouter : public RequestRouterInterface { } // drive requests to completion - return DriveReadResponses(running_requests); + auto responses = DriveReadResponses(running_requests); + + // post-process responses + std::vector result_rows; + + for (auto &&response : responses) { + std::move(response.result_row.begin(), response.result_row.end(), std::back_inserter(result_rows)); + } + + return result_rows; } std::optional MaybeNameToProperty(const std::string &name) const override { @@ -520,15 +529,14 @@ class RequestRouter : public RequestRouterInterface { return requests; } - std::vector RequestsForGetProperties(const msgs::GetPropertiesRequest &request) { + std::vector> RequestsForGetProperties( + const msgs::GetPropertiesRequest &request) { std::map per_shard_request_table; auto top_level_rqst_template = request; top_level_rqst_template.transaction_id = transaction_id_; top_level_rqst_template.vertex_ids.clear(); top_level_rqst_template.vertices_and_edges.clear(); - state.transaction_id = transaction_id_; - for (auto &vertex : request.vertex_ids) { auto shard = shards_map_.GetShardForKey(vertex.first.id, storage::conversions::ConvertPropertyVector(vertex.second)); @@ -547,15 +555,18 @@ class RequestRouter : public RequestRouterInterface { per_shard_request_table[shard].vertices_and_edges.emplace_back(std::move(vertex), maybe_edge); } + std::vector> requests; + for (auto &[shard, rqst] : per_shard_request_table) { ShardRequestState shard_request_state{ .shard = shard, .request = std::move(rqst), - .async_request_token = std::nullopt, }; - state.requests.emplace_back(std::move(shard_request_state)); + requests.emplace_back(std::move(shard_request_state)); } + + return requests; } StorageClient &GetStorageClientForShard(Shard shard) { From ca3f748325403771b4dbd02cbbda6551e88180f7 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Mon, 5 Dec 2022 13:43:20 +0000 Subject: [PATCH 06/12] Apply clang-tidy feedback --- src/io/notifier.hpp | 1 + src/query/v2/request_router.hpp | 7 ++++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/io/notifier.hpp b/src/io/notifier.hpp index 9e0dec7df..e2b69e4ac 100644 --- a/src/io/notifier.hpp +++ b/src/io/notifier.hpp @@ -14,6 +14,7 @@ #include #include #include +#include #include namespace memgraph::io { diff --git a/src/query/v2/request_router.hpp b/src/query/v2/request_router.hpp index 0d4bf27e1..b38d4d005 100644 --- a/src/query/v2/request_router.hpp +++ b/src/query/v2/request_router.hpp @@ -359,7 +359,8 @@ class RequestRouter : public RequestRouterInterface { std::vector GetProperties(msgs::GetPropertiesRequest requests) override { // create requests - std::vector> unsent_requests = RequestsForGetProperties(requests); + std::vector> unsent_requests = + RequestsForGetProperties(std::move(requests)); // begin all requests in parallel RunningRequests running_requests = {}; @@ -530,14 +531,14 @@ class RequestRouter : public RequestRouterInterface { } std::vector> RequestsForGetProperties( - const msgs::GetPropertiesRequest &request) { + msgs::GetPropertiesRequest &&request) { std::map per_shard_request_table; auto top_level_rqst_template = request; top_level_rqst_template.transaction_id = transaction_id_; top_level_rqst_template.vertex_ids.clear(); top_level_rqst_template.vertices_and_edges.clear(); - for (auto &vertex : request.vertex_ids) { + for (auto &&vertex : request.vertex_ids) { auto shard = shards_map_.GetShardForKey(vertex.first.id, storage::conversions::ConvertPropertyVector(vertex.second)); if (!per_shard_request_table.contains(shard)) { From 747b8a21cdd87d5006319711ccf6e3e881b6136e Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Mon, 5 Dec 2022 14:20:06 +0000 Subject: [PATCH 07/12] Fix bug with polling redirected requests --- src/query/v2/request_router.hpp | 15 +++++++++++++-- src/storage/v3/shard_rsm.cpp | 2 +- tests/simulation/request_router.cpp | 7 ++++++- 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/src/query/v2/request_router.hpp b/src/query/v2/request_router.hpp index b38d4d005..7e9831a70 100644 --- a/src/query/v2/request_router.hpp +++ b/src/query/v2/request_router.hpp @@ -45,6 +45,7 @@ #include "utils/result.hpp" namespace memgraph::query::v2 { + template class RsmStorageClientManager { public: @@ -608,7 +609,12 @@ class RequestRouter : public RequestRouterInterface { auto &request = running_requests.at(ready.GetId()); auto &storage_client = GetStorageClientForShard(request.shard); - auto poll_result = storage_client.PollAsyncReadRequest(ready); + std::optional> poll_result = + storage_client.PollAsyncReadRequest(ready); + + if (!poll_result.has_value()) { + continue; + } if (poll_result->HasError()) { throw std::runtime_error("RequestRouter Read request timed out"); @@ -649,7 +655,12 @@ class RequestRouter : public RequestRouterInterface { auto &request = running_requests.at(ready.GetId()); auto &storage_client = GetStorageClientForShard(request.shard); - auto poll_result = storage_client.PollAsyncWriteRequest(ready); + std::optional> poll_result = + storage_client.PollAsyncWriteRequest(ready); + + if (!poll_result.has_value()) { + continue; + } if (poll_result->HasError()) { throw std::runtime_error("RequestRouter Write request timed out"); diff --git a/src/storage/v3/shard_rsm.cpp b/src/storage/v3/shard_rsm.cpp index 639ffd6d8..b919d217c 100644 --- a/src/storage/v3/shard_rsm.cpp +++ b/src/storage/v3/shard_rsm.cpp @@ -611,7 +611,7 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::GetPropertiesRequest &&req) { return limit; }; - auto collect_response = [get_limit, &req](auto &elements, auto create_result_row) { + auto collect_response = [get_limit, &req](auto &elements, auto create_result_row) -> msgs::ReadResponses { msgs::GetPropertiesResponse response; const auto limit = get_limit(elements); for (size_t index = 0; index != limit; ++index) { diff --git a/tests/simulation/request_router.cpp b/tests/simulation/request_router.cpp index bc5168483..0f712793f 100644 --- a/tests/simulation/request_router.cpp +++ b/tests/simulation/request_router.cpp @@ -18,6 +18,8 @@ #include #include +#include + #include "common.hpp" #include "common/types.hpp" #include "coordinator/coordinator_client.hpp" @@ -370,4 +372,7 @@ void DoTest() { } } // namespace memgraph::query::v2::tests -int main() { memgraph::query::v2::tests::DoTest(); } +int main() { + spdlog::cfg::load_env_levels(); + memgraph::query::v2::tests::DoTest(); +} From 2a81ce5640f31d8c025d88584a2de7f14c8ae100 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Mon, 5 Dec 2022 15:26:18 +0100 Subject: [PATCH 08/12] Update src/io/simulator/simulator.hpp Co-authored-by: gvolfing <107616712+gvolfing@users.noreply.github.com> --- src/io/simulator/simulator.hpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/io/simulator/simulator.hpp b/src/io/simulator/simulator.hpp index 667570276..8afc073af 100644 --- a/src/io/simulator/simulator.hpp +++ b/src/io/simulator/simulator.hpp @@ -51,8 +51,7 @@ class Simulator { SimulatorStats Stats() { return simulator_handle_->Stats(); } std::function GetSimulatorTickClosure() { - std::shared_ptr handle_copy = simulator_handle_; - std::function tick_closure = [handle_copy] { return handle_copy->MaybeTickSimulator(); }; + std::function tick_closure = [handle_copy = simulator_handle_] { return handle_copy->MaybeTickSimulator(); }; return tick_closure; } }; From 25713405df9ad6df9320c46fc850579aba3f2537 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Mon, 5 Dec 2022 15:26:29 +0100 Subject: [PATCH 09/12] Update src/io/simulator/simulator_transport.hpp Co-authored-by: gvolfing <107616712+gvolfing@users.noreply.github.com> --- src/io/simulator/simulator_transport.hpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/io/simulator/simulator_transport.hpp b/src/io/simulator/simulator_transport.hpp index 758bc43b7..038cfeb03 100644 --- a/src/io/simulator/simulator_transport.hpp +++ b/src/io/simulator/simulator_transport.hpp @@ -36,8 +36,7 @@ class SimulatorTransport { template ResponseFuture Request(Address to_address, Address from_address, RequestT request, std::function notification, Duration timeout) { - std::shared_ptr handle_copy = simulator_handle_; - std::function tick_simulator = [handle_copy] { return handle_copy->MaybeTickSimulator(); }; + std::function tick_simulator = [handle_copy = simulator_handle_] { return handle_copy->MaybeTickSimulator(); }; return simulator_handle_->template SubmitRequest( to_address, from_address, std::move(request), timeout, std::move(tick_simulator), std::move(notification)); From b288f06cb7e51b69ddaaccf690e9816ed4f5017a Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Tue, 6 Dec 2022 11:31:40 +0100 Subject: [PATCH 10/12] Update src/io/rsm/rsm_client.hpp Co-authored-by: Jeremy B <97525434+42jeremy@users.noreply.github.com> --- src/io/rsm/rsm_client.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/io/rsm/rsm_client.hpp b/src/io/rsm/rsm_client.hpp index b863e79a4..cf56b1151 100644 --- a/src/io/rsm/rsm_client.hpp +++ b/src/io/rsm/rsm_client.hpp @@ -94,7 +94,7 @@ class RsmClient { BasicResult SendWriteRequest(WriteRequestT req) { Notifier notifier; - ReadinessToken readiness_token{0}; + const ReadinessToken readiness_token{0}; SendAsyncWriteRequest(req, notifier, readiness_token); auto poll_result = AwaitAsyncWriteRequest(readiness_token); while (!poll_result) { From 675c2fe24acb5c3fc1f1ffbfa9215c3819c6268e Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Tue, 6 Dec 2022 11:31:46 +0100 Subject: [PATCH 11/12] Update src/io/rsm/rsm_client.hpp Co-authored-by: Jeremy B <97525434+42jeremy@users.noreply.github.com> --- src/io/rsm/rsm_client.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/io/rsm/rsm_client.hpp b/src/io/rsm/rsm_client.hpp index cf56b1151..2b76b6399 100644 --- a/src/io/rsm/rsm_client.hpp +++ b/src/io/rsm/rsm_client.hpp @@ -105,7 +105,7 @@ class RsmClient { BasicResult SendReadRequest(ReadRequestT req) { Notifier notifier; - ReadinessToken readiness_token{0}; + const ReadinessToken readiness_token{0}; SendAsyncReadRequest(req, notifier, readiness_token); auto poll_result = AwaitAsyncReadRequest(readiness_token); while (!poll_result) { From 5d3d67cbd09faa5a13783910660cac7400c2a460 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Tue, 6 Dec 2022 10:32:57 +0000 Subject: [PATCH 12/12] Rename unsent_requests to requests_to_be_sent in RequestRouter --- src/query/v2/request_router.hpp | 44 ++++++++++++++++----------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/src/query/v2/request_router.hpp b/src/query/v2/request_router.hpp index 7e9831a70..116e884c2 100644 --- a/src/query/v2/request_router.hpp +++ b/src/query/v2/request_router.hpp @@ -237,14 +237,14 @@ class RequestRouter : public RequestRouterInterface { // TODO(kostasrim) Simplify return result std::vector ScanVertices(std::optional label) override { // create requests - std::vector> unsent_requests = RequestsForScanVertices(label); - spdlog::trace("created {} ScanVertices requests", unsent_requests.size()); + std::vector> requests_to_be_sent = RequestsForScanVertices(label); + spdlog::trace("created {} ScanVertices requests", requests_to_be_sent.size()); // begin all requests in parallel RunningRequests running_requests = {}; - running_requests.reserve(unsent_requests.size()); - for (size_t i = 0; i < unsent_requests.size(); i++) { - auto &request = unsent_requests[i]; + running_requests.reserve(requests_to_be_sent.size()); + for (size_t i = 0; i < requests_to_be_sent.size(); i++) { + auto &request = requests_to_be_sent[i]; io::ReadinessToken readiness_token{i}; auto &storage_client = GetStorageClientForShard(request.shard); storage_client.SendAsyncReadRequest(request.request, notifier_, readiness_token); @@ -272,15 +272,15 @@ class RequestRouter : public RequestRouterInterface { MG_ASSERT(!new_vertices.empty()); // create requests - std::vector> unsent_requests = + std::vector> requests_to_be_sent = RequestsForCreateVertices(new_vertices); - spdlog::trace("created {} CreateVertices requests", unsent_requests.size()); + spdlog::trace("created {} CreateVertices requests", requests_to_be_sent.size()); // begin all requests in parallel RunningRequests running_requests = {}; - running_requests.reserve(unsent_requests.size()); - for (size_t i = 0; i < unsent_requests.size(); i++) { - auto &request = unsent_requests[i]; + running_requests.reserve(requests_to_be_sent.size()); + for (size_t i = 0; i < requests_to_be_sent.size(); i++) { + auto &request = requests_to_be_sent[i]; io::ReadinessToken readiness_token{i}; for (auto &new_vertex : request.request.new_vertices) { new_vertex.label_ids.erase(new_vertex.label_ids.begin()); @@ -299,13 +299,13 @@ class RequestRouter : public RequestRouterInterface { MG_ASSERT(!new_edges.empty()); // create requests - std::vector> unsent_requests = RequestsForCreateExpand(new_edges); + std::vector> requests_to_be_sent = RequestsForCreateExpand(new_edges); // begin all requests in parallel RunningRequests running_requests = {}; - running_requests.reserve(unsent_requests.size()); - for (size_t i = 0; i < unsent_requests.size(); i++) { - auto &request = unsent_requests[i]; + running_requests.reserve(requests_to_be_sent.size()); + for (size_t i = 0; i < requests_to_be_sent.size(); i++) { + auto &request = requests_to_be_sent[i]; io::ReadinessToken readiness_token{i}; auto &storage_client = GetStorageClientForShard(request.shard); msgs::WriteRequests req = request.request; @@ -325,13 +325,13 @@ class RequestRouter : public RequestRouterInterface { // must be fetched again with an ExpandOne(Edges.dst) // create requests - std::vector> unsent_requests = RequestsForExpandOne(request); + std::vector> requests_to_be_sent = RequestsForExpandOne(request); // begin all requests in parallel RunningRequests running_requests = {}; - running_requests.reserve(unsent_requests.size()); - for (size_t i = 0; i < unsent_requests.size(); i++) { - auto &request = unsent_requests[i]; + running_requests.reserve(requests_to_be_sent.size()); + for (size_t i = 0; i < requests_to_be_sent.size(); i++) { + auto &request = requests_to_be_sent[i]; io::ReadinessToken readiness_token{i}; auto &storage_client = GetStorageClientForShard(request.shard); msgs::ReadRequests req = request.request; @@ -360,14 +360,14 @@ class RequestRouter : public RequestRouterInterface { std::vector GetProperties(msgs::GetPropertiesRequest requests) override { // create requests - std::vector> unsent_requests = + std::vector> requests_to_be_sent = RequestsForGetProperties(std::move(requests)); // begin all requests in parallel RunningRequests running_requests = {}; - running_requests.reserve(unsent_requests.size()); - for (size_t i = 0; i < unsent_requests.size(); i++) { - auto &request = unsent_requests[i]; + running_requests.reserve(requests_to_be_sent.size()); + for (size_t i = 0; i < requests_to_be_sent.size(); i++) { + auto &request = requests_to_be_sent[i]; io::ReadinessToken readiness_token{i}; auto &storage_client = GetStorageClientForShard(request.shard); msgs::ReadRequests req = request.request;