Add OpaquePromise type with correct dtor. Make a few structs comparable for storage in maps. Fix initialization of Shared

This commit is contained in:
Tyler Neely 2022-07-06 15:19:16 +00:00
parent f601e83f6f
commit a0058bc10a
6 changed files with 111 additions and 29 deletions

View File

@ -11,6 +11,8 @@
#pragma once
#include <compare>
#include <boost/asio/ip/tcp.hpp>
#include <boost/uuid/uuid.hpp>
@ -31,4 +33,16 @@ struct Address {
ret.last_known_port = port;
return ret;
}
bool operator==(const Address &other) const {
return (last_known_ip == other.last_known_ip) && (last_known_port == other.last_known_port);
}
bool operator<(const Address &other) const {
if (last_known_ip == other.last_known_ip) {
return last_known_port < other.last_known_port;
} else {
return last_known_ip < other.last_known_ip;
}
}
};

View File

@ -159,6 +159,8 @@ class MgPromise {
friend std::pair<MgFuture<T>, MgPromise<T>> FuturePromisePairWithNotifier<T>(std::function<void()>);
public:
MgPromise(std::shared_ptr<Shared<T>> shared) : shared_(shared) {}
MgPromise(MgPromise &&old) {
shared_ = std::move(old.shared_);
MG_ASSERT(!old.filled_or_moved_, "MgPromise moved from after already being moved from or filled.");
@ -183,9 +185,16 @@ class MgPromise {
bool IsAwaited() { return shared_->IsAwaited(); }
private:
MgPromise(std::shared_ptr<Shared<T>> shared) : shared_(shared) {}
/// Moves this MgPromise into a unique_ptr.
std::unique_ptr<MgPromise<T>> ToUnique() && {
std::unique_ptr<MgPromise<T>> up = std::make_unique<MgPromise<T>>(std::move(shared_));
filled_or_moved_ = true;
return up;
}
private:
std::shared_ptr<Shared<T>> shared_;
bool filled_or_moved_ = false;
};

View File

@ -41,12 +41,12 @@ class SimulatorTransport {
return std::move(future);
}
/*
template <Message... Ms>
RequestResult<Ms...> Receive(uint64_t timeout_microseconds) {
return simulator_handle_->template Receive<Ms...>(timeout_microseconds);
}
template <Message... Ms>
RequestResult<Ms...> Receive(uint64_t timeout_microseconds) {
return simulator_handle_->template Receive<Ms...>(timeout_microseconds);
}
/*
template <Message M>
void Send(Address address, uint64_t request_id, M message) {
return simulator_handle_->template Send<M>(address, request_id, message);
@ -69,5 +69,5 @@ class Simulator {
}
private:
std::shared_ptr<SimulatorHandle> simulator_handle_;
std::shared_ptr<SimulatorHandle> simulator_handle_{std::make_shared<SimulatorHandle>()};
};

View File

@ -11,6 +11,7 @@
#pragma once
#include <compare>
#include <map>
#include <vector>
@ -28,11 +29,71 @@ struct PromiseKey {
Address requester;
uint64_t request_id;
Address replier;
public:
bool operator<(const PromiseKey &other) const {
if (requester == other.requester) {
return request_id < other.request_id;
} else {
return requester < other.requester;
}
}
};
struct OpaquePromise {
// TODO delete copy ctor & copy assignment operator if possible
class OpaquePromise {
public:
OpaquePromise(OpaquePromise &&old) : ti_(old.ti_) {
ptr_ = old.ptr_;
old.ptr_ = nullptr;
}
OpaquePromise &operator=(OpaquePromise &&old) {
MG_ASSERT(this != &old);
ptr_ = old.ptr_;
ti_ = old.ti_;
old.ptr_ = nullptr;
return *this;
}
OpaquePromise(const OpaquePromise &) = delete;
OpaquePromise &operator=(const OpaquePromise &) = delete;
template <typename T>
std::unique_ptr<MgPromise<T>> Take() {
MG_ASSERT(typeid(T) == *ti_);
MG_ASSERT(ptr_ != nullptr);
MgPromise<T> *ptr = static_cast<MgPromise<T> *>(ptr_);
ptr_ = nullptr;
return std::unique_ptr<T>(ptr);
}
template <typename T>
OpaquePromise(std::unique_ptr<MgPromise<T>> promise)
: ti_(&typeid(T)),
ptr_((void *)promise.release()),
dtor_([](void *ptr) { static_cast<MgPromise<T> *>(ptr)->~MgPromise<T>(); }) {}
~OpaquePromise() {
if (nullptr != ptr_) {
dtor_(ptr_);
}
}
private:
const std::type_info *ti_;
void *ptr_;
std::function<void(void *)> dtor_;
};
struct DeadlineAndOpaquePromise {
uint64_t deadline;
std::any promise;
OpaquePromise promise;
};
class SimulatorHandle {
@ -58,12 +119,10 @@ class SimulatorHandle {
OpaqueMessage om{.address = from_addr, .request_id = request_id, .message = std::move(message)};
in_flight_.emplace_back(std::make_pair(std::move(to_addr), std::move(om)));
/*
std::any opaque_promise(std::move(promise));
PromiseKey pk { .requester=from_addr, .request_id=request_id, .replier=to_addr };
OpaquePromise op { .deadline=deadline, .promise=std::move(opaque_promise) };
promises_.insert(std::make_pair(std::move(pk), std::move(op)));
*/
OpaquePromise opaque_promise(std::move(promise).ToUnique());
PromiseKey pk{.requester = from_addr, .request_id = request_id, .replier = to_addr};
DeadlineAndOpaquePromise op{.deadline = deadline, .promise = std::move(opaque_promise)};
promises_.emplace(std::move(pk), std::move(op));
stats_.total_messages_++;
stats_.total_requests_++;
@ -71,11 +130,11 @@ class SimulatorHandle {
return;
}
template <Message... Ms>
RequestResult<Ms...> Receive(uint64_t timeout_microseconds) {
std::terminate();
}
/*
template <Message... Ms>
RequestResult<Ms...> Receive(uint64_t timeout_microseconds) {
std::abort();
}
template <Message M>
void Send(Address address, uint64_t request_id, M message) {
@ -84,11 +143,11 @@ class SimulatorHandle {
*/
private:
std::mutex mu_;
std::mutex mu_{};
std::condition_variable cv_sim_;
std::condition_variable cv_srv_;
std::vector<std::pair<Address, OpaqueMessage>> in_flight_;
std::map<PromiseKey, OpaquePromise> promises_;
std::map<PromiseKey, DeadlineAndOpaquePromise> promises_;
std::map<Address, OpaqueMessage> can_receive_;
uint64_t cluster_wide_time_microseconds_ = 0;
bool shut_down_;

View File

@ -9,6 +9,8 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
// TODO chrono::microseconds instead of std::time_t
#pragma once
#include <concepts>
@ -61,13 +63,13 @@ class Io {
public:
Io(I io, Address address) : implementation_(io), address_(address) {}
/// Set the default time-out for all requests that are issued
/// without an explicit time-out set.
/// Set the default timeout for all requests that are issued
/// without an explicit timeout set.
void SetDefaultTimeoutMicroseconds(uint64_t timeout_microseconds) {
default_timeout_microseconds_ = timeout_microseconds;
}
/// Issue a request with an explicit time-out in microseconds provided.
/// Issue a request with an explicit timeout in microseconds provided.
template <Message Request, Message Response>
ResponseFuture<Response> RequestWithTimeout(Address address, Request request, uint64_t timeout_microseconds) {
uint64_t request_id = ++request_id_counter_;

View File

@ -14,8 +14,6 @@
#include <string>
#include <vector>
#include <spdlog/spdlog.h>
#include "io/v3/simulator.hpp"
#include "utils/terminate_handler.hpp"
@ -47,8 +45,8 @@ int main() {
RequestMsg cli_req;
ResponseFuture<ResponseMsg> response_future = cli_io.template Request<RequestMsg, ResponseMsg>(srv_addr, cli_req);
// // receive request
// RequestResult<Request> request_result = sim_io_2.template ReceiveTimeout<Request>();
// receive request
RequestResult<RequestMsg> request_result = srv_io.template Receive<RequestMsg>();
// auto req_envelope = request_result.GetValue();
// Request req = std::get<Request>(req_envelope.message);
//