Transport prototype (#466)

This commit is contained in:
Tyler Neely 2022-08-12 08:24:32 +02:00 committed by GitHub
parent 5012824e05
commit 14c9e68456
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 1551 additions and 1 deletions

View File

@ -171,7 +171,7 @@ jobs:
# Run leftover CTest tests (all except unit and benchmark tests).
cd build
ctest -E "(memgraph__unit|memgraph__benchmark)" --output-on-failure
ctest -E "(memgraph__unit|memgraph__benchmark|memgraph__simulation)" --output-on-failure
- name: Run drivers tests
run: |
@ -262,6 +262,15 @@ jobs:
cd build
ctest -R memgraph__unit --output-on-failure -j$THREADS
- name: Run simulation tests
run: |
# Activate toolchain.
source /opt/toolchain-v4/activate
# Run unit tests.
cd build
ctest -R memgraph__simulation --output-on-failure -j$THREADS
- name: Run e2e tests
run: |
# TODO(gitbuda): Setup mgclient and pymgclient properly.

View File

@ -5,6 +5,7 @@ add_subdirectory(lisp)
add_subdirectory(utils)
add_subdirectory(requests)
add_subdirectory(io)
add_subdirectory(io/simulator)
add_subdirectory(kvstore)
add_subdirectory(telemetry)
add_subdirectory(communication)

60
src/io/address.hpp Normal file
View File

@ -0,0 +1,60 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <compare>
#include <fmt/format.h>
#include <boost/asio/ip/tcp.hpp>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_io.hpp>
namespace memgraph::io {
struct Address {
// It's important for all participants to have a
// unique identifier - IP and port alone are not
// enough, and may change over the lifecycle of
// the nodes. Particularly storage nodes may change
// their IP addresses over time, and the system
// should gracefully update its information
// about them.
boost::uuids::uuid unique_id;
boost::asio::ip::address last_known_ip;
uint16_t last_known_port;
static Address TestAddress(uint16_t port) {
Address ret;
ret.last_known_port = port;
return ret;
}
friend bool operator==(const Address &lhs, const Address &rhs) = default;
/// unique_id is most dominant for ordering, then last_known_ip, then last_known_port
friend bool operator<(const Address &lhs, const Address &rhs) {
if (lhs.unique_id != rhs.unique_id) {
return lhs.unique_id < rhs.unique_id;
}
if (lhs.last_known_ip != rhs.last_known_ip) {
return lhs.last_known_ip < rhs.last_known_ip;
}
return lhs.last_known_port < rhs.last_known_port;
}
std::string ToString() const {
return fmt::format("Address {{ unique_id: {}, last_known_ip: {}, last_known_port: {} }}",
boost::uuids::to_string(unique_id), last_known_ip.to_string(), last_known_port);
}
};
}; // namespace memgraph::io

26
src/io/errors.hpp Normal file
View File

@ -0,0 +1,26 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
namespace memgraph::io {
// Signifies that a retriable operation was unable to
// complete after a configured number of retries.
struct RetriesExhausted {};
// Signifies that a request was unable to receive a response
// within some configured timeout duration. It is important
// to remember that in distributed systems, a timeout does
// not signify that a request was not received or processed.
// It may be the case that the request was fully processed
// but that the response was not received.
struct TimedOut {};
}; // namespace memgraph::io

262
src/io/future.hpp Normal file
View File

@ -0,0 +1,262 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <condition_variable>
#include <memory>
#include <mutex>
#include <optional>
#include <thread>
#include <utility>
#include "io/errors.hpp"
#include "utils/logging.hpp"
namespace memgraph::io {
// Shared is in an anonymous namespace, and the only way to
// construct a Promise or Future is to pass a Shared in. This
// ensures that Promises and Futures can only be constructed
// in this translation unit.
namespace details {
template <typename T>
class Shared {
mutable std::condition_variable cv_;
mutable std::mutex mu_;
std::optional<T> item_;
bool consumed_ = false;
bool waiting_ = false;
std::function<bool()> simulator_notifier_ = nullptr;
public:
explicit Shared(std::function<bool()> simulator_notifier) : simulator_notifier_(simulator_notifier) {}
Shared() = default;
Shared(Shared &&) = delete;
Shared &operator=(Shared &&) = delete;
Shared(const Shared &) = delete;
Shared &operator=(const Shared &) = delete;
~Shared() = default;
/// Takes the item out of our optional item_ and returns it.
T Take() {
MG_ASSERT(item_, "Take called without item_ being present");
MG_ASSERT(!consumed_, "Take called on already-consumed Future");
T ret = std::move(item_).value();
item_.reset();
consumed_ = true;
return ret;
}
T Wait() {
std::unique_lock<std::mutex> lock(mu_);
waiting_ = true;
while (!item_) {
bool simulator_progressed = false;
if (simulator_notifier_) [[unlikely]] {
// We can't hold our own lock while notifying
// the simulator because notifying the simulator
// involves acquiring the simulator's mutex
// to guarantee that our notification linearizes
// with the simulator's condition variable.
// However, the simulator may acquire our
// mutex to check if we are being awaited,
// while determining system quiescence,
// so we have to get out of its way to avoid
// a cyclical deadlock.
lock.unlock();
simulator_progressed = std::invoke(simulator_notifier_);
lock.lock();
if (item_) {
// item may have been filled while we
// had dropped our mutex while notifying
// the simulator of our waiting_ status.
break;
}
}
if (!simulator_progressed) [[likely]] {
cv_.wait(lock);
}
MG_ASSERT(!consumed_, "Future consumed twice!");
}
waiting_ = false;
return Take();
}
bool IsReady() const {
std::unique_lock<std::mutex> lock(mu_);
return item_;
}
std::optional<T> TryGet() {
std::unique_lock<std::mutex> lock(mu_);
if (item_) {
return Take();
}
return std::nullopt;
}
void Fill(T item) {
{
std::unique_lock<std::mutex> lock(mu_);
MG_ASSERT(!consumed_, "Promise filled after it was already consumed!");
MG_ASSERT(!item_, "Promise filled twice!");
item_ = item;
} // lock released before condition variable notification
cv_.notify_all();
}
bool IsAwaited() const {
std::unique_lock<std::mutex> lock(mu_);
return waiting_;
}
};
} // namespace details
template <typename T>
class Future {
bool consumed_or_moved_ = false;
std::shared_ptr<details::Shared<T>> shared_;
public:
explicit Future(std::shared_ptr<details::Shared<T>> shared) : shared_(shared) {}
Future() = delete;
Future(Future &&old) noexcept {
MG_ASSERT(!old.consumed_or_moved_, "Future moved from after already being moved from or consumed.");
shared_ = std::move(old.shared_);
consumed_or_moved_ = old.consumed_or_moved_;
old.consumed_or_moved_ = true;
}
Future &operator=(Future &&old) noexcept {
MG_ASSERT(!old.consumed_or_moved_, "Future moved from after already being moved from or consumed.");
shared_ = std::move(old.shared_);
old.consumed_or_moved_ = true;
}
Future(const Future &) = delete;
Future &operator=(const Future &) = delete;
~Future() = default;
/// Returns true if the Future is ready to
/// be consumed using TryGet or Wait (prefer Wait
/// if you know it's ready, because it doesn't
/// return an optional.
bool IsReady() {
MG_ASSERT(!consumed_or_moved_, "Called IsReady after Future already consumed!");
return shared_->IsReady();
}
/// Non-blocking method that returns the inner
/// item if it's already ready, or std::nullopt
/// if it is not ready yet.
std::optional<T> TryGet() {
MG_ASSERT(!consumed_or_moved_, "Called TryGet after Future already consumed!");
std::optional<T> ret = shared_->TryGet();
if (ret) {
consumed_or_moved_ = true;
}
return ret;
}
/// Block on the corresponding promise to be filled,
/// returning the inner item when ready.
T Wait() && {
MG_ASSERT(!consumed_or_moved_, "Future should only be consumed with Wait once!");
T ret = shared_->Wait();
consumed_or_moved_ = true;
return ret;
}
/// Marks this Future as canceled.
void Cancel() {
MG_ASSERT(!consumed_or_moved_, "Future::Cancel called on a future that was already moved or consumed!");
consumed_or_moved_ = true;
}
};
template <typename T>
class Promise {
std::shared_ptr<details::Shared<T>> shared_;
bool filled_or_moved_ = false;
public:
explicit Promise(std::shared_ptr<details::Shared<T>> shared) : shared_(shared) {}
Promise() = delete;
Promise(Promise &&old) noexcept {
MG_ASSERT(!old.filled_or_moved_, "Promise moved from after already being moved from or filled.");
shared_ = std::move(old.shared_);
old.filled_or_moved_ = true;
}
Promise &operator=(Promise &&old) noexcept {
MG_ASSERT(!old.filled_or_moved_, "Promise moved from after already being moved from or filled.");
shared_ = std::move(old.shared_);
old.filled_or_moved_ = true;
}
Promise(const Promise &) = delete;
Promise &operator=(const Promise &) = delete;
~Promise() { MG_ASSERT(filled_or_moved_, "Promise destroyed before its associated Future was filled!"); }
// Fill the expected item into the Future.
void Fill(T item) {
MG_ASSERT(!filled_or_moved_, "Promise::Fill called on a promise that is already filled or moved!");
shared_->Fill(item);
filled_or_moved_ = true;
}
bool IsAwaited() { return shared_->IsAwaited(); }
/// Moves this Promise into a unique_ptr.
std::unique_ptr<Promise<T>> ToUnique() && {
std::unique_ptr<Promise<T>> up = std::make_unique<Promise<T>>(std::move(shared_));
filled_or_moved_ = true;
return up;
}
};
template <typename T>
std::pair<Future<T>, Promise<T>> FuturePromisePair() {
std::shared_ptr<details::Shared<T>> shared = std::make_shared<details::Shared<T>>();
Future<T> future = Future<T>(shared);
Promise<T> promise = Promise<T>(shared);
return std::make_pair(std::move(future), std::move(promise));
}
template <typename T>
std::pair<Future<T>, Promise<T>> FuturePromisePairWithNotifier(std::function<bool()> simulator_notifier) {
std::shared_ptr<details::Shared<T>> shared = std::make_shared<details::Shared<T>>(simulator_notifier);
Future<T> future = Future<T>(shared);
Promise<T> promise = Promise<T>(shared);
return std::make_pair(std::move(future), std::move(promise));
}
}; // namespace memgraph::io

View File

@ -0,0 +1,8 @@
set(io_simulator_sources
simulator_handle.cpp)
find_package(fmt REQUIRED)
find_package(Threads REQUIRED)
add_library(mg-io-simulator STATIC ${io_simulator_sources})
target_link_libraries(mg-io-simulator stdc++fs Threads::Threads fmt::fmt mg-utils)

View File

@ -0,0 +1,177 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include "io/transport.hpp"
namespace memgraph::io::simulator {
using memgraph::io::Duration;
using memgraph::io::Message;
using memgraph::io::Time;
struct OpaqueMessage {
Address from_address;
uint64_t request_id;
std::any message;
/// Recursively tries to match a specific type from the outer
/// variant's parameter pack against the type of the std::any,
/// and if it matches, make it concrete and return it. Otherwise,
/// move on and compare the any with the next type from the
/// parameter pack.
///
/// Return is the full std::variant<Ts...> type that holds the
/// full parameter pack without interfering with recursive
/// narrowing expansion.
template <typename Return, Message Head, Message... Rest>
std::optional<Return> Unpack(std::any &&a) {
if (typeid(Head) == a.type()) {
Head concrete = std::any_cast<Head>(std::move(a));
return concrete;
}
if constexpr (sizeof...(Rest) > 0) {
return Unpack<Return, Rest...>(std::move(a));
} else {
return std::nullopt;
}
}
/// High level "user-facing" conversion function that lets
/// people interested in conversion only supply a single
/// parameter pack for the types that they want to compare
/// with the any and potentially include in the returned
/// variant.
template <Message... Ms>
requires(sizeof...(Ms) > 0) std::optional<std::variant<Ms...>> VariantFromAny(std::any &&a) {
return Unpack<std::variant<Ms...>, Ms...>(std::move(a));
}
template <Message... Ms>
requires(sizeof...(Ms) > 0) std::optional<RequestEnvelope<Ms...>> Take() && {
std::optional<std::variant<Ms...>> m_opt = VariantFromAny<Ms...>(std::move(message));
if (m_opt) {
return RequestEnvelope<Ms...>{
.message = std::move(*m_opt),
.request_id = request_id,
.from_address = from_address,
};
}
return std::nullopt;
}
};
class OpaquePromiseTraitBase {
public:
virtual const std::type_info *TypeInfo() const = 0;
virtual bool IsAwaited(void *ptr) const = 0;
virtual void Fill(void *ptr, OpaqueMessage &&) const = 0;
virtual void TimeOut(void *ptr) const = 0;
virtual ~OpaquePromiseTraitBase() = default;
OpaquePromiseTraitBase() = default;
OpaquePromiseTraitBase(const OpaquePromiseTraitBase &) = delete;
OpaquePromiseTraitBase &operator=(const OpaquePromiseTraitBase &) = delete;
OpaquePromiseTraitBase(OpaquePromiseTraitBase &&old) = delete;
OpaquePromiseTraitBase &operator=(OpaquePromiseTraitBase &&) = delete;
};
template <typename T>
class OpaquePromiseTrait : public OpaquePromiseTraitBase {
public:
const std::type_info *TypeInfo() const override { return &typeid(T); };
bool IsAwaited(void *ptr) const override { return static_cast<ResponsePromise<T> *>(ptr)->IsAwaited(); };
void Fill(void *ptr, OpaqueMessage &&opaque_message) const override {
T message = std::any_cast<T>(std::move(opaque_message.message));
auto response_envelope = ResponseEnvelope<T>{.message = std::move(message),
.request_id = opaque_message.request_id,
.from_address = opaque_message.from_address};
auto promise = static_cast<ResponsePromise<T> *>(ptr);
auto unique_promise = std::unique_ptr<ResponsePromise<T>>(promise);
unique_promise->Fill(std::move(response_envelope));
};
void TimeOut(void *ptr) const override {
auto promise = static_cast<ResponsePromise<T> *>(ptr);
auto unique_promise = std::unique_ptr<ResponsePromise<T>>(promise);
ResponseResult<T> result = TimedOut{};
unique_promise->Fill(std::move(result));
}
};
class OpaquePromise {
void *ptr_;
std::unique_ptr<OpaquePromiseTraitBase> trait_;
public:
OpaquePromise(OpaquePromise &&old) noexcept : ptr_(old.ptr_), trait_(std::move(old.trait_)) {
MG_ASSERT(old.ptr_ != nullptr);
old.ptr_ = nullptr;
}
OpaquePromise &operator=(OpaquePromise &&old) noexcept {
MG_ASSERT(ptr_ == nullptr);
MG_ASSERT(old.ptr_ != nullptr);
MG_ASSERT(this != &old);
ptr_ = old.ptr_;
trait_ = std::move(old.trait_);
old.ptr_ = nullptr;
return *this;
}
OpaquePromise(const OpaquePromise &) = delete;
OpaquePromise &operator=(const OpaquePromise &) = delete;
template <typename T>
std::unique_ptr<ResponsePromise<T>> Take() && {
MG_ASSERT(typeid(T) == *trait_->TypeInfo());
MG_ASSERT(ptr_ != nullptr);
auto ptr = static_cast<ResponsePromise<T> *>(ptr_);
ptr_ = nullptr;
return std::unique_ptr<T>(ptr);
}
template <typename T>
explicit OpaquePromise(std::unique_ptr<ResponsePromise<T>> promise)
: ptr_(static_cast<void *>(promise.release())), trait_(std::make_unique<OpaquePromiseTrait<T>>()) {}
bool IsAwaited() {
MG_ASSERT(ptr_ != nullptr);
return trait_->IsAwaited(ptr_);
}
void TimeOut() {
MG_ASSERT(ptr_ != nullptr);
trait_->TimeOut(ptr_);
ptr_ = nullptr;
}
void Fill(OpaqueMessage &&opaque_message) {
MG_ASSERT(ptr_ != nullptr);
trait_->Fill(ptr_, std::move(opaque_message));
ptr_ = nullptr;
}
~OpaquePromise() {
MG_ASSERT(ptr_ == nullptr, "OpaquePromise destroyed without being explicitly timed out or filled");
}
};
} // namespace memgraph::io::simulator

View File

@ -0,0 +1,45 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <memory>
#include <random>
#include "io/address.hpp"
#include "io/simulator/simulator_config.hpp"
#include "io/simulator/simulator_handle.hpp"
#include "io/simulator/simulator_transport.hpp"
namespace memgraph::io::simulator {
class Simulator {
std::mt19937 rng_;
std::shared_ptr<SimulatorHandle> simulator_handle_;
public:
explicit Simulator(SimulatorConfig config)
: rng_(std::mt19937{config.rng_seed}), simulator_handle_{std::make_shared<SimulatorHandle>(config)} {}
void ShutDown() { simulator_handle_->ShutDown(); }
Io<SimulatorTransport> Register(Address address) {
std::uniform_int_distribution<uint64_t> seed_distrib;
uint64_t seed = seed_distrib(rng_);
return Io{SimulatorTransport{simulator_handle_, address, seed}, address};
}
void IncrementServerCountAndWaitForQuiescentState(Address address) {
simulator_handle_->IncrementServerCountAndWaitForQuiescentState(address);
}
SimulatorStats Stats() { return simulator_handle_->Stats(); }
};
}; // namespace memgraph::io::simulator

View File

@ -0,0 +1,30 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <chrono>
#include "io/time.hpp"
namespace memgraph::io::simulator {
using memgraph::io::Time;
struct SimulatorConfig {
uint8_t drop_percent = 0;
bool perform_timeouts = false;
bool scramble_messages = true;
uint64_t rng_seed = 0;
Time start_time = Time::min();
Time abort_time = Time::max();
};
}; // namespace memgraph::io::simulator

View File

@ -0,0 +1,154 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "io/simulator/simulator_handle.hpp"
#include "io/address.hpp"
#include "io/errors.hpp"
#include "io/simulator/simulator_config.hpp"
#include "io/simulator/simulator_stats.hpp"
#include "io/time.hpp"
#include "io/transport.hpp"
namespace memgraph::io::simulator {
using memgraph::io::Duration;
using memgraph::io::Time;
void SimulatorHandle::ShutDown() {
std::unique_lock<std::mutex> lock(mu_);
should_shut_down_ = true;
cv_.notify_all();
}
bool SimulatorHandle::ShouldShutDown() const {
std::unique_lock<std::mutex> lock(mu_);
return should_shut_down_;
}
void SimulatorHandle::IncrementServerCountAndWaitForQuiescentState(Address address) {
std::unique_lock<std::mutex> lock(mu_);
server_addresses_.insert(address);
while (true) {
const size_t blocked_servers = BlockedServers();
const bool all_servers_blocked = blocked_servers == server_addresses_.size();
if (all_servers_blocked) {
return;
}
cv_.wait(lock);
}
}
size_t SimulatorHandle::BlockedServers() {
size_t blocked_servers = blocked_on_receive_;
for (auto &[promise_key, opaque_promise] : promises_) {
if (opaque_promise.promise.IsAwaited() && server_addresses_.contains(promise_key.requester_address)) {
blocked_servers++;
}
}
return blocked_servers;
}
bool SimulatorHandle::MaybeTickSimulator() {
std::unique_lock<std::mutex> lock(mu_);
const size_t blocked_servers = BlockedServers();
if (blocked_servers < server_addresses_.size()) {
// we only need to advance the simulator when all
// servers have reached a quiescent state, blocked
// on their own futures or receive methods.
return false;
}
stats_.simulator_ticks++;
cv_.notify_all();
TimeoutPromisesPastDeadline();
if (in_flight_.empty()) {
// return early here because there are no messages to schedule
// We tick the clock forward when all servers are blocked but
// there are no in-flight messages to schedule delivery of.
std::poisson_distribution<> time_distrib(50);
Duration clock_advance = std::chrono::microseconds{time_distrib(rng_)};
cluster_wide_time_microseconds_ += clock_advance;
MG_ASSERT(cluster_wide_time_microseconds_ < config_.abort_time,
"Cluster has executed beyond its configured abort_time, and something may be failing to make progress "
"in an expected amount of time.");
return true;
}
if (config_.scramble_messages) {
// scramble messages
std::uniform_int_distribution<size_t> swap_distrib(0, in_flight_.size() - 1);
const size_t swap_index = swap_distrib(rng_);
std::swap(in_flight_[swap_index], in_flight_.back());
}
auto [to_address, opaque_message] = std::move(in_flight_.back());
in_flight_.pop_back();
std::uniform_int_distribution<int> drop_distrib(0, 99);
const int drop_threshold = drop_distrib(rng_);
const bool should_drop = drop_threshold < config_.drop_percent;
if (should_drop) {
stats_.dropped_messages++;
}
PromiseKey promise_key{.requester_address = to_address,
.request_id = opaque_message.request_id,
.replier_address = opaque_message.from_address};
if (promises_.contains(promise_key)) {
// complete waiting promise if it's there
DeadlineAndOpaquePromise dop = std::move(promises_.at(promise_key));
promises_.erase(promise_key);
const bool normal_timeout = config_.perform_timeouts && (dop.deadline < cluster_wide_time_microseconds_);
if (should_drop || normal_timeout) {
stats_.timed_out_requests++;
dop.promise.TimeOut();
} else {
stats_.total_responses++;
dop.promise.Fill(std::move(opaque_message));
}
} else if (should_drop) {
// don't add it anywhere, let it drop
} else {
// add to can_receive_ if not
const auto &[om_vec, inserted] = can_receive_.try_emplace(to_address, std::vector<OpaqueMessage>());
om_vec->second.emplace_back(std::move(opaque_message));
}
return true;
}
Time SimulatorHandle::Now() const {
std::unique_lock<std::mutex> lock(mu_);
return cluster_wide_time_microseconds_;
}
SimulatorStats SimulatorHandle::Stats() {
std::unique_lock<std::mutex> lock(mu_);
return stats_;
}
} // namespace memgraph::io::simulator

View File

@ -0,0 +1,206 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <any>
#include <compare>
#include <iostream>
#include <map>
#include <memory>
#include <optional>
#include <set>
#include <utility>
#include <variant>
#include <vector>
#include "io/address.hpp"
#include "io/errors.hpp"
#include "io/simulator/message_conversion.hpp"
#include "io/simulator/simulator_config.hpp"
#include "io/simulator/simulator_stats.hpp"
#include "io/time.hpp"
#include "io/transport.hpp"
namespace memgraph::io::simulator {
using memgraph::io::Duration;
using memgraph::io::Time;
struct PromiseKey {
Address requester_address;
uint64_t request_id;
// TODO(tyler) possibly remove replier_address from promise key
// once we want to support DSR.
Address replier_address;
public:
friend bool operator<(const PromiseKey &lhs, const PromiseKey &rhs) {
if (lhs.requester_address != rhs.requester_address) {
return lhs.requester_address < rhs.requester_address;
}
if (lhs.request_id != rhs.request_id) {
return lhs.request_id < rhs.request_id;
}
return lhs.replier_address < rhs.replier_address;
}
};
struct DeadlineAndOpaquePromise {
Time deadline;
OpaquePromise promise;
};
class SimulatorHandle {
mutable std::mutex mu_{};
mutable std::condition_variable cv_;
// messages that have not yet been scheduled or dropped
std::vector<std::pair<Address, OpaqueMessage>> in_flight_;
// the responses to requests that are being waited on
std::map<PromiseKey, DeadlineAndOpaquePromise> promises_;
// messages that are sent to servers that may later receive them
std::map<Address, std::vector<OpaqueMessage>> can_receive_;
Time cluster_wide_time_microseconds_;
bool should_shut_down_ = false;
SimulatorStats stats_;
size_t blocked_on_receive_ = 0;
std::set<Address> server_addresses_;
std::mt19937 rng_;
SimulatorConfig config_;
/// Returns the number of servers currently blocked on Receive, plus
/// the servers that are blocked on Futures that were created through
/// SimulatorTransport::Request.
///
/// TODO(tyler) investigate whether avoiding consideration of Futures
/// increases determinism.
size_t BlockedServers();
void TimeoutPromisesPastDeadline() {
const Time now = cluster_wide_time_microseconds_;
for (auto &[promise_key, dop] : promises_) {
if (dop.deadline < now) {
spdlog::debug("timing out request from requester {} to replier {}.", promise_key.requester_address.ToString(),
promise_key.replier_address.ToString());
std::move(dop).promise.TimeOut();
promises_.erase(promise_key);
stats_.timed_out_requests++;
}
}
}
public:
explicit SimulatorHandle(SimulatorConfig config)
: cluster_wide_time_microseconds_(config.start_time), rng_(config.rng_seed), config_(config) {}
void IncrementServerCountAndWaitForQuiescentState(Address address);
/// This method causes most of the interesting simulation logic to happen, wrt network behavior.
/// It checks to see if all background "server" threads are blocked on new messages, and if so,
/// it will decide whether to drop, reorder, or deliver in-flight messages based on the SimulatorConfig
/// that was used to create the Simulator.
bool MaybeTickSimulator();
void ShutDown();
bool ShouldShutDown() const;
template <Message Request, Message Response>
void SubmitRequest(Address to_address, Address from_address, uint64_t request_id, Request &&request, Duration timeout,
ResponsePromise<Response> &&promise) {
std::unique_lock<std::mutex> lock(mu_);
const Time deadline = cluster_wide_time_microseconds_ + timeout;
std::any message(request);
OpaqueMessage om{.from_address = from_address, .request_id = request_id, .message = std::move(message)};
in_flight_.emplace_back(std::make_pair(to_address, std::move(om)));
PromiseKey promise_key{.requester_address = from_address, .request_id = request_id, .replier_address = to_address};
OpaquePromise opaque_promise(std::move(promise).ToUnique());
DeadlineAndOpaquePromise dop{.deadline = deadline, .promise = std::move(opaque_promise)};
promises_.emplace(std::move(promise_key), std::move(dop));
stats_.total_messages++;
stats_.total_requests++;
cv_.notify_all();
}
template <Message... Ms>
requires(sizeof...(Ms) > 0) RequestResult<Ms...> Receive(const Address &receiver, Duration timeout) {
std::unique_lock<std::mutex> lock(mu_);
blocked_on_receive_ += 1;
const Time deadline = cluster_wide_time_microseconds_ + timeout;
while (!should_shut_down_ && (cluster_wide_time_microseconds_ < deadline)) {
if (can_receive_.contains(receiver)) {
std::vector<OpaqueMessage> &can_rx = can_receive_.at(receiver);
if (!can_rx.empty()) {
OpaqueMessage message = std::move(can_rx.back());
can_rx.pop_back();
// TODO(tyler) search for item in can_receive_ that matches the desired types, rather
// than asserting that the last item in can_rx matches.
auto m_opt = std::move(message).Take<Ms...>();
blocked_on_receive_ -= 1;
return std::move(m_opt).value();
}
}
lock.unlock();
bool made_progress = MaybeTickSimulator();
lock.lock();
if (!should_shut_down_ && !made_progress) {
cv_.wait(lock);
}
}
blocked_on_receive_ -= 1;
return TimedOut{};
}
template <Message M>
void Send(Address to_address, Address from_address, uint64_t request_id, M message) {
std::unique_lock<std::mutex> lock(mu_);
std::any message_any(std::move(message));
OpaqueMessage om{.from_address = from_address, .request_id = request_id, .message = std::move(message_any)};
in_flight_.emplace_back(std::make_pair(std::move(to_address), std::move(om)));
stats_.total_messages++;
cv_.notify_all();
}
Time Now() const;
template <class D = std::poisson_distribution<>, class Return = uint64_t>
Return Rand(D distrib) {
std::unique_lock<std::mutex> lock(mu_);
return distrib(rng_);
}
SimulatorStats Stats();
};
}; // namespace memgraph::io::simulator

View File

@ -0,0 +1,25 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <cstdint>
namespace memgraph::io::simulator {
struct SimulatorStats {
uint64_t total_messages = 0;
uint64_t dropped_messages = 0;
uint64_t timed_out_requests = 0;
uint64_t total_requests = 0;
uint64_t total_responses = 0;
uint64_t simulator_ticks = 0;
};
}; // namespace memgraph::io::simulator

View File

@ -0,0 +1,65 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <memory>
#include <utility>
#include "io/address.hpp"
#include "io/simulator/simulator_handle.hpp"
#include "io/time.hpp"
namespace memgraph::io::simulator {
using memgraph::io::Duration;
using memgraph::io::Time;
class SimulatorTransport {
std::shared_ptr<SimulatorHandle> simulator_handle_;
const Address address_;
std::mt19937 rng_;
public:
SimulatorTransport(std::shared_ptr<SimulatorHandle> simulator_handle, Address address, uint64_t seed)
: simulator_handle_(simulator_handle), address_(address), rng_(std::mt19937{seed}) {}
template <Message Request, Message Response>
ResponseFuture<Response> Request(Address address, uint64_t request_id, Request request, Duration timeout) {
std::function<bool()> maybe_tick_simulator = [this] { return simulator_handle_->MaybeTickSimulator(); };
auto [future, promise] =
memgraph::io::FuturePromisePairWithNotifier<ResponseResult<Response>>(maybe_tick_simulator);
simulator_handle_->SubmitRequest(address, address_, request_id, std::move(request), timeout, std::move(promise));
return std::move(future);
}
template <Message... Ms>
requires(sizeof...(Ms) > 0) RequestResult<Ms...> Receive(Duration timeout) {
return simulator_handle_->template Receive<Ms...>(address_, timeout);
}
template <Message M>
void Send(Address address, uint64_t request_id, M message) {
return simulator_handle_->template Send<M>(address, address_, request_id, message);
}
Time Now() const { return simulator_handle_->Now(); }
bool ShouldShutDown() const { return simulator_handle_->ShouldShutDown(); }
template <class D = std::poisson_distribution<>, class Return = uint64_t>
Return Rand(D distrib) {
return distrib(rng_);
}
};
}; // namespace memgraph::io::simulator

21
src/io/time.hpp Normal file
View File

@ -0,0 +1,21 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <chrono>
namespace memgraph::io {
using Duration = std::chrono::microseconds;
using Time = std::chrono::time_point<std::chrono::local_t, Duration>;
} // namespace memgraph::io

130
src/io/transport.hpp Normal file
View File

@ -0,0 +1,130 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <chrono>
#include <concepts>
#include <random>
#include <variant>
#include "io/address.hpp"
#include "io/errors.hpp"
#include "io/future.hpp"
#include "io/time.hpp"
#include "utils/result.hpp"
namespace memgraph::io {
using memgraph::utils::BasicResult;
// TODO(tyler) ensure that Message continues to represent
// reasonable constraints around message types over time,
// as we adapt things to use Thrift-generated message types.
template <typename T>
concept Message = std::same_as<T, std::decay_t<T>>;
template <Message M>
struct ResponseEnvelope {
M message;
uint64_t request_id;
Address from_address;
};
template <Message M>
using ResponseResult = BasicResult<TimedOut, ResponseEnvelope<M>>;
template <Message M>
using ResponseFuture = memgraph::io::Future<ResponseResult<M>>;
template <Message M>
using ResponsePromise = memgraph::io::Promise<ResponseResult<M>>;
template <Message... Ms>
struct RequestEnvelope {
std::variant<Ms...> message;
uint64_t request_id;
Address from_address;
};
template <Message... Ms>
using RequestResult = BasicResult<TimedOut, RequestEnvelope<Ms...>>;
template <typename I>
class Io {
I implementation_;
Address address_;
uint64_t request_id_counter_ = 0;
Duration default_timeout_ = std::chrono::microseconds{50000};
public:
Io(I io, Address address) : implementation_(io), address_(address) {}
/// Set the default timeout for all requests that are issued
/// without an explicit timeout set.
void SetDefaultTimeout(Duration timeout) { default_timeout_ = timeout; }
/// Issue a request with an explicit timeout in microseconds provided. This tends to be used by clients.
template <Message Request, Message Response>
ResponseFuture<Response> RequestWithTimeout(Address address, Request request, Duration timeout) {
const uint64_t request_id = ++request_id_counter_;
return implementation_.template Request<Request, Response>(address, request_id, request, timeout);
}
/// Issue a request that times out after the default timeout. This tends
/// to be used by clients.
template <Message Request, Message Response>
ResponseFuture<Response> Request(Address address, Request request) {
const uint64_t request_id = ++request_id_counter_;
const Duration timeout = default_timeout_;
return implementation_.template Request<Request, Response>(address, request_id, std::move(request), timeout);
}
/// Wait for an explicit number of microseconds for a request of one of the
/// provided types to arrive. This tends to be used by servers.
template <Message... Ms>
RequestResult<Ms...> ReceiveWithTimeout(Duration timeout) {
return implementation_.template Receive<Ms...>(timeout);
}
/// Wait the default number of microseconds for a request of one of the
/// provided types to arrive. This tends to be used by servers.
template <Message... Ms>
requires(sizeof...(Ms) > 0) RequestResult<Ms...> Receive() {
const Duration timeout = default_timeout_;
return implementation_.template Receive<Ms...>(timeout);
}
/// Send a message in a best-effort fashion. This is used for messaging where
/// responses are not necessarily expected, and for servers to respond to requests.
/// If you need reliable delivery, this must be built on-top. TCP is not enough for most use cases.
template <Message M>
void Send(Address address, uint64_t request_id, M message) {
return implementation_.template Send<M>(address, request_id, std::move(message));
}
/// The current system time. This time source should be preferred over any other,
/// because it lets us deterministically control clocks from tests for making
/// things like timeouts deterministic.
Time Now() const { return implementation_.Now(); }
/// Returns true if the system should shut-down.
bool ShouldShutDown() const { return implementation_.ShouldShutDown(); }
/// Returns a random number within the specified distribution.
template <class D = std::poisson_distribution<>, class Return = uint64_t>
Return Rand(D distrib) {
return implementation_.template Rand<D, Return>(distrib);
}
Address GetAddress() { return address_; }
};
}; // namespace memgraph::io

View File

@ -10,6 +10,9 @@ add_subdirectory(stress)
# concurrent test binaries
add_subdirectory(concurrent)
# simulation test binaries
add_subdirectory(simulation)
# manual test binaries
add_subdirectory(manual)

View File

@ -62,3 +62,6 @@ target_link_libraries(${test_prefix}storage_v2_gc mg-storage-v2)
add_benchmark(storage_v2_property_store.cpp)
target_link_libraries(${test_prefix}storage_v2_property_store mg-storage-v2)
add_benchmark(future.cpp)
target_link_libraries(${test_prefix}future mg-io)

View File

@ -0,0 +1,30 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <benchmark/benchmark.h>
#include "io/future.hpp"
static void FuturePairFillWait(benchmark::State &state) {
uint64_t counter = 0;
while (state.KeepRunning()) {
auto [future, promise] = memgraph::io::FuturePromisePair<int>();
promise.Fill(1);
std::move(future).Wait();
++counter;
}
state.SetItemsProcessed(counter);
}
BENCHMARK(FuturePairFillWait)->Unit(benchmark::kNanosecond)->UseRealTime();
BENCHMARK_MAIN();

View File

@ -0,0 +1,30 @@
set(test_prefix memgraph__simulation__)
find_package(gflags)
add_custom_target(memgraph__simulation)
function(add_simulation_test test_cpp san)
# get exec name (remove extension from the abs path)
get_filename_component(exec_name ${test_cpp} NAME_WE)
set(target_name ${test_prefix}${exec_name})
add_executable(${target_name} ${test_cpp})
# OUTPUT_NAME sets the real name of a target when it is built and can be
# used to help create two targets of the same name even though CMake
# requires unique logical target names
set_target_properties(${target_name} PROPERTIES OUTPUT_NAME ${exec_name})
target_link_libraries(${target_name} gtest gmock mg-utils mg-io mg-io-simulator)
# sanitize
target_compile_options(${target_name} PRIVATE -fsanitize=${san})
target_link_options(${target_name} PRIVATE -fsanitize=${san})
# register test
add_test(${target_name} ${exec_name})
add_dependencies(memgraph__simulation ${target_name})
endfunction(add_simulation_test)
add_simulation_test(basic_request.cpp address)
add_simulation_test(trial_query_storage/query_storage_test.cpp address)

View File

@ -0,0 +1,87 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <thread>
#include "io/simulator/simulator.hpp"
using memgraph::io::Address;
using memgraph::io::Io;
using memgraph::io::ResponseFuture;
using memgraph::io::ResponseResult;
using memgraph::io::simulator::Simulator;
using memgraph::io::simulator::SimulatorConfig;
using memgraph::io::simulator::SimulatorTransport;
struct CounterRequest {
uint64_t proposal;
};
struct CounterResponse {
uint64_t highest_seen;
};
void run_server(Io<SimulatorTransport> io) {
uint64_t highest_seen = 0;
while (!io.ShouldShutDown()) {
std::cout << "[SERVER] Is receiving..." << std::endl;
auto request_result = io.Receive<CounterRequest>();
if (request_result.HasError()) {
std::cout << "[SERVER] Error, continue" << std::endl;
continue;
}
auto request_envelope = request_result.GetValue();
auto req = std::get<CounterRequest>(request_envelope.message);
highest_seen = std::max(highest_seen, req.proposal);
auto srv_res = CounterResponse{highest_seen};
io.Send(request_envelope.from_address, request_envelope.request_id, srv_res);
}
}
int main() {
auto config = SimulatorConfig{
.drop_percent = 0,
.perform_timeouts = true,
.scramble_messages = true,
.rng_seed = 0,
};
auto simulator = Simulator(config);
auto cli_addr = Address::TestAddress(1);
auto srv_addr = Address::TestAddress(2);
Io<SimulatorTransport> cli_io = simulator.Register(cli_addr);
Io<SimulatorTransport> srv_io = simulator.Register(srv_addr);
auto srv_thread = std::jthread(run_server, std::move(srv_io));
simulator.IncrementServerCountAndWaitForQuiescentState(srv_addr);
for (int i = 1; i < 3; ++i) {
// send request
CounterRequest cli_req;
cli_req.proposal = i;
auto res_f = cli_io.Request<CounterRequest, CounterResponse>(srv_addr, cli_req);
auto res_rez = std::move(res_f).Wait();
if (!res_rez.HasError()) {
std::cout << "[CLIENT] Got a valid response" << std::endl;
auto env = res_rez.GetValue();
MG_ASSERT(env.message.highest_seen == i);
} else {
std::cout << "[CLIENT] Got an error" << std::endl;
}
}
simulator.ShutDown();
return 0;
}

View File

@ -0,0 +1,34 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <optional>
#include <string>
#include <vector>
namespace memgraph::tests::simulation {
struct Vertex {
std::string key;
};
struct ScanVerticesRequest {
int64_t count;
std::optional<int64_t> continuation;
};
struct VerticesResponse {
std::vector<Vertex> vertices;
std::optional<int64_t> continuation;
};
} // namespace memgraph::tests::simulation

View File

@ -0,0 +1,85 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <iostream>
#include "io/address.hpp"
#include "io/simulator/simulator.hpp"
#include "io/simulator/simulator_config.hpp"
#include "io/simulator/simulator_transport.hpp"
#include "io/transport.hpp"
#include "messages.hpp"
namespace memgraph::tests::simulation {
using memgraph::io::Io;
using memgraph::io::simulator::SimulatorTransport;
void run_server(Io<SimulatorTransport> io) {
while (!io.ShouldShutDown()) {
std::cout << "[STORAGE] Is receiving..." << std::endl;
auto request_result = io.Receive<ScanVerticesRequest>();
if (request_result.HasError()) {
std::cout << "[STORAGE] Error, continue" << std::endl;
continue;
}
auto request_envelope = request_result.GetValue();
auto req = std::get<ScanVerticesRequest>(request_envelope.message);
VerticesResponse response{};
const int64_t start_index = std::invoke([&req] {
if (req.continuation.has_value()) {
return *req.continuation;
}
return 0L;
});
for (auto index = start_index; index < start_index + req.count; ++index) {
response.vertices.push_back({std::string("Vertex_") + std::to_string(index)});
}
io.Send(request_envelope.from_address, request_envelope.request_id, response);
}
}
} // namespace memgraph::tests::simulation
int main() {
using memgraph::io::Address;
using memgraph::io::Io;
using memgraph::io::simulator::Simulator;
using memgraph::io::simulator::SimulatorConfig;
using memgraph::io::simulator::SimulatorTransport;
using memgraph::tests::simulation::run_server;
using memgraph::tests::simulation::ScanVerticesRequest;
using memgraph::tests::simulation::VerticesResponse;
auto config = SimulatorConfig{
.drop_percent = 0,
.perform_timeouts = true,
.scramble_messages = true,
.rng_seed = 0,
};
auto simulator = Simulator(config);
auto cli_addr = Address::TestAddress(1);
auto srv_addr = Address::TestAddress(2);
Io<SimulatorTransport> cli_io = simulator.Register(cli_addr);
Io<SimulatorTransport> srv_io = simulator.Register(srv_addr);
auto srv_thread = std::jthread(run_server, std::move(srv_io));
simulator.IncrementServerCountAndWaitForQuiescentState(srv_addr);
auto req = ScanVerticesRequest{2, std::nullopt};
auto res_f = cli_io.Request<ScanVerticesRequest, VerticesResponse>(srv_addr, req);
auto res_rez = std::move(res_f).Wait();
simulator.ShutDown();
return 0;
}

View File

@ -396,3 +396,7 @@ find_package(Boost REQUIRED)
add_unit_test(websocket.cpp)
target_link_libraries(${test_prefix}websocket mg-communication Boost::headers)
# Test future
add_unit_test(future.cpp)
target_link_libraries(${test_prefix}future mg-io)

55
tests/unit/future.cpp Normal file
View File

@ -0,0 +1,55 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <string>
#include <thread>
#include "gtest/gtest.h"
#include "io/future.hpp"
using namespace memgraph::io;
void Fill(Promise<std::string> promise_1) { promise_1.Fill("success"); }
void Wait(Future<std::string> future_1, Promise<std::string> promise_2) {
std::string result_1 = std::move(future_1).Wait();
EXPECT_TRUE(result_1 == "success");
promise_2.Fill("it worked");
}
TEST(Future, BasicLifecycle) {
std::atomic_bool waiting = false;
std::function<bool()> notifier = [&] {
waiting.store(true, std::memory_order_seq_cst);
return false;
};
auto [future_1, promise_1] = FuturePromisePairWithNotifier<std::string>(notifier);
auto [future_2, promise_2] = FuturePromisePair<std::string>();
std::jthread t1(Wait, std::move(future_1), std::move(promise_2));
// spin in a loop until the promise signals
// that it is waiting
while (!waiting.load(std::memory_order_acquire)) {
std::this_thread::yield();
}
std::jthread t2(Fill, std::move(promise_1));
t1.join();
t2.join();
std::string result_2 = std::move(future_2).Wait();
EXPECT_TRUE(result_2 == "it worked");
}