diff --git a/src/interface/echo.thrift b/src/interface/echo.thrift index b7059f828..2eb3fad24 100644 --- a/src/interface/echo.thrift +++ b/src/interface/echo.thrift @@ -2,6 +2,19 @@ struct EchoMessage { 1: binary message; } +struct Address{ + 1: string unique_id; + 2: string last_known_ip; + 3: i32 last_known_port; +} + +struct CompoundMessage{ + 1: Address to_address + 2: Address from_address + 3: binary message +} + service Echo { oneway void ReceiveSend(1: EchoMessage m) + oneway void RecieveCompoundThriftMessage(1: CompoundMessage m) } diff --git a/src/io/message_conversion.hpp b/src/io/message_conversion.hpp index 8d4fcd84d..a8dc8ae4f 100644 --- a/src/io/message_conversion.hpp +++ b/src/io/message_conversion.hpp @@ -38,6 +38,7 @@ struct PromiseKey { struct OpaqueMessage { Address from_address; + Address to_address; uint64_t request_id; std::any message; diff --git a/src/io/thrift/thrift_handle.hpp b/src/io/thrift/thrift_handle.hpp index 82b0a4e90..0d7e4fd8d 100644 --- a/src/io/thrift/thrift_handle.hpp +++ b/src/io/thrift/thrift_handle.hpp @@ -38,6 +38,11 @@ class ThriftHandle { // TODO(tyler) thrift clients for each outbound address combination std::map<Address, void *> clients_; + // TODO(gabor) make this to a threadpool + // uuid of the address -> port number where the given rsm is residing. + // TODO(gabor) The RSM map should not be a part of this class. + std::map<boost::uuids::uuid, uint16_t /*this should be the actual RSM*/> rsm_map_; + public: template <Message M> void DeliverMessage(Address from_address, RequestId request_id, M &&message) { @@ -62,6 +67,20 @@ class ThriftHandle { template <Message... Ms> requires(sizeof...(Ms) > 0) RequestResult<Ms...> Receive(const Address &receiver, Duration timeout) { // TODO(tyler) block for the specified duration on the Inbox's receipt of a message of this type. + std::unique_lock lock(mu_); + cv_.wait(lock, [this] { return !can_receive_.empty(); }); + + while (!can_receive_.empty()) { + auto current_message = can_receive_.back(); + can_receive_.pop_back(); + + // Logic to determine who to send the message. + // + auto destination_id = current_message.to_address.unique_id; + auto destination_port = rsm_map_.at(destination_id); + + // Send it to the port of the destination -how? + } } template <Message M>