Continue structuring the simulator and filling in its implementation

This commit is contained in:
Tyler Neely 2022-07-05 21:11:39 +00:00
parent e1aab7065f
commit f601e83f6f
6 changed files with 88 additions and 41 deletions

View File

@ -4,7 +4,8 @@ set(io_v3_sources
future.hpp
transport.hpp
simulator.hpp
simulator_handle.hpp)
simulator_handle.hpp
simulator_stats.hpp)
find_package(fmt REQUIRED)
find_package(Threads REQUIRED)

View File

@ -19,14 +19,6 @@
#include "simulator_handle.hpp"
#include "transport.hpp"
struct SimulatorStats {
uint64_t total_messages_;
uint64_t dropped_messages_;
uint64_t total_requests_;
uint64_t total_responses_;
uint64_t simulator_ticks_;
};
struct SimulatorConfig {
uint8_t drop_percent_;
uint64_t rng_seed_;
@ -38,20 +30,21 @@ class SimulatorTransport {
: simulator_handle_(simulator_handle), address_(address) {}
template <Message Request, Message Response>
ResponseFuture<Response> RequestTimeout(Address address, uint64_t request_id, Request request,
uint64_t timeout_microseconds) {
ResponseFuture<Response> Request(Address address, uint64_t request_id, Request request,
uint64_t timeout_microseconds) {
std::function<void()> notifier = [=] { simulator_handle_->NotifySimulator(); };
auto [future, promise] = FuturePromisePairWithNotifier<ResponseResult<Response>>(notifier);
simulator_handle_->SubmitRequest(address, request_id, request, timeout_microseconds, std::move(promise));
simulator_handle_->SubmitRequest(address, address_, request_id, std::move(request), timeout_microseconds,
std::move(promise));
return std::move(future);
}
/*
template <Message... Ms>
RequestResult<Ms...> ReceiveTimeout(uint64_t timeout_microseconds) {
return simulator_handle_->template ReceiveTimeout<Ms...>(timeout_microseconds);
RequestResult<Ms...> Receive(uint64_t timeout_microseconds) {
return simulator_handle_->template Receive<Ms...>(timeout_microseconds);
}
template <Message M>

View File

@ -12,14 +12,16 @@
#pragma once
#include <map>
#include <vector>
#include "address.hpp"
#include "simulator_stats.hpp"
#include "transport.hpp"
struct OpaqueMessage {
Address address;
uint64_t request_id;
std::unique_ptr<std::any> message;
std::any message;
};
struct PromiseKey {
@ -29,8 +31,8 @@ struct PromiseKey {
};
struct OpaquePromise {
time_t deadline;
std::unique_ptr<std::any> promise;
uint64_t deadline;
std::any promise;
};
class SimulatorHandle {
@ -46,14 +48,32 @@ class SimulatorHandle {
}
template <Message Request, Message Response>
void SubmitRequest(Address address, uint64_t request_id, Request request, uint64_t timeout_microseconds,
MgPromise<ResponseResult<Response>> promise) {
void SubmitRequest(Address to_addr, Address from_addr, uint64_t request_id, Request &&request,
uint64_t timeout_microseconds, MgPromise<ResponseResult<Response>> &&promise) {
std::unique_lock<std::mutex> lock(mu_);
uint64_t deadline = cluster_wide_time_microseconds_ + timeout_microseconds;
std::any message(std::move(request));
OpaqueMessage om{.address = from_addr, .request_id = request_id, .message = std::move(message)};
in_flight_.emplace_back(std::make_pair(std::move(to_addr), std::move(om)));
/*
std::any opaque_promise(std::move(promise));
PromiseKey pk { .requester=from_addr, .request_id=request_id, .replier=to_addr };
OpaquePromise op { .deadline=deadline, .promise=std::move(opaque_promise) };
promises_.insert(std::make_pair(std::move(pk), std::move(op)));
*/
stats_.total_messages_++;
stats_.total_requests_++;
return;
}
/*
template <Message... Ms>
RequestResult<Ms...> ReceiveTimeout(uint64_t timeout_microseconds) {
RequestResult<Ms...> Receive(uint64_t timeout_microseconds) {
std::abort();
}
@ -67,8 +87,10 @@ class SimulatorHandle {
std::mutex mu_;
std::condition_variable cv_sim_;
std::condition_variable cv_srv_;
std::map<Address, std::vector<OpaqueMessage>> in_flight_;
std::map<PromiseKey, OpaquePromise> promises;
std::map<Address, OpaqueMessage> can_receive;
std::vector<std::pair<Address, OpaqueMessage>> in_flight_;
std::map<PromiseKey, OpaquePromise> promises_;
std::map<Address, OpaqueMessage> can_receive_;
uint64_t cluster_wide_time_microseconds_ = 0;
bool shut_down_;
SimulatorStats stats_;
};

View File

@ -0,0 +1,20 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
struct SimulatorStats {
uint64_t total_messages_;
uint64_t dropped_messages_;
uint64_t total_requests_;
uint64_t total_responses_;
uint64_t simulator_ticks_;
};

View File

@ -61,43 +61,54 @@ class Io {
public:
Io(I io, Address address) : implementation_(io), address_(address) {}
/// Set the default time-out for all requests that are issued
/// without an explicit time-out set.
void SetDefaultTimeoutMicroseconds(uint64_t timeout_microseconds) {
default_timeout_microseconds_ = timeout_microseconds;
}
/// Issue a request with an explicit time-out in microseconds provided.
template <Message Request, Message Response>
ResponseFuture<Response> RequestTimeout(Address address, Request request, uint64_t timeout_microseconds) {
ResponseFuture<Response> RequestWithTimeout(Address address, Request request, uint64_t timeout_microseconds) {
uint64_t request_id = ++request_id_counter_;
return implementation_.template RequestTimeout<Request, Response>(address, request_id, request,
timeout_microseconds);
return implementation_.template Request<Request, Response>(address, request_id, request, timeout_microseconds);
}
/// Issue a request that times out after the default timeout.
template <Message Request, Message Response>
ResponseFuture<Response> RequestTimeout(Address address, Request request) {
ResponseFuture<Response> Request(Address address, Request request) {
uint64_t request_id = ++request_id_counter_;
uint64_t timeout_microseconds = default_timeout_microseconds_;
return implementation_.template RequestTimeout<Request, Response>(address, request_id, request,
timeout_microseconds);
return implementation_.template Request<Request, Response>(address, request_id, request, timeout_microseconds);
}
/// Wait for an explicit number of microseconds for a request of one of the
/// provided types to arrive.
template <Message... Ms>
RequestResult<Ms...> ReceiveTimeout(uint64_t timeout_microseconds) {
return implementation_.template ReceiveTimeout<Ms...>(timeout_microseconds);
RequestResult<Ms...> ReceiveWithTimeout(uint64_t timeout_microseconds) {
return implementation_.template Receive<Ms...>(timeout_microseconds);
}
/// Wait the default number of microseconds for a request of one of the
/// provided types to arrive.
template <Message... Ms>
RequestResult<Ms...> ReceiveTimeout() {
RequestResult<Ms...> Receive() {
uint64_t timeout_microseconds = default_timeout_microseconds_;
return implementation_.template ReceiveTimeout<Ms...>(timeout_microseconds);
return implementation_.template Receive<Ms...>(timeout_microseconds);
}
/// Send a message in a best-effort fashion. If you need reliable delivery,
/// this must be built on-top. TCP is not enough for most use cases.
template <Message M>
void Send(Address address, uint64_t request_id, M message) {
return implementation_.template Send<M>(address, request_id, message);
}
/// The current system time. This time source should be preferred over
/// any other time source.
std::time_t Now() { return implementation_.Now(); }
/// Returns true of the system should shut-down.
bool ShouldShutDown() { return implementation_.ShouldShutDown(); }
private:

View File

@ -19,20 +19,20 @@
#include "io/v3/simulator.hpp"
#include "utils/terminate_handler.hpp"
struct Request {
struct RequestMsg {
std::string data;
std::vector<uint8_t> Serialize() { return std::vector<uint8_t>(); }
static Request Deserialize(uint8_t *ptr, size_t len) { return Request{}; }
static RequestMsg Deserialize(uint8_t *ptr, size_t len) { return RequestMsg{}; }
};
struct Response {
struct ResponseMsg {
std::string data;
std::vector<uint8_t> Serialize() { return std::vector<uint8_t>(); }
static Response Deserialize(uint8_t *ptr, size_t len) { return Response{}; }
static ResponseMsg Deserialize(uint8_t *ptr, size_t len) { return ResponseMsg{}; }
};
int main() {
@ -41,18 +41,18 @@ int main() {
auto srv_addr = Address::TestAddress(2);
Io<SimulatorTransport> cli_io = simulator.Register(cli_addr, true);
// Io<SimulatorTransport> srv_io = simulator.Register(srv_addr, true);
Io<SimulatorTransport> srv_io = simulator.Register(srv_addr, true);
// send request
Request cli_req;
ResponseFuture<Response> response_future = cli_io.template RequestTimeout<Request, Response>(srv_addr, cli_req);
RequestMsg cli_req;
ResponseFuture<ResponseMsg> response_future = cli_io.template Request<RequestMsg, ResponseMsg>(srv_addr, cli_req);
// // receive request
// RequestResult<Request> request_result = sim_io_2.template ReceiveTimeout<Request>();
// auto req_envelope = request_result.GetValue();
// Request req = std::get<Request>(req_envelope.message);
//
// auto srv_res = Response{req.data};
// auto srv_res = ResponseMsg{req.data};
//
// // send response
// sim_io_2.Send(req_envelope.from, req_envelope.request_id, srv_res);