Tick the simulator forward from Notify::Await in a similar way that Future::Wait does
This commit is contained in:
parent
438b519703
commit
9a62503803
@ -12,6 +12,7 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <condition_variable>
|
#include <condition_variable>
|
||||||
|
#include <functional>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
@ -29,6 +30,7 @@ class Inner {
|
|||||||
std::condition_variable cv_;
|
std::condition_variable cv_;
|
||||||
std::mutex mu_;
|
std::mutex mu_;
|
||||||
std::vector<ReadinessToken> ready_;
|
std::vector<ReadinessToken> ready_;
|
||||||
|
std::optional<std::function<bool()>> tick_simulator_;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
void Notify(ReadinessToken readiness_token) {
|
void Notify(ReadinessToken readiness_token) {
|
||||||
@ -44,13 +46,29 @@ class Inner {
|
|||||||
std::unique_lock<std::mutex> lock(mu_);
|
std::unique_lock<std::mutex> lock(mu_);
|
||||||
|
|
||||||
while (ready_.empty()) {
|
while (ready_.empty()) {
|
||||||
cv_.wait(lock);
|
if (tick_simulator_) [[unlikely]] {
|
||||||
|
// This avoids a deadlock in a similar way that
|
||||||
|
// Future::Wait will release its mutex while
|
||||||
|
// interacting with the simulator, due to
|
||||||
|
// the fact that the simulator may cause
|
||||||
|
// notifications that we are interested in.
|
||||||
|
lock.unlock();
|
||||||
|
std::invoke(tick_simulator_.value());
|
||||||
|
lock.lock();
|
||||||
|
} else {
|
||||||
|
cv_.wait(lock);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ReadinessToken ret = ready_.back();
|
ReadinessToken ret = ready_.back();
|
||||||
ready_.pop_back();
|
ready_.pop_back();
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void InstallSimulatorTicker(std::function<bool()> tick_simulator) {
|
||||||
|
std::unique_lock<std::mutex> lock(mu_);
|
||||||
|
tick_simulator_ = tick_simulator;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
class Notifier {
|
class Notifier {
|
||||||
@ -67,6 +85,8 @@ class Notifier {
|
|||||||
void Notify(ReadinessToken readiness_token) const { inner_->Notify(readiness_token); }
|
void Notify(ReadinessToken readiness_token) const { inner_->Notify(readiness_token); }
|
||||||
|
|
||||||
ReadinessToken Await() const { return inner_->Await(); }
|
ReadinessToken Await() const { return inner_->Await(); }
|
||||||
|
|
||||||
|
void InstallSimulatorTicker(std::function<bool()> tick_simulator) { inner_->InstallSimulatorTicker(tick_simulator); }
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace memgraph::io
|
} // namespace memgraph::io
|
||||||
|
@ -91,33 +91,43 @@ struct ReadResponse {
|
|||||||
};
|
};
|
||||||
|
|
||||||
template <class... ReadReturn>
|
template <class... ReadReturn>
|
||||||
utils::TypeInfoRef TypeInfoFor(const ReadResponse<std::variant<ReadReturn...>> &read_response) {
|
utils::TypeInfoRef TypeInfoFor(const ReadResponse<std::variant<ReadReturn...>> &response) {
|
||||||
return TypeInfoForVariant(read_response.read_return);
|
return TypeInfoForVariant(response.read_return);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <class ReadReturn>
|
template <class ReadReturn>
|
||||||
utils::TypeInfoRef TypeInfoFor(const ReadResponse<ReadReturn> & /* read_response */) {
|
utils::TypeInfoRef TypeInfoFor(const ReadResponse<ReadReturn> & /* response */) {
|
||||||
return typeid(ReadReturn);
|
return typeid(ReadReturn);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <class ReadOperation>
|
||||||
|
utils::TypeInfoRef TypeInfoFor(const ReadRequest<ReadOperation> & /* request */) {
|
||||||
|
return typeid(ReadOperation);
|
||||||
|
}
|
||||||
|
|
||||||
|
template <class... ReadOperations>
|
||||||
|
utils::TypeInfoRef TypeInfoFor(const ReadRequest<std::variant<ReadOperations...>> &request) {
|
||||||
|
return TypeInfoForVariant(request.operation);
|
||||||
|
}
|
||||||
|
|
||||||
template <class... WriteReturn>
|
template <class... WriteReturn>
|
||||||
utils::TypeInfoRef TypeInfoFor(const WriteResponse<std::variant<WriteReturn...>> &write_response) {
|
utils::TypeInfoRef TypeInfoFor(const WriteResponse<std::variant<WriteReturn...>> &response) {
|
||||||
return TypeInfoForVariant(write_response.write_return);
|
return TypeInfoForVariant(response.write_return);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <class WriteReturn>
|
template <class WriteReturn>
|
||||||
utils::TypeInfoRef TypeInfoFor(const WriteResponse<WriteReturn> & /* write_response */) {
|
utils::TypeInfoRef TypeInfoFor(const WriteResponse<WriteReturn> & /* response */) {
|
||||||
return typeid(WriteReturn);
|
return typeid(WriteReturn);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <class WriteOperation>
|
template <class WriteOperation>
|
||||||
utils::TypeInfoRef TypeInfoFor(const WriteRequest<WriteOperation> & /* write_request */) {
|
utils::TypeInfoRef TypeInfoFor(const WriteRequest<WriteOperation> & /* request */) {
|
||||||
return typeid(WriteOperation);
|
return typeid(WriteOperation);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <class... WriteOperations>
|
template <class... WriteOperations>
|
||||||
utils::TypeInfoRef TypeInfoFor(const WriteRequest<std::variant<WriteOperations...>> &write_request) {
|
utils::TypeInfoRef TypeInfoFor(const WriteRequest<std::variant<WriteOperations...>> &request) {
|
||||||
return TypeInfoForVariant(write_request.operation);
|
return TypeInfoForVariant(request.operation);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// AppendRequest is a raft-level message that the Leader
|
/// AppendRequest is a raft-level message that the Leader
|
||||||
@ -846,7 +856,9 @@ class Raft {
|
|||||||
// Leaders are able to immediately respond to the requester (with a ReadResponseValue) applied to the ReplicatedState
|
// Leaders are able to immediately respond to the requester (with a ReadResponseValue) applied to the ReplicatedState
|
||||||
std::optional<Role> Handle(Leader & /* variable */, ReadRequest<ReadOperation> &&req, RequestId request_id,
|
std::optional<Role> Handle(Leader & /* variable */, ReadRequest<ReadOperation> &&req, RequestId request_id,
|
||||||
Address from_address) {
|
Address from_address) {
|
||||||
Log("handling ReadOperation");
|
auto type_info = TypeInfoFor(req);
|
||||||
|
std::string demangled_name = boost::core::demangle(type_info.get().name());
|
||||||
|
Log("handling ReadOperation<" + demangled_name + ">");
|
||||||
ReadOperation read_operation = req.operation;
|
ReadOperation read_operation = req.operation;
|
||||||
|
|
||||||
ReadResponseValue read_return = replicated_state_.Read(read_operation);
|
ReadResponseValue read_return = replicated_state_.Read(read_operation);
|
||||||
|
@ -73,7 +73,6 @@ class RsmClient {
|
|||||||
if (response.retry_leader) {
|
if (response.retry_leader) {
|
||||||
MG_ASSERT(!response.success, "retry_leader should never be set for successful responses");
|
MG_ASSERT(!response.success, "retry_leader should never be set for successful responses");
|
||||||
leader_ = response.retry_leader.value();
|
leader_ = response.retry_leader.value();
|
||||||
spdlog::error("client redirected to leader server {}", leader_.ToString());
|
|
||||||
spdlog::debug("client redirected to leader server {}", leader_.ToString());
|
spdlog::debug("client redirected to leader server {}", leader_.ToString());
|
||||||
}
|
}
|
||||||
if (!response.success) {
|
if (!response.success) {
|
||||||
|
@ -49,5 +49,11 @@ class Simulator {
|
|||||||
}
|
}
|
||||||
|
|
||||||
SimulatorStats Stats() { return simulator_handle_->Stats(); }
|
SimulatorStats Stats() { return simulator_handle_->Stats(); }
|
||||||
|
|
||||||
|
std::function<bool()> GetSimulatorTickClosure() {
|
||||||
|
std::shared_ptr<SimulatorHandle> handle_copy = simulator_handle_;
|
||||||
|
std::function<bool()> tick_closure = [handle_copy] { return handle_copy->MaybeTickSimulator(); };
|
||||||
|
return tick_closure;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
}; // namespace memgraph::io::simulator
|
}; // namespace memgraph::io::simulator
|
||||||
|
@ -22,6 +22,8 @@
|
|||||||
#include <variant>
|
#include <variant>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
|
#include <boost/core/demangle.hpp>
|
||||||
|
|
||||||
#include "io/address.hpp"
|
#include "io/address.hpp"
|
||||||
#include "io/errors.hpp"
|
#include "io/errors.hpp"
|
||||||
#include "io/message_conversion.hpp"
|
#include "io/message_conversion.hpp"
|
||||||
@ -107,8 +109,9 @@ class SimulatorHandle {
|
|||||||
ResponseFuture<Response> SubmitRequest(Address to_address, Address from_address, Request &&request, Duration timeout,
|
ResponseFuture<Response> SubmitRequest(Address to_address, Address from_address, Request &&request, Duration timeout,
|
||||||
std::function<bool()> &&maybe_tick_simulator,
|
std::function<bool()> &&maybe_tick_simulator,
|
||||||
std::function<void()> &&fill_notifier) {
|
std::function<void()> &&fill_notifier) {
|
||||||
spdlog::trace("submitting request to {}", to_address.last_known_port);
|
|
||||||
auto type_info = TypeInfoFor(request);
|
auto type_info = TypeInfoFor(request);
|
||||||
|
std::string demangled_name = boost::core::demangle(type_info.get().name());
|
||||||
|
spdlog::trace("simulator sending request {} to {}", demangled_name, to_address);
|
||||||
|
|
||||||
auto [future, promise] = memgraph::io::FuturePromisePairWithNotifications<ResponseResult<Response>>(
|
auto [future, promise] = memgraph::io::FuturePromisePairWithNotifications<ResponseResult<Response>>(
|
||||||
// set notifier for when the Future::Wait is called
|
// set notifier for when the Future::Wait is called
|
||||||
@ -116,34 +119,36 @@ class SimulatorHandle {
|
|||||||
// set notifier for when Promise::Fill is called
|
// set notifier for when Promise::Fill is called
|
||||||
std::forward<std::function<void()>>(fill_notifier));
|
std::forward<std::function<void()>>(fill_notifier));
|
||||||
|
|
||||||
std::unique_lock<std::mutex> lock(mu_);
|
{
|
||||||
|
std::unique_lock<std::mutex> lock(mu_);
|
||||||
|
|
||||||
RequestId request_id = ++request_id_counter_;
|
RequestId request_id = ++request_id_counter_;
|
||||||
|
|
||||||
const Time deadline = cluster_wide_time_microseconds_ + timeout;
|
const Time deadline = cluster_wide_time_microseconds_ + timeout;
|
||||||
|
|
||||||
std::any message(request);
|
std::any message(request);
|
||||||
OpaqueMessage om{.to_address = to_address,
|
OpaqueMessage om{.to_address = to_address,
|
||||||
.from_address = from_address,
|
.from_address = from_address,
|
||||||
.request_id = request_id,
|
.request_id = request_id,
|
||||||
.message = std::move(message),
|
.message = std::move(message),
|
||||||
.type_info = type_info};
|
.type_info = type_info};
|
||||||
in_flight_.emplace_back(std::make_pair(to_address, std::move(om)));
|
in_flight_.emplace_back(std::make_pair(to_address, std::move(om)));
|
||||||
|
|
||||||
PromiseKey promise_key{.requester_address = from_address, .request_id = request_id};
|
PromiseKey promise_key{.requester_address = from_address, .request_id = request_id};
|
||||||
OpaquePromise opaque_promise(std::move(promise).ToUnique());
|
OpaquePromise opaque_promise(std::move(promise).ToUnique());
|
||||||
DeadlineAndOpaquePromise dop{
|
DeadlineAndOpaquePromise dop{
|
||||||
.requested_at = cluster_wide_time_microseconds_,
|
.requested_at = cluster_wide_time_microseconds_,
|
||||||
.deadline = deadline,
|
.deadline = deadline,
|
||||||
.promise = std::move(opaque_promise),
|
.promise = std::move(opaque_promise),
|
||||||
};
|
};
|
||||||
|
|
||||||
MG_ASSERT(!promises_.contains(promise_key));
|
MG_ASSERT(!promises_.contains(promise_key));
|
||||||
|
|
||||||
promises_.emplace(std::move(promise_key), std::move(dop));
|
promises_.emplace(std::move(promise_key), std::move(dop));
|
||||||
|
|
||||||
stats_.total_messages++;
|
stats_.total_messages++;
|
||||||
stats_.total_requests++;
|
stats_.total_requests++;
|
||||||
|
} // lock dropped here
|
||||||
|
|
||||||
cv_.notify_all();
|
cv_.notify_all();
|
||||||
|
|
||||||
|
@ -36,11 +36,11 @@ class SimulatorTransport {
|
|||||||
template <Message RequestT, Message ResponseT>
|
template <Message RequestT, Message ResponseT>
|
||||||
ResponseFuture<ResponseT> Request(Address to_address, Address from_address, RequestT request,
|
ResponseFuture<ResponseT> Request(Address to_address, Address from_address, RequestT request,
|
||||||
std::function<void()> notification, Duration timeout) {
|
std::function<void()> notification, Duration timeout) {
|
||||||
std::function<bool()> maybe_tick_simulator = [this] { return simulator_handle_->MaybeTickSimulator(); };
|
std::shared_ptr<SimulatorHandle> handle_copy = simulator_handle_;
|
||||||
|
std::function<bool()> tick_simulator = [handle_copy] { return handle_copy->MaybeTickSimulator(); };
|
||||||
|
|
||||||
return simulator_handle_->template SubmitRequest<RequestT, ResponseT>(to_address, from_address, std::move(request),
|
return simulator_handle_->template SubmitRequest<RequestT, ResponseT>(
|
||||||
timeout, std::move(maybe_tick_simulator),
|
to_address, from_address, std::move(request), timeout, std::move(tick_simulator), std::move(notification));
|
||||||
std::move(notification));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
template <Message... Ms>
|
template <Message... Ms>
|
||||||
|
@ -135,10 +135,16 @@ class RequestRouter : public RequestRouterInterface {
|
|||||||
|
|
||||||
~RequestRouter() override {}
|
~RequestRouter() override {}
|
||||||
|
|
||||||
|
void InstallSimulatorTicker(std::function<bool()> tick_simulator) {
|
||||||
|
notifier_.InstallSimulatorTicker(tick_simulator);
|
||||||
|
}
|
||||||
|
|
||||||
void StartTransaction() override {
|
void StartTransaction() override {
|
||||||
coordinator::HlcRequest req{.last_shard_map_version = shards_map_.GetHlc()};
|
coordinator::HlcRequest req{.last_shard_map_version = shards_map_.GetHlc()};
|
||||||
CoordinatorWriteRequests write_req = req;
|
CoordinatorWriteRequests write_req = req;
|
||||||
|
spdlog::trace("sending hlc request to start transaction");
|
||||||
auto write_res = coord_cli_.SendWriteRequest(write_req);
|
auto write_res = coord_cli_.SendWriteRequest(write_req);
|
||||||
|
spdlog::trace("received hlc response to start transaction");
|
||||||
if (write_res.HasError()) {
|
if (write_res.HasError()) {
|
||||||
throw std::runtime_error("HLC request failed");
|
throw std::runtime_error("HLC request failed");
|
||||||
}
|
}
|
||||||
@ -157,7 +163,9 @@ class RequestRouter : public RequestRouterInterface {
|
|||||||
void Commit() override {
|
void Commit() override {
|
||||||
coordinator::HlcRequest req{.last_shard_map_version = shards_map_.GetHlc()};
|
coordinator::HlcRequest req{.last_shard_map_version = shards_map_.GetHlc()};
|
||||||
CoordinatorWriteRequests write_req = req;
|
CoordinatorWriteRequests write_req = req;
|
||||||
|
spdlog::trace("sending hlc request before committing transaction");
|
||||||
auto write_res = coord_cli_.SendWriteRequest(write_req);
|
auto write_res = coord_cli_.SendWriteRequest(write_req);
|
||||||
|
spdlog::trace("received hlc response before committing transaction");
|
||||||
if (write_res.HasError()) {
|
if (write_res.HasError()) {
|
||||||
throw std::runtime_error("HLC request for commit failed");
|
throw std::runtime_error("HLC request for commit failed");
|
||||||
}
|
}
|
||||||
@ -227,7 +235,7 @@ class RequestRouter : public RequestRouterInterface {
|
|||||||
std::vector<VertexAccessor> ScanVertices(std::optional<std::string> label) override {
|
std::vector<VertexAccessor> ScanVertices(std::optional<std::string> label) override {
|
||||||
// create requests
|
// create requests
|
||||||
std::vector<ShardRequestState<msgs::ScanVerticesRequest>> unsent_requests = RequestsForScanVertices(label);
|
std::vector<ShardRequestState<msgs::ScanVerticesRequest>> unsent_requests = RequestsForScanVertices(label);
|
||||||
spdlog::error("created {} ScanVertices requests", unsent_requests.size());
|
spdlog::trace("created {} ScanVertices requests", unsent_requests.size());
|
||||||
|
|
||||||
// begin all requests in parallel
|
// begin all requests in parallel
|
||||||
RunningRequests<msgs::ScanVerticesRequest> running_requests = {};
|
RunningRequests<msgs::ScanVerticesRequest> running_requests = {};
|
||||||
@ -239,11 +247,11 @@ class RequestRouter : public RequestRouterInterface {
|
|||||||
storage_client.SendAsyncReadRequest(request.request, notifier_, readiness_token);
|
storage_client.SendAsyncReadRequest(request.request, notifier_, readiness_token);
|
||||||
running_requests.emplace(readiness_token.GetId(), request);
|
running_requests.emplace(readiness_token.GetId(), request);
|
||||||
}
|
}
|
||||||
spdlog::error("sent {} ScanVertices requests in parallel", running_requests.size());
|
spdlog::trace("sent {} ScanVertices requests in parallel", running_requests.size());
|
||||||
|
|
||||||
// drive requests to completion
|
// drive requests to completion
|
||||||
auto responses = DriveReadResponses<msgs::ScanVerticesRequest, msgs::ScanVerticesResponse>(running_requests);
|
auto responses = DriveReadResponses<msgs::ScanVerticesRequest, msgs::ScanVerticesResponse>(running_requests);
|
||||||
spdlog::error("got back {} ScanVertices responses after driving to completion", responses.size());
|
spdlog::trace("got back {} ScanVertices responses after driving to completion", responses.size());
|
||||||
|
|
||||||
// convert responses into VertexAccessor objects to return
|
// convert responses into VertexAccessor objects to return
|
||||||
std::vector<VertexAccessor> accessors;
|
std::vector<VertexAccessor> accessors;
|
||||||
@ -263,6 +271,7 @@ class RequestRouter : public RequestRouterInterface {
|
|||||||
// create requests
|
// create requests
|
||||||
std::vector<ShardRequestState<msgs::CreateVerticesRequest>> unsent_requests =
|
std::vector<ShardRequestState<msgs::CreateVerticesRequest>> unsent_requests =
|
||||||
RequestsForCreateVertices(new_vertices);
|
RequestsForCreateVertices(new_vertices);
|
||||||
|
spdlog::trace("created {} CreateVertices requests", unsent_requests.size());
|
||||||
|
|
||||||
// begin all requests in parallel
|
// begin all requests in parallel
|
||||||
RunningRequests<msgs::CreateVerticesRequest> running_requests = {};
|
RunningRequests<msgs::CreateVerticesRequest> running_requests = {};
|
||||||
@ -277,6 +286,7 @@ class RequestRouter : public RequestRouterInterface {
|
|||||||
storage_client.SendAsyncWriteRequest(request.request, notifier_, readiness_token);
|
storage_client.SendAsyncWriteRequest(request.request, notifier_, readiness_token);
|
||||||
running_requests.emplace(readiness_token.GetId(), request);
|
running_requests.emplace(readiness_token.GetId(), request);
|
||||||
}
|
}
|
||||||
|
spdlog::trace("sent {} CreateVertices requests in parallel", running_requests.size());
|
||||||
|
|
||||||
// drive requests to completion
|
// drive requests to completion
|
||||||
return DriveWriteResponses<msgs::CreateVerticesRequest, msgs::CreateVerticesResponse>(running_requests);
|
return DriveWriteResponses<msgs::CreateVerticesRequest, msgs::CreateVerticesResponse>(running_requests);
|
||||||
@ -519,8 +529,10 @@ class RequestRouter : public RequestRouterInterface {
|
|||||||
// even if they came back in randomized orders.
|
// even if they came back in randomized orders.
|
||||||
std::map<size_t, ResponseT> response_map;
|
std::map<size_t, ResponseT> response_map;
|
||||||
|
|
||||||
|
spdlog::trace("waiting on readiness for token");
|
||||||
while (response_map.size() < running_requests.size()) {
|
while (response_map.size() < running_requests.size()) {
|
||||||
auto ready = notifier_.Await();
|
auto ready = notifier_.Await();
|
||||||
|
spdlog::trace("got readiness for token {}", ready.GetId());
|
||||||
auto &request = running_requests.at(ready.GetId());
|
auto &request = running_requests.at(ready.GetId());
|
||||||
auto &storage_client = GetStorageClientForShard(request.shard);
|
auto &storage_client = GetStorageClientForShard(request.shard);
|
||||||
|
|
||||||
|
@ -338,6 +338,8 @@ void DoTest() {
|
|||||||
CoordinatorClient<SimulatorTransport> coordinator_client(cli_io, c_addrs[0], c_addrs);
|
CoordinatorClient<SimulatorTransport> coordinator_client(cli_io, c_addrs[0], c_addrs);
|
||||||
|
|
||||||
query::v2::RequestRouter<SimulatorTransport> request_router(std::move(coordinator_client), std::move(cli_io));
|
query::v2::RequestRouter<SimulatorTransport> request_router(std::move(coordinator_client), std::move(cli_io));
|
||||||
|
std::function<bool()> tick_simulator = simulator.GetSimulatorTickClosure();
|
||||||
|
request_router.InstallSimulatorTicker(tick_simulator);
|
||||||
|
|
||||||
request_router.StartTransaction();
|
request_router.StartTransaction();
|
||||||
TestScanVertices(request_router);
|
TestScanVertices(request_router);
|
||||||
|
@ -244,6 +244,8 @@ std::pair<SimulatorStats, LatencyHistogramSummaries> RunClusterSimulation(const
|
|||||||
WaitForShardsToInitialize(coordinator_client);
|
WaitForShardsToInitialize(coordinator_client);
|
||||||
|
|
||||||
query::v2::RequestRouter<SimulatorTransport> request_router(std::move(coordinator_client), std::move(cli_io));
|
query::v2::RequestRouter<SimulatorTransport> request_router(std::move(coordinator_client), std::move(cli_io));
|
||||||
|
std::function<bool()> tick_simulator = simulator.GetSimulatorTickClosure();
|
||||||
|
request_router.InstallSimulatorTicker(tick_simulator);
|
||||||
|
|
||||||
request_router.StartTransaction();
|
request_router.StartTransaction();
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user