From 9a62503803f5e1667bd70a02454bdc5419eeb3ef Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Fri, 2 Dec 2022 18:04:38 +0000 Subject: [PATCH] Tick the simulator forward from Notify::Await in a similar way that Future::Wait does --- src/io/notifier.hpp | 22 ++++++++++- src/io/rsm/raft.hpp | 32 +++++++++++----- src/io/rsm/rsm_client.hpp | 1 - src/io/simulator/simulator.hpp | 6 +++ src/io/simulator/simulator_handle.hpp | 49 +++++++++++++----------- src/io/simulator/simulator_transport.hpp | 8 ++-- src/query/v2/request_router.hpp | 18 +++++++-- tests/simulation/request_router.cpp | 2 + tests/simulation/test_cluster.hpp | 2 + 9 files changed, 99 insertions(+), 41 deletions(-) diff --git a/src/io/notifier.hpp b/src/io/notifier.hpp index 81d6507bb..9e0dec7df 100644 --- a/src/io/notifier.hpp +++ b/src/io/notifier.hpp @@ -12,6 +12,7 @@ #pragma once #include +#include #include #include @@ -29,6 +30,7 @@ class Inner { std::condition_variable cv_; std::mutex mu_; std::vector ready_; + std::optional> tick_simulator_; public: void Notify(ReadinessToken readiness_token) { @@ -44,13 +46,29 @@ class Inner { std::unique_lock lock(mu_); while (ready_.empty()) { - cv_.wait(lock); + if (tick_simulator_) [[unlikely]] { + // This avoids a deadlock in a similar way that + // Future::Wait will release its mutex while + // interacting with the simulator, due to + // the fact that the simulator may cause + // notifications that we are interested in. + lock.unlock(); + std::invoke(tick_simulator_.value()); + lock.lock(); + } else { + cv_.wait(lock); + } } ReadinessToken ret = ready_.back(); ready_.pop_back(); return ret; } + + void InstallSimulatorTicker(std::function tick_simulator) { + std::unique_lock lock(mu_); + tick_simulator_ = tick_simulator; + } }; class Notifier { @@ -67,6 +85,8 @@ class Notifier { void Notify(ReadinessToken readiness_token) const { inner_->Notify(readiness_token); } ReadinessToken Await() const { return inner_->Await(); } + + void InstallSimulatorTicker(std::function tick_simulator) { inner_->InstallSimulatorTicker(tick_simulator); } }; } // namespace memgraph::io diff --git a/src/io/rsm/raft.hpp b/src/io/rsm/raft.hpp index 4ec22ff87..07a51288d 100644 --- a/src/io/rsm/raft.hpp +++ b/src/io/rsm/raft.hpp @@ -91,33 +91,43 @@ struct ReadResponse { }; template -utils::TypeInfoRef TypeInfoFor(const ReadResponse> &read_response) { - return TypeInfoForVariant(read_response.read_return); +utils::TypeInfoRef TypeInfoFor(const ReadResponse> &response) { + return TypeInfoForVariant(response.read_return); } template -utils::TypeInfoRef TypeInfoFor(const ReadResponse & /* read_response */) { +utils::TypeInfoRef TypeInfoFor(const ReadResponse & /* response */) { return typeid(ReadReturn); } +template +utils::TypeInfoRef TypeInfoFor(const ReadRequest & /* request */) { + return typeid(ReadOperation); +} + +template +utils::TypeInfoRef TypeInfoFor(const ReadRequest> &request) { + return TypeInfoForVariant(request.operation); +} + template -utils::TypeInfoRef TypeInfoFor(const WriteResponse> &write_response) { - return TypeInfoForVariant(write_response.write_return); +utils::TypeInfoRef TypeInfoFor(const WriteResponse> &response) { + return TypeInfoForVariant(response.write_return); } template -utils::TypeInfoRef TypeInfoFor(const WriteResponse & /* write_response */) { +utils::TypeInfoRef TypeInfoFor(const WriteResponse & /* response */) { return typeid(WriteReturn); } template -utils::TypeInfoRef TypeInfoFor(const WriteRequest & /* write_request */) { +utils::TypeInfoRef TypeInfoFor(const WriteRequest & /* request */) { return typeid(WriteOperation); } template -utils::TypeInfoRef TypeInfoFor(const WriteRequest> &write_request) { - return TypeInfoForVariant(write_request.operation); +utils::TypeInfoRef TypeInfoFor(const WriteRequest> &request) { + return TypeInfoForVariant(request.operation); } /// AppendRequest is a raft-level message that the Leader @@ -846,7 +856,9 @@ class Raft { // Leaders are able to immediately respond to the requester (with a ReadResponseValue) applied to the ReplicatedState std::optional Handle(Leader & /* variable */, ReadRequest &&req, RequestId request_id, Address from_address) { - Log("handling ReadOperation"); + auto type_info = TypeInfoFor(req); + std::string demangled_name = boost::core::demangle(type_info.get().name()); + Log("handling ReadOperation<" + demangled_name + ">"); ReadOperation read_operation = req.operation; ReadResponseValue read_return = replicated_state_.Read(read_operation); diff --git a/src/io/rsm/rsm_client.hpp b/src/io/rsm/rsm_client.hpp index 1283ec3dc..b863e79a4 100644 --- a/src/io/rsm/rsm_client.hpp +++ b/src/io/rsm/rsm_client.hpp @@ -73,7 +73,6 @@ class RsmClient { if (response.retry_leader) { MG_ASSERT(!response.success, "retry_leader should never be set for successful responses"); leader_ = response.retry_leader.value(); - spdlog::error("client redirected to leader server {}", leader_.ToString()); spdlog::debug("client redirected to leader server {}", leader_.ToString()); } if (!response.success) { diff --git a/src/io/simulator/simulator.hpp b/src/io/simulator/simulator.hpp index 622c264b4..667570276 100644 --- a/src/io/simulator/simulator.hpp +++ b/src/io/simulator/simulator.hpp @@ -49,5 +49,11 @@ class Simulator { } SimulatorStats Stats() { return simulator_handle_->Stats(); } + + std::function GetSimulatorTickClosure() { + std::shared_ptr handle_copy = simulator_handle_; + std::function tick_closure = [handle_copy] { return handle_copy->MaybeTickSimulator(); }; + return tick_closure; + } }; }; // namespace memgraph::io::simulator diff --git a/src/io/simulator/simulator_handle.hpp b/src/io/simulator/simulator_handle.hpp index 0cd6b77b0..3fd9b4965 100644 --- a/src/io/simulator/simulator_handle.hpp +++ b/src/io/simulator/simulator_handle.hpp @@ -22,6 +22,8 @@ #include #include +#include + #include "io/address.hpp" #include "io/errors.hpp" #include "io/message_conversion.hpp" @@ -107,8 +109,9 @@ class SimulatorHandle { ResponseFuture SubmitRequest(Address to_address, Address from_address, Request &&request, Duration timeout, std::function &&maybe_tick_simulator, std::function &&fill_notifier) { - spdlog::trace("submitting request to {}", to_address.last_known_port); auto type_info = TypeInfoFor(request); + std::string demangled_name = boost::core::demangle(type_info.get().name()); + spdlog::trace("simulator sending request {} to {}", demangled_name, to_address); auto [future, promise] = memgraph::io::FuturePromisePairWithNotifications>( // set notifier for when the Future::Wait is called @@ -116,34 +119,36 @@ class SimulatorHandle { // set notifier for when Promise::Fill is called std::forward>(fill_notifier)); - std::unique_lock lock(mu_); + { + std::unique_lock lock(mu_); - RequestId request_id = ++request_id_counter_; + RequestId request_id = ++request_id_counter_; - const Time deadline = cluster_wide_time_microseconds_ + timeout; + const Time deadline = cluster_wide_time_microseconds_ + timeout; - std::any message(request); - OpaqueMessage om{.to_address = to_address, - .from_address = from_address, - .request_id = request_id, - .message = std::move(message), - .type_info = type_info}; - in_flight_.emplace_back(std::make_pair(to_address, std::move(om))); + std::any message(request); + OpaqueMessage om{.to_address = to_address, + .from_address = from_address, + .request_id = request_id, + .message = std::move(message), + .type_info = type_info}; + in_flight_.emplace_back(std::make_pair(to_address, std::move(om))); - PromiseKey promise_key{.requester_address = from_address, .request_id = request_id}; - OpaquePromise opaque_promise(std::move(promise).ToUnique()); - DeadlineAndOpaquePromise dop{ - .requested_at = cluster_wide_time_microseconds_, - .deadline = deadline, - .promise = std::move(opaque_promise), - }; + PromiseKey promise_key{.requester_address = from_address, .request_id = request_id}; + OpaquePromise opaque_promise(std::move(promise).ToUnique()); + DeadlineAndOpaquePromise dop{ + .requested_at = cluster_wide_time_microseconds_, + .deadline = deadline, + .promise = std::move(opaque_promise), + }; - MG_ASSERT(!promises_.contains(promise_key)); + MG_ASSERT(!promises_.contains(promise_key)); - promises_.emplace(std::move(promise_key), std::move(dop)); + promises_.emplace(std::move(promise_key), std::move(dop)); - stats_.total_messages++; - stats_.total_requests++; + stats_.total_messages++; + stats_.total_requests++; + } // lock dropped here cv_.notify_all(); diff --git a/src/io/simulator/simulator_transport.hpp b/src/io/simulator/simulator_transport.hpp index 2107c34ca..758bc43b7 100644 --- a/src/io/simulator/simulator_transport.hpp +++ b/src/io/simulator/simulator_transport.hpp @@ -36,11 +36,11 @@ class SimulatorTransport { template ResponseFuture Request(Address to_address, Address from_address, RequestT request, std::function notification, Duration timeout) { - std::function maybe_tick_simulator = [this] { return simulator_handle_->MaybeTickSimulator(); }; + std::shared_ptr handle_copy = simulator_handle_; + std::function tick_simulator = [handle_copy] { return handle_copy->MaybeTickSimulator(); }; - return simulator_handle_->template SubmitRequest(to_address, from_address, std::move(request), - timeout, std::move(maybe_tick_simulator), - std::move(notification)); + return simulator_handle_->template SubmitRequest( + to_address, from_address, std::move(request), timeout, std::move(tick_simulator), std::move(notification)); } template diff --git a/src/query/v2/request_router.hpp b/src/query/v2/request_router.hpp index 2d563ade0..1336addae 100644 --- a/src/query/v2/request_router.hpp +++ b/src/query/v2/request_router.hpp @@ -135,10 +135,16 @@ class RequestRouter : public RequestRouterInterface { ~RequestRouter() override {} + void InstallSimulatorTicker(std::function tick_simulator) { + notifier_.InstallSimulatorTicker(tick_simulator); + } + void StartTransaction() override { coordinator::HlcRequest req{.last_shard_map_version = shards_map_.GetHlc()}; CoordinatorWriteRequests write_req = req; + spdlog::trace("sending hlc request to start transaction"); auto write_res = coord_cli_.SendWriteRequest(write_req); + spdlog::trace("received hlc response to start transaction"); if (write_res.HasError()) { throw std::runtime_error("HLC request failed"); } @@ -157,7 +163,9 @@ class RequestRouter : public RequestRouterInterface { void Commit() override { coordinator::HlcRequest req{.last_shard_map_version = shards_map_.GetHlc()}; CoordinatorWriteRequests write_req = req; + spdlog::trace("sending hlc request before committing transaction"); auto write_res = coord_cli_.SendWriteRequest(write_req); + spdlog::trace("received hlc response before committing transaction"); if (write_res.HasError()) { throw std::runtime_error("HLC request for commit failed"); } @@ -227,7 +235,7 @@ class RequestRouter : public RequestRouterInterface { std::vector ScanVertices(std::optional label) override { // create requests std::vector> unsent_requests = RequestsForScanVertices(label); - spdlog::error("created {} ScanVertices requests", unsent_requests.size()); + spdlog::trace("created {} ScanVertices requests", unsent_requests.size()); // begin all requests in parallel RunningRequests running_requests = {}; @@ -239,11 +247,11 @@ class RequestRouter : public RequestRouterInterface { storage_client.SendAsyncReadRequest(request.request, notifier_, readiness_token); running_requests.emplace(readiness_token.GetId(), request); } - spdlog::error("sent {} ScanVertices requests in parallel", running_requests.size()); + spdlog::trace("sent {} ScanVertices requests in parallel", running_requests.size()); // drive requests to completion auto responses = DriveReadResponses(running_requests); - spdlog::error("got back {} ScanVertices responses after driving to completion", responses.size()); + spdlog::trace("got back {} ScanVertices responses after driving to completion", responses.size()); // convert responses into VertexAccessor objects to return std::vector accessors; @@ -263,6 +271,7 @@ class RequestRouter : public RequestRouterInterface { // create requests std::vector> unsent_requests = RequestsForCreateVertices(new_vertices); + spdlog::trace("created {} CreateVertices requests", unsent_requests.size()); // begin all requests in parallel RunningRequests running_requests = {}; @@ -277,6 +286,7 @@ class RequestRouter : public RequestRouterInterface { storage_client.SendAsyncWriteRequest(request.request, notifier_, readiness_token); running_requests.emplace(readiness_token.GetId(), request); } + spdlog::trace("sent {} CreateVertices requests in parallel", running_requests.size()); // drive requests to completion return DriveWriteResponses(running_requests); @@ -519,8 +529,10 @@ class RequestRouter : public RequestRouterInterface { // even if they came back in randomized orders. std::map response_map; + spdlog::trace("waiting on readiness for token"); while (response_map.size() < running_requests.size()) { auto ready = notifier_.Await(); + spdlog::trace("got readiness for token {}", ready.GetId()); auto &request = running_requests.at(ready.GetId()); auto &storage_client = GetStorageClientForShard(request.shard); diff --git a/tests/simulation/request_router.cpp b/tests/simulation/request_router.cpp index 8187f138b..af8ec62ef 100644 --- a/tests/simulation/request_router.cpp +++ b/tests/simulation/request_router.cpp @@ -338,6 +338,8 @@ void DoTest() { CoordinatorClient coordinator_client(cli_io, c_addrs[0], c_addrs); query::v2::RequestRouter request_router(std::move(coordinator_client), std::move(cli_io)); + std::function tick_simulator = simulator.GetSimulatorTickClosure(); + request_router.InstallSimulatorTicker(tick_simulator); request_router.StartTransaction(); TestScanVertices(request_router); diff --git a/tests/simulation/test_cluster.hpp b/tests/simulation/test_cluster.hpp index 1392a0632..3e14545a9 100644 --- a/tests/simulation/test_cluster.hpp +++ b/tests/simulation/test_cluster.hpp @@ -244,6 +244,8 @@ std::pair RunClusterSimulation(const WaitForShardsToInitialize(coordinator_client); query::v2::RequestRouter request_router(std::move(coordinator_client), std::move(cli_io)); + std::function tick_simulator = simulator.GetSimulatorTickClosure(); + request_router.InstallSimulatorTicker(tick_simulator); request_router.StartTransaction();