2023-03-01 16:39:32 +01:00

178 lines
5.7 KiB

// Copyright 2023 Memgraph Ltd.
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <chrono>
#include <condition_variable>
#include <iostream>
#include <map>
#include <mutex>
#include "io/errors.hpp"
#include "io/message_conversion.hpp"
#include "io/message_histogram_collector.hpp"
#include "io/time.hpp"
#include "io/transport.hpp"
namespace memgraph::io::local_transport {
class LocalTransportHandle {
mutable std::mutex mu_{};
mutable std::condition_variable cv_;
bool should_shut_down_ = false;
MessageHistogramCollector histograms_;
RequestId request_id_counter_ = 0;
// the responses to requests that are being waited on
std::map<PromiseKey, DeadlineAndOpaquePromise> promises_;
// messages that are sent to servers that may later receive them
std::vector<OpaqueMessage> can_receive_;
~LocalTransportHandle() {
for (auto &&[pk, promise] : promises_) {
void ShutDown() {
std::unique_lock<std::mutex> lock(mu_);
should_shut_down_ = true;
bool ShouldShutDown() const {
std::unique_lock<std::mutex> lock(mu_);
return should_shut_down_;
LatencyHistogramSummaries ResponseLatencies() {
std::unique_lock<std::mutex> lock(mu_);
return histograms_.ResponseLatencies();
static Time Now() {
auto nano_time = std::chrono::system_clock::now();
return std::chrono::time_point_cast<std::chrono::microseconds>(nano_time);
template <Message... Ms>
requires(sizeof...(Ms) > 0) RequestResult<Ms...> Receive(Address /* receiver_address */, Duration timeout) {
std::unique_lock lock(mu_);
Time before = Now();
spdlog::info("can_receive_ size: {}", can_receive_.size());
while (can_receive_.empty()) {
Time now = Now();
// protection against non-monotonic timesources
auto maxed_now = std::max(now, before);
auto elapsed = maxed_now - before;
if (timeout < elapsed) {
return TimedOut{};
Duration relative_timeout = timeout - elapsed;
std::cv_status cv_status_value = cv_.wait_for(lock, relative_timeout);
if (cv_status_value == std::cv_status::timeout) {
return TimedOut{};
auto current_message = std::move(can_receive_.back());
auto m_opt = std::move(current_message).Take<Ms...>();
return std::move(m_opt).value();
template <Message M>
void Send(Address to_address, Address from_address, RequestId request_id, RValueRef<M> message) {
auto type_info = TypeInfoFor(message);
std::any message_any(std::move(message));
OpaqueMessage opaque_message{.to_address = to_address,
.from_address = from_address,
.request_id = request_id,
.message = std::move(message_any),
.type_info = type_info};
PromiseKey promise_key{.requester_address = to_address, .request_id = opaque_message.request_id};
std::unique_lock<std::mutex> lock(mu_);
if (promises_.contains(promise_key)) {
spdlog::info("using message to fill promise");
// complete waiting promise if it's there
DeadlineAndOpaquePromise dop = std::move(;
Duration response_latency = Now() - dop.requested_at;
dop.promise.Fill(std::move(opaque_message), response_latency);
histograms_.Measure(type_info, response_latency);
} else {
spdlog::info("placing message in can_receive_");
} // lock dropped
template <Message ResponseT, Message RequestT>
ResponseFuture<ResponseT> SubmitRequest(Address to_address, Address from_address, RValueRef<RequestT> request,
Duration timeout, std::function<void()> fill_notifier) {
auto [future, promise] = memgraph::io::FuturePromisePairWithNotifications<ResponseResult<ResponseT>>(
// set null notifier for when the Future::Wait is called
// set notifier for when Promise::Fill is called
const bool port_matches = to_address.last_known_port == from_address.last_known_port;
const bool ip_matches = to_address.last_known_ip == from_address.last_known_ip;
MG_ASSERT(port_matches && ip_matches);
const auto now = Now();
const Time deadline = now + timeout;
RequestId request_id = 0;
std::unique_lock<std::mutex> lock(mu_);
request_id = ++request_id_counter_;
PromiseKey promise_key{.requester_address = from_address, .request_id = request_id};
OpaquePromise opaque_promise(std::move(promise).ToUnique());
DeadlineAndOpaquePromise dop{.requested_at = now, .deadline = deadline, .promise = std::move(opaque_promise)};
promises_.emplace(std::move(promise_key), std::move(dop));
} // lock dropped
Send<RequestT>(to_address, from_address, request_id, std::move(request));
return std::move(future);
} // namespace memgraph::io::local_transport