Check-in high-level skeleton of some things for the protobuf transport

This commit is contained in:
Tyler Neely 2022-10-05 12:41:32 +00:00
parent bc602bb93c
commit 18c8473a6b
6 changed files with 113 additions and 28 deletions

21
src/io/crc_frame.hpp Normal file
View File

@ -0,0 +1,21 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
namespace memgraph::io {
/// Protocol:
/// crc32: 4 bytes
/// len: 8 bytes
/// buffer: <len bytes>
std::optional<std::pair<size_t, size_t>> make_frame(char *ptr, size_t len) { return std::nullopt; }
} // namespace memgraph::io

View File

@ -20,10 +20,11 @@ namespace memgraph::io::protobuf_transport {
class ProtobufTransport { class ProtobufTransport {
std::shared_ptr<ProtobufTransportHandle> protobuf_transport_handle_; std::shared_ptr<ProtobufTransportHandle> protobuf_transport_handle_;
uint16_t listen_port_;
public: public:
explicit ProtobufTransport(std::shared_ptr<ProtobufTransportHandle> protobuf_transport_handle) explicit ProtobufTransport(uint16_t listen_port)
: protobuf_transport_handle_(std::move(protobuf_transport_handle)) {} : protobuf_transport_handle_(std::make_unique<ProtobufTransportHandle>()), listen_port_(listen_port) {}
template <Message RequestT, Message ResponseT> template <Message RequestT, Message ResponseT>
ResponseFuture<ResponseT> Request(Address to_address, Address from_address, RequestId request_id, RequestT request, ResponseFuture<ResponseT> Request(Address to_address, Address from_address, RequestId request_id, RequestT request,

View File

@ -21,9 +21,14 @@
#include "io/address.hpp" #include "io/address.hpp"
#include "io/transport.hpp" #include "io/transport.hpp"
//#include "protobuf/messages.pb.cc"
#include "protobuf/messages.pb.h"
namespace memgraph::io::protobuf_transport { namespace memgraph::io::protobuf_transport {
using PbAddress = memgraph::protobuf::Address;
using memgraph::protobuf::UberMessage;
class ProtobufTransportHandle { class ProtobufTransportHandle {
mutable std::mutex mu_{}; mutable std::mutex mu_{};
mutable std::condition_variable cv_; mutable std::condition_variable cv_;
@ -33,7 +38,10 @@ class ProtobufTransportHandle {
std::map<PromiseKey, DeadlineAndOpaquePromise> promises_; std::map<PromiseKey, DeadlineAndOpaquePromise> promises_;
// messages that are sent to servers that may later receive them // messages that are sent to servers that may later receive them
std::vector<OpaqueMessage> can_receive_; std::vector<std::string> can_receive_;
// serialized outbound messages
std::vector<std::string> outbox_;
public: public:
~ProtobufTransportHandle() { ~ProtobufTransportHandle() {
@ -97,30 +105,34 @@ class ProtobufTransportHandle {
template <Message M> template <Message M>
void Send(Address to_address, Address from_address, RequestId request_id, M &&message) { void Send(Address to_address, Address from_address, RequestId request_id, M &&message) {
std::any message_any(std::forward<M>(message)); PromiseKey promise_key{.requester_address = to_address, .request_id = request_id, .replier_address = from_address};
OpaqueMessage opaque_message{.to_address = to_address,
.from_address = from_address,
.request_id = request_id,
.message = std::move(message_any)};
PromiseKey promise_key{
.requester_address = to_address, .request_id = opaque_message.request_id, .replier_address = from_address};
{ {
std::unique_lock<std::mutex> lock(mu_); std::unique_lock<std::mutex> lock(mu_);
if (promises_.contains(promise_key)) { if (promises_.contains(promise_key)) {
spdlog::info("using message to fill promise"); // hair-pin local message optimization
// complete waiting promise if it's there spdlog::info("using message to fill local promise");
DeadlineAndOpaquePromise dop = std::move(promises_.at(promise_key)); DeadlineAndOpaquePromise dop = std::move(promises_.at(promise_key));
promises_.erase(promise_key); promises_.erase(promise_key);
std::any message_any(std::forward<M>(message));
OpaqueMessage opaque_message{.to_address = to_address,
.from_address = from_address,
.request_id = request_id,
.message = std::move(message_any)};
dop.promise.Fill(std::move(opaque_message)); dop.promise.Fill(std::move(opaque_message));
} else { } else {
spdlog::info("placing message in can_receive_"); spdlog::info("placing message in outbox");
// TODO(tyler) send over socket to destination // serialize protobuf message and place it in the outbox
can_receive_.emplace_back(std::move(opaque_message));
std::string bytes;
bool success = message.SerializeToString(&bytes);
MG_ASSERT(success);
outbox_.emplace_back(std::move(bytes));
} }
} // lock dropped } // lock dropped
@ -130,10 +142,6 @@ class ProtobufTransportHandle {
template <Message RequestT, Message ResponseT> template <Message RequestT, Message ResponseT>
void SubmitRequest(Address to_address, Address from_address, RequestId request_id, RequestT &&request, void SubmitRequest(Address to_address, Address from_address, RequestId request_id, RequestT &&request,
Duration timeout, ResponsePromise<ResponseT> promise) { Duration timeout, ResponsePromise<ResponseT> promise) {
const bool port_matches = to_address.last_known_port == from_address.last_known_port;
const bool ip_matches = to_address.last_known_ip == from_address.last_known_ip;
MG_ASSERT(port_matches && ip_matches);
const Time deadline = Now() + timeout; const Time deadline = Now() + timeout;
{ {

View File

@ -0,0 +1,25 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <chrono>
#include <condition_variable>
#include <iostream>
#include <map>
#include <mutex>
#include "google/protobuf/message.h"
#include "io/address.hpp"
#include "io/transport.hpp"
namespace memgraph::io::protobuf_transport {}

View File

@ -2,6 +2,8 @@
// Generated by the protocol buffer compiler. DO NOT EDIT! // Generated by the protocol buffer compiler. DO NOT EDIT!
// source: src/protobuf/messages.proto // source: src/protobuf/messages.proto
#pragma once
#ifndef GOOGLE_PROTOBUF_INCLUDED_src_2fprotobuf_2fmessages_2eproto #ifndef GOOGLE_PROTOBUF_INCLUDED_src_2fprotobuf_2fmessages_2eproto
#define GOOGLE_PROTOBUF_INCLUDED_src_2fprotobuf_2fmessages_2eproto #define GOOGLE_PROTOBUF_INCLUDED_src_2fprotobuf_2fmessages_2eproto

View File

@ -16,25 +16,53 @@
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include "io/address.hpp"
#include "io/protobuf_transport/protobuf_transport.hpp" #include "io/protobuf_transport/protobuf_transport.hpp"
#include "io/transport.hpp"
#include "protobuf/messages.pb.cc" #include "protobuf/messages.pb.cc"
#include "protobuf/messages.pb.h" #include "protobuf/messages.pb.h"
namespace memgraph::io::tests { namespace memgraph::io::tests {
using memgraph::protobuf::Address; using memgraph::io::protobuf_transport::ProtobufTransport;
using MgAddress = memgraph::io::Address;
using PbAddress = memgraph::protobuf::Address;
using memgraph::protobuf::TestRequest; using memgraph::protobuf::TestRequest;
using memgraph::protobuf::UberMessage; using memgraph::protobuf::UberMessage;
TEST(ProtobufTransport, Echo) { TEST(ProtobufTransport, Echo) {
spdlog::error("ayo"); uint16_t cli_port = 6000;
uint16_t srv_port = 7000;
std::string out; MgAddress cli_addr = MgAddress::TestAddress(cli_port);
MgAddress srv_addr = MgAddress::TestAddress(srv_port);
Address to_addr; ProtobufTransport cli_pt{cli_port};
ProtobufTransport srv_pt{srv_port};
Io<ProtobufTransport> cli_io{cli_pt, cli_addr};
Io<ProtobufTransport> srv_io{srv_pt, srv_addr};
auto response_result_future = cli_io.Request<UberMessage, UberMessage>(srv_addr, UberMessage{});
auto request_result = srv_io.Receive<UberMessage>();
auto request_envelope = request_result.GetValue();
UberMessage request = std::get<UberMessage>(request_envelope.message);
// send it back as an echo
srv_io.Send(request_envelope.from_address, request_envelope.request_id, request);
// client receives it
auto response_result = std::move(response_result_future).Wait();
auto response_envelope = response_result.GetValue();
UberMessage response = response_envelope.message;
PbAddress to_addr;
to_addr.set_last_known_port(1); to_addr.set_last_known_port(1);
Address from_addr; PbAddress from_addr;
to_addr.set_last_known_port(2); to_addr.set_last_known_port(2);
auto req = new TestRequest{}; auto req = new TestRequest{};
@ -44,14 +72,14 @@ TEST(ProtobufTransport, Echo) {
um.set_request_id(1); um.set_request_id(1);
um.set_allocated_test_request(req); um.set_allocated_test_request(req);
bool success = req->SerializeToString(&out); std::string out;
bool success = um.SerializeToString(&out);
MG_ASSERT(success); MG_ASSERT(success);
TestRequest rt; TestRequest rt;
rt.ParseFromString(out); rt.ParseFromString(out);
MG_ASSERT(rt.content() == req->content());
} }
} // namespace memgraph::io::tests } // namespace memgraph::io::tests