Merge pull request #690 from memgraph/tyler_full_async_request_router
[project-pineapples <-] full async request router
This commit is contained in:
commit
59c94c90e6
@ -35,10 +35,13 @@ class Shared {
|
||||
std::optional<T> item_;
|
||||
bool consumed_ = false;
|
||||
bool waiting_ = false;
|
||||
std::function<bool()> simulator_notifier_ = nullptr;
|
||||
bool filled_ = false;
|
||||
std::function<bool()> wait_notifier_ = nullptr;
|
||||
std::function<void()> fill_notifier_ = nullptr;
|
||||
|
||||
public:
|
||||
explicit Shared(std::function<bool()> simulator_notifier) : simulator_notifier_(simulator_notifier) {}
|
||||
explicit Shared(std::function<bool()> wait_notifier, std::function<void()> fill_notifier)
|
||||
: wait_notifier_(wait_notifier), fill_notifier_(fill_notifier) {}
|
||||
Shared() = default;
|
||||
Shared(Shared &&) = delete;
|
||||
Shared &operator=(Shared &&) = delete;
|
||||
@ -64,7 +67,7 @@ class Shared {
|
||||
waiting_ = true;
|
||||
|
||||
while (!item_) {
|
||||
if (simulator_notifier_) [[unlikely]] {
|
||||
if (wait_notifier_) [[unlikely]] {
|
||||
// We can't hold our own lock while notifying
|
||||
// the simulator because notifying the simulator
|
||||
// involves acquiring the simulator's mutex
|
||||
@ -76,7 +79,7 @@ class Shared {
|
||||
// so we have to get out of its way to avoid
|
||||
// a cyclical deadlock.
|
||||
lock.unlock();
|
||||
std::invoke(simulator_notifier_);
|
||||
std::invoke(wait_notifier_);
|
||||
lock.lock();
|
||||
if (item_) {
|
||||
// item may have been filled while we
|
||||
@ -115,11 +118,19 @@ class Shared {
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
|
||||
MG_ASSERT(!consumed_, "Promise filled after it was already consumed!");
|
||||
MG_ASSERT(!item_, "Promise filled twice!");
|
||||
MG_ASSERT(!filled_, "Promise filled twice!");
|
||||
|
||||
item_ = item;
|
||||
filled_ = true;
|
||||
} // lock released before condition variable notification
|
||||
|
||||
if (fill_notifier_) {
|
||||
spdlog::trace("calling fill notifier");
|
||||
std::invoke(fill_notifier_);
|
||||
} else {
|
||||
spdlog::trace("not calling fill notifier");
|
||||
}
|
||||
|
||||
cv_.notify_all();
|
||||
}
|
||||
|
||||
@ -251,8 +262,9 @@ std::pair<Future<T>, Promise<T>> FuturePromisePair() {
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
std::pair<Future<T>, Promise<T>> FuturePromisePairWithNotifier(std::function<bool()> simulator_notifier) {
|
||||
std::shared_ptr<details::Shared<T>> shared = std::make_shared<details::Shared<T>>(simulator_notifier);
|
||||
std::pair<Future<T>, Promise<T>> FuturePromisePairWithNotifications(std::function<bool()> wait_notifier,
|
||||
std::function<void()> fill_notifier) {
|
||||
std::shared_ptr<details::Shared<T>> shared = std::make_shared<details::Shared<T>>(wait_notifier, fill_notifier);
|
||||
|
||||
Future<T> future = Future<T>(shared);
|
||||
Promise<T> promise = Promise<T>(shared);
|
||||
|
@ -31,9 +31,10 @@ class LocalTransport {
|
||||
: local_transport_handle_(std::move(local_transport_handle)) {}
|
||||
|
||||
template <Message RequestT, Message ResponseT>
|
||||
ResponseFuture<ResponseT> Request(Address to_address, Address from_address, RequestT request, Duration timeout) {
|
||||
return local_transport_handle_->template SubmitRequest<RequestT, ResponseT>(to_address, from_address,
|
||||
std::move(request), timeout);
|
||||
ResponseFuture<ResponseT> Request(Address to_address, Address from_address, RequestT request,
|
||||
std::function<void()> fill_notifier, Duration timeout) {
|
||||
return local_transport_handle_->template SubmitRequest<RequestT, ResponseT>(
|
||||
to_address, from_address, std::move(request), timeout, fill_notifier);
|
||||
}
|
||||
|
||||
template <Message... Ms>
|
||||
|
@ -140,8 +140,12 @@ class LocalTransportHandle {
|
||||
|
||||
template <Message RequestT, Message ResponseT>
|
||||
ResponseFuture<ResponseT> SubmitRequest(Address to_address, Address from_address, RequestT &&request,
|
||||
Duration timeout) {
|
||||
auto [future, promise] = memgraph::io::FuturePromisePair<ResponseResult<ResponseT>>();
|
||||
Duration timeout, std::function<void()> fill_notifier) {
|
||||
auto [future, promise] = memgraph::io::FuturePromisePairWithNotifications<ResponseResult<ResponseT>>(
|
||||
// set null notifier for when the Future::Wait is called
|
||||
nullptr,
|
||||
// set notifier for when Promise::Fill is called
|
||||
std::forward<std::function<void()>>(fill_notifier));
|
||||
|
||||
const bool port_matches = to_address.last_known_port == from_address.last_known_port;
|
||||
const bool ip_matches = to_address.last_known_ip == from_address.last_known_ip;
|
||||
|
93
src/io/notifier.hpp
Normal file
93
src/io/notifier.hpp
Normal file
@ -0,0 +1,93 @@
|
||||
// Copyright 2022 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <condition_variable>
|
||||
#include <functional>
|
||||
#include <mutex>
|
||||
#include <optional>
|
||||
#include <vector>
|
||||
|
||||
namespace memgraph::io {
|
||||
|
||||
class ReadinessToken {
|
||||
size_t id_;
|
||||
|
||||
public:
|
||||
explicit ReadinessToken(size_t id) : id_(id) {}
|
||||
size_t GetId() const { return id_; }
|
||||
};
|
||||
|
||||
class Inner {
|
||||
std::condition_variable cv_;
|
||||
std::mutex mu_;
|
||||
std::vector<ReadinessToken> ready_;
|
||||
std::optional<std::function<bool()>> tick_simulator_;
|
||||
|
||||
public:
|
||||
void Notify(ReadinessToken readiness_token) {
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
ready_.emplace_back(readiness_token);
|
||||
} // mutex dropped
|
||||
|
||||
cv_.notify_all();
|
||||
}
|
||||
|
||||
ReadinessToken Await() {
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
|
||||
while (ready_.empty()) {
|
||||
if (tick_simulator_) [[unlikely]] {
|
||||
// This avoids a deadlock in a similar way that
|
||||
// Future::Wait will release its mutex while
|
||||
// interacting with the simulator, due to
|
||||
// the fact that the simulator may cause
|
||||
// notifications that we are interested in.
|
||||
lock.unlock();
|
||||
std::invoke(tick_simulator_.value());
|
||||
lock.lock();
|
||||
} else {
|
||||
cv_.wait(lock);
|
||||
}
|
||||
}
|
||||
|
||||
ReadinessToken ret = ready_.back();
|
||||
ready_.pop_back();
|
||||
return ret;
|
||||
}
|
||||
|
||||
void InstallSimulatorTicker(std::function<bool()> tick_simulator) {
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
tick_simulator_ = tick_simulator;
|
||||
}
|
||||
};
|
||||
|
||||
class Notifier {
|
||||
std::shared_ptr<Inner> inner_;
|
||||
|
||||
public:
|
||||
Notifier() : inner_(std::make_shared<Inner>()) {}
|
||||
Notifier(const Notifier &) = default;
|
||||
Notifier &operator=(const Notifier &) = default;
|
||||
Notifier(Notifier &&old) = default;
|
||||
Notifier &operator=(Notifier &&old) = default;
|
||||
~Notifier() = default;
|
||||
|
||||
void Notify(ReadinessToken readiness_token) const { inner_->Notify(readiness_token); }
|
||||
|
||||
ReadinessToken Await() const { return inner_->Await(); }
|
||||
|
||||
void InstallSimulatorTicker(std::function<bool()> tick_simulator) { inner_->InstallSimulatorTicker(tick_simulator); }
|
||||
};
|
||||
|
||||
} // namespace memgraph::io
|
@ -91,33 +91,43 @@ struct ReadResponse {
|
||||
};
|
||||
|
||||
template <class... ReadReturn>
|
||||
utils::TypeInfoRef TypeInfoFor(const ReadResponse<std::variant<ReadReturn...>> &read_response) {
|
||||
return TypeInfoForVariant(read_response.read_return);
|
||||
utils::TypeInfoRef TypeInfoFor(const ReadResponse<std::variant<ReadReturn...>> &response) {
|
||||
return TypeInfoForVariant(response.read_return);
|
||||
}
|
||||
|
||||
template <class ReadReturn>
|
||||
utils::TypeInfoRef TypeInfoFor(const ReadResponse<ReadReturn> & /* read_response */) {
|
||||
utils::TypeInfoRef TypeInfoFor(const ReadResponse<ReadReturn> & /* response */) {
|
||||
return typeid(ReadReturn);
|
||||
}
|
||||
|
||||
template <class ReadOperation>
|
||||
utils::TypeInfoRef TypeInfoFor(const ReadRequest<ReadOperation> & /* request */) {
|
||||
return typeid(ReadOperation);
|
||||
}
|
||||
|
||||
template <class... ReadOperations>
|
||||
utils::TypeInfoRef TypeInfoFor(const ReadRequest<std::variant<ReadOperations...>> &request) {
|
||||
return TypeInfoForVariant(request.operation);
|
||||
}
|
||||
|
||||
template <class... WriteReturn>
|
||||
utils::TypeInfoRef TypeInfoFor(const WriteResponse<std::variant<WriteReturn...>> &write_response) {
|
||||
return TypeInfoForVariant(write_response.write_return);
|
||||
utils::TypeInfoRef TypeInfoFor(const WriteResponse<std::variant<WriteReturn...>> &response) {
|
||||
return TypeInfoForVariant(response.write_return);
|
||||
}
|
||||
|
||||
template <class WriteReturn>
|
||||
utils::TypeInfoRef TypeInfoFor(const WriteResponse<WriteReturn> & /* write_response */) {
|
||||
utils::TypeInfoRef TypeInfoFor(const WriteResponse<WriteReturn> & /* response */) {
|
||||
return typeid(WriteReturn);
|
||||
}
|
||||
|
||||
template <class WriteOperation>
|
||||
utils::TypeInfoRef TypeInfoFor(const WriteRequest<WriteOperation> & /* write_request */) {
|
||||
utils::TypeInfoRef TypeInfoFor(const WriteRequest<WriteOperation> & /* request */) {
|
||||
return typeid(WriteOperation);
|
||||
}
|
||||
|
||||
template <class... WriteOperations>
|
||||
utils::TypeInfoRef TypeInfoFor(const WriteRequest<std::variant<WriteOperations...>> &write_request) {
|
||||
return TypeInfoForVariant(write_request.operation);
|
||||
utils::TypeInfoRef TypeInfoFor(const WriteRequest<std::variant<WriteOperations...>> &request) {
|
||||
return TypeInfoForVariant(request.operation);
|
||||
}
|
||||
|
||||
/// AppendRequest is a raft-level message that the Leader
|
||||
@ -846,7 +856,9 @@ class Raft {
|
||||
// Leaders are able to immediately respond to the requester (with a ReadResponseValue) applied to the ReplicatedState
|
||||
std::optional<Role> Handle(Leader & /* variable */, ReadRequest<ReadOperation> &&req, RequestId request_id,
|
||||
Address from_address) {
|
||||
Log("handling ReadOperation");
|
||||
auto type_info = TypeInfoFor(req);
|
||||
std::string demangled_name = boost::core::demangle(type_info.get().name());
|
||||
Log("handling ReadOperation<" + demangled_name + ">");
|
||||
ReadOperation read_operation = req.operation;
|
||||
|
||||
ReadResponseValue read_return = replicated_state_.Read(read_operation);
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
#include "io/address.hpp"
|
||||
#include "io/errors.hpp"
|
||||
#include "io/notifier.hpp"
|
||||
#include "io/rsm/raft.hpp"
|
||||
#include "utils/result.hpp"
|
||||
|
||||
@ -37,18 +38,11 @@ using memgraph::io::rsm::WriteRequest;
|
||||
using memgraph::io::rsm::WriteResponse;
|
||||
using memgraph::utils::BasicResult;
|
||||
|
||||
class AsyncRequestToken {
|
||||
size_t id_;
|
||||
|
||||
public:
|
||||
explicit AsyncRequestToken(size_t id) : id_(id) {}
|
||||
size_t GetId() const { return id_; }
|
||||
};
|
||||
|
||||
template <typename RequestT, typename ResponseT>
|
||||
struct AsyncRequest {
|
||||
Time start_time;
|
||||
RequestT request;
|
||||
Notifier notifier;
|
||||
ResponseFuture<ResponseT> future;
|
||||
};
|
||||
|
||||
@ -66,8 +60,6 @@ class RsmClient {
|
||||
std::unordered_map<size_t, AsyncRequest<ReadRequestT, ReadResponse<ReadResponseT>>> async_reads_;
|
||||
std::unordered_map<size_t, AsyncRequest<WriteRequestT, WriteResponse<WriteResponseT>>> async_writes_;
|
||||
|
||||
size_t async_token_generator_ = 0;
|
||||
|
||||
void SelectRandomLeader() {
|
||||
std::uniform_int_distribution<size_t> addr_distrib(0, (server_addrs_.size() - 1));
|
||||
size_t addr_index = io_.Rand(addr_distrib);
|
||||
@ -101,61 +93,63 @@ class RsmClient {
|
||||
~RsmClient() = default;
|
||||
|
||||
BasicResult<TimedOut, WriteResponseT> SendWriteRequest(WriteRequestT req) {
|
||||
auto token = SendAsyncWriteRequest(req);
|
||||
auto poll_result = AwaitAsyncWriteRequest(token);
|
||||
Notifier notifier;
|
||||
const ReadinessToken readiness_token{0};
|
||||
SendAsyncWriteRequest(req, notifier, readiness_token);
|
||||
auto poll_result = AwaitAsyncWriteRequest(readiness_token);
|
||||
while (!poll_result) {
|
||||
poll_result = AwaitAsyncWriteRequest(token);
|
||||
poll_result = AwaitAsyncWriteRequest(readiness_token);
|
||||
}
|
||||
return poll_result.value();
|
||||
}
|
||||
|
||||
BasicResult<TimedOut, ReadResponseT> SendReadRequest(ReadRequestT req) {
|
||||
auto token = SendAsyncReadRequest(req);
|
||||
auto poll_result = AwaitAsyncReadRequest(token);
|
||||
Notifier notifier;
|
||||
const ReadinessToken readiness_token{0};
|
||||
SendAsyncReadRequest(req, notifier, readiness_token);
|
||||
auto poll_result = AwaitAsyncReadRequest(readiness_token);
|
||||
while (!poll_result) {
|
||||
poll_result = AwaitAsyncReadRequest(token);
|
||||
poll_result = AwaitAsyncReadRequest(readiness_token);
|
||||
}
|
||||
return poll_result.value();
|
||||
}
|
||||
|
||||
/// AsyncRead methods
|
||||
AsyncRequestToken SendAsyncReadRequest(const ReadRequestT &req) {
|
||||
size_t token = async_token_generator_++;
|
||||
|
||||
void SendAsyncReadRequest(const ReadRequestT &req, Notifier notifier, ReadinessToken readiness_token) {
|
||||
ReadRequest<ReadRequestT> read_req = {.operation = req};
|
||||
|
||||
AsyncRequest<ReadRequestT, ReadResponse<ReadResponseT>> async_request{
|
||||
.start_time = io_.Now(),
|
||||
.request = std::move(req),
|
||||
.future = io_.template Request<ReadRequest<ReadRequestT>, ReadResponse<ReadResponseT>>(leader_, read_req),
|
||||
.notifier = notifier,
|
||||
.future = io_.template RequestWithNotification<ReadRequest<ReadRequestT>, ReadResponse<ReadResponseT>>(
|
||||
leader_, read_req, notifier, readiness_token),
|
||||
};
|
||||
|
||||
async_reads_.emplace(token, std::move(async_request));
|
||||
|
||||
return AsyncRequestToken{token};
|
||||
async_reads_.emplace(readiness_token.GetId(), std::move(async_request));
|
||||
}
|
||||
|
||||
void ResendAsyncReadRequest(const AsyncRequestToken &token) {
|
||||
auto &async_request = async_reads_.at(token.GetId());
|
||||
void ResendAsyncReadRequest(const ReadinessToken &readiness_token) {
|
||||
auto &async_request = async_reads_.at(readiness_token.GetId());
|
||||
|
||||
ReadRequest<ReadRequestT> read_req = {.operation = async_request.request};
|
||||
|
||||
async_request.future =
|
||||
io_.template Request<ReadRequest<ReadRequestT>, ReadResponse<ReadResponseT>>(leader_, read_req);
|
||||
async_request.future = io_.template RequestWithNotification<ReadRequest<ReadRequestT>, ReadResponse<ReadResponseT>>(
|
||||
leader_, read_req, async_request.notifier, readiness_token);
|
||||
}
|
||||
|
||||
std::optional<BasicResult<TimedOut, ReadResponseT>> PollAsyncReadRequest(const AsyncRequestToken &token) {
|
||||
auto &async_request = async_reads_.at(token.GetId());
|
||||
std::optional<BasicResult<TimedOut, ReadResponseT>> PollAsyncReadRequest(const ReadinessToken &readiness_token) {
|
||||
auto &async_request = async_reads_.at(readiness_token.GetId());
|
||||
|
||||
if (!async_request.future.IsReady()) {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
return AwaitAsyncReadRequest();
|
||||
return AwaitAsyncReadRequest(readiness_token);
|
||||
}
|
||||
|
||||
std::optional<BasicResult<TimedOut, ReadResponseT>> AwaitAsyncReadRequest(const AsyncRequestToken &token) {
|
||||
auto &async_request = async_reads_.at(token.GetId());
|
||||
std::optional<BasicResult<TimedOut, ReadResponseT>> AwaitAsyncReadRequest(const ReadinessToken &readiness_token) {
|
||||
auto &async_request = async_reads_.at(readiness_token.GetId());
|
||||
ResponseResult<ReadResponse<ReadResponseT>> get_response_result = std::move(async_request.future).Wait();
|
||||
|
||||
const Duration overall_timeout = io_.GetDefaultTimeout();
|
||||
@ -165,7 +159,7 @@ class RsmClient {
|
||||
if (result_has_error && past_time_out) {
|
||||
// TODO static assert the exact type of error.
|
||||
spdlog::debug("client timed out while trying to communicate with leader server {}", leader_.ToString());
|
||||
async_reads_.erase(token.GetId());
|
||||
async_reads_.erase(readiness_token.GetId());
|
||||
return TimedOut{};
|
||||
}
|
||||
|
||||
@ -176,7 +170,7 @@ class RsmClient {
|
||||
PossiblyRedirectLeader(read_get_response);
|
||||
|
||||
if (read_get_response.success) {
|
||||
async_reads_.erase(token.GetId());
|
||||
async_reads_.erase(readiness_token.GetId());
|
||||
spdlog::debug("returning read_return for RSM request");
|
||||
return std::move(read_get_response.read_return);
|
||||
}
|
||||
@ -184,49 +178,48 @@ class RsmClient {
|
||||
SelectRandomLeader();
|
||||
}
|
||||
|
||||
ResendAsyncReadRequest(token);
|
||||
ResendAsyncReadRequest(readiness_token);
|
||||
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
/// AsyncWrite methods
|
||||
AsyncRequestToken SendAsyncWriteRequest(const WriteRequestT &req) {
|
||||
size_t token = async_token_generator_++;
|
||||
|
||||
void SendAsyncWriteRequest(const WriteRequestT &req, Notifier notifier, ReadinessToken readiness_token) {
|
||||
WriteRequest<WriteRequestT> write_req = {.operation = req};
|
||||
|
||||
AsyncRequest<WriteRequestT, WriteResponse<WriteResponseT>> async_request{
|
||||
.start_time = io_.Now(),
|
||||
.request = std::move(req),
|
||||
.future = io_.template Request<WriteRequest<WriteRequestT>, WriteResponse<WriteResponseT>>(leader_, write_req),
|
||||
.notifier = notifier,
|
||||
.future = io_.template RequestWithNotification<WriteRequest<WriteRequestT>, WriteResponse<WriteResponseT>>(
|
||||
leader_, write_req, notifier, readiness_token),
|
||||
};
|
||||
|
||||
async_writes_.emplace(token, std::move(async_request));
|
||||
|
||||
return AsyncRequestToken{token};
|
||||
async_writes_.emplace(readiness_token.GetId(), std::move(async_request));
|
||||
}
|
||||
|
||||
void ResendAsyncWriteRequest(const AsyncRequestToken &token) {
|
||||
auto &async_request = async_writes_.at(token.GetId());
|
||||
void ResendAsyncWriteRequest(const ReadinessToken &readiness_token) {
|
||||
auto &async_request = async_writes_.at(readiness_token.GetId());
|
||||
|
||||
WriteRequest<WriteRequestT> write_req = {.operation = async_request.request};
|
||||
|
||||
async_request.future =
|
||||
io_.template Request<WriteRequest<WriteRequestT>, WriteResponse<WriteResponseT>>(leader_, write_req);
|
||||
io_.template RequestWithNotification<WriteRequest<WriteRequestT>, WriteResponse<WriteResponseT>>(
|
||||
leader_, write_req, async_request.notifier, readiness_token);
|
||||
}
|
||||
|
||||
std::optional<BasicResult<TimedOut, WriteResponseT>> PollAsyncWriteRequest(const AsyncRequestToken &token) {
|
||||
auto &async_request = async_writes_.at(token.GetId());
|
||||
std::optional<BasicResult<TimedOut, WriteResponseT>> PollAsyncWriteRequest(const ReadinessToken &readiness_token) {
|
||||
auto &async_request = async_writes_.at(readiness_token.GetId());
|
||||
|
||||
if (!async_request.future.IsReady()) {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
return AwaitAsyncWriteRequest();
|
||||
return AwaitAsyncWriteRequest(readiness_token);
|
||||
}
|
||||
|
||||
std::optional<BasicResult<TimedOut, WriteResponseT>> AwaitAsyncWriteRequest(const AsyncRequestToken &token) {
|
||||
auto &async_request = async_writes_.at(token.GetId());
|
||||
std::optional<BasicResult<TimedOut, WriteResponseT>> AwaitAsyncWriteRequest(const ReadinessToken &readiness_token) {
|
||||
auto &async_request = async_writes_.at(readiness_token.GetId());
|
||||
ResponseResult<WriteResponse<WriteResponseT>> get_response_result = std::move(async_request.future).Wait();
|
||||
|
||||
const Duration overall_timeout = io_.GetDefaultTimeout();
|
||||
@ -236,7 +229,7 @@ class RsmClient {
|
||||
if (result_has_error && past_time_out) {
|
||||
// TODO static assert the exact type of error.
|
||||
spdlog::debug("client timed out while trying to communicate with leader server {}", leader_.ToString());
|
||||
async_writes_.erase(token.GetId());
|
||||
async_writes_.erase(readiness_token.GetId());
|
||||
return TimedOut{};
|
||||
}
|
||||
|
||||
@ -248,14 +241,14 @@ class RsmClient {
|
||||
PossiblyRedirectLeader(write_get_response);
|
||||
|
||||
if (write_get_response.success) {
|
||||
async_writes_.erase(token.GetId());
|
||||
async_writes_.erase(readiness_token.GetId());
|
||||
return std::move(write_get_response.write_return);
|
||||
}
|
||||
} else {
|
||||
SelectRandomLeader();
|
||||
}
|
||||
|
||||
ResendAsyncWriteRequest(token);
|
||||
ResendAsyncWriteRequest(readiness_token);
|
||||
|
||||
return std::nullopt;
|
||||
}
|
||||
|
@ -49,5 +49,10 @@ class Simulator {
|
||||
}
|
||||
|
||||
SimulatorStats Stats() { return simulator_handle_->Stats(); }
|
||||
|
||||
std::function<bool()> GetSimulatorTickClosure() {
|
||||
std::function<bool()> tick_closure = [handle_copy = simulator_handle_] { return handle_copy->MaybeTickSimulator(); };
|
||||
return tick_closure;
|
||||
}
|
||||
};
|
||||
}; // namespace memgraph::io::simulator
|
||||
|
@ -22,6 +22,8 @@
|
||||
#include <variant>
|
||||
#include <vector>
|
||||
|
||||
#include <boost/core/demangle.hpp>
|
||||
|
||||
#include "io/address.hpp"
|
||||
#include "io/errors.hpp"
|
||||
#include "io/message_conversion.hpp"
|
||||
@ -105,41 +107,48 @@ class SimulatorHandle {
|
||||
|
||||
template <Message Request, Message Response>
|
||||
ResponseFuture<Response> SubmitRequest(Address to_address, Address from_address, Request &&request, Duration timeout,
|
||||
std::function<bool()> &&maybe_tick_simulator) {
|
||||
spdlog::trace("submitting request to {}", to_address.last_known_port);
|
||||
std::function<bool()> &&maybe_tick_simulator,
|
||||
std::function<void()> &&fill_notifier) {
|
||||
auto type_info = TypeInfoFor(request);
|
||||
std::string demangled_name = boost::core::demangle(type_info.get().name());
|
||||
spdlog::trace("simulator sending request {} to {}", demangled_name, to_address);
|
||||
|
||||
auto [future, promise] = memgraph::io::FuturePromisePairWithNotifier<ResponseResult<Response>>(
|
||||
std::forward<std::function<bool()>>(maybe_tick_simulator));
|
||||
auto [future, promise] = memgraph::io::FuturePromisePairWithNotifications<ResponseResult<Response>>(
|
||||
// set notifier for when the Future::Wait is called
|
||||
std::forward<std::function<bool()>>(maybe_tick_simulator),
|
||||
// set notifier for when Promise::Fill is called
|
||||
std::forward<std::function<void()>>(fill_notifier));
|
||||
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
|
||||
RequestId request_id = ++request_id_counter_;
|
||||
RequestId request_id = ++request_id_counter_;
|
||||
|
||||
const Time deadline = cluster_wide_time_microseconds_ + timeout;
|
||||
const Time deadline = cluster_wide_time_microseconds_ + timeout;
|
||||
|
||||
std::any message(request);
|
||||
OpaqueMessage om{.to_address = to_address,
|
||||
.from_address = from_address,
|
||||
.request_id = request_id,
|
||||
.message = std::move(message),
|
||||
.type_info = type_info};
|
||||
in_flight_.emplace_back(std::make_pair(to_address, std::move(om)));
|
||||
std::any message(request);
|
||||
OpaqueMessage om{.to_address = to_address,
|
||||
.from_address = from_address,
|
||||
.request_id = request_id,
|
||||
.message = std::move(message),
|
||||
.type_info = type_info};
|
||||
in_flight_.emplace_back(std::make_pair(to_address, std::move(om)));
|
||||
|
||||
PromiseKey promise_key{.requester_address = from_address, .request_id = request_id};
|
||||
OpaquePromise opaque_promise(std::move(promise).ToUnique());
|
||||
DeadlineAndOpaquePromise dop{
|
||||
.requested_at = cluster_wide_time_microseconds_,
|
||||
.deadline = deadline,
|
||||
.promise = std::move(opaque_promise),
|
||||
};
|
||||
PromiseKey promise_key{.requester_address = from_address, .request_id = request_id};
|
||||
OpaquePromise opaque_promise(std::move(promise).ToUnique());
|
||||
DeadlineAndOpaquePromise dop{
|
||||
.requested_at = cluster_wide_time_microseconds_,
|
||||
.deadline = deadline,
|
||||
.promise = std::move(opaque_promise),
|
||||
};
|
||||
|
||||
MG_ASSERT(!promises_.contains(promise_key));
|
||||
MG_ASSERT(!promises_.contains(promise_key));
|
||||
|
||||
promises_.emplace(std::move(promise_key), std::move(dop));
|
||||
promises_.emplace(std::move(promise_key), std::move(dop));
|
||||
|
||||
stats_.total_messages++;
|
||||
stats_.total_requests++;
|
||||
stats_.total_messages++;
|
||||
stats_.total_requests++;
|
||||
} // lock dropped here
|
||||
|
||||
cv_.notify_all();
|
||||
|
||||
|
@ -15,6 +15,7 @@
|
||||
#include <utility>
|
||||
|
||||
#include "io/address.hpp"
|
||||
#include "io/notifier.hpp"
|
||||
#include "io/simulator/simulator_handle.hpp"
|
||||
#include "io/time.hpp"
|
||||
|
||||
@ -33,11 +34,12 @@ class SimulatorTransport {
|
||||
: simulator_handle_(simulator_handle), address_(address), rng_(std::mt19937{seed}) {}
|
||||
|
||||
template <Message RequestT, Message ResponseT>
|
||||
ResponseFuture<ResponseT> Request(Address to_address, Address from_address, RequestT request, Duration timeout) {
|
||||
std::function<bool()> maybe_tick_simulator = [this] { return simulator_handle_->MaybeTickSimulator(); };
|
||||
ResponseFuture<ResponseT> Request(Address to_address, Address from_address, RequestT request,
|
||||
std::function<void()> notification, Duration timeout) {
|
||||
std::function<bool()> tick_simulator = [handle_copy = simulator_handle_] { return handle_copy->MaybeTickSimulator(); };
|
||||
|
||||
return simulator_handle_->template SubmitRequest<RequestT, ResponseT>(to_address, from_address, std::move(request),
|
||||
timeout, std::move(maybe_tick_simulator));
|
||||
return simulator_handle_->template SubmitRequest<RequestT, ResponseT>(
|
||||
to_address, from_address, std::move(request), timeout, std::move(tick_simulator), std::move(notification));
|
||||
}
|
||||
|
||||
template <Message... Ms>
|
||||
|
@ -20,6 +20,7 @@
|
||||
#include "io/errors.hpp"
|
||||
#include "io/future.hpp"
|
||||
#include "io/message_histogram_collector.hpp"
|
||||
#include "io/notifier.hpp"
|
||||
#include "io/time.hpp"
|
||||
#include "utils/result.hpp"
|
||||
|
||||
@ -84,7 +85,9 @@ class Io {
|
||||
template <Message RequestT, Message ResponseT>
|
||||
ResponseFuture<ResponseT> RequestWithTimeout(Address address, RequestT request, Duration timeout) {
|
||||
const Address from_address = address_;
|
||||
return implementation_.template Request<RequestT, ResponseT>(address, from_address, request, timeout);
|
||||
std::function<void()> fill_notifier = nullptr;
|
||||
return implementation_.template Request<RequestT, ResponseT>(address, from_address, request, fill_notifier,
|
||||
timeout);
|
||||
}
|
||||
|
||||
/// Issue a request that times out after the default timeout. This tends
|
||||
@ -93,7 +96,30 @@ class Io {
|
||||
ResponseFuture<ResponseT> Request(Address to_address, RequestT request) {
|
||||
const Duration timeout = default_timeout_;
|
||||
const Address from_address = address_;
|
||||
return implementation_.template Request<RequestT, ResponseT>(to_address, from_address, std::move(request), timeout);
|
||||
std::function<void()> fill_notifier = nullptr;
|
||||
return implementation_.template Request<RequestT, ResponseT>(to_address, from_address, std::move(request),
|
||||
fill_notifier, timeout);
|
||||
}
|
||||
|
||||
/// Issue a request that will notify a Notifier when it is filled or times out.
|
||||
template <Message RequestT, Message ResponseT>
|
||||
ResponseFuture<ResponseT> RequestWithNotification(Address to_address, RequestT request, Notifier notifier,
|
||||
ReadinessToken readiness_token) {
|
||||
const Duration timeout = default_timeout_;
|
||||
const Address from_address = address_;
|
||||
std::function<void()> fill_notifier = [notifier, readiness_token]() { notifier.Notify(readiness_token); };
|
||||
return implementation_.template Request<RequestT, ResponseT>(to_address, from_address, std::move(request),
|
||||
fill_notifier, timeout);
|
||||
}
|
||||
|
||||
/// Issue a request that will notify a Notifier when it is filled or times out.
|
||||
template <Message RequestT, Message ResponseT>
|
||||
ResponseFuture<ResponseT> RequestWithNotificationAndTimeout(Address to_address, RequestT request, Notifier notifier,
|
||||
ReadinessToken readiness_token, Duration timeout) {
|
||||
const Address from_address = address_;
|
||||
std::function<void()> fill_notifier = [notifier, readiness_token]() { notifier.Notify(readiness_token); };
|
||||
return implementation_.template Request<RequestT, ResponseT>(to_address, from_address, std::move(request),
|
||||
fill_notifier, timeout);
|
||||
}
|
||||
|
||||
/// Wait for an explicit number of microseconds for a request of one of the
|
||||
|
@ -32,6 +32,7 @@
|
||||
#include "coordinator/shard_map.hpp"
|
||||
#include "io/address.hpp"
|
||||
#include "io/errors.hpp"
|
||||
#include "io/notifier.hpp"
|
||||
#include "io/rsm/raft.hpp"
|
||||
#include "io/rsm/rsm_client.hpp"
|
||||
#include "io/rsm/shard_rsm.hpp"
|
||||
@ -44,6 +45,7 @@
|
||||
#include "utils/result.hpp"
|
||||
|
||||
namespace memgraph::query::v2 {
|
||||
|
||||
template <typename TStorageClient>
|
||||
class RsmStorageClientManager {
|
||||
public:
|
||||
@ -76,25 +78,11 @@ template <typename TRequest>
|
||||
struct ShardRequestState {
|
||||
memgraph::coordinator::Shard shard;
|
||||
TRequest request;
|
||||
std::optional<io::rsm::AsyncRequestToken> async_request_token;
|
||||
};
|
||||
|
||||
// maps from ReadinessToken's internal size_t to the associated state
|
||||
template <typename TRequest>
|
||||
struct ExecutionState {
|
||||
using CompoundKey = io::rsm::ShardRsmKey;
|
||||
using Shard = coordinator::Shard;
|
||||
|
||||
// label is optional because some operators can create/remove etc, vertices. These kind of requests contain the label
|
||||
// on the request itself.
|
||||
std::optional<std::string> label;
|
||||
// Transaction id to be filled by the RequestRouter implementation
|
||||
coordinator::Hlc transaction_id;
|
||||
// Initialized by RequestRouter implementation. This vector is filled with the shards that
|
||||
// the RequestRouter impl will send requests to. When a request to a shard exhausts it, meaning that
|
||||
// it pulled all the requested data from the given Shard, it will be removed from the Vector. When the Vector becomes
|
||||
// empty, it means that all of the requests have completed succefully.
|
||||
std::vector<ShardRequestState<TRequest>> requests;
|
||||
};
|
||||
using RunningRequests = std::unordered_map<size_t, ShardRequestState<TRequest>>;
|
||||
|
||||
class RequestRouterInterface {
|
||||
public:
|
||||
@ -150,10 +138,16 @@ class RequestRouter : public RequestRouterInterface {
|
||||
|
||||
~RequestRouter() override {}
|
||||
|
||||
void InstallSimulatorTicker(std::function<bool()> tick_simulator) {
|
||||
notifier_.InstallSimulatorTicker(tick_simulator);
|
||||
}
|
||||
|
||||
void StartTransaction() override {
|
||||
coordinator::HlcRequest req{.last_shard_map_version = shards_map_.GetHlc()};
|
||||
CoordinatorWriteRequests write_req = req;
|
||||
spdlog::trace("sending hlc request to start transaction");
|
||||
auto write_res = coord_cli_.SendWriteRequest(write_req);
|
||||
spdlog::trace("received hlc response to start transaction");
|
||||
if (write_res.HasError()) {
|
||||
throw std::runtime_error("HLC request failed");
|
||||
}
|
||||
@ -172,7 +166,9 @@ class RequestRouter : public RequestRouterInterface {
|
||||
void Commit() override {
|
||||
coordinator::HlcRequest req{.last_shard_map_version = shards_map_.GetHlc()};
|
||||
CoordinatorWriteRequests write_req = req;
|
||||
spdlog::trace("sending hlc request before committing transaction");
|
||||
auto write_res = coord_cli_.SendWriteRequest(write_req);
|
||||
spdlog::trace("received hlc response before committing transaction");
|
||||
if (write_res.HasError()) {
|
||||
throw std::runtime_error("HLC request for commit failed");
|
||||
}
|
||||
@ -240,26 +236,25 @@ class RequestRouter : public RequestRouterInterface {
|
||||
|
||||
// TODO(kostasrim) Simplify return result
|
||||
std::vector<VertexAccessor> ScanVertices(std::optional<std::string> label) override {
|
||||
ExecutionState<msgs::ScanVerticesRequest> state = {};
|
||||
state.label = label;
|
||||
|
||||
// create requests
|
||||
InitializeExecutionState(state);
|
||||
std::vector<ShardRequestState<msgs::ScanVerticesRequest>> requests_to_be_sent = RequestsForScanVertices(label);
|
||||
spdlog::trace("created {} ScanVertices requests", requests_to_be_sent.size());
|
||||
|
||||
// begin all requests in parallel
|
||||
for (auto &request : state.requests) {
|
||||
RunningRequests<msgs::ScanVerticesRequest> running_requests = {};
|
||||
running_requests.reserve(requests_to_be_sent.size());
|
||||
for (size_t i = 0; i < requests_to_be_sent.size(); i++) {
|
||||
auto &request = requests_to_be_sent[i];
|
||||
io::ReadinessToken readiness_token{i};
|
||||
auto &storage_client = GetStorageClientForShard(request.shard);
|
||||
msgs::ReadRequests req = request.request;
|
||||
|
||||
request.async_request_token = storage_client.SendAsyncReadRequest(request.request);
|
||||
storage_client.SendAsyncReadRequest(request.request, notifier_, readiness_token);
|
||||
running_requests.emplace(readiness_token.GetId(), request);
|
||||
}
|
||||
spdlog::trace("sent {} ScanVertices requests in parallel", running_requests.size());
|
||||
|
||||
// drive requests to completion
|
||||
std::vector<msgs::ScanVerticesResponse> responses;
|
||||
responses.reserve(state.requests.size());
|
||||
do {
|
||||
DriveReadResponses(state, responses);
|
||||
} while (!state.requests.empty());
|
||||
auto responses = DriveReadResponses<msgs::ScanVerticesRequest, msgs::ScanVerticesResponse>(running_requests);
|
||||
spdlog::trace("got back {} ScanVertices responses after driving to completion", responses.size());
|
||||
|
||||
// convert responses into VertexAccessor objects to return
|
||||
std::vector<VertexAccessor> accessors;
|
||||
@ -274,62 +269,55 @@ class RequestRouter : public RequestRouterInterface {
|
||||
}
|
||||
|
||||
std::vector<msgs::CreateVerticesResponse> CreateVertices(std::vector<msgs::NewVertex> new_vertices) override {
|
||||
ExecutionState<msgs::CreateVerticesRequest> state = {};
|
||||
MG_ASSERT(!new_vertices.empty());
|
||||
|
||||
// create requests
|
||||
InitializeExecutionState(state, new_vertices);
|
||||
std::vector<ShardRequestState<msgs::CreateVerticesRequest>> requests_to_be_sent =
|
||||
RequestsForCreateVertices(new_vertices);
|
||||
spdlog::trace("created {} CreateVertices requests", requests_to_be_sent.size());
|
||||
|
||||
// begin all requests in parallel
|
||||
for (auto &request : state.requests) {
|
||||
auto req_deep_copy = request.request;
|
||||
|
||||
for (auto &new_vertex : req_deep_copy.new_vertices) {
|
||||
RunningRequests<msgs::CreateVerticesRequest> running_requests = {};
|
||||
running_requests.reserve(requests_to_be_sent.size());
|
||||
for (size_t i = 0; i < requests_to_be_sent.size(); i++) {
|
||||
auto &request = requests_to_be_sent[i];
|
||||
io::ReadinessToken readiness_token{i};
|
||||
for (auto &new_vertex : request.request.new_vertices) {
|
||||
new_vertex.label_ids.erase(new_vertex.label_ids.begin());
|
||||
}
|
||||
|
||||
auto &storage_client = GetStorageClientForShard(request.shard);
|
||||
|
||||
msgs::WriteRequests req = req_deep_copy;
|
||||
request.async_request_token = storage_client.SendAsyncWriteRequest(req);
|
||||
storage_client.SendAsyncWriteRequest(request.request, notifier_, readiness_token);
|
||||
running_requests.emplace(readiness_token.GetId(), request);
|
||||
}
|
||||
spdlog::trace("sent {} CreateVertices requests in parallel", running_requests.size());
|
||||
|
||||
// drive requests to completion
|
||||
std::vector<msgs::CreateVerticesResponse> responses;
|
||||
responses.reserve(state.requests.size());
|
||||
do {
|
||||
DriveWriteResponses(state, responses);
|
||||
} while (!state.requests.empty());
|
||||
|
||||
return responses;
|
||||
return DriveWriteResponses<msgs::CreateVerticesRequest, msgs::CreateVerticesResponse>(running_requests);
|
||||
}
|
||||
|
||||
std::vector<msgs::CreateExpandResponse> CreateExpand(std::vector<msgs::NewExpand> new_edges) override {
|
||||
ExecutionState<msgs::CreateExpandRequest> state = {};
|
||||
MG_ASSERT(!new_edges.empty());
|
||||
|
||||
// create requests
|
||||
InitializeExecutionState(state, new_edges);
|
||||
std::vector<ShardRequestState<msgs::CreateExpandRequest>> requests_to_be_sent = RequestsForCreateExpand(new_edges);
|
||||
|
||||
// begin all requests in parallel
|
||||
for (auto &request : state.requests) {
|
||||
RunningRequests<msgs::CreateExpandRequest> running_requests = {};
|
||||
running_requests.reserve(requests_to_be_sent.size());
|
||||
for (size_t i = 0; i < requests_to_be_sent.size(); i++) {
|
||||
auto &request = requests_to_be_sent[i];
|
||||
io::ReadinessToken readiness_token{i};
|
||||
auto &storage_client = GetStorageClientForShard(request.shard);
|
||||
msgs::WriteRequests req = request.request;
|
||||
request.async_request_token = storage_client.SendAsyncWriteRequest(req);
|
||||
storage_client.SendAsyncWriteRequest(req, notifier_, readiness_token);
|
||||
running_requests.emplace(readiness_token.GetId(), request);
|
||||
}
|
||||
|
||||
// drive requests to completion
|
||||
std::vector<msgs::CreateExpandResponse> responses;
|
||||
responses.reserve(state.requests.size());
|
||||
do {
|
||||
DriveWriteResponses(state, responses);
|
||||
} while (!state.requests.empty());
|
||||
|
||||
return responses;
|
||||
return DriveWriteResponses<msgs::CreateExpandRequest, msgs::CreateExpandResponse>(running_requests);
|
||||
}
|
||||
|
||||
std::vector<msgs::ExpandOneResultRow> ExpandOne(msgs::ExpandOneRequest request) override {
|
||||
ExecutionState<msgs::ExpandOneRequest> state = {};
|
||||
// TODO(kostasrim)Update to limit the batch size here
|
||||
// Expansions of the destination must be handled by the caller. For example
|
||||
// match (u:L1 { prop : 1 })-[:Friend]-(v:L1)
|
||||
@ -337,21 +325,22 @@ class RequestRouter : public RequestRouterInterface {
|
||||
// must be fetched again with an ExpandOne(Edges.dst)
|
||||
|
||||
// create requests
|
||||
InitializeExecutionState(state, std::move(request));
|
||||
std::vector<ShardRequestState<msgs::ExpandOneRequest>> requests_to_be_sent = RequestsForExpandOne(request);
|
||||
|
||||
// begin all requests in parallel
|
||||
for (auto &request : state.requests) {
|
||||
RunningRequests<msgs::ExpandOneRequest> running_requests = {};
|
||||
running_requests.reserve(requests_to_be_sent.size());
|
||||
for (size_t i = 0; i < requests_to_be_sent.size(); i++) {
|
||||
auto &request = requests_to_be_sent[i];
|
||||
io::ReadinessToken readiness_token{i};
|
||||
auto &storage_client = GetStorageClientForShard(request.shard);
|
||||
msgs::ReadRequests req = request.request;
|
||||
request.async_request_token = storage_client.SendAsyncReadRequest(req);
|
||||
storage_client.SendAsyncReadRequest(req, notifier_, readiness_token);
|
||||
running_requests.emplace(readiness_token.GetId(), request);
|
||||
}
|
||||
|
||||
// drive requests to completion
|
||||
std::vector<msgs::ExpandOneResponse> responses;
|
||||
responses.reserve(state.requests.size());
|
||||
do {
|
||||
DriveReadResponses(state, responses);
|
||||
} while (!state.requests.empty());
|
||||
auto responses = DriveReadResponses<msgs::ExpandOneRequest, msgs::ExpandOneResponse>(running_requests);
|
||||
|
||||
// post-process responses
|
||||
std::vector<msgs::ExpandOneResultRow> result_rows;
|
||||
@ -370,25 +359,33 @@ class RequestRouter : public RequestRouterInterface {
|
||||
}
|
||||
|
||||
std::vector<msgs::GetPropertiesResultRow> GetProperties(msgs::GetPropertiesRequest requests) override {
|
||||
ExecutionState<msgs::GetPropertiesRequest> state = {};
|
||||
InitializeExecutionState(state, std::move(requests));
|
||||
for (auto &request : state.requests) {
|
||||
// create requests
|
||||
std::vector<ShardRequestState<msgs::GetPropertiesRequest>> requests_to_be_sent =
|
||||
RequestsForGetProperties(std::move(requests));
|
||||
|
||||
// begin all requests in parallel
|
||||
RunningRequests<msgs::GetPropertiesRequest> running_requests = {};
|
||||
running_requests.reserve(requests_to_be_sent.size());
|
||||
for (size_t i = 0; i < requests_to_be_sent.size(); i++) {
|
||||
auto &request = requests_to_be_sent[i];
|
||||
io::ReadinessToken readiness_token{i};
|
||||
auto &storage_client = GetStorageClientForShard(request.shard);
|
||||
msgs::ReadRequests req = request.request;
|
||||
request.async_request_token = storage_client.SendAsyncReadRequest(req);
|
||||
storage_client.SendAsyncReadRequest(req, notifier_, readiness_token);
|
||||
running_requests.emplace(readiness_token.GetId(), request);
|
||||
}
|
||||
|
||||
std::vector<msgs::GetPropertiesResponse> responses;
|
||||
do {
|
||||
DriveReadResponses(state, responses);
|
||||
} while (!state.requests.empty());
|
||||
// drive requests to completion
|
||||
auto responses = DriveReadResponses<msgs::GetPropertiesRequest, msgs::GetPropertiesResponse>(running_requests);
|
||||
|
||||
std::vector<msgs::GetPropertiesResultRow> result;
|
||||
for (auto &res : responses) {
|
||||
std::move(res.result_row.begin(), res.result_row.end(), std::back_inserter(result));
|
||||
// post-process responses
|
||||
std::vector<msgs::GetPropertiesResultRow> result_rows;
|
||||
|
||||
for (auto &&response : responses) {
|
||||
std::move(response.result_row.begin(), response.result_row.end(), std::back_inserter(result_rows));
|
||||
}
|
||||
|
||||
return result;
|
||||
return result_rows;
|
||||
}
|
||||
|
||||
std::optional<storage::v3::PropertyId> MaybeNameToProperty(const std::string &name) const override {
|
||||
@ -404,10 +401,8 @@ class RequestRouter : public RequestRouterInterface {
|
||||
}
|
||||
|
||||
private:
|
||||
void InitializeExecutionState(ExecutionState<msgs::CreateVerticesRequest> &state,
|
||||
std::vector<msgs::NewVertex> new_vertices) {
|
||||
state.transaction_id = transaction_id_;
|
||||
|
||||
std::vector<ShardRequestState<msgs::CreateVerticesRequest>> RequestsForCreateVertices(
|
||||
const std::vector<msgs::NewVertex> &new_vertices) {
|
||||
std::map<Shard, msgs::CreateVerticesRequest> per_shard_request_table;
|
||||
|
||||
for (auto &new_vertex : new_vertices) {
|
||||
@ -421,20 +416,21 @@ class RequestRouter : public RequestRouterInterface {
|
||||
per_shard_request_table[shard].new_vertices.push_back(std::move(new_vertex));
|
||||
}
|
||||
|
||||
std::vector<ShardRequestState<msgs::CreateVerticesRequest>> requests = {};
|
||||
|
||||
for (auto &[shard, request] : per_shard_request_table) {
|
||||
ShardRequestState<msgs::CreateVerticesRequest> shard_request_state{
|
||||
.shard = shard,
|
||||
.request = request,
|
||||
.async_request_token = std::nullopt,
|
||||
};
|
||||
state.requests.emplace_back(std::move(shard_request_state));
|
||||
requests.emplace_back(std::move(shard_request_state));
|
||||
}
|
||||
|
||||
return requests;
|
||||
}
|
||||
|
||||
void InitializeExecutionState(ExecutionState<msgs::CreateExpandRequest> &state,
|
||||
std::vector<msgs::NewExpand> new_expands) {
|
||||
state.transaction_id = transaction_id_;
|
||||
|
||||
std::vector<ShardRequestState<msgs::CreateExpandRequest>> RequestsForCreateExpand(
|
||||
const std::vector<msgs::NewExpand> &new_expands) {
|
||||
std::map<Shard, msgs::CreateExpandRequest> per_shard_request_table;
|
||||
auto ensure_shard_exists_in_table = [&per_shard_request_table,
|
||||
transaction_id = transaction_id_](const Shard &shard) {
|
||||
@ -459,27 +455,33 @@ class RequestRouter : public RequestRouterInterface {
|
||||
per_shard_request_table[shard_src_vertex].new_expands.push_back(std::move(new_expand));
|
||||
}
|
||||
|
||||
std::vector<ShardRequestState<msgs::CreateExpandRequest>> requests = {};
|
||||
|
||||
for (auto &[shard, request] : per_shard_request_table) {
|
||||
ShardRequestState<msgs::CreateExpandRequest> shard_request_state{
|
||||
.shard = shard,
|
||||
.request = request,
|
||||
.async_request_token = std::nullopt,
|
||||
};
|
||||
state.requests.emplace_back(std::move(shard_request_state));
|
||||
requests.emplace_back(std::move(shard_request_state));
|
||||
}
|
||||
|
||||
return requests;
|
||||
}
|
||||
|
||||
void InitializeExecutionState(ExecutionState<msgs::ScanVerticesRequest> &state) {
|
||||
std::vector<ShardRequestState<msgs::ScanVerticesRequest>> RequestsForScanVertices(
|
||||
const std::optional<std::string> &label) {
|
||||
std::vector<coordinator::Shards> multi_shards;
|
||||
state.transaction_id = transaction_id_;
|
||||
if (!state.label) {
|
||||
multi_shards = shards_map_.GetAllShards();
|
||||
} else {
|
||||
const auto label_id = shards_map_.GetLabelId(*state.label);
|
||||
if (label) {
|
||||
const auto label_id = shards_map_.GetLabelId(*label);
|
||||
MG_ASSERT(label_id);
|
||||
MG_ASSERT(IsPrimaryLabel(*label_id));
|
||||
multi_shards = {shards_map_.GetShardsForLabel(*state.label)};
|
||||
multi_shards = {shards_map_.GetShardsForLabel(*label)};
|
||||
} else {
|
||||
multi_shards = shards_map_.GetAllShards();
|
||||
}
|
||||
|
||||
std::vector<ShardRequestState<msgs::ScanVerticesRequest>> requests = {};
|
||||
|
||||
for (auto &shards : multi_shards) {
|
||||
for (auto &[key, shard] : shards) {
|
||||
MG_ASSERT(!shard.empty());
|
||||
@ -491,22 +493,21 @@ class RequestRouter : public RequestRouterInterface {
|
||||
ShardRequestState<msgs::ScanVerticesRequest> shard_request_state{
|
||||
.shard = shard,
|
||||
.request = std::move(request),
|
||||
.async_request_token = std::nullopt,
|
||||
};
|
||||
|
||||
state.requests.emplace_back(std::move(shard_request_state));
|
||||
requests.emplace_back(std::move(shard_request_state));
|
||||
}
|
||||
}
|
||||
|
||||
return requests;
|
||||
}
|
||||
|
||||
void InitializeExecutionState(ExecutionState<msgs::ExpandOneRequest> &state, msgs::ExpandOneRequest request) {
|
||||
state.transaction_id = transaction_id_;
|
||||
|
||||
std::vector<ShardRequestState<msgs::ExpandOneRequest>> RequestsForExpandOne(const msgs::ExpandOneRequest &request) {
|
||||
std::map<Shard, msgs::ExpandOneRequest> per_shard_request_table;
|
||||
auto top_level_rqst_template = request;
|
||||
msgs::ExpandOneRequest top_level_rqst_template = request;
|
||||
top_level_rqst_template.transaction_id = transaction_id_;
|
||||
top_level_rqst_template.src_vertices.clear();
|
||||
state.requests.clear();
|
||||
|
||||
for (auto &vertex : request.src_vertices) {
|
||||
auto shard =
|
||||
shards_map_.GetShardForKey(vertex.first.id, storage::conversions::ConvertPropertyVector(vertex.second));
|
||||
@ -516,27 +517,29 @@ class RequestRouter : public RequestRouterInterface {
|
||||
per_shard_request_table[shard].src_vertices.push_back(vertex);
|
||||
}
|
||||
|
||||
std::vector<ShardRequestState<msgs::ExpandOneRequest>> requests = {};
|
||||
|
||||
for (auto &[shard, request] : per_shard_request_table) {
|
||||
ShardRequestState<msgs::ExpandOneRequest> shard_request_state{
|
||||
.shard = shard,
|
||||
.request = request,
|
||||
.async_request_token = std::nullopt,
|
||||
};
|
||||
|
||||
state.requests.emplace_back(std::move(shard_request_state));
|
||||
requests.emplace_back(std::move(shard_request_state));
|
||||
}
|
||||
|
||||
return requests;
|
||||
}
|
||||
|
||||
void InitializeExecutionState(ExecutionState<msgs::GetPropertiesRequest> &state, msgs::GetPropertiesRequest request) {
|
||||
std::vector<ShardRequestState<msgs::GetPropertiesRequest>> RequestsForGetProperties(
|
||||
msgs::GetPropertiesRequest &&request) {
|
||||
std::map<Shard, msgs::GetPropertiesRequest> per_shard_request_table;
|
||||
auto top_level_rqst_template = request;
|
||||
top_level_rqst_template.transaction_id = transaction_id_;
|
||||
top_level_rqst_template.vertex_ids.clear();
|
||||
top_level_rqst_template.vertices_and_edges.clear();
|
||||
|
||||
state.transaction_id = transaction_id_;
|
||||
|
||||
for (auto &vertex : request.vertex_ids) {
|
||||
for (auto &&vertex : request.vertex_ids) {
|
||||
auto shard =
|
||||
shards_map_.GetShardForKey(vertex.first.id, storage::conversions::ConvertPropertyVector(vertex.second));
|
||||
if (!per_shard_request_table.contains(shard)) {
|
||||
@ -554,15 +557,18 @@ class RequestRouter : public RequestRouterInterface {
|
||||
per_shard_request_table[shard].vertices_and_edges.emplace_back(std::move(vertex), maybe_edge);
|
||||
}
|
||||
|
||||
std::vector<ShardRequestState<msgs::GetPropertiesRequest>> requests;
|
||||
|
||||
for (auto &[shard, rqst] : per_shard_request_table) {
|
||||
ShardRequestState<msgs::GetPropertiesRequest> shard_request_state{
|
||||
.shard = shard,
|
||||
.request = std::move(rqst),
|
||||
.async_request_token = std::nullopt,
|
||||
};
|
||||
|
||||
state.requests.emplace_back(std::move(shard_request_state));
|
||||
requests.emplace_back(std::move(shard_request_state));
|
||||
}
|
||||
|
||||
return requests;
|
||||
}
|
||||
|
||||
StorageClient &GetStorageClientForShard(Shard shard) {
|
||||
@ -590,13 +596,24 @@ class RequestRouter : public RequestRouterInterface {
|
||||
}
|
||||
|
||||
template <typename RequestT, typename ResponseT>
|
||||
void DriveReadResponses(ExecutionState<RequestT> &state, std::vector<ResponseT> &responses) {
|
||||
for (auto &request : state.requests) {
|
||||
std::vector<ResponseT> DriveReadResponses(RunningRequests<RequestT> &running_requests) {
|
||||
// Store responses in a map based on the corresponding request
|
||||
// offset, so that they can be reassembled in the correct order
|
||||
// even if they came back in randomized orders.
|
||||
std::map<size_t, ResponseT> response_map;
|
||||
|
||||
spdlog::trace("waiting on readiness for token");
|
||||
while (response_map.size() < running_requests.size()) {
|
||||
auto ready = notifier_.Await();
|
||||
spdlog::trace("got readiness for token {}", ready.GetId());
|
||||
auto &request = running_requests.at(ready.GetId());
|
||||
auto &storage_client = GetStorageClientForShard(request.shard);
|
||||
|
||||
auto poll_result = storage_client.AwaitAsyncReadRequest(request.async_request_token.value());
|
||||
while (!poll_result) {
|
||||
poll_result = storage_client.AwaitAsyncReadRequest(request.async_request_token.value());
|
||||
std::optional<utils::BasicResult<io::TimedOut, msgs::ReadResponses>> poll_result =
|
||||
storage_client.PollAsyncReadRequest(ready);
|
||||
|
||||
if (!poll_result.has_value()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (poll_result->HasError()) {
|
||||
@ -609,19 +626,40 @@ class RequestRouter : public RequestRouterInterface {
|
||||
throw std::runtime_error("RequestRouter Read request did not succeed");
|
||||
}
|
||||
|
||||
responses.push_back(std::move(response));
|
||||
// the readiness token has an ID based on the request vector offset
|
||||
response_map.emplace(ready.GetId(), std::move(response));
|
||||
}
|
||||
state.requests.clear();
|
||||
|
||||
std::vector<ResponseT> responses;
|
||||
responses.reserve(running_requests.size());
|
||||
|
||||
int last = -1;
|
||||
for (auto &&[offset, response] : response_map) {
|
||||
MG_ASSERT(last + 1 == offset);
|
||||
responses.emplace_back(std::forward<ResponseT>(response));
|
||||
last = offset;
|
||||
}
|
||||
|
||||
return responses;
|
||||
}
|
||||
|
||||
template <typename RequestT, typename ResponseT>
|
||||
void DriveWriteResponses(ExecutionState<RequestT> &state, std::vector<ResponseT> &responses) {
|
||||
for (auto &request : state.requests) {
|
||||
std::vector<ResponseT> DriveWriteResponses(RunningRequests<RequestT> &running_requests) {
|
||||
// Store responses in a map based on the corresponding request
|
||||
// offset, so that they can be reassembled in the correct order
|
||||
// even if they came back in randomized orders.
|
||||
std::map<size_t, ResponseT> response_map;
|
||||
|
||||
while (response_map.size() < running_requests.size()) {
|
||||
auto ready = notifier_.Await();
|
||||
auto &request = running_requests.at(ready.GetId());
|
||||
auto &storage_client = GetStorageClientForShard(request.shard);
|
||||
|
||||
auto poll_result = storage_client.AwaitAsyncWriteRequest(request.async_request_token.value());
|
||||
while (!poll_result) {
|
||||
poll_result = storage_client.AwaitAsyncWriteRequest(request.async_request_token.value());
|
||||
std::optional<utils::BasicResult<io::TimedOut, msgs::WriteResponses>> poll_result =
|
||||
storage_client.PollAsyncWriteRequest(ready);
|
||||
|
||||
if (!poll_result.has_value()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (poll_result->HasError()) {
|
||||
@ -634,9 +672,21 @@ class RequestRouter : public RequestRouterInterface {
|
||||
throw std::runtime_error("RequestRouter Write request did not succeed");
|
||||
}
|
||||
|
||||
responses.push_back(std::move(response));
|
||||
// the readiness token has an ID based on the request vector offset
|
||||
response_map.emplace(ready.GetId(), std::move(response));
|
||||
}
|
||||
state.requests.clear();
|
||||
|
||||
std::vector<ResponseT> responses;
|
||||
responses.reserve(running_requests.size());
|
||||
|
||||
int last = -1;
|
||||
for (auto &&[offset, response] : response_map) {
|
||||
MG_ASSERT(last + 1 == offset);
|
||||
responses.emplace_back(std::forward<ResponseT>(response));
|
||||
last = offset;
|
||||
}
|
||||
|
||||
return responses;
|
||||
}
|
||||
|
||||
void SetUpNameIdMappers() {
|
||||
@ -665,6 +715,7 @@ class RequestRouter : public RequestRouterInterface {
|
||||
RsmStorageClientManager<StorageClient> storage_cli_manager_;
|
||||
io::Io<TTransport> io_;
|
||||
coordinator::Hlc transaction_id_;
|
||||
io::Notifier notifier_ = {};
|
||||
// TODO(kostasrim) Add batch prefetching
|
||||
};
|
||||
} // namespace memgraph::query::v2
|
||||
|
@ -611,7 +611,7 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::GetPropertiesRequest &&req) {
|
||||
return limit;
|
||||
};
|
||||
|
||||
auto collect_response = [get_limit, &req](auto &elements, auto create_result_row) {
|
||||
auto collect_response = [get_limit, &req](auto &elements, auto create_result_row) -> msgs::ReadResponses {
|
||||
msgs::GetPropertiesResponse response;
|
||||
const auto limit = get_limit(elements);
|
||||
for (size_t index = 0; index != limit; ++index) {
|
||||
|
@ -18,6 +18,8 @@
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#include <spdlog/cfg/env.h>
|
||||
|
||||
#include "common.hpp"
|
||||
#include "common/types.hpp"
|
||||
#include "coordinator/coordinator_client.hpp"
|
||||
@ -346,6 +348,8 @@ void DoTest() {
|
||||
CoordinatorClient<SimulatorTransport> coordinator_client(cli_io, c_addrs[0], c_addrs);
|
||||
|
||||
query::v2::RequestRouter<SimulatorTransport> request_router(std::move(coordinator_client), std::move(cli_io));
|
||||
std::function<bool()> tick_simulator = simulator.GetSimulatorTickClosure();
|
||||
request_router.InstallSimulatorTicker(tick_simulator);
|
||||
|
||||
request_router.StartTransaction();
|
||||
TestScanVertices(request_router);
|
||||
@ -368,4 +372,7 @@ void DoTest() {
|
||||
}
|
||||
} // namespace memgraph::query::v2::tests
|
||||
|
||||
int main() { memgraph::query::v2::tests::DoTest(); }
|
||||
int main() {
|
||||
spdlog::cfg::load_env_levels();
|
||||
memgraph::query::v2::tests::DoTest();
|
||||
}
|
||||
|
@ -244,6 +244,8 @@ std::pair<SimulatorStats, LatencyHistogramSummaries> RunClusterSimulation(const
|
||||
WaitForShardsToInitialize(coordinator_client);
|
||||
|
||||
query::v2::RequestRouter<SimulatorTransport> request_router(std::move(coordinator_client), std::move(cli_io));
|
||||
std::function<bool()> tick_simulator = simulator.GetSimulatorTickClosure();
|
||||
request_router.InstallSimulatorTicker(tick_simulator);
|
||||
|
||||
request_router.StartTransaction();
|
||||
|
||||
|
@ -28,13 +28,19 @@ void Wait(Future<std::string> future_1, Promise<std::string> promise_2) {
|
||||
|
||||
TEST(Future, BasicLifecycle) {
|
||||
std::atomic_bool waiting = false;
|
||||
std::atomic_bool filled = false;
|
||||
|
||||
std::function<bool()> notifier = [&] {
|
||||
std::function<bool()> wait_notifier = [&] {
|
||||
waiting.store(true, std::memory_order_seq_cst);
|
||||
return false;
|
||||
};
|
||||
|
||||
auto [future_1, promise_1] = FuturePromisePairWithNotifier<std::string>(notifier);
|
||||
std::function<bool()> fill_notifier = [&] {
|
||||
filled.store(true, std::memory_order_seq_cst);
|
||||
return false;
|
||||
};
|
||||
|
||||
auto [future_1, promise_1] = FuturePromisePairWithNotifications<std::string>(wait_notifier, fill_notifier);
|
||||
auto [future_2, promise_2] = FuturePromisePair<std::string>();
|
||||
|
||||
std::jthread t1(Wait, std::move(future_1), std::move(promise_2));
|
||||
@ -50,6 +56,8 @@ TEST(Future, BasicLifecycle) {
|
||||
t1.join();
|
||||
t2.join();
|
||||
|
||||
EXPECT_TRUE(filled.load(std::memory_order_acquire));
|
||||
|
||||
std::string result_2 = std::move(future_2).Wait();
|
||||
EXPECT_TRUE(result_2 == "it worked");
|
||||
}
|
||||
|
@ -194,7 +194,8 @@ void ExecuteOp(query::v2::RequestRouter<LocalTransport> &request_router, std::se
|
||||
ScanAll scan_all) {
|
||||
auto results = request_router.ScanVertices("test_label");
|
||||
|
||||
MG_ASSERT(results.size() == correctness_model.size());
|
||||
spdlog::error("got {} results, model size is {}", results.size(), correctness_model.size());
|
||||
EXPECT_EQ(results.size(), correctness_model.size());
|
||||
|
||||
for (const auto &vertex_accessor : results) {
|
||||
const auto properties = vertex_accessor.Properties();
|
||||
|
@ -86,13 +86,14 @@ class MockedRequestRouter : public RequestRouterInterface {
|
||||
void Commit() override {}
|
||||
std::vector<VertexAccessor> ScanVertices(std::optional<std::string> /* label */) override { return {}; }
|
||||
|
||||
std::vector<CreateVerticesResponse> CreateVertices(std::vector<memgraph::msgs::NewVertex> new_vertices) override {
|
||||
std::vector<CreateVerticesResponse> CreateVertices(
|
||||
std::vector<memgraph::msgs::NewVertex> /* new_vertices */) override {
|
||||
return {};
|
||||
}
|
||||
|
||||
std::vector<ExpandOneResultRow> ExpandOne(ExpandOneRequest request) override { return {}; }
|
||||
std::vector<ExpandOneResultRow> ExpandOne(ExpandOneRequest /* request */) override { return {}; }
|
||||
|
||||
std::vector<CreateExpandResponse> CreateExpand(std::vector<NewExpand> new_edges) override { return {}; }
|
||||
std::vector<CreateExpandResponse> CreateExpand(std::vector<NewExpand> /* new_edges */) override { return {}; }
|
||||
|
||||
std::vector<GetPropertiesResultRow> GetProperties(GetPropertiesRequest rqst) override { return {}; }
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user