Rearrange some things to better support thrift transport

This commit is contained in:
Tyler Neely 2022-08-11 16:46:17 +00:00
parent 0083fa8a94
commit 10c9f349a6
4 changed files with 42 additions and 38 deletions

View File

@ -13,11 +13,28 @@
#include "io/transport.hpp"
namespace memgraph::io::simulator {
namespace memgraph::io {
using memgraph::io::Duration;
using memgraph::io::Message;
using memgraph::io::Time;
struct PromiseKey {
Address requester_address;
uint64_t request_id;
// TODO(tyler) possibly remove replier_address from promise key
// once we want to support DSR.
Address replier_address;
public:
friend bool operator<(const PromiseKey &lhs, const PromiseKey &rhs) {
if (lhs.requester_address != rhs.requester_address) {
return lhs.requester_address < rhs.requester_address;
}
if (lhs.request_id != rhs.request_id) {
return lhs.request_id < rhs.request_id;
}
return lhs.replier_address < rhs.replier_address;
}
};
struct OpaqueMessage {
Address from_address;
@ -169,4 +186,9 @@ class OpaquePromise {
}
};
} // namespace memgraph::io::simulator
struct DeadlineAndOpaquePromise {
Time deadline;
OpaquePromise promise;
};
} // namespace memgraph::io

View File

@ -24,7 +24,7 @@
#include "io/address.hpp"
#include "io/errors.hpp"
#include "io/simulator/message_conversion.hpp"
#include "io/message_conversion.hpp"
#include "io/simulator/simulator_config.hpp"
#include "io/simulator/simulator_stats.hpp"
#include "io/time.hpp"
@ -33,34 +33,10 @@
namespace memgraph::io::simulator {
using memgraph::io::Duration;
using memgraph::io::OpaqueMessage;
using memgraph::io::OpaquePromise;
using memgraph::io::Time;
struct PromiseKey {
Address requester_address;
uint64_t request_id;
// TODO(tyler) possibly remove replier_address from promise key
// once we want to support DSR.
Address replier_address;
public:
friend bool operator<(const PromiseKey &lhs, const PromiseKey &rhs) {
if (lhs.requester_address != rhs.requester_address) {
return lhs.requester_address < rhs.requester_address;
}
if (lhs.request_id != rhs.request_id) {
return lhs.request_id < rhs.request_id;
}
return lhs.replier_address < rhs.replier_address;
}
};
struct DeadlineAndOpaquePromise {
Time deadline;
OpaquePromise promise;
};
class SimulatorHandle {
mutable std::mutex mu_{};
mutable std::condition_variable cv_;

View File

@ -18,6 +18,12 @@ namespace memgraph::io::thrift {
using memgraph::io::Address;
class ThriftHandle {
// the responses to requests that are being waited on
std::map<PromiseKey, DeadlineAndOpaquePromise> promises_;
// messages that are sent to servers that may later receive them
std::map<Address, std::vector<OpaqueMessage>> can_receive_;
public:
template <Message Request, Message Response>
void SubmitRequest(Address to_address, Address from_address, uint64_t request_id, Request &&request, Duration timeout,

View File

@ -27,31 +27,31 @@ using memgraph::io::Duration;
using memgraph::io::Time;
class ThriftTransport {
std::shared_ptr<ThriftHandle> simulator_handle_;
std::shared_ptr<ThriftHandle> thrift_handle_;
const Address address_;
std::random_device rng_;
public:
ThriftTransport(std::shared_ptr<ThriftHandle> simulator_handle, Address address)
: simulator_handle_(simulator_handle), address_(address) {}
ThriftTransport(std::shared_ptr<ThriftHandle> thrift_handle, Address address)
: thrift_handle_(thrift_handle), address_(address) {}
template <Message Request, Message Response>
ResponseFuture<Response> Request(Address address, uint64_t request_id, Request request, Duration timeout) {
auto [future, promise] = memgraph::io::FuturePromisePairWithNotifier<ResponseResult<Response>>();
simulator_handle_->SubmitRequest(address, address_, request_id, std::move(request), timeout, std::move(promise));
thrift_handle_->SubmitRequest(address, address_, request_id, std::move(request), timeout, std::move(promise));
return std::move(future);
}
template <Message... Ms>
requires(sizeof...(Ms) > 0) RequestResult<Ms...> Receive(Duration timeout) {
return simulator_handle_->template Receive<Ms...>(address_, timeout);
return thrift_handle_->template Receive<Ms...>(address_, timeout);
}
template <Message M>
void Send(Address address, uint64_t request_id, M message) {
return simulator_handle_->template Send<M>(address, address_, request_id, message);
return thrift_handle_->template Send<M>(address, address_, request_id, message);
}
Time Now() const {