Bridge UberMessage with internal memgraph messages and implement ThriftHandle::Send()

This commit is contained in:
gvolfing 2022-08-22 09:37:25 +02:00
parent e2c2a52f79
commit 38db09fa35
2 changed files with 48 additions and 26 deletions

View File

@ -1,3 +1,5 @@
include(MgThrift)
set(io_src_files
network/addrinfo.cpp
network/endpoint.cpp
@ -8,4 +10,4 @@ find_package(fmt REQUIRED)
find_package(Threads REQUIRED)
add_library(mg-io STATIC ${io_src_files})
target_link_libraries(mg-io stdc++fs Threads::Threads fmt::fmt mg-utils)
target_link_libraries(mg-io stdc++fs Threads::Threads fmt::fmt mg-utils FBThrift::thriftcpp2 mg-interface-ubermessage-cpp2)

View File

@ -19,12 +19,12 @@
#include <boost/asio/ip/tcp.hpp>
#include <boost/lexical_cast.hpp>
// #include <folly/init/Init.h>
// #include <folly/io/SocketOptionMap.h>
// #include <folly/io/async/AsyncServerSocket.h>
// #include <folly/net/NetworkSocket.h>
// #include <thrift/lib/cpp2/async/HeaderClientChannel.h>
// #include <thrift/lib/cpp2/server/ThriftServer.h>
#include <folly/init/Init.h>
#include <folly/io/SocketOptionMap.h>
#include <folly/io/async/AsyncServerSocket.h>
#include <folly/net/NetworkSocket.h>
#include <thrift/lib/cpp2/async/HeaderClientChannel.h>
#include <thrift/lib/cpp2/server/ThriftServer.h>
// From generated code
#include "interface/gen-cpp2/UberServer.h"
@ -37,7 +37,6 @@
namespace memgraph::io::thrift {
using namespace apache::thrift;
// using namespace cpp2;
using namespace folly;
using memgraph::io::Address;
@ -50,7 +49,7 @@ class ThriftHandle {
mutable std::condition_variable cv_;
const Address address_ = Address::TestAddress(0);
// EventBase base_;
EventBase base_;
// the responses to requests that are being waited on
std::map<PromiseKey, DeadlineAndOpaquePromise> promises_;
@ -59,13 +58,22 @@ class ThriftHandle {
std::vector<OpaqueMessage> can_receive_;
// TODO(tyler) thrift clients for each outbound address combination
std::map<Address, cpp2::UberServerAsyncClient> clients_;
// AsyncClient does not offer default init so they are optional atm.
std::map<Address, std::optional<cpp2::UberServerAsyncClient>> clients_;
// TODO(gabor) make this to a threadpool
// uuid of the address -> port number where the given rsm is residing.
// TODO(gabor) The RSM map should not be a part of this class.
// std::map<boost::uuids::uuid, uint16_t /*this should be the actual RSM*/> rsm_map_;
cpp2::Address convertToUberAddress(const memgraph::io::Address &address) {
cpp2::Address ret_address;
ret_address.unique_id_ref() = boost::uuids::to_string(address.unique_id);
ret_address.last_known_ip_ref() = address.last_known_ip.to_string();
ret_address.last_known_port_ref() = static_cast<int32_t>(address.last_known_port);
return ret_address;
}
public:
explicit ThriftHandle(Address our_address) : address_(our_address) {}
@ -165,29 +173,41 @@ class ThriftHandle {
void Send(Address to_address, Address from_address, RequestId request_id, M message) {
// TODO(tyler) call thrift client for address (or create one if it doesn't exist yet)
// if(clients_.contains(to_address))
// {
// const auto &client = clients_[to_address];
// client.sync_Send(message);
// }
// else{
// // maybe make this into a member var
// const auto& other_ip = to_address.last_known_ip.to_string();
// const auto& other_port = to_address.last_known_port;
// auto socket(folly::AsyncSocket::newSocket(&base_, other_ip, other_port));
// auto client_channel = HeaderClientChannel::newChannel(std::move(socket));
// // Create a client object
// EchoAsyncClient client(std::move(client_channel));
cpp2::UberMessage uber_message;
// client.sync_Send(message);
// }
uber_message.to_address_ref() = convertToUberAddress(to_address);
uber_message.from_address_ref() = convertToUberAddress(from_address);
uber_message.request_id_ref() = static_cast<int64_t>(request_id);
// uber_message.high_level_union() = message;
// cpp2::UberMessage uber_message = {
// .to_address = convertToUberAddress(to_address),
// .from_address = convertToUberAddress(from_address),
// .request_id = static_cast<int64_t>(request_id),
// .high_level_union = message
// };
if (clients_.contains(to_address)) {
auto &client = clients_[to_address];
client->sync_ReceiveUberMessage(uber_message);
} else {
// maybe make this into a member var
const auto &other_ip = to_address.last_known_ip.to_string();
const auto &other_port = to_address.last_known_port;
auto socket(folly::AsyncSocket::newSocket(&base_, other_ip, other_port));
auto client_channel = HeaderClientChannel::newChannel(std::move(socket));
// Create a client object
cpp2::UberServerAsyncClient client(std::move(client_channel));
client.sync_ReceiveUberMessage(uber_message);
}
}
};
class UberMessageService final : cpp2::UberServerSvIf {
std::shared_ptr<ThriftHandle> handle_;
memgraph::io::Address convertToMgAddress(const cpp2::Address address) {
memgraph::io::Address convertToMgAddress(const cpp2::Address &address) {
memgraph::io::Address ret_address;
ret_address = {.unique_id{boost::lexical_cast<boost::uuids::uuid>(address.get_unique_id())},
.last_known_ip{boost::asio::ip::make_address(address.get_last_known_ip())},