diff --git a/src/io/CMakeLists.txt b/src/io/CMakeLists.txt index 128e87114..80c3f77cc 100644 --- a/src/io/CMakeLists.txt +++ b/src/io/CMakeLists.txt @@ -1,3 +1,5 @@ +include(MgThrift) + set(io_src_files network/addrinfo.cpp network/endpoint.cpp @@ -8,4 +10,4 @@ find_package(fmt REQUIRED) find_package(Threads REQUIRED) add_library(mg-io STATIC ${io_src_files}) -target_link_libraries(mg-io stdc++fs Threads::Threads fmt::fmt mg-utils) +target_link_libraries(mg-io stdc++fs Threads::Threads fmt::fmt mg-utils FBThrift::thriftcpp2 mg-interface-ubermessage-cpp2) diff --git a/src/io/thrift/thrift_handle.hpp b/src/io/thrift/thrift_handle.hpp index 2f435b04f..5cbf4b15d 100644 --- a/src/io/thrift/thrift_handle.hpp +++ b/src/io/thrift/thrift_handle.hpp @@ -19,12 +19,12 @@ #include <boost/asio/ip/tcp.hpp> #include <boost/lexical_cast.hpp> -// #include <folly/init/Init.h> -// #include <folly/io/SocketOptionMap.h> -// #include <folly/io/async/AsyncServerSocket.h> -// #include <folly/net/NetworkSocket.h> -// #include <thrift/lib/cpp2/async/HeaderClientChannel.h> -// #include <thrift/lib/cpp2/server/ThriftServer.h> +#include <folly/init/Init.h> +#include <folly/io/SocketOptionMap.h> +#include <folly/io/async/AsyncServerSocket.h> +#include <folly/net/NetworkSocket.h> +#include <thrift/lib/cpp2/async/HeaderClientChannel.h> +#include <thrift/lib/cpp2/server/ThriftServer.h> // From generated code #include "interface/gen-cpp2/UberServer.h" @@ -37,7 +37,6 @@ namespace memgraph::io::thrift { using namespace apache::thrift; -// using namespace cpp2; using namespace folly; using memgraph::io::Address; @@ -50,7 +49,7 @@ class ThriftHandle { mutable std::condition_variable cv_; const Address address_ = Address::TestAddress(0); - // EventBase base_; + EventBase base_; // the responses to requests that are being waited on std::map<PromiseKey, DeadlineAndOpaquePromise> promises_; @@ -59,13 +58,22 @@ class ThriftHandle { std::vector<OpaqueMessage> can_receive_; // TODO(tyler) thrift clients for each outbound address combination - std::map<Address, cpp2::UberServerAsyncClient> clients_; + // AsyncClient does not offer default init so they are optional atm. + std::map<Address, std::optional<cpp2::UberServerAsyncClient>> clients_; // TODO(gabor) make this to a threadpool // uuid of the address -> port number where the given rsm is residing. // TODO(gabor) The RSM map should not be a part of this class. // std::map<boost::uuids::uuid, uint16_t /*this should be the actual RSM*/> rsm_map_; + cpp2::Address convertToUberAddress(const memgraph::io::Address &address) { + cpp2::Address ret_address; + ret_address.unique_id_ref() = boost::uuids::to_string(address.unique_id); + ret_address.last_known_ip_ref() = address.last_known_ip.to_string(); + ret_address.last_known_port_ref() = static_cast<int32_t>(address.last_known_port); + return ret_address; + } + public: explicit ThriftHandle(Address our_address) : address_(our_address) {} @@ -165,29 +173,41 @@ class ThriftHandle { void Send(Address to_address, Address from_address, RequestId request_id, M message) { // TODO(tyler) call thrift client for address (or create one if it doesn't exist yet) - // if(clients_.contains(to_address)) - // { - // const auto &client = clients_[to_address]; - // client.sync_Send(message); - // } - // else{ - // // maybe make this into a member var - // const auto& other_ip = to_address.last_known_ip.to_string(); - // const auto& other_port = to_address.last_known_port; - // auto socket(folly::AsyncSocket::newSocket(&base_, other_ip, other_port)); - // auto client_channel = HeaderClientChannel::newChannel(std::move(socket)); - // // Create a client object - // EchoAsyncClient client(std::move(client_channel)); + cpp2::UberMessage uber_message; - // client.sync_Send(message); - // } + uber_message.to_address_ref() = convertToUberAddress(to_address); + uber_message.from_address_ref() = convertToUberAddress(from_address); + uber_message.request_id_ref() = static_cast<int64_t>(request_id); + // uber_message.high_level_union() = message; + + // cpp2::UberMessage uber_message = { + // .to_address = convertToUberAddress(to_address), + // .from_address = convertToUberAddress(from_address), + // .request_id = static_cast<int64_t>(request_id), + // .high_level_union = message + // }; + + if (clients_.contains(to_address)) { + auto &client = clients_[to_address]; + client->sync_ReceiveUberMessage(uber_message); + } else { + // maybe make this into a member var + const auto &other_ip = to_address.last_known_ip.to_string(); + const auto &other_port = to_address.last_known_port; + auto socket(folly::AsyncSocket::newSocket(&base_, other_ip, other_port)); + auto client_channel = HeaderClientChannel::newChannel(std::move(socket)); + // Create a client object + cpp2::UberServerAsyncClient client(std::move(client_channel)); + + client.sync_ReceiveUberMessage(uber_message); + } } }; class UberMessageService final : cpp2::UberServerSvIf { std::shared_ptr<ThriftHandle> handle_; - memgraph::io::Address convertToMgAddress(const cpp2::Address address) { + memgraph::io::Address convertToMgAddress(const cpp2::Address &address) { memgraph::io::Address ret_address; ret_address = {.unique_id{boost::lexical_cast<boost::uuids::uuid>(address.get_unique_id())}, .last_known_ip{boost::asio::ip::make_address(address.get_last_known_ip())},