Merge branch 'T0879-MG-transport-prototype' of github.com:memgraph/memgraph into T0941-MG-implement-basic-raft-version
This commit is contained in:
commit
618a3d96b3
11
.github/workflows/diff.yaml
vendored
11
.github/workflows/diff.yaml
vendored
@ -171,7 +171,7 @@ jobs:
|
||||
|
||||
# Run leftover CTest tests (all except unit and benchmark tests).
|
||||
cd build
|
||||
ctest -E "(memgraph__unit|memgraph__benchmark)" --output-on-failure
|
||||
ctest -E "(memgraph__unit|memgraph__benchmark|memgraph__simulation)" --output-on-failure
|
||||
|
||||
- name: Run drivers tests
|
||||
run: |
|
||||
@ -262,6 +262,15 @@ jobs:
|
||||
cd build
|
||||
ctest -R memgraph__unit --output-on-failure -j$THREADS
|
||||
|
||||
- name: Run simulation tests
|
||||
run: |
|
||||
# Activate toolchain.
|
||||
source /opt/toolchain-v4/activate
|
||||
|
||||
# Run unit tests.
|
||||
cd build
|
||||
ctest -R memgraph__simulation --output-on-failure -j$THREADS
|
||||
|
||||
- name: Run e2e tests
|
||||
run: |
|
||||
# TODO(gitbuda): Setup mgclient and pymgclient properly.
|
||||
|
@ -5,7 +5,6 @@ add_subdirectory(lisp)
|
||||
add_subdirectory(utils)
|
||||
add_subdirectory(requests)
|
||||
add_subdirectory(io)
|
||||
add_subdirectory(io/rsm)
|
||||
add_subdirectory(io/simulator)
|
||||
add_subdirectory(kvstore)
|
||||
add_subdirectory(telemetry)
|
||||
|
@ -2,11 +2,7 @@ set(io_src_files
|
||||
network/addrinfo.cpp
|
||||
network/endpoint.cpp
|
||||
network/socket.cpp
|
||||
network/utils.cpp
|
||||
future.hpp
|
||||
address.hpp
|
||||
errors.hpp
|
||||
transport.hpp)
|
||||
network/utils.cpp)
|
||||
|
||||
find_package(fmt REQUIRED)
|
||||
find_package(Threads REQUIRED)
|
||||
|
@ -13,8 +13,10 @@
|
||||
|
||||
#include <compare>
|
||||
|
||||
#include <fmt/format.h>
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
#include <boost/uuid/uuid.hpp>
|
||||
#include <boost/uuid/uuid_io.hpp>
|
||||
|
||||
namespace memgraph::io {
|
||||
struct Address {
|
||||
@ -36,15 +38,26 @@ struct Address {
|
||||
}
|
||||
|
||||
bool operator==(const Address &other) const {
|
||||
return (last_known_ip == other.last_known_ip) && (last_known_port == other.last_known_port);
|
||||
return (unique_id == other.unique_id) && (last_known_ip == other.last_known_ip) &&
|
||||
(last_known_port == other.last_known_port);
|
||||
}
|
||||
|
||||
/// unique_id is most dominant for ordering, then last_known_ip, then last_known_port
|
||||
bool operator<(const Address &other) const {
|
||||
if (last_known_ip == other.last_known_ip) {
|
||||
return last_known_port < other.last_known_port;
|
||||
} else {
|
||||
if (unique_id != other.unique_id) {
|
||||
return unique_id < other.unique_id;
|
||||
}
|
||||
|
||||
if (last_known_ip != other.last_known_ip) {
|
||||
return last_known_ip < other.last_known_ip;
|
||||
}
|
||||
|
||||
return last_known_port < other.last_known_port;
|
||||
}
|
||||
|
||||
std::string ToString() const {
|
||||
return fmt::format("Address {{ unique_id: {}, last_known_ip: {}, last_known_port: {} }}",
|
||||
boost::uuids::to_string(unique_id), last_known_ip.to_string(), last_known_port);
|
||||
}
|
||||
};
|
||||
}; // namespace memgraph::io
|
||||
|
@ -18,9 +18,8 @@
|
||||
#include <thread>
|
||||
#include <utility>
|
||||
|
||||
#include "utils/logging.hpp"
|
||||
|
||||
#include "io/errors.hpp"
|
||||
#include "utils/logging.hpp"
|
||||
|
||||
namespace memgraph::io {
|
||||
|
||||
@ -28,15 +27,15 @@ namespace memgraph::io {
|
||||
// construct a Promise or Future is to pass a Shared in. This
|
||||
// ensures that Promises and Futures can only be constructed
|
||||
// in this translation unit.
|
||||
namespace {
|
||||
namespace details {
|
||||
template <typename T>
|
||||
class Shared {
|
||||
std::condition_variable cv_;
|
||||
std::mutex mu_;
|
||||
mutable std::condition_variable cv_;
|
||||
mutable std::mutex mu_;
|
||||
std::optional<T> item_;
|
||||
bool consumed_ = false;
|
||||
bool waiting_ = false;
|
||||
std::optional<std::function<bool()>> simulator_notifier_;
|
||||
std::function<bool()> simulator_notifier_ = nullptr;
|
||||
|
||||
public:
|
||||
explicit Shared(std::function<bool()> simulator_notifier) : simulator_notifier_(simulator_notifier) {}
|
||||
@ -47,13 +46,26 @@ class Shared {
|
||||
Shared &operator=(const Shared &) = delete;
|
||||
~Shared() = default;
|
||||
|
||||
/// Takes the item out of our optional item_ and returns it.
|
||||
T Take() {
|
||||
MG_ASSERT(item_, "Take called without item_ being present");
|
||||
MG_ASSERT(!consumed_, "Take called on already-consumed Future");
|
||||
|
||||
T ret = std::move(item_).value();
|
||||
item_.reset();
|
||||
|
||||
consumed_ = true;
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
T Wait() {
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
waiting_ = true;
|
||||
|
||||
while (!item_) {
|
||||
bool simulator_progressed = false;
|
||||
if (simulator_notifier_) {
|
||||
if (simulator_notifier_) [[unlikely]] {
|
||||
// We can't hold our own lock while notifying
|
||||
// the simulator because notifying the simulator
|
||||
// involves acquiring the simulator's mutex
|
||||
@ -65,7 +77,7 @@ class Shared {
|
||||
// so we have to get out of its way to avoid
|
||||
// a cyclical deadlock.
|
||||
lock.unlock();
|
||||
simulator_progressed = (*simulator_notifier_)();
|
||||
simulator_progressed = (simulator_notifier_)();
|
||||
lock.lock();
|
||||
if (item_) {
|
||||
// item may have been filled while we
|
||||
@ -74,19 +86,15 @@ class Shared {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!simulator_progressed) {
|
||||
if (!simulator_progressed) [[likely]] {
|
||||
cv_.wait(lock);
|
||||
}
|
||||
MG_ASSERT(!consumed_, "Future consumed twice!");
|
||||
}
|
||||
|
||||
T ret = std::move(item_).value();
|
||||
item_.reset();
|
||||
|
||||
waiting_ = false;
|
||||
consumed_ = true;
|
||||
|
||||
return ret;
|
||||
return Take();
|
||||
}
|
||||
|
||||
bool IsReady() {
|
||||
@ -98,16 +106,10 @@ class Shared {
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
|
||||
if (item_) {
|
||||
T ret = std::move(item_).value();
|
||||
item_.reset();
|
||||
|
||||
waiting_ = false;
|
||||
consumed_ = true;
|
||||
|
||||
return ret;
|
||||
} else {
|
||||
return std::nullopt;
|
||||
return Take();
|
||||
}
|
||||
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
void Fill(T item) {
|
||||
@ -128,28 +130,30 @@ class Shared {
|
||||
return waiting_;
|
||||
}
|
||||
};
|
||||
} // namespace
|
||||
} // namespace details
|
||||
|
||||
template <typename T>
|
||||
class Future {
|
||||
bool consumed_or_moved_ = false;
|
||||
std::shared_ptr<Shared<T>> shared_;
|
||||
std::shared_ptr<details::Shared<T>> shared_;
|
||||
|
||||
public:
|
||||
explicit Future(std::shared_ptr<Shared<T>> shared) : shared_(shared) {}
|
||||
explicit Future(std::shared_ptr<details::Shared<T>> shared) : shared_(shared) {}
|
||||
|
||||
Future() = delete;
|
||||
Future(Future &&old) {
|
||||
Future(Future &&old) noexcept {
|
||||
MG_ASSERT(!old.consumed_or_moved_, "Future moved from after already being moved from or consumed.");
|
||||
shared_ = std::move(old.shared_);
|
||||
consumed_or_moved_ = old.consumed_or_moved_;
|
||||
MG_ASSERT(!old.consumed_or_moved_, "Future moved from after already being moved from or consumed.");
|
||||
old.consumed_or_moved_ = true;
|
||||
}
|
||||
Future &operator=(Future &&old) {
|
||||
|
||||
Future &operator=(Future &&old) noexcept {
|
||||
MG_ASSERT(!old.consumed_or_moved_, "Future moved from after already being moved from or consumed.");
|
||||
shared_ = std::move(old.shared_);
|
||||
MG_ASSERT(!old.consumed_or_moved_, "Future moved from after already being moved from or consumed.");
|
||||
old.consumed_or_moved_ = true;
|
||||
}
|
||||
|
||||
Future(const Future &) = delete;
|
||||
Future &operator=(const Future &) = delete;
|
||||
~Future() = default;
|
||||
@ -177,7 +181,7 @@ class Future {
|
||||
|
||||
/// Block on the corresponding promise to be filled,
|
||||
/// returning the inner item when ready.
|
||||
T Wait() {
|
||||
T Wait() && {
|
||||
MG_ASSERT(!consumed_or_moved_, "Future should only be consumed with Wait once!");
|
||||
T ret = shared_->Wait();
|
||||
consumed_or_moved_ = true;
|
||||
@ -193,21 +197,22 @@ class Future {
|
||||
|
||||
template <typename T>
|
||||
class Promise {
|
||||
std::shared_ptr<Shared<T>> shared_;
|
||||
std::shared_ptr<details::Shared<T>> shared_;
|
||||
bool filled_or_moved_ = false;
|
||||
|
||||
public:
|
||||
explicit Promise(std::shared_ptr<Shared<T>> shared) : shared_(shared) {}
|
||||
explicit Promise(std::shared_ptr<details::Shared<T>> shared) : shared_(shared) {}
|
||||
|
||||
Promise() = delete;
|
||||
Promise(Promise &&old) {
|
||||
shared_ = std::move(old.shared_);
|
||||
Promise(Promise &&old) noexcept {
|
||||
MG_ASSERT(!old.filled_or_moved_, "Promise moved from after already being moved from or filled.");
|
||||
shared_ = std::move(old.shared_);
|
||||
old.filled_or_moved_ = true;
|
||||
}
|
||||
Promise &operator=(Promise &&old) {
|
||||
shared_ = std::move(old.shared_);
|
||||
|
||||
Promise &operator=(Promise &&old) noexcept {
|
||||
MG_ASSERT(!old.filled_or_moved_, "Promise moved from after already being moved from or filled.");
|
||||
shared_ = std::move(old.shared_);
|
||||
old.filled_or_moved_ = true;
|
||||
}
|
||||
Promise(const Promise &) = delete;
|
||||
@ -236,7 +241,7 @@ class Promise {
|
||||
|
||||
template <typename T>
|
||||
std::pair<Future<T>, Promise<T>> FuturePromisePair() {
|
||||
std::shared_ptr<Shared<T>> shared = std::make_shared<Shared<T>>();
|
||||
std::shared_ptr<details::Shared<T>> shared = std::make_shared<details::Shared<T>>();
|
||||
|
||||
Future<T> future = Future<T>(shared);
|
||||
Promise<T> promise = Promise<T>(shared);
|
||||
@ -246,7 +251,7 @@ std::pair<Future<T>, Promise<T>> FuturePromisePair() {
|
||||
|
||||
template <typename T>
|
||||
std::pair<Future<T>, Promise<T>> FuturePromisePairWithNotifier(std::function<bool()> simulator_notifier) {
|
||||
std::shared_ptr<Shared<T>> shared = std::make_shared<Shared<T>>(simulator_notifier);
|
||||
std::shared_ptr<details::Shared<T>> shared = std::make_shared<details::Shared<T>>(simulator_notifier);
|
||||
|
||||
Future<T> future = Future<T>(shared);
|
||||
Promise<T> promise = Promise<T>(shared);
|
||||
|
@ -1,9 +0,0 @@
|
||||
set(io_rsm_sources
|
||||
raft.hpp
|
||||
rsm.hpp)
|
||||
|
||||
find_package(fmt REQUIRED)
|
||||
find_package(Threads REQUIRED)
|
||||
|
||||
add_library(mg-io-rsm STATIC ${io_rsm_sources})
|
||||
target_link_libraries(mg-io-rsm stdc++fs Threads::Threads fmt::fmt mg-utils mg-io)
|
@ -1,8 +1,5 @@
|
||||
set(io_simulator_sources
|
||||
simulator.hpp
|
||||
simulator_handle.hpp
|
||||
simulator_stats.hpp
|
||||
simulator_config.hpp)
|
||||
simulator_handle.cpp)
|
||||
|
||||
find_package(fmt REQUIRED)
|
||||
find_package(Threads REQUIRED)
|
||||
|
172
src/io/simulator/message_conversion.hpp
Normal file
172
src/io/simulator/message_conversion.hpp
Normal file
@ -0,0 +1,172 @@
|
||||
// Copyright 2022 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "io/transport.hpp"
|
||||
|
||||
namespace memgraph::io::simulator {
|
||||
|
||||
using memgraph::io::Duration;
|
||||
using memgraph::io::Message;
|
||||
using memgraph::io::Time;
|
||||
|
||||
struct OpaqueMessage {
|
||||
Address from_address;
|
||||
uint64_t request_id;
|
||||
std::any message;
|
||||
|
||||
/// Recursively tries to match a specific type from the outer
|
||||
/// variant's parameter pack against the type of the std::any,
|
||||
/// and if it matches, make it concrete and return it. Otherwise,
|
||||
/// move on and compare the any with the next type from the
|
||||
/// parameter pack.
|
||||
///
|
||||
/// Return is the full std::variant<Ts...> type that holds the
|
||||
/// full parameter pack without interfering with recursive
|
||||
/// narrowing expansion.
|
||||
template <typename Return, Message Head, Message... Rest>
|
||||
std::optional<Return> Unpack(std::any &&a) {
|
||||
if (typeid(Head) == a.type()) {
|
||||
Head concrete = std::any_cast<Head>(std::move(a));
|
||||
return concrete;
|
||||
}
|
||||
|
||||
if constexpr (sizeof...(Rest) > 0) {
|
||||
return Unpack<Return, Rest...>(std::move(a));
|
||||
} else {
|
||||
return std::nullopt;
|
||||
}
|
||||
}
|
||||
|
||||
/// High level "user-facing" conversion function that lets
|
||||
/// people interested in conversion only supply a single
|
||||
/// parameter pack for the types that they want to compare
|
||||
/// with the any and potentially include in the returned
|
||||
/// variant.
|
||||
template <Message... Ms>
|
||||
requires(sizeof...(Ms) > 0) std::optional<std::variant<Ms...>> VariantFromAny(std::any &&a) {
|
||||
return Unpack<std::variant<Ms...>, Ms...>(std::move(a));
|
||||
}
|
||||
|
||||
template <Message... Ms>
|
||||
requires(sizeof...(Ms) > 0) std::optional<RequestEnvelope<Ms...>> Take() && {
|
||||
std::optional<std::variant<Ms...>> m_opt = VariantFromAny<Ms...>(std::move(message));
|
||||
|
||||
if (m_opt) {
|
||||
return RequestEnvelope<Ms...>{
|
||||
.message = std::move(*m_opt),
|
||||
.request_id = request_id,
|
||||
.from_address = from_address,
|
||||
};
|
||||
}
|
||||
|
||||
return std::nullopt;
|
||||
}
|
||||
};
|
||||
|
||||
class OpaquePromiseTraitBase {
|
||||
public:
|
||||
virtual const std::type_info *TypeInfo() const = 0;
|
||||
virtual bool IsAwaited(void *ptr) const = 0;
|
||||
virtual void Fill(void *ptr, OpaqueMessage &&) const = 0;
|
||||
virtual void TimeOut(void *ptr) const = 0;
|
||||
|
||||
virtual ~OpaquePromiseTraitBase() = default;
|
||||
OpaquePromiseTraitBase() = default;
|
||||
OpaquePromiseTraitBase(const OpaquePromiseTraitBase &) = delete;
|
||||
OpaquePromiseTraitBase &operator=(const OpaquePromiseTraitBase &) = delete;
|
||||
OpaquePromiseTraitBase(OpaquePromiseTraitBase &&old) = delete;
|
||||
OpaquePromiseTraitBase &operator=(OpaquePromiseTraitBase &&) = delete;
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
class OpaquePromiseTrait : public OpaquePromiseTraitBase {
|
||||
public:
|
||||
const std::type_info *TypeInfo() const override { return &typeid(T); };
|
||||
|
||||
bool IsAwaited(void *ptr) const override { return static_cast<ResponsePromise<T> *>(ptr)->IsAwaited(); };
|
||||
|
||||
void Fill(void *ptr, OpaqueMessage &&opaque_message) const override {
|
||||
T message = std::any_cast<T>(std::move(opaque_message.message));
|
||||
auto response_envelope = ResponseEnvelope<T>{.message = std::move(message),
|
||||
.request_id = opaque_message.request_id,
|
||||
.from_address = opaque_message.from_address};
|
||||
auto promise = static_cast<ResponsePromise<T> *>(ptr);
|
||||
auto unique_promise = std::unique_ptr<ResponsePromise<T>>(promise);
|
||||
unique_promise->Fill(std::move(response_envelope));
|
||||
};
|
||||
|
||||
void TimeOut(void *ptr) const override {
|
||||
auto promise = static_cast<ResponsePromise<T> *>(ptr);
|
||||
auto unique_promise = std::unique_ptr<ResponsePromise<T>>(promise);
|
||||
ResponseResult<T> result = TimedOut{};
|
||||
unique_promise->Fill(std::move(result));
|
||||
}
|
||||
};
|
||||
|
||||
class OpaquePromise {
|
||||
void *ptr_;
|
||||
std::unique_ptr<OpaquePromiseTraitBase> trait_;
|
||||
|
||||
public:
|
||||
OpaquePromise(OpaquePromise &&old) noexcept : ptr_(old.ptr_), trait_(std::move(old.trait_)) { old.ptr_ = nullptr; }
|
||||
|
||||
OpaquePromise &operator=(OpaquePromise &&old) noexcept {
|
||||
MG_ASSERT(this != &old);
|
||||
ptr_ = old.ptr_;
|
||||
trait_ = std::move(old.trait_);
|
||||
old.ptr_ = nullptr;
|
||||
return *this;
|
||||
}
|
||||
|
||||
OpaquePromise(const OpaquePromise &) = delete;
|
||||
OpaquePromise &operator=(const OpaquePromise &) = delete;
|
||||
|
||||
template <typename T>
|
||||
std::unique_ptr<ResponsePromise<T>> Take() && {
|
||||
MG_ASSERT(typeid(T) == *trait_->TypeInfo());
|
||||
MG_ASSERT(ptr_ != nullptr);
|
||||
|
||||
auto ptr = static_cast<ResponsePromise<T> *>(ptr_);
|
||||
|
||||
ptr_ = nullptr;
|
||||
|
||||
return std::unique_ptr<T>(ptr);
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
explicit OpaquePromise(std::unique_ptr<ResponsePromise<T>> promise)
|
||||
: ptr_(static_cast<void *>(promise.release())), trait_(std::make_unique<OpaquePromiseTrait<T>>()) {}
|
||||
|
||||
bool IsAwaited() {
|
||||
MG_ASSERT(ptr_ != nullptr);
|
||||
return trait_->IsAwaited(ptr_);
|
||||
}
|
||||
|
||||
void TimeOut() {
|
||||
MG_ASSERT(ptr_ != nullptr);
|
||||
trait_->TimeOut(ptr_);
|
||||
ptr_ = nullptr;
|
||||
}
|
||||
|
||||
void Fill(OpaqueMessage &&opaque_message) {
|
||||
MG_ASSERT(ptr_ != nullptr);
|
||||
trait_->Fill(ptr_, std::move(opaque_message));
|
||||
ptr_ = nullptr;
|
||||
}
|
||||
|
||||
~OpaquePromise() {
|
||||
MG_ASSERT(ptr_ == nullptr, "OpaquePromise destroyed without being explicitly timed out or filled");
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace memgraph::io::simulator
|
@ -12,6 +12,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
#include <random>
|
||||
|
||||
#include "io/address.hpp"
|
||||
#include "io/simulator/simulator_config.hpp"
|
||||
@ -20,7 +21,7 @@
|
||||
|
||||
namespace memgraph::io::simulator {
|
||||
class Simulator {
|
||||
std::mt19937 rng_{};
|
||||
std::mt19937 rng_;
|
||||
std::shared_ptr<SimulatorHandle> simulator_handle_;
|
||||
|
||||
public:
|
||||
@ -30,9 +31,9 @@ class Simulator {
|
||||
void ShutDown() { simulator_handle_->ShutDown(); }
|
||||
|
||||
Io<SimulatorTransport> Register(Address address) {
|
||||
std::uniform_int_distribution<uint64_t> seed_distrib{};
|
||||
std::uniform_int_distribution<uint64_t> seed_distrib;
|
||||
uint64_t seed = seed_distrib(rng_);
|
||||
return Io(SimulatorTransport(simulator_handle_, address, seed), address);
|
||||
return Io{SimulatorTransport{simulator_handle_, address, seed}, address};
|
||||
}
|
||||
|
||||
void IncrementServerCountAndWaitForQuiescentState(Address address) {
|
||||
|
@ -11,13 +11,20 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <chrono>
|
||||
|
||||
#include "io/time.hpp"
|
||||
|
||||
namespace memgraph::io::simulator {
|
||||
|
||||
using memgraph::io::Time;
|
||||
|
||||
struct SimulatorConfig {
|
||||
int drop_percent = 0;
|
||||
uint8_t drop_percent = 0;
|
||||
bool perform_timeouts = false;
|
||||
bool scramble_messages = true;
|
||||
uint64_t rng_seed = 0;
|
||||
uint64_t start_time = 0;
|
||||
uint64_t abort_time = ULLONG_MAX;
|
||||
Time start_time = Time::min();
|
||||
Time abort_time = Time::max();
|
||||
};
|
||||
}; // namespace memgraph::io::simulator
|
||||
|
156
src/io/simulator/simulator_handle.cpp
Normal file
156
src/io/simulator/simulator_handle.cpp
Normal file
@ -0,0 +1,156 @@
|
||||
// Copyright 2022 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include "io/simulator/simulator_handle.hpp"
|
||||
#include "io/address.hpp"
|
||||
#include "io/errors.hpp"
|
||||
#include "io/simulator/simulator_config.hpp"
|
||||
#include "io/simulator/simulator_stats.hpp"
|
||||
#include "io/time.hpp"
|
||||
#include "io/transport.hpp"
|
||||
|
||||
namespace memgraph::io::simulator {
|
||||
|
||||
using memgraph::io::Duration;
|
||||
using memgraph::io::Time;
|
||||
|
||||
void SimulatorHandle::ShutDown() {
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
should_shut_down_ = true;
|
||||
cv_.notify_all();
|
||||
}
|
||||
|
||||
bool SimulatorHandle::ShouldShutDown() const {
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
return should_shut_down_;
|
||||
}
|
||||
|
||||
void SimulatorHandle::IncrementServerCountAndWaitForQuiescentState(Address address) {
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
server_addresses_.insert(address);
|
||||
|
||||
while (true) {
|
||||
const size_t blocked_servers = BlockedServers();
|
||||
|
||||
const bool all_servers_blocked = blocked_servers == server_addresses_.size();
|
||||
|
||||
if (all_servers_blocked) {
|
||||
return;
|
||||
}
|
||||
|
||||
cv_.wait(lock);
|
||||
}
|
||||
}
|
||||
|
||||
size_t SimulatorHandle::BlockedServers() {
|
||||
size_t blocked_servers = blocked_on_receive_;
|
||||
|
||||
for (auto &[promise_key, opaque_promise] : promises_) {
|
||||
if (opaque_promise.promise.IsAwaited()) {
|
||||
if (server_addresses_.contains(promise_key.requester_address)) {
|
||||
blocked_servers++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return blocked_servers;
|
||||
}
|
||||
|
||||
bool SimulatorHandle::MaybeTickSimulator() {
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
|
||||
const size_t blocked_servers = BlockedServers();
|
||||
|
||||
if (blocked_servers < server_addresses_.size()) {
|
||||
// we only need to advance the simulator when all
|
||||
// servers have reached a quiescent state, blocked
|
||||
// on their own futures or receive methods.
|
||||
return false;
|
||||
}
|
||||
|
||||
stats_.simulator_ticks++;
|
||||
|
||||
cv_.notify_all();
|
||||
|
||||
TimeoutPromisesPastDeadline();
|
||||
|
||||
if (in_flight_.empty()) {
|
||||
// return early here because there are no messages to schedule
|
||||
|
||||
// We tick the clock forward when all servers are blocked but
|
||||
// there are no in-flight messages to schedule delivery of.
|
||||
std::poisson_distribution<> time_distrib(50);
|
||||
Duration clock_advance = std::chrono::microseconds{time_distrib(rng_)};
|
||||
cluster_wide_time_microseconds_ += clock_advance;
|
||||
|
||||
MG_ASSERT(cluster_wide_time_microseconds_ < config_.abort_time,
|
||||
"Cluster has executed beyond its configured abort_time, and something may be failing to make progress "
|
||||
"in an expected amount of time.");
|
||||
return true;
|
||||
}
|
||||
|
||||
if (config_.scramble_messages) {
|
||||
// scramble messages
|
||||
std::uniform_int_distribution<size_t> swap_distrib(0, in_flight_.size() - 1);
|
||||
const size_t swap_index = swap_distrib(rng_);
|
||||
std::swap(in_flight_[swap_index], in_flight_.back());
|
||||
}
|
||||
|
||||
auto [to_address, opaque_message] = std::move(in_flight_.back());
|
||||
in_flight_.pop_back();
|
||||
|
||||
std::uniform_int_distribution<int> drop_distrib(0, 99);
|
||||
const int drop_threshold = drop_distrib(rng_);
|
||||
const bool should_drop = drop_threshold < config_.drop_percent;
|
||||
|
||||
if (should_drop) {
|
||||
stats_.dropped_messages++;
|
||||
}
|
||||
|
||||
PromiseKey promise_key{.requester_address = to_address,
|
||||
.request_id = opaque_message.request_id,
|
||||
.replier_address = opaque_message.from_address};
|
||||
|
||||
if (promises_.contains(promise_key)) {
|
||||
// complete waiting promise if it's there
|
||||
DeadlineAndOpaquePromise dop = std::move(promises_.at(promise_key));
|
||||
promises_.erase(promise_key);
|
||||
|
||||
const bool normal_timeout = config_.perform_timeouts && (dop.deadline < cluster_wide_time_microseconds_);
|
||||
|
||||
if (should_drop || normal_timeout) {
|
||||
stats_.timed_out_requests++;
|
||||
dop.promise.TimeOut();
|
||||
} else {
|
||||
stats_.total_responses++;
|
||||
dop.promise.Fill(std::move(opaque_message));
|
||||
}
|
||||
} else if (should_drop) {
|
||||
// don't add it anywhere, let it drop
|
||||
} else {
|
||||
// add to can_receive_ if not
|
||||
const auto &[om_vec, inserted] = can_receive_.try_emplace(to_address, std::vector<OpaqueMessage>());
|
||||
om_vec->second.emplace_back(std::move(opaque_message));
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
Time SimulatorHandle::Now() const {
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
return cluster_wide_time_microseconds_;
|
||||
}
|
||||
|
||||
SimulatorStats SimulatorHandle::Stats() {
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
return stats_;
|
||||
}
|
||||
} // namespace memgraph::io::simulator
|
@ -11,9 +11,8 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <compare>
|
||||
|
||||
#include <any>
|
||||
#include <compare>
|
||||
#include <iostream>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
@ -25,194 +24,57 @@
|
||||
|
||||
#include "io/address.hpp"
|
||||
#include "io/errors.hpp"
|
||||
#include "io/simulator/message_conversion.hpp"
|
||||
#include "io/simulator/simulator_config.hpp"
|
||||
#include "io/simulator/simulator_stats.hpp"
|
||||
#include "io/time.hpp"
|
||||
#include "io/transport.hpp"
|
||||
|
||||
namespace memgraph::io::simulator {
|
||||
struct OpaqueMessage {
|
||||
Address from_address;
|
||||
uint64_t request_id;
|
||||
std::any message;
|
||||
|
||||
/// Recursively tries to match a specific type from the outer
|
||||
/// variant's parameter pack against the type of the std::any,
|
||||
/// and if it matches, make it concrete and return it. Otherwise,
|
||||
/// move on and compare the any with the next type from the
|
||||
/// parameter pack.
|
||||
///
|
||||
/// Return is the full std::variant<Ts...> type that holds the
|
||||
/// full parameter pack without interfering with recursive
|
||||
/// narrowing expansion.
|
||||
template <typename Return, Message Head, Message... Rest>
|
||||
std::optional<Return> Unpack(std::any &&a) {
|
||||
if (typeid(Head) == a.type()) {
|
||||
Head concrete = std::any_cast<Head>(std::move(a));
|
||||
return concrete;
|
||||
}
|
||||
|
||||
if constexpr (sizeof...(Rest) > 0) {
|
||||
return Unpack<Return, Rest...>(std::move(a));
|
||||
} else {
|
||||
return std::nullopt;
|
||||
}
|
||||
}
|
||||
|
||||
/// High level "user-facing" conversion function that lets
|
||||
/// people interested in conversion only supply a single
|
||||
/// parameter pack for the types that they want to compare
|
||||
/// with the any and potentially include in the returned
|
||||
/// variant.
|
||||
template <Message... Ms>
|
||||
requires(sizeof...(Ms) > 0) std::optional<std::variant<Ms...>> VariantFromAny(std::any &&a) {
|
||||
return Unpack<std::variant<Ms...>, Ms...>(std::move(a));
|
||||
}
|
||||
|
||||
template <Message... Ms>
|
||||
requires(sizeof...(Ms) > 0) std::optional<RequestEnvelope<Ms...>> Take() {
|
||||
std::optional<std::variant<Ms...>> m_opt = VariantFromAny<Ms...>(std::move(message));
|
||||
|
||||
if (m_opt) {
|
||||
return RequestEnvelope<Ms...>{
|
||||
.message = std::move(*m_opt),
|
||||
.request_id = request_id,
|
||||
.from_address = from_address,
|
||||
};
|
||||
} else {
|
||||
return std::nullopt;
|
||||
}
|
||||
}
|
||||
};
|
||||
using memgraph::io::Duration;
|
||||
using memgraph::io::Time;
|
||||
|
||||
struct PromiseKey {
|
||||
Address requester_address;
|
||||
uint64_t request_id;
|
||||
// TODO(tyler) possibly remove replier_address from promise key
|
||||
// once we want to support DSR.
|
||||
Address replier_address;
|
||||
|
||||
public:
|
||||
bool operator<(const PromiseKey &other) const {
|
||||
if (requester_address == other.requester_address) {
|
||||
return request_id < other.request_id;
|
||||
} else {
|
||||
if (requester_address != other.requester_address) {
|
||||
return requester_address < other.requester_address;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
class OpaquePromise {
|
||||
const std::type_info *ti_;
|
||||
void *ptr_;
|
||||
std::function<void(void *)> dtor_;
|
||||
std::function<bool(void *)> is_awaited_;
|
||||
std::function<void(void *, OpaqueMessage)> fill_;
|
||||
std::function<void(void *)> time_out_;
|
||||
|
||||
public:
|
||||
OpaquePromise(OpaquePromise &&old)
|
||||
: ti_(old.ti_),
|
||||
ptr_(old.ptr_),
|
||||
dtor_(old.dtor_),
|
||||
is_awaited_(old.is_awaited_),
|
||||
fill_(old.fill_),
|
||||
time_out_(old.time_out_) {
|
||||
old.ptr_ = nullptr;
|
||||
}
|
||||
|
||||
OpaquePromise &operator=(OpaquePromise &&old) {
|
||||
MG_ASSERT(this != &old);
|
||||
|
||||
ptr_ = old.ptr_;
|
||||
ti_ = old.ti_;
|
||||
dtor_ = old.dtor_;
|
||||
is_awaited_ = old.is_awaited_;
|
||||
fill_ = old.fill_;
|
||||
time_out_ = old.time_out_;
|
||||
|
||||
old.ptr_ = nullptr;
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
||||
OpaquePromise(const OpaquePromise &) = delete;
|
||||
OpaquePromise &operator=(const OpaquePromise &) = delete;
|
||||
|
||||
template <typename T>
|
||||
std::unique_ptr<ResponsePromise<T>> Take() {
|
||||
MG_ASSERT(typeid(T) == *ti_);
|
||||
MG_ASSERT(ptr_ != nullptr);
|
||||
|
||||
ResponsePromise<T> *ptr = static_cast<ResponsePromise<T> *>(ptr_);
|
||||
|
||||
ptr_ = nullptr;
|
||||
|
||||
return std::unique_ptr<T>(ptr);
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
explicit OpaquePromise(std::unique_ptr<ResponsePromise<T>> promise)
|
||||
: ti_(&typeid(T)),
|
||||
ptr_(static_cast<void *>(promise.release())),
|
||||
dtor_([](void *ptr) { static_cast<ResponsePromise<T> *>(ptr)->~ResponsePromise<T>(); }),
|
||||
is_awaited_([](void *ptr) { return static_cast<ResponsePromise<T> *>(ptr)->IsAwaited(); }),
|
||||
fill_([](void *this_ptr, OpaqueMessage opaque_message) {
|
||||
T message = std::any_cast<T>(std::move(opaque_message.message));
|
||||
auto response_envelope = ResponseEnvelope<T>{.message = std::move(message),
|
||||
.request_id = opaque_message.request_id,
|
||||
.from_address = opaque_message.from_address};
|
||||
ResponsePromise<T> *promise = static_cast<ResponsePromise<T> *>(this_ptr);
|
||||
auto unique_promise = std::unique_ptr<ResponsePromise<T>>(promise);
|
||||
unique_promise->Fill(std::move(response_envelope));
|
||||
}),
|
||||
time_out_([](void *ptr) {
|
||||
ResponsePromise<T> *promise = static_cast<ResponsePromise<T> *>(ptr);
|
||||
auto unique_promise = std::unique_ptr<ResponsePromise<T>>(promise);
|
||||
ResponseResult<T> result = TimedOut{};
|
||||
unique_promise->Fill(std::move(result));
|
||||
}) {}
|
||||
|
||||
bool IsAwaited() {
|
||||
MG_ASSERT(ptr_ != nullptr);
|
||||
return is_awaited_(ptr_);
|
||||
}
|
||||
|
||||
void TimeOut() {
|
||||
MG_ASSERT(ptr_ != nullptr);
|
||||
time_out_(ptr_);
|
||||
ptr_ = nullptr;
|
||||
}
|
||||
|
||||
void Fill(OpaqueMessage &&opaque_message) {
|
||||
MG_ASSERT(ptr_ != nullptr);
|
||||
fill_(ptr_, std::move(opaque_message));
|
||||
ptr_ = nullptr;
|
||||
}
|
||||
|
||||
~OpaquePromise() {
|
||||
if (nullptr != ptr_) {
|
||||
dtor_(ptr_);
|
||||
if (request_id != other.request_id) {
|
||||
return request_id < other.request_id;
|
||||
}
|
||||
|
||||
return replier_address < other.replier_address;
|
||||
}
|
||||
};
|
||||
|
||||
struct DeadlineAndOpaquePromise {
|
||||
uint64_t deadline;
|
||||
Time deadline;
|
||||
OpaquePromise promise;
|
||||
};
|
||||
|
||||
class SimulatorHandle {
|
||||
std::mutex mu_{};
|
||||
std::condition_variable cv_;
|
||||
mutable std::mutex mu_{};
|
||||
mutable std::condition_variable cv_;
|
||||
|
||||
// messages that have not yet been scheduled or dropped
|
||||
std::vector<std::pair<Address, OpaqueMessage>> in_flight_;
|
||||
|
||||
// the responsese to requests that are being waited on
|
||||
// the responses to requests that are being waited on
|
||||
std::map<PromiseKey, DeadlineAndOpaquePromise> promises_;
|
||||
|
||||
// messages that are sent to servers that may later receive them
|
||||
std::map<Address, std::vector<OpaqueMessage>> can_receive_;
|
||||
|
||||
uint64_t cluster_wide_time_microseconds_;
|
||||
Time cluster_wide_time_microseconds_;
|
||||
bool should_shut_down_ = false;
|
||||
SimulatorStats stats_;
|
||||
size_t blocked_on_receive_ = 0;
|
||||
@ -220,161 +82,55 @@ class SimulatorHandle {
|
||||
std::mt19937 rng_;
|
||||
SimulatorConfig config_;
|
||||
|
||||
/// Returns the number of servers currently blocked on Receive, plus
|
||||
/// the servers that are blocked on Futures that were created through
|
||||
/// SimulatorTransport::Request.
|
||||
///
|
||||
/// TODO(tyler) investigate whether avoiding consideration of Futures
|
||||
/// increases determinism.
|
||||
size_t BlockedServers();
|
||||
|
||||
void TimeoutPromisesPastDeadline() {
|
||||
const Time now = cluster_wide_time_microseconds_;
|
||||
|
||||
for (auto &[promise_key, dop] : promises_) {
|
||||
if (dop.deadline < now) {
|
||||
spdlog::debug("timing out request from requester {} to replier {}.", promise_key.requester_address.ToString(),
|
||||
promise_key.replier_address.ToString());
|
||||
std::move(dop).promise.TimeOut();
|
||||
promises_.erase(promise_key);
|
||||
|
||||
stats_.timed_out_requests++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
explicit SimulatorHandle(SimulatorConfig config)
|
||||
: cluster_wide_time_microseconds_(config.start_time), rng_(config.rng_seed), config_(config) {}
|
||||
|
||||
void IncrementServerCountAndWaitForQuiescentState(Address address) {
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
server_addresses_.insert(address);
|
||||
void IncrementServerCountAndWaitForQuiescentState(Address address);
|
||||
|
||||
while (true) {
|
||||
size_t blocked_servers = blocked_on_receive_;
|
||||
/// This method causes most of the interesting simulation logic to happen, wrt network behavior.
|
||||
/// It checks to see if all background "server" threads are blocked on new messages, and if so,
|
||||
/// it will decide whether to drop, reorder, or deliver in-flight messages based on the SimulatorConfig
|
||||
/// that was used to create the Simulator.
|
||||
bool MaybeTickSimulator();
|
||||
|
||||
for (auto &[promise_key, opaque_promise] : promises_) {
|
||||
if (opaque_promise.promise.IsAwaited()) {
|
||||
if (server_addresses_.contains(promise_key.requester_address)) {
|
||||
blocked_servers++;
|
||||
}
|
||||
}
|
||||
}
|
||||
void ShutDown();
|
||||
|
||||
bool all_servers_blocked = blocked_servers == server_addresses_.size();
|
||||
|
||||
if (all_servers_blocked) {
|
||||
return;
|
||||
}
|
||||
|
||||
cv_.wait(lock);
|
||||
}
|
||||
}
|
||||
|
||||
void TimeoutPromisesPastDeadline() {
|
||||
uint64_t now = cluster_wide_time_microseconds_;
|
||||
|
||||
for (auto &[promise_key, dop] : promises_) {
|
||||
// TODO(tyler) queue this up and drop it after its deadline
|
||||
if (dop.deadline < now) {
|
||||
std::cout << "timing out request" << std::endl;
|
||||
DeadlineAndOpaquePromise dop = std::move(promises_.at(promise_key));
|
||||
promises_.erase(promise_key);
|
||||
|
||||
stats_.timed_out_requests++;
|
||||
|
||||
dop.promise.TimeOut();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool MaybeTickSimulator() {
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
|
||||
size_t blocked_servers = blocked_on_receive_;
|
||||
|
||||
for (auto &[promise_key, opaque_promise] : promises_) {
|
||||
if (opaque_promise.promise.IsAwaited()) {
|
||||
if (server_addresses_.contains(promise_key.requester_address)) {
|
||||
blocked_servers++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (blocked_servers < server_addresses_.size()) {
|
||||
// we only need to advance the simulator when all
|
||||
// servers have reached a quiescent state, blocked
|
||||
// on their own futures or receive methods.
|
||||
return false;
|
||||
}
|
||||
|
||||
stats_.simulator_ticks++;
|
||||
|
||||
cv_.notify_all();
|
||||
|
||||
TimeoutPromisesPastDeadline();
|
||||
|
||||
if (in_flight_.empty()) {
|
||||
// return early here because there are no messages to schedule
|
||||
|
||||
// We tick the clock forward when all servers are blocked but
|
||||
// there are no in-flight messages to schedule delivery of.
|
||||
std::poisson_distribution<> time_distrib(50);
|
||||
uint64_t clock_advance = time_distrib(rng_);
|
||||
cluster_wide_time_microseconds_ += clock_advance;
|
||||
|
||||
MG_ASSERT(cluster_wide_time_microseconds_ < config_.abort_time,
|
||||
"Cluster has executed beyond its configured abort_time, and something may be failing to make progress "
|
||||
"in an expected amount of time.");
|
||||
return true;
|
||||
}
|
||||
|
||||
if (config_.scramble_messages) {
|
||||
// scramble messages
|
||||
std::uniform_int_distribution<size_t> swap_distrib(0, in_flight_.size() - 1);
|
||||
size_t swap_index = swap_distrib(rng_);
|
||||
std::swap(in_flight_[swap_index], in_flight_.back());
|
||||
}
|
||||
|
||||
auto [to_address, opaque_message] = std::move(in_flight_.back());
|
||||
in_flight_.pop_back();
|
||||
|
||||
std::uniform_int_distribution<int> drop_distrib(0, 99);
|
||||
int drop_threshold = drop_distrib(rng_);
|
||||
bool should_drop = drop_threshold < config_.drop_percent;
|
||||
|
||||
if (should_drop) {
|
||||
stats_.dropped_messages++;
|
||||
}
|
||||
|
||||
PromiseKey promise_key{.requester_address = to_address,
|
||||
.request_id = opaque_message.request_id,
|
||||
.replier_address = opaque_message.from_address};
|
||||
|
||||
if (promises_.contains(promise_key)) {
|
||||
// complete waiting promise if it's there
|
||||
DeadlineAndOpaquePromise dop = std::move(promises_.at(promise_key));
|
||||
promises_.erase(promise_key);
|
||||
|
||||
bool normal_timeout = config_.perform_timeouts && (dop.deadline < cluster_wide_time_microseconds_);
|
||||
|
||||
if (should_drop || normal_timeout) {
|
||||
stats_.timed_out_requests++;
|
||||
dop.promise.TimeOut();
|
||||
} else {
|
||||
stats_.total_responses++;
|
||||
dop.promise.Fill(std::move(opaque_message));
|
||||
}
|
||||
} else if (should_drop) {
|
||||
// don't add it anywhere, let it drop
|
||||
} else {
|
||||
// add to can_receive_ if not
|
||||
const auto &[om_vec, inserted] = can_receive_.try_emplace(to_address, std::vector<OpaqueMessage>());
|
||||
om_vec->second.emplace_back(std::move(opaque_message));
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void ShutDown() {
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
should_shut_down_ = true;
|
||||
cv_.notify_all();
|
||||
}
|
||||
|
||||
bool ShouldShutDown() {
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
return should_shut_down_;
|
||||
}
|
||||
bool ShouldShutDown() const;
|
||||
|
||||
template <Message Request, Message Response>
|
||||
void SubmitRequest(Address to_address, Address from_address, uint64_t request_id, Request &&request,
|
||||
uint64_t timeout_microseconds, ResponsePromise<Response> &&promise) {
|
||||
void SubmitRequest(Address to_address, Address from_address, uint64_t request_id, Request &&request, Duration timeout,
|
||||
ResponsePromise<Response> &&promise) {
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
|
||||
uint64_t deadline = cluster_wide_time_microseconds_ + timeout_microseconds;
|
||||
const Time deadline = cluster_wide_time_microseconds_ + timeout;
|
||||
|
||||
std::any message(std::move(request));
|
||||
std::any message(request);
|
||||
OpaqueMessage om{.from_address = from_address, .request_id = request_id, .message = std::move(message)};
|
||||
in_flight_.emplace_back(std::make_pair(std::move(to_address), std::move(om)));
|
||||
in_flight_.emplace_back(std::make_pair(to_address, std::move(om)));
|
||||
|
||||
PromiseKey promise_key{.requester_address = from_address, .request_id = request_id, .replier_address = to_address};
|
||||
OpaquePromise opaque_promise(std::move(promise).ToUnique());
|
||||
@ -385,15 +141,15 @@ class SimulatorHandle {
|
||||
stats_.total_requests++;
|
||||
|
||||
cv_.notify_all();
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
template <Message... Ms>
|
||||
requires(sizeof...(Ms) > 0) RequestResult<Ms...> Receive(const Address &receiver, uint64_t timeout_microseconds) {
|
||||
requires(sizeof...(Ms) > 0) RequestResult<Ms...> Receive(const Address &receiver, Duration timeout) {
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
|
||||
uint64_t deadline = cluster_wide_time_microseconds_ + timeout_microseconds;
|
||||
blocked_on_receive_ += 1;
|
||||
|
||||
const Time deadline = cluster_wide_time_microseconds_ + timeout;
|
||||
|
||||
while (!should_shut_down_ && (cluster_wide_time_microseconds_ < deadline)) {
|
||||
if (can_receive_.contains(receiver)) {
|
||||
@ -404,21 +160,24 @@ class SimulatorHandle {
|
||||
|
||||
// TODO(tyler) search for item in can_receive_ that matches the desired types, rather
|
||||
// than asserting that the last item in can_rx matches.
|
||||
auto m_opt = message.Take<Ms...>();
|
||||
auto m_opt = std::move(message).Take<Ms...>();
|
||||
|
||||
blocked_on_receive_ -= 1;
|
||||
|
||||
return std::move(m_opt).value();
|
||||
}
|
||||
}
|
||||
|
||||
blocked_on_receive_ += 1;
|
||||
lock.unlock();
|
||||
bool made_progress = MaybeTickSimulator();
|
||||
lock.lock();
|
||||
if (!should_shut_down_ && !made_progress) {
|
||||
cv_.wait(lock);
|
||||
}
|
||||
blocked_on_receive_ -= 1;
|
||||
}
|
||||
|
||||
blocked_on_receive_ -= 1;
|
||||
|
||||
return TimedOut{};
|
||||
}
|
||||
|
||||
@ -434,10 +193,7 @@ class SimulatorHandle {
|
||||
cv_.notify_all();
|
||||
}
|
||||
|
||||
uint64_t Now() {
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
return cluster_wide_time_microseconds_;
|
||||
}
|
||||
Time Now() const;
|
||||
|
||||
template <class D = std::poisson_distribution<>, class Return = uint64_t>
|
||||
Return Rand(D distrib) {
|
||||
@ -445,9 +201,6 @@ class SimulatorHandle {
|
||||
return distrib(rng_);
|
||||
}
|
||||
|
||||
SimulatorStats Stats() {
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
return stats_;
|
||||
}
|
||||
SimulatorStats Stats();
|
||||
};
|
||||
}; // namespace memgraph::io::simulator
|
||||
|
@ -11,6 +11,8 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <cstdint>
|
||||
|
||||
namespace memgraph::io::simulator {
|
||||
struct SimulatorStats {
|
||||
uint64_t total_messages = 0;
|
||||
|
@ -16,33 +16,36 @@
|
||||
|
||||
#include "io/address.hpp"
|
||||
#include "io/simulator/simulator_handle.hpp"
|
||||
#include "io/time.hpp"
|
||||
|
||||
namespace memgraph::io::simulator {
|
||||
|
||||
using memgraph::io::Duration;
|
||||
using memgraph::io::Time;
|
||||
|
||||
class SimulatorTransport {
|
||||
std::shared_ptr<SimulatorHandle> simulator_handle_;
|
||||
Address address_;
|
||||
std::mt19937 rng_{};
|
||||
const Address address_;
|
||||
std::mt19937 rng_;
|
||||
|
||||
public:
|
||||
SimulatorTransport(std::shared_ptr<SimulatorHandle> simulator_handle, Address address, uint64_t seed)
|
||||
: simulator_handle_(simulator_handle), address_(address), rng_(std::mt19937{seed}) {}
|
||||
|
||||
template <Message Request, Message Response>
|
||||
ResponseFuture<Response> Request(Address address, uint64_t request_id, Request request,
|
||||
uint64_t timeout_microseconds) {
|
||||
std::function<bool()> maybe_tick_simulator = [=] { return simulator_handle_->MaybeTickSimulator(); };
|
||||
ResponseFuture<Response> Request(Address address, uint64_t request_id, Request request, Duration timeout) {
|
||||
std::function<bool()> maybe_tick_simulator = [this] { return simulator_handle_->MaybeTickSimulator(); };
|
||||
auto [future, promise] =
|
||||
memgraph::io::FuturePromisePairWithNotifier<ResponseResult<Response>>(maybe_tick_simulator);
|
||||
|
||||
simulator_handle_->SubmitRequest(address, address_, request_id, std::move(request), timeout_microseconds,
|
||||
std::move(promise));
|
||||
simulator_handle_->SubmitRequest(address, address_, request_id, std::move(request), timeout, std::move(promise));
|
||||
|
||||
return std::move(future);
|
||||
}
|
||||
|
||||
template <Message... Ms>
|
||||
requires(sizeof...(Ms) > 0) RequestResult<Ms...> Receive(uint64_t timeout_microseconds) {
|
||||
return simulator_handle_->template Receive<Ms...>(address_, timeout_microseconds);
|
||||
requires(sizeof...(Ms) > 0) RequestResult<Ms...> Receive(Duration timeout) {
|
||||
return simulator_handle_->template Receive<Ms...>(address_, timeout);
|
||||
}
|
||||
|
||||
template <Message M>
|
||||
@ -50,9 +53,9 @@ class SimulatorTransport {
|
||||
return simulator_handle_->template Send<M>(address, address_, request_id, message);
|
||||
}
|
||||
|
||||
uint64_t Now() { return simulator_handle_->Now(); }
|
||||
Time Now() const { return simulator_handle_->Now(); }
|
||||
|
||||
bool ShouldShutDown() { return simulator_handle_->ShouldShutDown(); }
|
||||
bool ShouldShutDown() const { return simulator_handle_->ShouldShutDown(); }
|
||||
|
||||
template <class D = std::poisson_distribution<>, class Return = uint64_t>
|
||||
Return Rand(D distrib) {
|
||||
|
@ -11,16 +11,11 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
/// Replicated State Machine-related code.
|
||||
namespace memgraph::io::rsm {
|
||||
#include <chrono>
|
||||
|
||||
/*
|
||||
template <typename T>
|
||||
concept ReplicatedStateMachine = true;
|
||||
requires(T a, uint8_t *ptr, size_t len) {
|
||||
{ a.Serialize() } -> std::same_as<std::vector<uint8_t>>;
|
||||
{ T::Deserialize(ptr, len) } -> std::same_as<T>;
|
||||
};
|
||||
*/
|
||||
namespace memgraph::io {
|
||||
|
||||
};
|
||||
using Duration = std::chrono::microseconds;
|
||||
using Time = std::chrono::time_point<std::chrono::local_t, Duration>;
|
||||
|
||||
} // namespace memgraph::io
|
@ -11,19 +11,21 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <chrono>
|
||||
#include <concepts>
|
||||
#include <random>
|
||||
#include <variant>
|
||||
|
||||
#include "utils/result.hpp"
|
||||
|
||||
#include "io/address.hpp"
|
||||
#include "io/errors.hpp"
|
||||
#include "io/future.hpp"
|
||||
#include "io/time.hpp"
|
||||
#include "utils/result.hpp"
|
||||
|
||||
namespace memgraph::io {
|
||||
|
||||
using memgraph::utils::BasicResult;
|
||||
|
||||
namespace memgraph::io {
|
||||
// TODO(tyler) ensure that Message continues to represent
|
||||
// reasonable constraints around message types over time,
|
||||
// as we adapt things to use Thrift-generated message types.
|
||||
@ -61,63 +63,61 @@ class Io {
|
||||
I implementation_;
|
||||
Address address_;
|
||||
uint64_t request_id_counter_ = 0;
|
||||
uint64_t default_timeout_microseconds_ = 50 * 1000;
|
||||
Duration default_timeout_ = std::chrono::microseconds{50000};
|
||||
|
||||
public:
|
||||
Io(I io, Address address) : implementation_(io), address_(address) {}
|
||||
|
||||
/// Set the default timeout for all requests that are issued
|
||||
/// without an explicit timeout set.
|
||||
void SetDefaultTimeoutMicroseconds(uint64_t timeout_microseconds) {
|
||||
default_timeout_microseconds_ = timeout_microseconds;
|
||||
}
|
||||
void SetDefaultTimeout(Duration timeout) { default_timeout_ = timeout; }
|
||||
|
||||
/// Issue a request with an explicit timeout in microseconds provided.
|
||||
/// Issue a request with an explicit timeout in microseconds provided. This tends to be used by clients.
|
||||
template <Message Request, Message Response>
|
||||
ResponseFuture<Response> RequestWithTimeout(Address address, Request request, uint64_t timeout_microseconds) {
|
||||
uint64_t request_id = ++request_id_counter_;
|
||||
return implementation_.template Request<Request, Response>(address, request_id, request, timeout_microseconds);
|
||||
ResponseFuture<Response> RequestWithTimeout(Address address, Request request, Duration timeout) {
|
||||
const uint64_t request_id = ++request_id_counter_;
|
||||
return implementation_.template Request<Request, Response>(address, request_id, request, timeout);
|
||||
}
|
||||
|
||||
/// Issue a request that times out after the default timeout.
|
||||
/// Issue a request that times out after the default timeout. This tends
|
||||
/// to be used by clients.
|
||||
template <Message Request, Message Response>
|
||||
ResponseFuture<Response> Request(Address address, Request request) {
|
||||
uint64_t request_id = ++request_id_counter_;
|
||||
uint64_t timeout_microseconds = default_timeout_microseconds_;
|
||||
return implementation_.template Request<Request, Response>(address, request_id, std::move(request),
|
||||
timeout_microseconds);
|
||||
const uint64_t request_id = ++request_id_counter_;
|
||||
const Duration timeout = default_timeout_;
|
||||
return implementation_.template Request<Request, Response>(address, request_id, std::move(request), timeout);
|
||||
}
|
||||
|
||||
/// Wait for an explicit number of microseconds for a request of one of the
|
||||
/// provided types to arrive.
|
||||
/// provided types to arrive. This tends to be used by servers.
|
||||
template <Message... Ms>
|
||||
RequestResult<Ms...> ReceiveWithTimeout(uint64_t timeout_microseconds) {
|
||||
return implementation_.template Receive<Ms...>(timeout_microseconds);
|
||||
RequestResult<Ms...> ReceiveWithTimeout(Duration timeout) {
|
||||
return implementation_.template Receive<Ms...>(timeout);
|
||||
}
|
||||
|
||||
/// Wait the default number of microseconds for a request of one of the
|
||||
/// provided types to arrive.
|
||||
/// provided types to arrive. This tends to be used by servers.
|
||||
template <Message... Ms>
|
||||
requires(sizeof...(Ms) > 0) RequestResult<Ms...> Receive() {
|
||||
uint64_t timeout_microseconds = default_timeout_microseconds_;
|
||||
return implementation_.template Receive<Ms...>(timeout_microseconds);
|
||||
const Duration timeout = default_timeout_;
|
||||
return implementation_.template Receive<Ms...>(timeout);
|
||||
}
|
||||
|
||||
/// Send a message in a best-effort fashion. If you need reliable delivery,
|
||||
/// this must be built on-top. TCP is not enough for most use cases.
|
||||
/// Send a message in a best-effort fashion. This is used for messaging where
|
||||
/// responses are not necessarily expected, and for servers to respond to requests.
|
||||
/// If you need reliable delivery, this must be built on-top. TCP is not enough for most use cases.
|
||||
template <Message M>
|
||||
void Send(Address address, uint64_t request_id, M message) {
|
||||
return implementation_.template Send<M>(address, request_id, std::move(message));
|
||||
}
|
||||
|
||||
/// The current system time in microseconds since the unix epoch.
|
||||
/// This time source should be preferred over any other, because it
|
||||
/// lets us deterministically control clocks from tests for making
|
||||
/// The current system time. This time source should be preferred over any other,
|
||||
/// because it lets us deterministically control clocks from tests for making
|
||||
/// things like timeouts deterministic.
|
||||
uint64_t Now() { return implementation_.Now(); }
|
||||
Time Now() const { return implementation_.Now(); }
|
||||
|
||||
/// Returns true of the system should shut-down.
|
||||
bool ShouldShutDown() { return implementation_.ShouldShutDown(); }
|
||||
/// Returns true if the system should shut-down.
|
||||
bool ShouldShutDown() const { return implementation_.ShouldShutDown(); }
|
||||
|
||||
/// Returns a random number within the specified distribution.
|
||||
template <class D = std::poisson_distribution<>, class Return = uint64_t>
|
||||
|
@ -62,3 +62,6 @@ target_link_libraries(${test_prefix}storage_v2_gc mg-storage-v2)
|
||||
|
||||
add_benchmark(storage_v2_property_store.cpp)
|
||||
target_link_libraries(${test_prefix}storage_v2_property_store mg-storage-v2)
|
||||
|
||||
add_benchmark(future.cpp)
|
||||
target_link_libraries(${test_prefix}future mg-io)
|
||||
|
30
tests/benchmark/future.cpp
Normal file
30
tests/benchmark/future.cpp
Normal file
@ -0,0 +1,30 @@
|
||||
// Copyright 2022 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include <benchmark/benchmark.h>
|
||||
|
||||
#include "io/future.hpp"
|
||||
|
||||
static void FuturePairFillWait(benchmark::State &state) {
|
||||
uint64_t counter = 0;
|
||||
while (state.KeepRunning()) {
|
||||
auto [future, promise] = memgraph::io::FuturePromisePair<int>();
|
||||
promise.Fill(1);
|
||||
std::move(future).Wait();
|
||||
|
||||
++counter;
|
||||
}
|
||||
state.SetItemsProcessed(counter);
|
||||
}
|
||||
|
||||
BENCHMARK(FuturePairFillWait)->Unit(benchmark::kNanosecond)->UseRealTime();
|
||||
|
||||
BENCHMARK_MAIN();
|
@ -14,7 +14,7 @@ function(add_simulation_test test_cpp san)
|
||||
# used to help create two targets of the same name even though CMake
|
||||
# requires unique logical target names
|
||||
set_target_properties(${target_name} PROPERTIES OUTPUT_NAME ${exec_name})
|
||||
target_link_libraries(${target_name} gtest gmock mg-utils mg-io mg-io-simulator mg-io-rsm)
|
||||
target_link_libraries(${target_name} gtest gmock mg-utils mg-io mg-io-simulator)
|
||||
|
||||
# sanitize
|
||||
target_compile_options(${target_name} PRIVATE -fsanitize=${san})
|
||||
@ -25,10 +25,6 @@ function(add_simulation_test test_cpp san)
|
||||
add_dependencies(memgraph__simulation ${target_name})
|
||||
endfunction(add_simulation_test)
|
||||
|
||||
add_simulation_test(future.cpp thread)
|
||||
|
||||
add_simulation_test(basic_request.cpp address)
|
||||
|
||||
add_simulation_test(raft.cpp address)
|
||||
|
||||
add_simulation_test(trial_query_storage/query_storage_test.cpp address)
|
||||
|
@ -30,11 +30,11 @@ struct CounterResponse {
|
||||
};
|
||||
|
||||
void run_server(Io<SimulatorTransport> io) {
|
||||
uint64_t highest_seen;
|
||||
uint64_t highest_seen = 0;
|
||||
|
||||
while (!io.ShouldShutDown()) {
|
||||
std::cout << "[SERVER] Is receiving..." << std::endl;
|
||||
auto request_result = io.ReceiveWithTimeout<CounterRequest>(100000);
|
||||
auto request_result = io.Receive<CounterRequest>();
|
||||
if (request_result.HasError()) {
|
||||
std::cout << "[SERVER] Error, continue" << std::endl;
|
||||
continue;
|
||||
@ -71,8 +71,8 @@ int main() {
|
||||
// send request
|
||||
CounterRequest cli_req;
|
||||
cli_req.proposal = i;
|
||||
auto res_f = cli_io.RequestWithTimeout<CounterRequest, CounterResponse>(srv_addr, cli_req, 1000);
|
||||
auto res_rez = res_f.Wait();
|
||||
auto res_f = cli_io.Request<CounterRequest, CounterResponse>(srv_addr, cli_req);
|
||||
auto res_rez = std::move(res_f).Wait();
|
||||
if (!res_rez.HasError()) {
|
||||
std::cout << "[CLIENT] Got a valid response" << std::endl;
|
||||
auto env = res_rez.GetValue();
|
||||
|
@ -15,8 +15,6 @@
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
// header
|
||||
|
||||
namespace memgraph::tests::simulation {
|
||||
|
||||
struct Vertex {
|
||||
|
@ -26,7 +26,7 @@ using memgraph::io::simulator::SimulatorTransport;
|
||||
void run_server(Io<SimulatorTransport> io) {
|
||||
while (!io.ShouldShutDown()) {
|
||||
std::cout << "[STORAGE] Is receiving..." << std::endl;
|
||||
auto request_result = io.ReceiveWithTimeout<ScanVerticesRequest>(100000);
|
||||
auto request_result = io.Receive<ScanVerticesRequest>();
|
||||
if (request_result.HasError()) {
|
||||
std::cout << "[STORAGE] Error, continue" << std::endl;
|
||||
continue;
|
||||
@ -78,9 +78,8 @@ int main() {
|
||||
|
||||
auto req = ScanVerticesRequest{2, std::nullopt};
|
||||
|
||||
auto res_f = cli_io.RequestWithTimeout<ScanVerticesRequest, VerticesResponse>(srv_addr, req, 1000);
|
||||
auto res_rez = res_f.Wait();
|
||||
// MG_ASSERT(res_rez.HasError());
|
||||
auto res_f = cli_io.Request<ScanVerticesRequest, VerticesResponse>(srv_addr, req);
|
||||
auto res_rez = std::move(res_f).Wait();
|
||||
simulator.ShutDown();
|
||||
return 0;
|
||||
}
|
||||
|
@ -365,3 +365,7 @@ target_link_libraries(${test_prefix}websocket mg-communication Boost::headers)
|
||||
# Test storage-v3
|
||||
add_unit_test(storage_v3.cpp)
|
||||
target_link_libraries(${test_prefix}storage_v3 mg-storage-v3)
|
||||
|
||||
# Test future
|
||||
add_unit_test(future.cpp)
|
||||
target_link_libraries(${test_prefix}future mg-io)
|
||||
|
@ -12,20 +12,21 @@
|
||||
#include <string>
|
||||
#include <thread>
|
||||
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
#include "io/future.hpp"
|
||||
#include "utils/logging.hpp"
|
||||
|
||||
using namespace memgraph::io;
|
||||
|
||||
void Fill(Promise<std::string> promise_1) { promise_1.Fill("success"); }
|
||||
|
||||
void Wait(Future<std::string> future_1, Promise<std::string> promise_2) {
|
||||
std::string result_1 = future_1.Wait();
|
||||
MG_ASSERT(result_1 == "success");
|
||||
std::string result_1 = std::move(future_1).Wait();
|
||||
EXPECT_TRUE(result_1 == "success");
|
||||
promise_2.Fill("it worked");
|
||||
}
|
||||
|
||||
int main() {
|
||||
TEST(Future, BasicLifecycle) {
|
||||
std::atomic_bool waiting = false;
|
||||
|
||||
std::function<bool()> notifier = [&] {
|
||||
@ -49,8 +50,6 @@ int main() {
|
||||
t1.join();
|
||||
t2.join();
|
||||
|
||||
std::string result_2 = future_2.Wait();
|
||||
MG_ASSERT(result_2 == "it worked");
|
||||
|
||||
return 0;
|
||||
std::string result_2 = std::move(future_2).Wait();
|
||||
EXPECT_TRUE(result_2 == "it worked");
|
||||
}
|
Loading…
Reference in New Issue
Block a user