[SQUASH ME] Initial check-in for getting feedback

This commit is contained in:
Tyler Neely 2022-06-29 15:52:37 +00:00
parent 21870a0e7e
commit 4140f3e05e
7 changed files with 423 additions and 0 deletions

View File

@ -15,6 +15,7 @@ add_subdirectory(query)
add_subdirectory(slk)
add_subdirectory(rpc)
add_subdirectory(auth)
add_subdirectory(io/v3)
if (MG_ENTERPRISE)
add_subdirectory(audit)

12
src/io/v3/CMakeLists.txt Normal file
View File

@ -0,0 +1,12 @@
set(io_v3_sources
address.hpp
errors.hpp
future.hpp
transport.hpp
simulator.hpp)
find_package(fmt REQUIRED)
find_package(Threads REQUIRED)
add_library(mg-io-v3 STATIC ${io_v3_sources})
target_link_libraries(mg-io-v3 stdc++fs Threads::Threads fmt::fmt mg-utils)

28
src/io/v3/address.hpp Normal file
View File

@ -0,0 +1,28 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <boost/asio/ip/tcp.hpp>
#include <boost/uuid/uuid.hpp>
struct Address {
// It's important for all participants to have a
// unique identifier - IP and port alone are not
// enough, and may change over the lifecycle of
// the nodes. Particularly storage nodes may change
// their IP addresses over time, and the system
// should gracefully update its information
// about them.
boost::uuids::uuid unique_id;
boost::asio::ip::address last_known_ip;
uint16_t last_known_port;
};

20
src/io/v3/errors.hpp Normal file
View File

@ -0,0 +1,20 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
// Signifies that a retriable operation was unable to
// (fully) complete after a configured number of retries.
struct RetriesExhausted {};
// Signifies that an operation was unable
// to (fully) complete after a configured duration.
struct Timeout {};

180
src/io/v3/future.hpp Normal file
View File

@ -0,0 +1,180 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <condition_variable>
#include <memory>
#include <mutex>
#include <optional>
#include <thread>
#include <utility>
#include "utils/logging.hpp"
#include "errors.hpp"
#include "simulator.hpp"
template <typename T>
class MgPromise;
template <typename T>
class MgFuture;
template <typename T>
std::pair<MgFuture<T>, MgPromise<T>> future_promise_pair();
template <typename T>
std::pair<MgFuture<T>, MgPromise<T>> future_promise_pair(SimulatorHandle);
template <typename T>
class Shared {
friend std::pair<MgFuture<T>, MgPromise<T>> future_promise_pair<T>();
friend MgPromise<T>;
friend MgFuture<T>;
private:
T Wait() {
std::unique_lock<std::mutex> lock(mu_);
while (!item_) {
waiting_ = true;
if (simulator_handle_) {
// We can't hold our own lock while notifying
// the simulator because notifying the simulator
// involves acquiring the simulator's mutex
// to guarantee that our notification linearizes
// with the simulator's condition variable.
// However, the simulator may acquire our
// mutex to check if we are being awaited,
// while determining system quiescence,
// so we have to get out of its way to avoid
// a cyclical deadlock.
lock.unlock();
(*simulator_handle_)->NotifySimulator();
lock.lock();
if (item_) {
// item may have been filled while we
// had dropped our mutex while notifying
// the simulator of our waiting_ status.
break;
}
}
cv_.wait(lock);
waiting_ = false;
MG_ASSERT(!consumed_, "MgFuture consumed twice!");
}
T ret = *std::move(item_);
consumed_ = true;
return ret;
}
void Fill(T item) {
{
std::unique_lock<std::mutex> lock(mu_);
MG_ASSERT(!consumed_, "MgPromise filled after it was already consumed!");
MG_ASSERT(!item_, "MgPromise filled twice!");
item_ = item;
} // lock released before condition variable notification
cv_.notify_all();
}
bool IsAwaited() {
std::unique_lock<std::mutex> lock(mu_);
return waiting_;
}
std::condition_variable cv_;
std::mutex mu_;
std::optional<T> item_;
bool consumed_;
bool waiting_;
std::optional<std::shared_ptr<SimulatorHandle>> simulator_handle_;
};
template <typename T>
class MgFuture {
friend std::pair<MgFuture<T>, MgPromise<T>> future_promise_pair<T>();
friend std::pair<MgFuture<T>, MgPromise<T>> future_promise_pair<T>(SimulatorHandle);
public:
MgFuture(MgFuture &&) = default;
MgFuture(const MgFuture &) = delete;
// Block on the corresponding promise to be filled,
// returning the inner item when ready.
T Wait() { return shared_->Wait(); }
private:
MgFuture(std::shared_ptr<Shared<T>> shared) : shared_(shared) {}
std::shared_ptr<Shared<T>> shared_;
};
template <typename T>
class MgPromise {
friend std::pair<MgFuture<T>, MgPromise<T>> future_promise_pair<T>();
friend std::pair<MgFuture<T>, MgPromise<T>> future_promise_pair<T>(SimulatorHandle);
public:
MgPromise(MgPromise &&) = default;
MgPromise(const MgPromise &) = delete;
~MgPromise() noexcept(false) {
MG_ASSERT(filled_,
"MgPromise destroyed before its \
associated MgFuture was filled!");
}
// Fill the expected item into the Future.
void Fill(T item) {
shared_->Fill(item);
filled_ = true;
}
bool IsAwaited() { return shared_->IsAwaited(); }
private:
MgPromise(std::shared_ptr<Shared<T>> shared) : shared_(shared) {}
std::shared_ptr<Shared<T>> shared_;
bool filled_;
};
template <typename T>
std::pair<MgFuture<T>, MgPromise<T>> future_promise_pair() {
std::shared_ptr<Shared<T>> shared = std::make_shared<Shared<T>>();
MgFuture<T> future = MgFuture<T>(shared);
MgPromise<T> promise = MgPromise<T>(shared);
return std::make_pair(std::move(future), std::move(promise));
}
template <typename T>
std::pair<MgFuture<T>, MgPromise<T>> future_promise_pair(SimulatorHandle simulator_handle) {
auto [future, promise] = future_promise_pair<T>();
future.simulator_handle_ = simulator_handle;
return std::make_pair(std::move(future), std::move(promise));
}
namespace _compile_test {
void _templatization_smoke_test() {
auto [future, promise] = future_promise_pair<bool>();
promise.Fill(true);
MG_ASSERT(future.Wait() == true);
}
} // namespace _compile_test

