Add local transport (#512)
* Create LocalTransport Io provider for sending messages to components on the same machine * Move src/io/simulation/message_conversion.hpp to src/io/message_conversion.hpp for use in other Io providers
This commit is contained in:
parent
14c9e68456
commit
a40403e3ce
@ -16,6 +16,7 @@
|
||||
#include <fmt/format.h>
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
#include <boost/uuid/uuid.hpp>
|
||||
#include <boost/uuid/uuid_generators.hpp>
|
||||
#include <boost/uuid/uuid_io.hpp>
|
||||
|
||||
namespace memgraph::io {
|
||||
@ -37,6 +38,12 @@ struct Address {
|
||||
return ret;
|
||||
}
|
||||
|
||||
static Address UniqueLocalAddress() {
|
||||
return Address{
|
||||
.unique_id = boost::uuids::uuid{boost::uuids::random_generator()()},
|
||||
};
|
||||
}
|
||||
|
||||
friend bool operator==(const Address &lhs, const Address &rhs) = default;
|
||||
|
||||
/// unique_id is most dominant for ordering, then last_known_ip, then last_known_port
|
||||
|
35
src/io/local_transport/local_system.hpp
Normal file
35
src/io/local_transport/local_system.hpp
Normal file
@ -0,0 +1,35 @@
|
||||
// Copyright 2022 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <random>
|
||||
|
||||
#include "io/address.hpp"
|
||||
#include "io/local_transport/local_transport.hpp"
|
||||
#include "io/local_transport/local_transport_handle.hpp"
|
||||
#include "io/transport.hpp"
|
||||
|
||||
namespace memgraph::io::local_transport {
|
||||
|
||||
class LocalSystem {
|
||||
std::shared_ptr<LocalTransportHandle> local_transport_handle_ = std::make_shared<LocalTransportHandle>();
|
||||
|
||||
public:
|
||||
Io<LocalTransport> Register(Address address) {
|
||||
LocalTransport local_transport(local_transport_handle_, address);
|
||||
return Io{local_transport, address};
|
||||
}
|
||||
|
||||
void ShutDown() { local_transport_handle_->ShutDown(); }
|
||||
};
|
||||
|
||||
} // namespace memgraph::io::local_transport
|
67
src/io/local_transport/local_transport.hpp
Normal file
67
src/io/local_transport/local_transport.hpp
Normal file
@ -0,0 +1,67 @@
|
||||
// Copyright 2022 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
#include <random>
|
||||
#include <utility>
|
||||
|
||||
#include "io/address.hpp"
|
||||
#include "io/local_transport/local_transport_handle.hpp"
|
||||
#include "io/time.hpp"
|
||||
#include "io/transport.hpp"
|
||||
|
||||
namespace memgraph::io::local_transport {
|
||||
|
||||
class LocalTransport {
|
||||
std::shared_ptr<LocalTransportHandle> local_transport_handle_;
|
||||
const Address address_;
|
||||
|
||||
public:
|
||||
LocalTransport(std::shared_ptr<LocalTransportHandle> local_transport_handle, Address address)
|
||||
: local_transport_handle_(std::move(local_transport_handle)), address_(address) {}
|
||||
|
||||
template <Message Request, Message Response>
|
||||
ResponseFuture<Response> Request(Address to_address, RequestId request_id, Request request, Duration timeout) {
|
||||
auto [future, promise] = memgraph::io::FuturePromisePair<ResponseResult<Response>>();
|
||||
|
||||
Address from_address = address_;
|
||||
|
||||
local_transport_handle_->SubmitRequest(to_address, from_address, request_id, std::move(request), timeout,
|
||||
std::move(promise));
|
||||
|
||||
return std::move(future);
|
||||
}
|
||||
|
||||
template <Message... Ms>
|
||||
requires(sizeof...(Ms) > 0) RequestResult<Ms...> Receive(Duration timeout) {
|
||||
Address from_address = address_;
|
||||
return local_transport_handle_->template Receive<Ms...>(timeout);
|
||||
}
|
||||
|
||||
template <Message M>
|
||||
void Send(Address to_address, Address from_address, RequestId request_id, M &&message) {
|
||||
return local_transport_handle_->template Send<M>(to_address, from_address, request_id, std::forward<M>(message));
|
||||
}
|
||||
|
||||
Time Now() const { return local_transport_handle_->Now(); }
|
||||
|
||||
bool ShouldShutDown() const { return local_transport_handle_->ShouldShutDown(); }
|
||||
|
||||
template <class D = std::poisson_distribution<>, class Return = uint64_t>
|
||||
Return Rand(D distrib) {
|
||||
std::random_device rng;
|
||||
return distrib(rng);
|
||||
}
|
||||
};
|
||||
}; // namespace memgraph::io::local_transport
|
139
src/io/local_transport/local_transport_handle.hpp
Normal file
139
src/io/local_transport/local_transport_handle.hpp
Normal file
@ -0,0 +1,139 @@
|
||||
// Copyright 2022 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <chrono>
|
||||
#include <condition_variable>
|
||||
#include <iostream>
|
||||
#include <map>
|
||||
#include <mutex>
|
||||
|
||||
#include "io/errors.hpp"
|
||||
#include "io/message_conversion.hpp"
|
||||
#include "io/time.hpp"
|
||||
#include "io/transport.hpp"
|
||||
|
||||
namespace memgraph::io::local_transport {
|
||||
|
||||
class LocalTransportHandle {
|
||||
mutable std::mutex mu_{};
|
||||
mutable std::condition_variable cv_;
|
||||
bool should_shut_down_ = false;
|
||||
|
||||
// the responses to requests that are being waited on
|
||||
std::map<PromiseKey, DeadlineAndOpaquePromise> promises_;
|
||||
|
||||
// messages that are sent to servers that may later receive them
|
||||
std::vector<OpaqueMessage> can_receive_;
|
||||
|
||||
public:
|
||||
void ShutDown() {
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
should_shut_down_ = true;
|
||||
cv_.notify_all();
|
||||
}
|
||||
|
||||
bool ShouldShutDown() const {
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
return should_shut_down_;
|
||||
}
|
||||
|
||||
static Time Now() {
|
||||
auto nano_time = std::chrono::system_clock::now();
|
||||
return std::chrono::time_point_cast<std::chrono::microseconds>(nano_time);
|
||||
}
|
||||
|
||||
template <Message... Ms>
|
||||
requires(sizeof...(Ms) > 0) RequestResult<Ms...> Receive(Duration timeout) {
|
||||
std::unique_lock lock(mu_);
|
||||
|
||||
Time before = Now();
|
||||
|
||||
while (can_receive_.empty()) {
|
||||
Time now = Now();
|
||||
|
||||
// protection against non-monotonic timesources
|
||||
auto maxed_now = std::max(now, before);
|
||||
auto elapsed = maxed_now - before;
|
||||
|
||||
if (timeout < elapsed) {
|
||||
return TimedOut{};
|
||||
}
|
||||
|
||||
Duration relative_timeout = timeout - elapsed;
|
||||
|
||||
std::cv_status cv_status_value = cv_.wait_for(lock, relative_timeout);
|
||||
|
||||
if (cv_status_value == std::cv_status::timeout) {
|
||||
return TimedOut{};
|
||||
}
|
||||
}
|
||||
|
||||
auto current_message = std::move(can_receive_.back());
|
||||
can_receive_.pop_back();
|
||||
|
||||
auto m_opt = std::move(current_message).Take<Ms...>();
|
||||
|
||||
return std::move(m_opt).value();
|
||||
}
|
||||
|
||||
template <Message M>
|
||||
void Send(Address to_address, Address from_address, RequestId request_id, M &&message) {
|
||||
std::any message_any(std::forward<M>(message));
|
||||
OpaqueMessage opaque_message{
|
||||
.from_address = from_address, .request_id = request_id, .message = std::move(message_any)};
|
||||
|
||||
PromiseKey promise_key{.requester_address = to_address,
|
||||
.request_id = opaque_message.request_id,
|
||||
.replier_address = opaque_message.from_address};
|
||||
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
|
||||
if (promises_.contains(promise_key)) {
|
||||
// complete waiting promise if it's there
|
||||
DeadlineAndOpaquePromise dop = std::move(promises_.at(promise_key));
|
||||
promises_.erase(promise_key);
|
||||
|
||||
dop.promise.Fill(std::move(opaque_message));
|
||||
} else {
|
||||
can_receive_.emplace_back(std::move(opaque_message));
|
||||
}
|
||||
} // lock dropped
|
||||
|
||||
cv_.notify_all();
|
||||
}
|
||||
|
||||
template <Message Request, Message Response>
|
||||
void SubmitRequest(Address to_address, Address from_address, RequestId request_id, Request &&request,
|
||||
Duration timeout, ResponsePromise<Response> promise) {
|
||||
const bool port_matches = to_address.last_known_port == from_address.last_known_port;
|
||||
const bool ip_matches = to_address.last_known_ip == from_address.last_known_ip;
|
||||
|
||||
MG_ASSERT(port_matches && ip_matches);
|
||||
const Time deadline = Now() + timeout;
|
||||
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
|
||||
PromiseKey promise_key{
|
||||
.requester_address = from_address, .request_id = request_id, .replier_address = to_address};
|
||||
OpaquePromise opaque_promise(std::move(promise).ToUnique());
|
||||
DeadlineAndOpaquePromise dop{.deadline = deadline, .promise = std::move(opaque_promise)};
|
||||
promises_.emplace(std::move(promise_key), std::move(dop));
|
||||
} // lock dropped
|
||||
|
||||
Send(to_address, from_address, request_id, std::forward<Request>(request));
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace memgraph::io::local_transport
|
@ -13,13 +13,35 @@
|
||||
|
||||
#include "io/transport.hpp"
|
||||
|
||||
namespace memgraph::io::simulator {
|
||||
namespace memgraph::io {
|
||||
|
||||
using memgraph::io::Duration;
|
||||
using memgraph::io::Message;
|
||||
using memgraph::io::Time;
|
||||
|
||||
struct PromiseKey {
|
||||
Address requester_address;
|
||||
uint64_t request_id;
|
||||
// TODO(tyler) possibly remove replier_address from promise key
|
||||
// once we want to support DSR.
|
||||
Address replier_address;
|
||||
|
||||
public:
|
||||
friend bool operator<(const PromiseKey &lhs, const PromiseKey &rhs) {
|
||||
if (lhs.requester_address != rhs.requester_address) {
|
||||
return lhs.requester_address < rhs.requester_address;
|
||||
}
|
||||
|
||||
if (lhs.request_id != rhs.request_id) {
|
||||
return lhs.request_id < rhs.request_id;
|
||||
}
|
||||
|
||||
return lhs.replier_address < rhs.replier_address;
|
||||
}
|
||||
};
|
||||
|
||||
struct OpaqueMessage {
|
||||
Address to_address;
|
||||
Address from_address;
|
||||
uint64_t request_id;
|
||||
std::any message;
|
||||
@ -65,6 +87,7 @@ struct OpaqueMessage {
|
||||
return RequestEnvelope<Ms...>{
|
||||
.message = std::move(*m_opt),
|
||||
.request_id = request_id,
|
||||
.to_address = to_address,
|
||||
.from_address = from_address,
|
||||
};
|
||||
}
|
||||
@ -99,6 +122,7 @@ class OpaquePromiseTrait : public OpaquePromiseTraitBase {
|
||||
T message = std::any_cast<T>(std::move(opaque_message.message));
|
||||
auto response_envelope = ResponseEnvelope<T>{.message = std::move(message),
|
||||
.request_id = opaque_message.request_id,
|
||||
.to_address = opaque_message.to_address,
|
||||
.from_address = opaque_message.from_address};
|
||||
auto promise = static_cast<ResponsePromise<T> *>(ptr);
|
||||
auto unique_promise = std::unique_ptr<ResponsePromise<T>>(promise);
|
||||
@ -174,4 +198,9 @@ class OpaquePromise {
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace memgraph::io::simulator
|
||||
struct DeadlineAndOpaquePromise {
|
||||
Time deadline;
|
||||
OpaquePromise promise;
|
||||
};
|
||||
|
||||
} // namespace memgraph::io
|
@ -24,7 +24,7 @@
|
||||
|
||||
#include "io/address.hpp"
|
||||
#include "io/errors.hpp"
|
||||
#include "io/simulator/message_conversion.hpp"
|
||||
#include "io/message_conversion.hpp"
|
||||
#include "io/simulator/simulator_config.hpp"
|
||||
#include "io/simulator/simulator_stats.hpp"
|
||||
#include "io/time.hpp"
|
||||
@ -32,35 +32,6 @@
|
||||
|
||||
namespace memgraph::io::simulator {
|
||||
|
||||
using memgraph::io::Duration;
|
||||
using memgraph::io::Time;
|
||||
|
||||
struct PromiseKey {
|
||||
Address requester_address;
|
||||
uint64_t request_id;
|
||||
// TODO(tyler) possibly remove replier_address from promise key
|
||||
// once we want to support DSR.
|
||||
Address replier_address;
|
||||
|
||||
public:
|
||||
friend bool operator<(const PromiseKey &lhs, const PromiseKey &rhs) {
|
||||
if (lhs.requester_address != rhs.requester_address) {
|
||||
return lhs.requester_address < rhs.requester_address;
|
||||
}
|
||||
|
||||
if (lhs.request_id != rhs.request_id) {
|
||||
return lhs.request_id < rhs.request_id;
|
||||
}
|
||||
|
||||
return lhs.replier_address < rhs.replier_address;
|
||||
}
|
||||
};
|
||||
|
||||
struct DeadlineAndOpaquePromise {
|
||||
Time deadline;
|
||||
OpaquePromise promise;
|
||||
};
|
||||
|
||||
class SimulatorHandle {
|
||||
mutable std::mutex mu_{};
|
||||
mutable std::condition_variable cv_;
|
||||
@ -122,14 +93,17 @@ class SimulatorHandle {
|
||||
bool ShouldShutDown() const;
|
||||
|
||||
template <Message Request, Message Response>
|
||||
void SubmitRequest(Address to_address, Address from_address, uint64_t request_id, Request &&request, Duration timeout,
|
||||
ResponsePromise<Response> &&promise) {
|
||||
void SubmitRequest(Address to_address, Address from_address, RequestId request_id, Request &&request,
|
||||
Duration timeout, ResponsePromise<Response> &&promise) {
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
|
||||
const Time deadline = cluster_wide_time_microseconds_ + timeout;
|
||||
|
||||
std::any message(request);
|
||||
OpaqueMessage om{.from_address = from_address, .request_id = request_id, .message = std::move(message)};
|
||||
OpaqueMessage om{.to_address = to_address,
|
||||
.from_address = from_address,
|
||||
.request_id = request_id,
|
||||
.message = std::move(message)};
|
||||
in_flight_.emplace_back(std::make_pair(to_address, std::move(om)));
|
||||
|
||||
PromiseKey promise_key{.requester_address = from_address, .request_id = request_id, .replier_address = to_address};
|
||||
@ -182,10 +156,13 @@ class SimulatorHandle {
|
||||
}
|
||||
|
||||
template <Message M>
|
||||
void Send(Address to_address, Address from_address, uint64_t request_id, M message) {
|
||||
void Send(Address to_address, Address from_address, RequestId request_id, M message) {
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
std::any message_any(std::move(message));
|
||||
OpaqueMessage om{.from_address = from_address, .request_id = request_id, .message = std::move(message_any)};
|
||||
OpaqueMessage om{.to_address = to_address,
|
||||
.from_address = from_address,
|
||||
.request_id = request_id,
|
||||
.message = std::move(message_any)};
|
||||
in_flight_.emplace_back(std::make_pair(std::move(to_address), std::move(om)));
|
||||
|
||||
stats_.total_messages++;
|
||||
|
@ -49,8 +49,8 @@ class SimulatorTransport {
|
||||
}
|
||||
|
||||
template <Message M>
|
||||
void Send(Address address, uint64_t request_id, M message) {
|
||||
return simulator_handle_->template Send<M>(address, address_, request_id, message);
|
||||
void Send(Address to_address, Address from_address, uint64_t request_id, M message) {
|
||||
return simulator_handle_->template Send<M>(to_address, from_address, request_id, message);
|
||||
}
|
||||
|
||||
Time Now() const { return simulator_handle_->Now(); }
|
||||
|
@ -16,6 +16,6 @@
|
||||
namespace memgraph::io {
|
||||
|
||||
using Duration = std::chrono::microseconds;
|
||||
using Time = std::chrono::time_point<std::chrono::local_t, Duration>;
|
||||
using Time = std::chrono::time_point<std::chrono::system_clock, Duration>;
|
||||
|
||||
} // namespace memgraph::io
|
||||
|
@ -32,10 +32,13 @@ using memgraph::utils::BasicResult;
|
||||
template <typename T>
|
||||
concept Message = std::same_as<T, std::decay_t<T>>;
|
||||
|
||||
using RequestId = uint64_t;
|
||||
|
||||
template <Message M>
|
||||
struct ResponseEnvelope {
|
||||
M message;
|
||||
uint64_t request_id;
|
||||
RequestId request_id;
|
||||
Address to_address;
|
||||
Address from_address;
|
||||
};
|
||||
|
||||
@ -51,7 +54,8 @@ using ResponsePromise = memgraph::io::Promise<ResponseResult<M>>;
|
||||
template <Message... Ms>
|
||||
struct RequestEnvelope {
|
||||
std::variant<Ms...> message;
|
||||
uint64_t request_id;
|
||||
RequestId request_id;
|
||||
Address to_address;
|
||||
Address from_address;
|
||||
};
|
||||
|
||||
@ -62,7 +66,7 @@ template <typename I>
|
||||
class Io {
|
||||
I implementation_;
|
||||
Address address_;
|
||||
uint64_t request_id_counter_ = 0;
|
||||
RequestId request_id_counter_ = 0;
|
||||
Duration default_timeout_ = std::chrono::microseconds{50000};
|
||||
|
||||
public:
|
||||
@ -75,7 +79,7 @@ class Io {
|
||||
/// Issue a request with an explicit timeout in microseconds provided. This tends to be used by clients.
|
||||
template <Message Request, Message Response>
|
||||
ResponseFuture<Response> RequestWithTimeout(Address address, Request request, Duration timeout) {
|
||||
const uint64_t request_id = ++request_id_counter_;
|
||||
const RequestId request_id = ++request_id_counter_;
|
||||
return implementation_.template Request<Request, Response>(address, request_id, request, timeout);
|
||||
}
|
||||
|
||||
@ -83,7 +87,7 @@ class Io {
|
||||
/// to be used by clients.
|
||||
template <Message Request, Message Response>
|
||||
ResponseFuture<Response> Request(Address address, Request request) {
|
||||
const uint64_t request_id = ++request_id_counter_;
|
||||
const RequestId request_id = ++request_id_counter_;
|
||||
const Duration timeout = default_timeout_;
|
||||
return implementation_.template Request<Request, Response>(address, request_id, std::move(request), timeout);
|
||||
}
|
||||
@ -107,8 +111,9 @@ class Io {
|
||||
/// responses are not necessarily expected, and for servers to respond to requests.
|
||||
/// If you need reliable delivery, this must be built on-top. TCP is not enough for most use cases.
|
||||
template <Message M>
|
||||
void Send(Address address, uint64_t request_id, M message) {
|
||||
return implementation_.template Send<M>(address, request_id, std::move(message));
|
||||
void Send(Address to_address, RequestId request_id, M message) {
|
||||
Address from_address = address_;
|
||||
return implementation_.template Send<M>(to_address, from_address, request_id, std::move(message));
|
||||
}
|
||||
|
||||
/// The current system time. This time source should be preferred over any other,
|
||||
|
@ -399,4 +399,8 @@ target_link_libraries(${test_prefix}websocket mg-communication Boost::headers)
|
||||
|
||||
# Test future
|
||||
add_unit_test(future.cpp)
|
||||
target_link_libraries(${test_prefix}future mg-io)
|
||||
target_link_libraries(${test_prefix}future mg-io)
|
||||
|
||||
# Test Local Transport
|
||||
add_unit_test(local_transport.cpp)
|
||||
target_link_libraries(${test_prefix}local_transport mg-io)
|
||||
|
86
tests/unit/local_transport.cpp
Normal file
86
tests/unit/local_transport.cpp
Normal file
@ -0,0 +1,86 @@
|
||||
// Copyright 2022 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include <chrono>
|
||||
#include <thread>
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include "io/local_transport/local_system.hpp"
|
||||
#include "io/local_transport/local_transport.hpp"
|
||||
#include "io/transport.hpp"
|
||||
|
||||
namespace memgraph::io::tests {
|
||||
|
||||
using memgraph::io::local_transport::LocalSystem;
|
||||
using memgraph::io::local_transport::LocalTransport;
|
||||
|
||||
struct CounterRequest {
|
||||
uint64_t proposal;
|
||||
};
|
||||
|
||||
struct CounterResponse {
|
||||
uint64_t highest_seen;
|
||||
};
|
||||
|
||||
void RunServer(Io<LocalTransport> io) {
|
||||
uint64_t highest_seen = 0;
|
||||
|
||||
while (!io.ShouldShutDown()) {
|
||||
spdlog::info("[SERVER] Is receiving...");
|
||||
auto request_result = io.Receive<CounterRequest>();
|
||||
if (request_result.HasError()) {
|
||||
spdlog::info("[SERVER] timed out, continue");
|
||||
continue;
|
||||
}
|
||||
auto request_envelope = request_result.GetValue();
|
||||
ASSERT_TRUE(std::holds_alternative<CounterRequest>(request_envelope.message));
|
||||
auto req = std::get<CounterRequest>(request_envelope.message);
|
||||
|
||||
highest_seen = std::max(highest_seen, req.proposal);
|
||||
auto srv_res = CounterResponse{highest_seen};
|
||||
|
||||
io.Send(request_envelope.from_address, request_envelope.request_id, srv_res);
|
||||
}
|
||||
}
|
||||
|
||||
TEST(LocalTransport, BasicRequest) {
|
||||
LocalSystem local_system;
|
||||
|
||||
// rely on uuid to be unique on default Address
|
||||
auto cli_addr = Address::UniqueLocalAddress();
|
||||
auto srv_addr = Address::UniqueLocalAddress();
|
||||
|
||||
Io<LocalTransport> cli_io = local_system.Register(cli_addr);
|
||||
Io<LocalTransport> srv_io = local_system.Register(srv_addr);
|
||||
|
||||
auto srv_thread = std::jthread(RunServer, std::move(srv_io));
|
||||
|
||||
for (int i = 1; i < 3; ++i) {
|
||||
// send request
|
||||
CounterRequest cli_req;
|
||||
auto value = 1; // i;
|
||||
cli_req.proposal = value;
|
||||
spdlog::info("[CLIENT] sending request");
|
||||
auto res_f = cli_io.Request<CounterRequest, CounterResponse>(srv_addr, cli_req);
|
||||
spdlog::info("[CLIENT] waiting on future");
|
||||
|
||||
auto res_rez = std::move(res_f).Wait();
|
||||
spdlog::info("[CLIENT] future returned");
|
||||
MG_ASSERT(!res_rez.HasError());
|
||||
spdlog::info("[CLIENT] Got a valid response");
|
||||
auto env = res_rez.GetValue();
|
||||
MG_ASSERT(env.message.highest_seen == value);
|
||||
}
|
||||
|
||||
local_system.ShutDown();
|
||||
}
|
||||
} // namespace memgraph::io::tests
|
Loading…
Reference in New Issue
Block a user