Enforce move in transport layer

This commit is contained in:
János Benjamin Antal 2023-03-01 16:39:32 +01:00
parent 13795c8993
commit 53cd5592e0
13 changed files with 73 additions and 53 deletions

View File

@ -31,7 +31,7 @@ class LocalTransport {
: local_transport_handle_(std::move(local_transport_handle)) {}
template <Message ResponseT, Message RequestT>
ResponseFuture<ResponseT> Request(Address to_address, Address from_address, RequestT &&request,
ResponseFuture<ResponseT> Request(Address to_address, Address from_address, RValueRef<RequestT> request,
std::function<void()> fill_notifier, Duration timeout) {
return local_transport_handle_->template SubmitRequest<ResponseT, RequestT>(
to_address, from_address, std::move(request), timeout, fill_notifier);
@ -43,8 +43,8 @@ class LocalTransport {
}
template <Message M>
void Send(Address to_address, Address from_address, RequestId request_id, M &&message) {
return local_transport_handle_->template Send(to_address, from_address, request_id, std::move(message));
void Send(Address to_address, Address from_address, RequestId request_id, RValueRef<M> message) {
return local_transport_handle_->template Send<M>(to_address, from_address, request_id, std::move(message));
}
Time Now() const { return local_transport_handle_->Now(); }

View File

@ -104,7 +104,7 @@ class LocalTransportHandle {
}
template <Message M>
void Send(Address to_address, Address from_address, RequestId request_id, M &&message) {
void Send(Address to_address, Address from_address, RequestId request_id, RValueRef<M> message) {
auto type_info = TypeInfoFor(message);
std::any message_any(std::move(message));
@ -139,13 +139,13 @@ class LocalTransportHandle {
}
template <Message ResponseT, Message RequestT>
ResponseFuture<ResponseT> SubmitRequest(Address to_address, Address from_address, RequestT &&request,
ResponseFuture<ResponseT> SubmitRequest(Address to_address, Address from_address, RValueRef<RequestT> request,
Duration timeout, std::function<void()> fill_notifier) {
auto [future, promise] = memgraph::io::FuturePromisePairWithNotifications<ResponseResult<ResponseT>>(
// set null notifier for when the Future::Wait is called
nullptr,
// set notifier for when Promise::Fill is called
std::forward<std::function<void()>>(fill_notifier));
std::move(fill_notifier));
const bool port_matches = to_address.last_known_port == from_address.last_known_port;
const bool ip_matches = to_address.last_known_ip == from_address.last_known_ip;
@ -168,7 +168,7 @@ class LocalTransportHandle {
promises_.emplace(std::move(promise_key), std::move(dop));
} // lock dropped
Send(to_address, from_address, request_id, std::move(request));
Send<RequestT>(to_address, from_address, request_id, std::move(request));
return std::move(future);
}

View File

@ -19,6 +19,7 @@
#include <map>
#include <set>
#include <thread>
#include <type_traits>
#include <vector>
#include <boost/core/demangle.hpp>
@ -624,13 +625,12 @@ class Raft {
MG_ASSERT(std::max(req.term, state_.term) == req.term);
}
VoteResponse res{
.term = std::max(req.term, state_.term),
.committed_log_size = state_.committed_log_size,
.vote_granted = new_leader,
};
io_.Send(from_address, request_id, std::move(res));
io_.Send(from_address, request_id,
VoteResponse{
.term = std::max(req.term, state_.term),
.committed_log_size = state_.committed_log_size,
.vote_granted = new_leader,
});
if (new_leader) {
// become a follower
@ -718,6 +718,10 @@ class Raft {
.log_size = state_.log.size(),
};
static_assert(std::is_trivially_copyable_v<AppendResponse>,
"This function copies this message, therefore it is important to be trivially copyable. Otherwise it "
"should be moved");
if constexpr (std::is_same<ALL, Leader>()) {
MG_ASSERT(req.term != state_.term, "Multiple leaders are acting under the term ", req.term);
}
@ -808,7 +812,7 @@ class Raft {
Log("returning log_size of ", res.log_size);
io_.Send(from_address, request_id, std::move(res));
io_.Send(from_address, request_id, AppendResponse{res});
return std::nullopt;
}

View File

@ -122,8 +122,8 @@ class RsmClient {
.start_time = io_.Now(),
.request = std::move(req),
.notifier = notifier,
.future = io_.template RequestWithNotification<ReadResponse<ReadResponseT>>(leader_, std::move(read_req),
notifier, readiness_token),
.future = io_.template RequestWithNotification<ReadResponse<ReadResponseT>, ReadRequest<ReadRequestT>>(
leader_, std::move(read_req), notifier, readiness_token),
};
async_reads_.emplace(readiness_token.GetId(), std::move(async_request));
@ -134,7 +134,7 @@ class RsmClient {
ReadRequest<ReadRequestT> read_req = {.operation = async_request.request};
async_request.future = io_.template RequestWithNotification<ReadResponse<ReadResponseT>>(
async_request.future = io_.template RequestWithNotification<ReadResponse<ReadResponseT>, ReadRequest<ReadRequestT>>(
leader_, std::move(read_req), async_request.notifier, readiness_token);
}
@ -191,8 +191,8 @@ class RsmClient {
.start_time = io_.Now(),
.request = std::move(req),
.notifier = notifier,
.future = io_.template RequestWithNotification<WriteResponse<WriteResponseT>>(leader_, std::move(write_req),
notifier, readiness_token),
.future = io_.template RequestWithNotification<WriteResponse<WriteResponseT>, WriteRequest<WriteRequestT>>(
leader_, std::move(write_req), notifier, readiness_token),
};
async_writes_.emplace(readiness_token.GetId(), std::move(async_request));
@ -203,8 +203,9 @@ class RsmClient {
WriteRequest<WriteRequestT> write_req = {.operation = async_request.request};
async_request.future = io_.template RequestWithNotification<WriteResponse<WriteResponseT>>(
leader_, std::move(write_req), async_request.notifier, readiness_token);
async_request.future =
io_.template RequestWithNotification<WriteResponse<WriteResponseT>, WriteRequest<WriteRequestT>>(
leader_, std::move(write_req), async_request.notifier, readiness_token);
}
std::optional<BasicResult<TimedOut, WriteResponseT>> PollAsyncWriteRequest(const ReadinessToken &readiness_token) {

View File

@ -105,19 +105,19 @@ class SimulatorHandle {
bool ShouldShutDown() const;
template <Message Response, Message Request>
ResponseFuture<Response> SubmitRequest(Address to_address, Address from_address, Request &&request, Duration timeout,
std::function<bool()> &&maybe_tick_simulator,
std::function<void()> &&fill_notifier) {
template <Message ResponseT, Message RequestT>
ResponseFuture<ResponseT> SubmitRequest(Address to_address, Address from_address, RValueRef<RequestT> request,
Duration timeout, std::function<bool()> &&maybe_tick_simulator,
std::function<void()> &&fill_notifier) {
auto type_info = TypeInfoFor(request);
std::string demangled_name = boost::core::demangle(type_info.get().name());
spdlog::trace("simulator sending request {} to {}", demangled_name, to_address);
auto [future, promise] = memgraph::io::FuturePromisePairWithNotifications<ResponseResult<Response>>(
auto [future, promise] = memgraph::io::FuturePromisePairWithNotifications<ResponseResult<ResponseT>>(
// set notifier for when the Future::Wait is called
std::forward<std::function<bool()>>(maybe_tick_simulator),
std::move(maybe_tick_simulator),
// set notifier for when Promise::Fill is called
std::forward<std::function<void()>>(fill_notifier));
std::move(fill_notifier));
{
std::unique_lock<std::mutex> lock(mu_);
@ -194,7 +194,7 @@ class SimulatorHandle {
}
template <Message M>
void Send(Address to_address, Address from_address, RequestId request_id, M &&message) {
void Send(Address to_address, Address from_address, RequestId request_id, RValueRef<M> message) {
spdlog::trace("sending message from {} to {}", from_address.last_known_port, to_address.last_known_port);
auto type_info = TypeInfoFor(message);
{

View File

@ -34,14 +34,14 @@ class SimulatorTransport {
: simulator_handle_(simulator_handle), address_(address), rng_(std::mt19937{seed}) {}
template <Message ResponseT, Message RequestT>
ResponseFuture<ResponseT> Request(Address to_address, Address from_address, RequestT &&request,
ResponseFuture<ResponseT> Request(Address to_address, Address from_address, RValueRef<RequestT> request,
std::function<void()> notification, Duration timeout) {
std::function<bool()> tick_simulator = [handle_copy = simulator_handle_] {
return handle_copy->MaybeTickSimulator();
};
return simulator_handle_->template SubmitRequest<ResponseT>(to_address, from_address, std::move(request), timeout,
std::move(tick_simulator), std::move(notification));
return simulator_handle_->template SubmitRequest<ResponseT, RequestT>(
to_address, from_address, std::move(request), timeout, std::move(tick_simulator), std::move(notification));
}
template <Message... Ms>
@ -50,8 +50,8 @@ class SimulatorTransport {
}
template <Message M>
void Send(Address to_address, Address from_address, uint64_t request_id, M &&message) {
return simulator_handle_->template Send(to_address, from_address, request_id, std::move(message));
void Send(Address to_address, Address from_address, uint64_t request_id, RValueRef<M> message) {
return simulator_handle_->template Send<M>(to_address, from_address, request_id, std::move(message));
}
Time Now() const { return simulator_handle_->Now(); }

View File

@ -22,6 +22,7 @@
#include "io/message_histogram_collector.hpp"
#include "io/notifier.hpp"
#include "io/time.hpp"
#include "utils/concepts.hpp"
#include "utils/result.hpp"
namespace memgraph::io {
@ -34,6 +35,14 @@ using memgraph::utils::BasicResult;
template <typename T>
concept Message = std::movable<T> && std::copyable<T>;
template <utils::Object T>
struct RValueRefEnforcer {
using Type = T &&;
};
template <typename T>
using RValueRef = typename RValueRefEnforcer<T>::Type;
using RequestId = uint64_t;
template <Message M>
@ -83,33 +92,33 @@ class Io {
/// Issue a request with an explicit timeout in microseconds provided. This tends to be used by clients.
template <Message ResponseT, Message RequestT>
ResponseFuture<ResponseT> RequestWithTimeout(Address address, RequestT &&request, Duration timeout) {
ResponseFuture<ResponseT> RequestWithTimeout(Address address, RValueRef<RequestT> request, Duration timeout) {
const Address from_address = address_;
std::function<void()> fill_notifier = nullptr;
return implementation_.template Request<ResponseT>(address, from_address, std::forward<RequestT>(request),
fill_notifier, timeout);
return implementation_.template Request<ResponseT, RequestT>(address, from_address, std::move(request),
fill_notifier, timeout);
}
/// Issue a request that times out after the default timeout. This tends
/// to be used by clients.
template <Message ResponseT, Message RequestT>
ResponseFuture<ResponseT> Request(Address to_address, RequestT &&request) {
ResponseFuture<ResponseT> Request(Address to_address, RValueRef<RequestT> request) {
const Duration timeout = default_timeout_;
const Address from_address = address_;
std::function<void()> fill_notifier = nullptr;
return implementation_.template Request<ResponseT>(to_address, from_address, std::forward<RequestT>(request),
fill_notifier, timeout);
return implementation_.template Request<ResponseT, RequestT>(to_address, from_address, std::move(request),
fill_notifier, timeout);
}
/// Issue a request that will notify a Notifier when it is filled or times out.
template <Message ResponseT, Message RequestT>
ResponseFuture<ResponseT> RequestWithNotification(Address to_address, RequestT &&request, Notifier notifier,
ResponseFuture<ResponseT> RequestWithNotification(Address to_address, RValueRef<RequestT> request, Notifier notifier,
ReadinessToken readiness_token) {
const Duration timeout = default_timeout_;
const Address from_address = address_;
std::function<void()> fill_notifier = [notifier, readiness_token]() { notifier.Notify(readiness_token); };
return implementation_.template Request<ResponseT>(to_address, from_address, std::forward<RequestT>(request),
fill_notifier, timeout);
return implementation_.template Request<ResponseT, RequestT>(to_address, from_address, std::move(request),
fill_notifier, timeout);
}
/// Issue a request that will notify a Notifier when it is filled or times out.
@ -143,7 +152,7 @@ class Io {
template <Message M>
void Send(Address to_address, RequestId request_id, M &&message) {
Address from_address = address_;
return implementation_.template Send(to_address, from_address, request_id, std::forward<M>(message));
return implementation_.template Send<M>(to_address, from_address, request_id, std::forward<M>(message));
}
/// The current system time. This time source should be preferred over any other,

View File

@ -729,8 +729,9 @@ class RequestRouter : public RequestRouterInterface {
io::rsm::WriteRequest<coordinator::CoordinatorWriteRequests> ww;
ww.operation = std::move(requests);
auto resp = io_.template Request<io::rsm::WriteResponse<coordinator::CoordinatorWriteResponses>>(
coordinator_address, std::move(ww))
auto resp = io_.template Request<io::rsm::WriteResponse<coordinator::CoordinatorWriteResponses>,
io::rsm::WriteRequest<coordinator::CoordinatorWriteRequests>>(coordinator_address,
std::move(ww))
.Wait();
if (resp.HasValue()) {
const auto alloc_edge_id_reps =

View File

@ -249,8 +249,9 @@ class ShardManager {
ww.operation = cwr;
spdlog::info("SM sending heartbeat to coordinator {}", coordinator_leader_.ToString());
heartbeat_res_.emplace(
std::move(io_.template Request<WriteResponse<CoordinatorWriteResponses>>(coordinator_leader_, std::move(ww))));
heartbeat_res_.emplace(std::move(
io_.template Request<WriteResponse<CoordinatorWriteResponses>, WriteRequest<CoordinatorWriteRequests>>(
coordinator_leader_, std::move(ww))));
spdlog::info("SM sent heartbeat");
}

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -12,6 +12,7 @@
#pragma once
#include <concepts>
#include <iterator>
#include <type_traits>
namespace memgraph::utils {
template <typename T, typename... Args>
@ -34,4 +35,7 @@ template <typename T>
concept Dereferenceable = requires(T t) {
{ *t } -> CanReference;
};
template <typename T>
concept Object = std::is_object_v<T>;
} // namespace memgraph::utils

View File

@ -76,7 +76,7 @@ std::pair<SimulatorStats, LatencyHistogramSummaries> RunWorkload(SimulatorConfig
CounterRequest cli_req;
cli_req.proposal = i;
spdlog::info("[CLIENT] calling Request");
auto res_f = cli_io.Request<CounterResponse>(srv_addr, std::move(cli_req));
auto res_f = cli_io.Request<CounterResponse, CounterRequest>(srv_addr, std::move(cli_req));
spdlog::info("[CLIENT] calling Wait");
auto res_rez = std::move(res_f).Wait();
spdlog::info("[CLIENT] Wait returned");

View File

@ -78,7 +78,7 @@ int main() {
auto req = ScanVerticesRequest{2, std::nullopt};
auto res_f = cli_io.Request<VerticesResponse>(srv_addr, std::move(req));
auto res_f = cli_io.Request<VerticesResponse, ScanVerticesRequest>(srv_addr, std::move(req));
auto res_rez = std::move(res_f).Wait();
simulator.ShutDown();
return 0;

View File

@ -70,7 +70,7 @@ TEST(LocalTransport, BasicRequest) {
auto value = 1; // i;
cli_req.proposal = value;
spdlog::info("[CLIENT] sending request");
auto res_f = cli_io.Request<CounterResponse>(srv_addr, std::move(cli_req));
auto res_f = cli_io.Request<CounterResponse, CounterRequest>(srv_addr, std::move(cli_req));
spdlog::info("[CLIENT] waiting on future");
auto res_rez = std::move(res_f).Wait();