79
src/io/v3/simulator.hpp Normal file
View File

@ -0,0 +1,79 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <variant>
#include "address.hpp"
#include "errors.hpp"
#include "future.hpp"
#include "simulator_handle.hpp"
#include "transport.hpp"
struct SimulatorStats {
uint64_t total_messages_;
uint64_t dropped_messages_;
uint64_t total_requests_;
uint64_t total_responses_;
uint64_t simulator_ticks_;
}
struct SimulatorConfig {
uint8_t drop_percent_;
uint64_t rng_seed_;
}
class SimulatorHandle {
public:
void NotifySimulator() {
std::unique_lock<std::mutex> lock(mu_);
cv_sim_.notify_all();
}
private:
std::mutex mu_;
std::condition_variable cv_sim_;
std::condition_variable cv_srv_;
};
class SimulatorTransport {
public:
SimulatorTransport(std::shared_ptr<SimulatorHandle> simulator_handle, Address address)
: simulator_handle_(simulator_handle), address_(address) {}
private:
std::shared_ptr<SimulatorHandle> simulator_handle_;
Address address_;
};
class Simulator {
public:
SimulatorTransport Register(Address address, bool is_server) {
return SimulatorTransport(simulator_handle_, address);
}
private:
std::shared_ptr<SimulatorHandle> simulator_handle_;
};
namespace _compile_test {
void use_it() {
auto simulator = Simulator();
auto addr_1 = Address();
auto addr_2 = Address();
auto addr_3 = Address();
auto sim_transport_1 = simulator.Register(addr_1, true);
auto sim_transport_2 = simulator.Register(addr_2, true);
auto sim_transport_3 = simulator.Register(addr_3, true);
}
} // namespace _compile_test

103
src/io/v3/transport.hpp Normal file
View File

@ -0,0 +1,103 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <concepts>
#include <variant>
#include "utils/result.hpp"
#include "address.hpp"
#include "errors.hpp"
#include "future.hpp"
using memgraph::utils::BasicResult;
template <typename T>
concept Message = requires(T a, uint8_t *ptr, size_t len) {
// These are placeholders and will be replaced
// by some concept that identifies Thrift-generated
// messages.
{ a.serialize() } -> std::same_as<std::vector<uint8_t>>;
{ T::deserialize(ptr, len) } -> std::same_as<T>;
};
template <Message M>
struct MessageAndAddress {
M message;
uint64_t request_id;
Address from;
};
template <Message... Ms>
struct MessageVariantAndSenderAddress {
std::variant<Ms...> message;
uint64_t request_id;
Address from;
};
template <Message M>
using RequestResult = BasicResult<MessageAndAddress<M>, Timeout>;
template <Message M>
using RequestFuture = MgFuture<RequestResult<M>>;
template <typename I>
class Io {
public:
Io(I io, Address address) : implementation_(io), address_(address) {}
void SetDefaultTimeoutMicroseconds(uint64_t timeout_microseconds) {
default_timeout_microseconds_ = timeout_microseconds;
}
template <Message Request, Message Response>
RequestFuture<Response> RequestTimeout(Address address, Request request, uint64_t timeout_microseconds) {
uint64_t request_id = ++request_id_counter_;
return implementation_.template RequestTimeout<Request, Response>(address, request_id, request,
timeout_microseconds);
}
template <Message Request, Message Response>
RequestFuture<Response> RequestTimeout(Address address, Request request) {
uint64_t request_id = ++request_id_counter_;
uint64_t timeout_microseconds = default_timeout_microseconds_;
return implementation_.template RequestTimeout<Request, Response>(address, request_id, request,
timeout_microseconds);
}
template <Message... Ms>
MessageVariantAndSenderAddress<Ms...> ReceiveTimeout(uint64_t timeout_microseconds) {
return implementation_.template ReceiveTimeout<Ms...>(timeout_microseconds);
}
template <Message... Ms>
MessageVariantAndSenderAddress<Ms...> ReceiveTimeout() {
uint64_t timeout_microseconds = default_timeout_microseconds_;
return implementation_.template ReceiveTimeout<Ms...>(timeout_microseconds);
}
template <Message M>
void Send(Address address, uint64_t request_id, M message) {
return implementation_.template Send<M>(address, request_id, message);
}
std::time_t Now() { return implementation_.Now(); }
bool ShouldShutDown() { return implementation_.ShouldShutDown(); }
private:
I implementation_;
Address address_;
uint64_t request_id_counter_ = 0;
uint64_t default_timeout_microseconds_ = 50 * 1000;
};