diff --git a/.github/workflows/diff.yaml b/.github/workflows/diff.yaml index bf6a39147..ce2caea75 100644 --- a/.github/workflows/diff.yaml +++ b/.github/workflows/diff.yaml @@ -171,7 +171,7 @@ jobs: # Run leftover CTest tests (all except unit and benchmark tests). cd build - ctest -E "(memgraph__unit|memgraph__benchmark)" --output-on-failure + ctest -E "(memgraph__unit|memgraph__benchmark|memgraph__simulation)" --output-on-failure - name: Run drivers tests run: | @@ -262,6 +262,15 @@ jobs: cd build ctest -R memgraph__unit --output-on-failure -j$THREADS + - name: Run simulation tests + run: | + # Activate toolchain. + source /opt/toolchain-v4/activate + + # Run unit tests. + cd build + ctest -R memgraph__simulation --output-on-failure -j$THREADS + - name: Run e2e tests run: | # TODO(gitbuda): Setup mgclient and pymgclient properly. diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index efc653b9a..f4c303daf 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -5,6 +5,7 @@ add_subdirectory(lisp) add_subdirectory(utils) add_subdirectory(requests) add_subdirectory(io) +add_subdirectory(io/simulator) add_subdirectory(kvstore) add_subdirectory(telemetry) add_subdirectory(communication) diff --git a/src/io/address.hpp b/src/io/address.hpp new file mode 100644 index 000000000..914c8cb86 --- /dev/null +++ b/src/io/address.hpp @@ -0,0 +1,68 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include + +#include +#include +#include +#include +#include + +namespace memgraph::io { +struct Address { + // It's important for all participants to have a + // unique identifier - IP and port alone are not + // enough, and may change over the lifecycle of + // the nodes. Particularly storage nodes may change + // their IP addresses over time, and the system + // should gracefully update its information + // about them. + boost::uuids::uuid unique_id; + boost::asio::ip::address last_known_ip; + uint16_t last_known_port; + + static Address TestAddress(uint16_t port) { + return Address{ + .unique_id = boost::uuids::uuid{boost::uuids::random_generator()()}, + .last_known_port = port, + }; + } + + static Address UniqueLocalAddress() { + return Address{ + .unique_id = boost::uuids::uuid{boost::uuids::random_generator()()}, + }; + } + + friend bool operator==(const Address &lhs, const Address &rhs) = default; + + /// unique_id is most dominant for ordering, then last_known_ip, then last_known_port + friend bool operator<(const Address &lhs, const Address &rhs) { + if (lhs.unique_id != rhs.unique_id) { + return lhs.unique_id < rhs.unique_id; + } + + if (lhs.last_known_ip != rhs.last_known_ip) { + return lhs.last_known_ip < rhs.last_known_ip; + } + + return lhs.last_known_port < rhs.last_known_port; + } + + std::string ToString() const { + return fmt::format("Address {{ unique_id: {}, last_known_ip: {}, last_known_port: {} }}", + boost::uuids::to_string(unique_id), last_known_ip.to_string(), last_known_port); + } +}; +}; // namespace memgraph::io diff --git a/src/io/errors.hpp b/src/io/errors.hpp new file mode 100644 index 000000000..7df2171d9 --- /dev/null +++ b/src/io/errors.hpp @@ -0,0 +1,26 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +namespace memgraph::io { +// Signifies that a retriable operation was unable to +// complete after a configured number of retries. +struct RetriesExhausted {}; + +// Signifies that a request was unable to receive a response +// within some configured timeout duration. It is important +// to remember that in distributed systems, a timeout does +// not signify that a request was not received or processed. +// It may be the case that the request was fully processed +// but that the response was not received. +struct TimedOut {}; +}; // namespace memgraph::io diff --git a/src/io/future.hpp b/src/io/future.hpp new file mode 100644 index 000000000..7b9a4461c --- /dev/null +++ b/src/io/future.hpp @@ -0,0 +1,262 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "io/errors.hpp" +#include "utils/logging.hpp" + +namespace memgraph::io { + +// Shared is in an anonymous namespace, and the only way to +// construct a Promise or Future is to pass a Shared in. This +// ensures that Promises and Futures can only be constructed +// in this translation unit. +namespace details { +template +class Shared { + mutable std::condition_variable cv_; + mutable std::mutex mu_; + std::optional item_; + bool consumed_ = false; + bool waiting_ = false; + std::function simulator_notifier_ = nullptr; + + public: + explicit Shared(std::function simulator_notifier) : simulator_notifier_(simulator_notifier) {} + Shared() = default; + Shared(Shared &&) = delete; + Shared &operator=(Shared &&) = delete; + Shared(const Shared &) = delete; + Shared &operator=(const Shared &) = delete; + ~Shared() = default; + + /// Takes the item out of our optional item_ and returns it. + T Take() { + MG_ASSERT(item_, "Take called without item_ being present"); + MG_ASSERT(!consumed_, "Take called on already-consumed Future"); + + T ret = std::move(item_).value(); + item_.reset(); + + consumed_ = true; + + return ret; + } + + T Wait() { + std::unique_lock lock(mu_); + waiting_ = true; + + while (!item_) { + bool simulator_progressed = false; + if (simulator_notifier_) [[unlikely]] { + // We can't hold our own lock while notifying + // the simulator because notifying the simulator + // involves acquiring the simulator's mutex + // to guarantee that our notification linearizes + // with the simulator's condition variable. + // However, the simulator may acquire our + // mutex to check if we are being awaited, + // while determining system quiescence, + // so we have to get out of its way to avoid + // a cyclical deadlock. + lock.unlock(); + simulator_progressed = std::invoke(simulator_notifier_); + lock.lock(); + if (item_) { + // item may have been filled while we + // had dropped our mutex while notifying + // the simulator of our waiting_ status. + break; + } + } + if (!simulator_progressed) [[likely]] { + cv_.wait(lock); + } + MG_ASSERT(!consumed_, "Future consumed twice!"); + } + + waiting_ = false; + + return Take(); + } + + bool IsReady() const { + std::unique_lock lock(mu_); + return item_; + } + + std::optional TryGet() { + std::unique_lock lock(mu_); + + if (item_) { + return Take(); + } + + return std::nullopt; + } + + void Fill(T item) { + { + std::unique_lock lock(mu_); + + MG_ASSERT(!consumed_, "Promise filled after it was already consumed!"); + MG_ASSERT(!item_, "Promise filled twice!"); + + item_ = item; + } // lock released before condition variable notification + + cv_.notify_all(); + } + + bool IsAwaited() const { + std::unique_lock lock(mu_); + return waiting_; + } +}; +} // namespace details + +template +class Future { + bool consumed_or_moved_ = false; + std::shared_ptr> shared_; + + public: + explicit Future(std::shared_ptr> shared) : shared_(shared) {} + + Future() = delete; + Future(Future &&old) noexcept { + MG_ASSERT(!old.consumed_or_moved_, "Future moved from after already being moved from or consumed."); + shared_ = std::move(old.shared_); + consumed_or_moved_ = old.consumed_or_moved_; + old.consumed_or_moved_ = true; + } + + Future &operator=(Future &&old) noexcept { + MG_ASSERT(!old.consumed_or_moved_, "Future moved from after already being moved from or consumed."); + shared_ = std::move(old.shared_); + old.consumed_or_moved_ = true; + } + + Future(const Future &) = delete; + Future &operator=(const Future &) = delete; + ~Future() = default; + + /// Returns true if the Future is ready to + /// be consumed using TryGet or Wait (prefer Wait + /// if you know it's ready, because it doesn't + /// return an optional. + bool IsReady() { + MG_ASSERT(!consumed_or_moved_, "Called IsReady after Future already consumed!"); + return shared_->IsReady(); + } + + /// Non-blocking method that returns the inner + /// item if it's already ready, or std::nullopt + /// if it is not ready yet. + std::optional TryGet() { + MG_ASSERT(!consumed_or_moved_, "Called TryGet after Future already consumed!"); + std::optional ret = shared_->TryGet(); + if (ret) { + consumed_or_moved_ = true; + } + return ret; + } + + /// Block on the corresponding promise to be filled, + /// returning the inner item when ready. + T Wait() && { + MG_ASSERT(!consumed_or_moved_, "Future should only be consumed with Wait once!"); + T ret = shared_->Wait(); + consumed_or_moved_ = true; + return ret; + } + + /// Marks this Future as canceled. + void Cancel() { + MG_ASSERT(!consumed_or_moved_, "Future::Cancel called on a future that was already moved or consumed!"); + consumed_or_moved_ = true; + } +}; + +template +class Promise { + std::shared_ptr> shared_; + bool filled_or_moved_ = false; + + public: + explicit Promise(std::shared_ptr> shared) : shared_(shared) {} + + Promise() = delete; + Promise(Promise &&old) noexcept { + MG_ASSERT(!old.filled_or_moved_, "Promise moved from after already being moved from or filled."); + shared_ = std::move(old.shared_); + old.filled_or_moved_ = true; + } + + Promise &operator=(Promise &&old) noexcept { + MG_ASSERT(!old.filled_or_moved_, "Promise moved from after already being moved from or filled."); + shared_ = std::move(old.shared_); + old.filled_or_moved_ = true; + } + Promise(const Promise &) = delete; + Promise &operator=(const Promise &) = delete; + + ~Promise() { MG_ASSERT(filled_or_moved_, "Promise destroyed before its associated Future was filled!"); } + + // Fill the expected item into the Future. + void Fill(T item) { + MG_ASSERT(!filled_or_moved_, "Promise::Fill called on a promise that is already filled or moved!"); + shared_->Fill(item); + filled_or_moved_ = true; + } + + bool IsAwaited() { return shared_->IsAwaited(); } + + /// Moves this Promise into a unique_ptr. + std::unique_ptr> ToUnique() && { + std::unique_ptr> up = std::make_unique>(std::move(shared_)); + + filled_or_moved_ = true; + + return up; + } +}; + +template +std::pair, Promise> FuturePromisePair() { + std::shared_ptr> shared = std::make_shared>(); + + Future future = Future(shared); + Promise promise = Promise(shared); + + return std::make_pair(std::move(future), std::move(promise)); +} + +template +std::pair, Promise> FuturePromisePairWithNotifier(std::function simulator_notifier) { + std::shared_ptr> shared = std::make_shared>(simulator_notifier); + + Future future = Future(shared); + Promise promise = Promise(shared); + + return std::make_pair(std::move(future), std::move(promise)); +} + +}; // namespace memgraph::io diff --git a/src/io/local_transport/local_system.hpp b/src/io/local_transport/local_system.hpp new file mode 100644 index 000000000..fd0628943 --- /dev/null +++ b/src/io/local_transport/local_system.hpp @@ -0,0 +1,35 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include + +#include "io/address.hpp" +#include "io/local_transport/local_transport.hpp" +#include "io/local_transport/local_transport_handle.hpp" +#include "io/transport.hpp" + +namespace memgraph::io::local_transport { + +class LocalSystem { + std::shared_ptr local_transport_handle_ = std::make_shared(); + + public: + Io Register(Address address) { + LocalTransport local_transport(local_transport_handle_, address); + return Io{local_transport, address}; + } + + void ShutDown() { local_transport_handle_->ShutDown(); } +}; + +} // namespace memgraph::io::local_transport diff --git a/src/io/local_transport/local_transport.hpp b/src/io/local_transport/local_transport.hpp new file mode 100644 index 000000000..f08392a87 --- /dev/null +++ b/src/io/local_transport/local_transport.hpp @@ -0,0 +1,67 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include +#include +#include +#include + +#include "io/address.hpp" +#include "io/local_transport/local_transport_handle.hpp" +#include "io/time.hpp" +#include "io/transport.hpp" + +namespace memgraph::io::local_transport { + +class LocalTransport { + std::shared_ptr local_transport_handle_; + const Address address_; + + public: + LocalTransport(std::shared_ptr local_transport_handle, Address address) + : local_transport_handle_(std::move(local_transport_handle)), address_(address) {} + + template + ResponseFuture Request(Address to_address, RequestId request_id, Request request, Duration timeout) { + auto [future, promise] = memgraph::io::FuturePromisePair>(); + + Address from_address = address_; + + local_transport_handle_->SubmitRequest(to_address, from_address, request_id, std::move(request), timeout, + std::move(promise)); + + return std::move(future); + } + + template + requires(sizeof...(Ms) > 0) RequestResult Receive(Duration timeout) { + Address from_address = address_; + return local_transport_handle_->template Receive(timeout); + } + + template + void Send(Address to_address, Address from_address, RequestId request_id, M &&message) { + return local_transport_handle_->template Send(to_address, from_address, request_id, std::forward(message)); + } + + Time Now() const { return local_transport_handle_->Now(); } + + bool ShouldShutDown() const { return local_transport_handle_->ShouldShutDown(); } + + template , class Return = uint64_t> + Return Rand(D distrib) { + std::random_device rng; + return distrib(rng); + } +}; +}; // namespace memgraph::io::local_transport diff --git a/src/io/local_transport/local_transport_handle.hpp b/src/io/local_transport/local_transport_handle.hpp new file mode 100644 index 000000000..8536ff716 --- /dev/null +++ b/src/io/local_transport/local_transport_handle.hpp @@ -0,0 +1,139 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include +#include +#include +#include +#include + +#include "io/errors.hpp" +#include "io/message_conversion.hpp" +#include "io/time.hpp" +#include "io/transport.hpp" + +namespace memgraph::io::local_transport { + +class LocalTransportHandle { + mutable std::mutex mu_{}; + mutable std::condition_variable cv_; + bool should_shut_down_ = false; + + // the responses to requests that are being waited on + std::map promises_; + + // messages that are sent to servers that may later receive them + std::vector can_receive_; + + public: + void ShutDown() { + std::unique_lock lock(mu_); + should_shut_down_ = true; + cv_.notify_all(); + } + + bool ShouldShutDown() const { + std::unique_lock lock(mu_); + return should_shut_down_; + } + + static Time Now() { + auto nano_time = std::chrono::system_clock::now(); + return std::chrono::time_point_cast(nano_time); + } + + template + requires(sizeof...(Ms) > 0) RequestResult Receive(Duration timeout) { + std::unique_lock lock(mu_); + + Time before = Now(); + + while (can_receive_.empty()) { + Time now = Now(); + + // protection against non-monotonic timesources + auto maxed_now = std::max(now, before); + auto elapsed = maxed_now - before; + + if (timeout < elapsed) { + return TimedOut{}; + } + + Duration relative_timeout = timeout - elapsed; + + std::cv_status cv_status_value = cv_.wait_for(lock, relative_timeout); + + if (cv_status_value == std::cv_status::timeout) { + return TimedOut{}; + } + } + + auto current_message = std::move(can_receive_.back()); + can_receive_.pop_back(); + + auto m_opt = std::move(current_message).Take(); + + return std::move(m_opt).value(); + } + + template + void Send(Address to_address, Address from_address, RequestId request_id, M &&message) { + std::any message_any(std::forward(message)); + OpaqueMessage opaque_message{ + .from_address = from_address, .request_id = request_id, .message = std::move(message_any)}; + + PromiseKey promise_key{.requester_address = to_address, + .request_id = opaque_message.request_id, + .replier_address = opaque_message.from_address}; + + { + std::unique_lock lock(mu_); + + if (promises_.contains(promise_key)) { + // complete waiting promise if it's there + DeadlineAndOpaquePromise dop = std::move(promises_.at(promise_key)); + promises_.erase(promise_key); + + dop.promise.Fill(std::move(opaque_message)); + } else { + can_receive_.emplace_back(std::move(opaque_message)); + } + } // lock dropped + + cv_.notify_all(); + } + + template + void SubmitRequest(Address to_address, Address from_address, RequestId request_id, Request &&request, + Duration timeout, ResponsePromise promise) { + const bool port_matches = to_address.last_known_port == from_address.last_known_port; + const bool ip_matches = to_address.last_known_ip == from_address.last_known_ip; + + MG_ASSERT(port_matches && ip_matches); + const Time deadline = Now() + timeout; + + { + std::unique_lock lock(mu_); + + PromiseKey promise_key{ + .requester_address = from_address, .request_id = request_id, .replier_address = to_address}; + OpaquePromise opaque_promise(std::move(promise).ToUnique()); + DeadlineAndOpaquePromise dop{.deadline = deadline, .promise = std::move(opaque_promise)}; + promises_.emplace(std::move(promise_key), std::move(dop)); + } // lock dropped + + Send(to_address, from_address, request_id, std::forward(request)); + } +}; + +} // namespace memgraph::io::local_transport diff --git a/src/io/message_conversion.hpp b/src/io/message_conversion.hpp new file mode 100644 index 000000000..53881583b --- /dev/null +++ b/src/io/message_conversion.hpp @@ -0,0 +1,206 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include "io/transport.hpp" + +namespace memgraph::io { + +using memgraph::io::Duration; +using memgraph::io::Message; +using memgraph::io::Time; + +struct PromiseKey { + Address requester_address; + uint64_t request_id; + // TODO(tyler) possibly remove replier_address from promise key + // once we want to support DSR. + Address replier_address; + + public: + friend bool operator<(const PromiseKey &lhs, const PromiseKey &rhs) { + if (lhs.requester_address != rhs.requester_address) { + return lhs.requester_address < rhs.requester_address; + } + + if (lhs.request_id != rhs.request_id) { + return lhs.request_id < rhs.request_id; + } + + return lhs.replier_address < rhs.replier_address; + } +}; + +struct OpaqueMessage { + Address to_address; + Address from_address; + uint64_t request_id; + std::any message; + + /// Recursively tries to match a specific type from the outer + /// variant's parameter pack against the type of the std::any, + /// and if it matches, make it concrete and return it. Otherwise, + /// move on and compare the any with the next type from the + /// parameter pack. + /// + /// Return is the full std::variant type that holds the + /// full parameter pack without interfering with recursive + /// narrowing expansion. + template + std::optional Unpack(std::any &&a) { + if (typeid(Head) == a.type()) { + Head concrete = std::any_cast(std::move(a)); + return concrete; + } + + if constexpr (sizeof...(Rest) > 0) { + return Unpack(std::move(a)); + } else { + return std::nullopt; + } + } + + /// High level "user-facing" conversion function that lets + /// people interested in conversion only supply a single + /// parameter pack for the types that they want to compare + /// with the any and potentially include in the returned + /// variant. + template + requires(sizeof...(Ms) > 0) std::optional> VariantFromAny(std::any &&a) { + return Unpack, Ms...>(std::move(a)); + } + + template + requires(sizeof...(Ms) > 0) std::optional> Take() && { + std::optional> m_opt = VariantFromAny(std::move(message)); + + if (m_opt) { + return RequestEnvelope{ + .message = std::move(*m_opt), + .request_id = request_id, + .to_address = to_address, + .from_address = from_address, + }; + } + + return std::nullopt; + } +}; + +class OpaquePromiseTraitBase { + public: + virtual const std::type_info *TypeInfo() const = 0; + virtual bool IsAwaited(void *ptr) const = 0; + virtual void Fill(void *ptr, OpaqueMessage &&) const = 0; + virtual void TimeOut(void *ptr) const = 0; + + virtual ~OpaquePromiseTraitBase() = default; + OpaquePromiseTraitBase() = default; + OpaquePromiseTraitBase(const OpaquePromiseTraitBase &) = delete; + OpaquePromiseTraitBase &operator=(const OpaquePromiseTraitBase &) = delete; + OpaquePromiseTraitBase(OpaquePromiseTraitBase &&old) = delete; + OpaquePromiseTraitBase &operator=(OpaquePromiseTraitBase &&) = delete; +}; + +template +class OpaquePromiseTrait : public OpaquePromiseTraitBase { + public: + const std::type_info *TypeInfo() const override { return &typeid(T); }; + + bool IsAwaited(void *ptr) const override { return static_cast *>(ptr)->IsAwaited(); }; + + void Fill(void *ptr, OpaqueMessage &&opaque_message) const override { + T message = std::any_cast(std::move(opaque_message.message)); + auto response_envelope = ResponseEnvelope{.message = std::move(message), + .request_id = opaque_message.request_id, + .to_address = opaque_message.to_address, + .from_address = opaque_message.from_address}; + auto promise = static_cast *>(ptr); + auto unique_promise = std::unique_ptr>(promise); + unique_promise->Fill(std::move(response_envelope)); + }; + + void TimeOut(void *ptr) const override { + auto promise = static_cast *>(ptr); + auto unique_promise = std::unique_ptr>(promise); + ResponseResult result = TimedOut{}; + unique_promise->Fill(std::move(result)); + } +}; + +class OpaquePromise { + void *ptr_; + std::unique_ptr trait_; + + public: + OpaquePromise(OpaquePromise &&old) noexcept : ptr_(old.ptr_), trait_(std::move(old.trait_)) { + MG_ASSERT(old.ptr_ != nullptr); + old.ptr_ = nullptr; + } + + OpaquePromise &operator=(OpaquePromise &&old) noexcept { + MG_ASSERT(ptr_ == nullptr); + MG_ASSERT(old.ptr_ != nullptr); + MG_ASSERT(this != &old); + ptr_ = old.ptr_; + trait_ = std::move(old.trait_); + old.ptr_ = nullptr; + return *this; + } + + OpaquePromise(const OpaquePromise &) = delete; + OpaquePromise &operator=(const OpaquePromise &) = delete; + + template + std::unique_ptr> Take() && { + MG_ASSERT(typeid(T) == *trait_->TypeInfo()); + MG_ASSERT(ptr_ != nullptr); + + auto ptr = static_cast *>(ptr_); + + ptr_ = nullptr; + + return std::unique_ptr(ptr); + } + + template + explicit OpaquePromise(std::unique_ptr> promise) + : ptr_(static_cast(promise.release())), trait_(std::make_unique>()) {} + + bool IsAwaited() { + MG_ASSERT(ptr_ != nullptr); + return trait_->IsAwaited(ptr_); + } + + void TimeOut() { + MG_ASSERT(ptr_ != nullptr); + trait_->TimeOut(ptr_); + ptr_ = nullptr; + } + + void Fill(OpaqueMessage &&opaque_message) { + MG_ASSERT(ptr_ != nullptr); + trait_->Fill(ptr_, std::move(opaque_message)); + ptr_ = nullptr; + } + + ~OpaquePromise() { + MG_ASSERT(ptr_ == nullptr, "OpaquePromise destroyed without being explicitly timed out or filled"); + } +}; + +struct DeadlineAndOpaquePromise { + Time deadline; + OpaquePromise promise; +}; + +} // namespace memgraph::io diff --git a/src/io/rsm/raft.hpp b/src/io/rsm/raft.hpp new file mode 100644 index 000000000..c38d3da74 --- /dev/null +++ b/src/io/rsm/raft.hpp @@ -0,0 +1,913 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// TODO(tyler) buffer out-of-order Append buffers on the Followers to reassemble more quickly +// TODO(tyler) handle granular batch sizes based on simple flow control + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "io/simulator/simulator.hpp" +#include "io/transport.hpp" +#include "utils/concepts.hpp" + +namespace memgraph::io::rsm { + +/// Timeout and replication tunables +using namespace std::chrono_literals; +static constexpr auto kMinimumElectionTimeout = 100ms; +static constexpr auto kMaximumElectionTimeout = 200ms; +static constexpr auto kMinimumBroadcastTimeout = 40ms; +static constexpr auto kMaximumBroadcastTimeout = 60ms; +static constexpr auto kMinimumCronInterval = 1ms; +static constexpr auto kMaximumCronInterval = 2ms; +static constexpr auto kMinimumReceiveTimeout = 40ms; +static constexpr auto kMaximumReceiveTimeout = 60ms; +static_assert(kMinimumElectionTimeout > kMaximumBroadcastTimeout, + "The broadcast timeout has to be smaller than the election timeout!"); +static_assert(kMinimumElectionTimeout < kMaximumElectionTimeout, + "The minimum election timeout has to be smaller than the maximum election timeout!"); +static_assert(kMinimumBroadcastTimeout < kMaximumBroadcastTimeout, + "The minimum broadcast timeout has to be smaller than the maximum broadcast timeout!"); +static_assert(kMinimumCronInterval < kMaximumCronInterval, + "The minimum cron interval has to be smaller than the maximum cron interval!"); +static_assert(kMinimumReceiveTimeout < kMaximumReceiveTimeout, + "The minimum receive timeout has to be smaller than the maximum receive timeout!"); +static constexpr size_t kMaximumAppendBatchSize = 1024; + +using Term = uint64_t; +using LogIndex = uint64_t; +using LogSize = uint64_t; +using RequestId = uint64_t; + +template +struct WriteRequest { + WriteOperation operation; +}; + +/// WriteResponse is returned to a client after +/// their WriteRequest was entered in to the raft +/// log and it reached consensus. +/// +/// WriteReturn is the result of applying the WriteRequest to +/// ReplicatedState, and if the ReplicatedState::write +/// method is deterministic, all replicas will +/// have the same ReplicatedState after applying +/// the submitted WriteRequest. +template +struct WriteResponse { + bool success; + WriteReturn write_return; + std::optional
retry_leader; + LogIndex raft_index; +}; + +template +struct ReadRequest { + ReadOperation operation; +}; + +template +struct ReadResponse { + bool success; + ReadReturn read_return; + std::optional
retry_leader; +}; + +/// AppendRequest is a raft-level message that the Leader +/// periodically broadcasts to all Follower peers. This +/// serves three main roles: +/// 1. acts as a heartbeat from the Leader to the Follower +/// 2. replicates new data that the Leader has received to the Follower +/// 3. informs Follower peers when the commit index has increased, +/// signalling that it is now safe to apply log items to the +/// replicated state machine +template +struct AppendRequest { + Term term = 0; + LogIndex batch_start_log_index; + Term last_log_term; + std::vector> entries; + LogSize leader_commit; +}; + +struct AppendResponse { + bool success; + Term term; + Term last_log_term; + // a small optimization over the raft paper, tells + // the leader the offset that we are interested in + // to send log offsets from for us. This will only + // be useful at the beginning of a leader's term. + LogSize log_size; +}; + +struct VoteRequest { + Term term = 0; + LogSize log_size; + Term last_log_term; +}; + +struct VoteResponse { + Term term = 0; + LogSize committed_log_size; + bool vote_granted = false; +}; + +template +struct CommonState { + Term term = 0; + std::vector> log; + LogSize committed_log_size = 0; + LogSize applied_size = 0; +}; + +struct FollowerTracker { + LogIndex next_index = 0; + LogSize confirmed_log_size = 0; +}; + +struct PendingClientRequest { + RequestId request_id; + Address address; + Time received_at; +}; + +struct Leader { + std::map followers; + std::unordered_map pending_client_requests; + Time last_broadcast = Time::min(); + + std::string static ToString() { return "\tLeader \t"; } +}; + +struct Candidate { + std::map successful_votes; + Time election_began = Time::min(); + std::set
outstanding_votes; + + std::string static ToString() { return "\tCandidate\t"; } +}; + +struct Follower { + Time last_received_append_entries_timestamp; + Address leader_address; + + std::string static ToString() { return "\tFollower \t"; } +}; + +using Role = std::variant; + +template +concept AllRoles = memgraph::utils::SameAsAnyOf; + +template +concept LeaderOrFollower = memgraph::utils::SameAsAnyOf; + +template +concept FollowerOrCandidate = memgraph::utils::SameAsAnyOf; + +/* +all ReplicatedState classes should have an Apply method +that returns our WriteResponseValue: + +ReadResponse Read(ReadOperation); +WriteResponseValue ReplicatedState::Apply(WriteRequest); + +for examples: +if the state is uint64_t, and WriteRequest is `struct PlusOne {};`, +and WriteResponseValue is also uint64_t (the new value), then +each call to state.Apply(PlusOne{}) will return the new value +after incrementing it. 0, 1, 2, 3... and this will be sent back +to the client that requested the mutation. + +In practice, these mutations will usually be predicated on some +previous value, so that they are idempotent, functioning similarly +to a CAS operation. +*/ +template +concept Rsm = requires(ReplicatedState state, WriteOperation w, ReadOperation r) { + { state.Read(r) } -> std::same_as; + { state.Apply(w) } -> std::same_as; +}; + +/// Parameter Purpose +/// -------------------------- +/// IoImpl the concrete Io provider - SimulatorTransport, ThriftTransport, etc... +/// ReplicatedState the high-level data structure that is managed by the raft-backed replicated state machine +/// WriteOperation the individual operation type that is applied to the ReplicatedState in identical order +/// across each replica +/// WriteResponseValue the return value of calling ReplicatedState::Apply(WriteOperation), which is executed in +/// identical order across all replicas after an WriteOperation reaches consensus. +/// ReadOperation the type of operations that do not require consensus before executing directly +/// on a const ReplicatedState & +/// ReadResponseValue the return value of calling ReplicatedState::Read(ReadOperation), which is executed directly +/// without going through consensus first +template +requires Rsm +class Raft { + CommonState state_; + Role role_ = Candidate{}; + Io io_; + std::vector
peers_; + ReplicatedState replicated_state_; + + public: + Raft(Io &&io, std::vector
peers, ReplicatedState &&replicated_state) + : io_(std::forward>(io)), + peers_(peers), + replicated_state_(std::forward(replicated_state)) {} + + void Run() { + Time last_cron = io_.Now(); + + while (!io_.ShouldShutDown()) { + const auto now = io_.Now(); + const Duration random_cron_interval = RandomTimeout(kMinimumCronInterval, kMaximumCronInterval); + if (now - last_cron > random_cron_interval) { + Cron(); + last_cron = now; + } + + const Duration receive_timeout = RandomTimeout(kMinimumReceiveTimeout, kMaximumReceiveTimeout); + + auto request_result = + io_.template ReceiveWithTimeout, AppendRequest, AppendResponse, + WriteRequest, VoteRequest, VoteResponse>(receive_timeout); + if (request_result.HasError()) { + continue; + } + + auto request = std::move(request_result.GetValue()); + + Handle(std::move(request.message), request.request_id, request.from_address); + } + } + + private: + // Raft paper - 5.3 + // When the entry has been safely replicated, the leader applies the + // entry to its state machine and returns the result of that + // execution to the client. + // + // "Safely replicated" is defined as being known to be present + // on at least a majority of all peers (inclusive of the Leader). + void BumpCommitIndexAndReplyToClients(Leader &leader) { + auto confirmed_log_sizes = std::vector{}; + + // We include our own log size in the calculation of the log + // confirmed log size that is present on at least a majority of all peers. + confirmed_log_sizes.push_back(state_.log.size()); + + for (const auto &[addr, f] : leader.followers) { + confirmed_log_sizes.push_back(f.confirmed_log_size); + Log("Follower at port ", addr.last_known_port, " has confirmed log size of: ", f.confirmed_log_size); + } + + // reverse sort from highest to lowest (using std::ranges::greater) + std::ranges::sort(confirmed_log_sizes, std::ranges::greater()); + + // This is a particularly correctness-critical calculation because it + // determines the committed log size that will be broadcast in + // the next AppendRequest. + // + // If the following sizes are recorded for clusters of different numbers of peers, + // these are the expected sizes that are considered to have reached consensus: + // + // state | expected value | (confirmed_log_sizes.size() / 2) + // [1] 1 (1 / 2) => 0 + // [2, 1] 1 (2 / 2) => 1 + // [3, 2, 1] 2 (3 / 2) => 1 + // [4, 3, 2, 1] 2 (4 / 2) => 2 + // [5, 4, 3, 2, 1] 3 (5 / 2) => 2 + const size_t majority_index = confirmed_log_sizes.size() / 2; + const LogSize new_committed_log_size = confirmed_log_sizes[majority_index]; + + // We never go backwards in history. + MG_ASSERT(state_.committed_log_size <= new_committed_log_size, + "as a Leader, we have previously set our committed_log_size to {}, but our Followers have a majority " + "committed_log_size of {}", + state_.committed_log_size, new_committed_log_size); + + state_.committed_log_size = new_committed_log_size; + + // For each size between the old size and the new one (inclusive), + // Apply that log's WriteOperation to our replicated_state_, + // and use the specific return value of the ReplicatedState::Apply + // method (WriteResponseValue) to respond to the requester. + for (; state_.applied_size < state_.committed_log_size; state_.applied_size++) { + const LogIndex apply_index = state_.applied_size; + const auto &write_request = state_.log[apply_index].second; + const WriteResponseValue write_return = replicated_state_.Apply(write_request); + + if (leader.pending_client_requests.contains(apply_index)) { + const PendingClientRequest client_request = std::move(leader.pending_client_requests.at(apply_index)); + leader.pending_client_requests.erase(apply_index); + + const WriteResponse resp{ + .success = true, + .write_return = std::move(write_return), + .raft_index = apply_index, + }; + + io_.Send(client_request.address, client_request.request_id, std::move(resp)); + } + } + + Log("committed_log_size is now ", state_.committed_log_size); + } + + // Raft paper - 5.1 + // AppendEntries RPCs are initiated by leaders to replicate log entries and to provide a form of heartbeat + void BroadcastAppendEntries(std::map &followers) { + for (auto &[address, follower] : followers) { + const LogIndex next_index = follower.next_index; + + const auto missing = state_.log.size() - next_index; + const auto batch_size = std::min(missing, kMaximumAppendBatchSize); + const auto start_index = next_index; + const auto end_index = start_index + batch_size; + + // advance follower's next index + follower.next_index += batch_size; + + std::vector> entries; + + entries.insert(entries.begin(), state_.log.begin() + start_index, state_.log.begin() + end_index); + + const Term previous_term_from_index = PreviousTermFromIndex(start_index); + + Log("sending ", entries.size(), " entries to Follower ", address.last_known_port, + " which are above its next_index of ", next_index); + + AppendRequest ar{ + .term = state_.term, + .batch_start_log_index = start_index, + .last_log_term = previous_term_from_index, + .entries = std::move(entries), + .leader_commit = state_.committed_log_size, + }; + + // request_id not necessary to set because it's not a Future-backed Request. + static constexpr RequestId request_id = 0; + + io_.Send(address, request_id, std::move(ar)); + } + } + + // Raft paper - 5.2 + // Raft uses randomized election timeouts to ensure that split votes are rare and that they are resolved quickly + Duration RandomTimeout(Duration min, Duration max) { + std::uniform_int_distribution time_distrib(min.count(), max.count()); + + const auto rand_micros = io_.Rand(time_distrib); + + return Duration{rand_micros}; + } + + Duration RandomTimeout(int min_micros, int max_micros) { + std::uniform_int_distribution time_distrib(min_micros, max_micros); + + const int rand_micros = io_.Rand(time_distrib); + + return std::chrono::microseconds{rand_micros}; + } + + Term PreviousTermFromIndex(LogIndex index) const { + if (index == 0 || state_.log.size() + 1 <= index) { + return 0; + } + + const auto &[term, data] = state_.log.at(index - 1); + return term; + } + + Term CommittedLogTerm() { + MG_ASSERT(state_.log.size() >= state_.committed_log_size); + if (state_.log.empty() || state_.committed_log_size == 0) { + return 0; + } + + const auto &[term, data] = state_.log.at(state_.committed_log_size - 1); + return term; + } + + Term LastLogTerm() const { + if (state_.log.empty()) { + return 0; + } + + const auto &[term, data] = state_.log.back(); + return term; + } + + template + void Log(Ts &&...args) { + const Time now = io_.Now(); + const auto micros = std::chrono::duration_cast(now.time_since_epoch()).count(); + const Term term = state_.term; + const std::string role_string = std::visit([&](const auto &role) { return role.ToString(); }, role_); + + std::ostringstream out; + + out << '\t' << static_cast(micros) << "\t" << term << "\t" << io_.GetAddress().last_known_port; + + out << role_string; + + (out << ... << args); + + spdlog::info(out.str()); + } + + ///////////////////////////////////////////////////////////// + /// Raft-related Cron methods + /// + /// Cron + std::visit is how events are dispatched + /// to certain code based on Raft role. + /// + /// Cron(role) takes as the first argument a reference to its + /// role, and as the second argument, the message that has + /// been received. + ///////////////////////////////////////////////////////////// + + /// Periodic protocol maintenance. + void Cron() { + // dispatch periodic logic based on our role to a specific Cron method. + std::optional new_role = std::visit([&](auto &role) { return Cron(role); }, role_); + + if (new_role) { + role_ = std::move(new_role).value(); + } + } + + // Raft paper - 5.2 + // Candidates keep sending Vote to peers until: + // 1. receiving Append with a higher term (become Follower) + // 2. receiving Vote with a higher term (become a Follower) + // 3. receiving a quorum of responses to our last batch of Vote (become a Leader) + std::optional Cron(Candidate &candidate) { + const auto now = io_.Now(); + const Duration election_timeout = RandomTimeout(kMinimumElectionTimeout, kMaximumElectionTimeout); + const auto election_timeout_us = std::chrono::duration_cast(election_timeout).count(); + + if (now - candidate.election_began > election_timeout) { + state_.term++; + Log("becoming Candidate for term ", state_.term, " after leader timeout of ", election_timeout_us, + "ms elapsed since last election attempt"); + + const VoteRequest request{ + .term = state_.term, + .log_size = state_.log.size(), + .last_log_term = LastLogTerm(), + }; + + auto outstanding_votes = std::set
(); + + for (const auto &peer : peers_) { + // request_id not necessary to set because it's not a Future-backed Request. + static constexpr auto request_id = 0; + io_.template Send(peer, request_id, request); + outstanding_votes.insert(peer); + } + + return Candidate{ + .successful_votes = std::map(), + .election_began = now, + .outstanding_votes = outstanding_votes, + }; + } + return std::nullopt; + } + + // Raft paper - 5.2 + // Followers become candidates if we haven't heard from the leader + // after a randomized timeout. + std::optional Cron(Follower &follower) { + const auto now = io_.Now(); + const auto time_since_last_append_entries = now - follower.last_received_append_entries_timestamp; + + // randomized follower timeout + const Duration election_timeout = RandomTimeout(kMinimumElectionTimeout, kMaximumElectionTimeout); + + if (time_since_last_append_entries > election_timeout) { + // become a Candidate if we haven't heard from the Leader after this timeout + return Candidate{}; + } + + return std::nullopt; + } + + // Leaders (re)send AppendRequest to followers. + std::optional Cron(Leader &leader) { + const Time now = io_.Now(); + const Duration broadcast_timeout = RandomTimeout(kMinimumBroadcastTimeout, kMaximumBroadcastTimeout); + + if (now - leader.last_broadcast > broadcast_timeout) { + BroadcastAppendEntries(leader.followers); + leader.last_broadcast = now; + } + + return std::nullopt; + } + + ///////////////////////////////////////////////////////////// + /// Raft-related Handle methods + /// + /// Handle + std::visit is how events are dispatched + /// to certain code based on Raft role. + /// + /// Handle(role, message, ...) + /// takes as the first argument a reference + /// to its role, and as the second argument, the + /// message that has been received. + ///////////////////////////////////////////////////////////// + + using ReceiveVariant = std::variant, AppendRequest, AppendResponse, + WriteRequest, VoteRequest, VoteResponse>; + + void Handle(ReceiveVariant &&message_variant, RequestId request_id, Address from_address) { + // dispatch the message to a handler based on our role, + // which can be specified in the Handle first argument, + // or it can be `auto` if it's a handler for several roles + // or messages. + std::optional new_role = std::visit( + [&](auto &&msg, auto &role) mutable { + return Handle(role, std::forward(msg), request_id, from_address); + }, + std::forward(message_variant), role_); + + // TODO(tyler) (M3) maybe replace std::visit with get_if for explicit prioritized matching, [[likely]] etc... + if (new_role) { + role_ = std::move(new_role).value(); + } + } + + // all roles can receive Vote and possibly become a follower + template + std::optional Handle(ALL & /* variable */, VoteRequest &&req, RequestId request_id, Address from_address) { + Log("received VoteRequest from ", from_address.last_known_port, " with term ", req.term); + const bool last_log_term_dominates = req.last_log_term >= LastLogTerm(); + const bool term_dominates = req.term > state_.term; + const bool log_size_dominates = req.log_size >= state_.log.size(); + const bool new_leader = last_log_term_dominates && term_dominates && log_size_dominates; + + if (new_leader) { + MG_ASSERT(req.term > state_.term); + MG_ASSERT(std::max(req.term, state_.term) == req.term); + } + + const VoteResponse res{ + .term = std::max(req.term, state_.term), + .committed_log_size = state_.committed_log_size, + .vote_granted = new_leader, + }; + + io_.Send(from_address, request_id, res); + + if (new_leader) { + // become a follower + state_.term = req.term; + return Follower{ + .last_received_append_entries_timestamp = io_.Now(), + .leader_address = from_address, + }; + } + + if (term_dominates) { + Log("received a vote from an inferior candidate. Becoming Candidate"); + state_.term = std::max(state_.term, req.term) + 1; + return Candidate{}; + } + + return std::nullopt; + } + + std::optional Handle(Candidate &candidate, VoteResponse &&res, RequestId /* variable */, Address from_address) { + Log("received VoteResponse"); + + if (!res.vote_granted || res.term != state_.term) { + Log("received unsuccessful VoteResponse from term ", res.term, " when our candidacy term is ", state_.term); + // we received a delayed VoteResponse from the past, which has to do with an election that is + // no longer valid. We can simply drop this. + return std::nullopt; + } + + MG_ASSERT(candidate.outstanding_votes.contains(from_address), + "Received unexpected VoteResponse from server not present in Candidate's outstanding_votes!"); + candidate.outstanding_votes.erase(from_address); + + MG_ASSERT(!candidate.successful_votes.contains(from_address), + "Received unexpected VoteResponse from server already in Candidate's successful_votes!"); + candidate.successful_votes.insert({from_address, res.committed_log_size}); + + if (candidate.successful_votes.size() >= candidate.outstanding_votes.size()) { + std::map followers{}; + + for (const auto &[address, committed_log_size] : candidate.successful_votes) { + FollowerTracker follower{ + .next_index = committed_log_size, + .confirmed_log_size = committed_log_size, + }; + followers.insert({address, follower}); + } + for (const auto &address : candidate.outstanding_votes) { + FollowerTracker follower{ + .next_index = state_.log.size(), + .confirmed_log_size = 0, + }; + followers.insert({address, follower}); + } + + Log("becoming Leader at term ", state_.term); + + BroadcastAppendEntries(followers); + + return Leader{ + .followers = std::move(followers), + .pending_client_requests = std::unordered_map(), + }; + } + + return std::nullopt; + } + + template + std::optional Handle(LOF & /* variable */, VoteResponse && /* variable */, RequestId /* variable */, + Address /* variable */) { + Log("non-Candidate received VoteResponse"); + return std::nullopt; + } + + template + std::optional Handle(ALL &role, AppendRequest &&req, RequestId request_id, + Address from_address) { + // log size starts out as state_.committed_log_size and only if everything is successful do we + // switch it to the log length. + AppendResponse res{ + .success = false, + .term = state_.term, + .last_log_term = CommittedLogTerm(), + .log_size = state_.log.size(), + }; + + if constexpr (std::is_same()) { + MG_ASSERT(req.term != state_.term, "Multiple leaders are acting under the term ", req.term); + } + + const bool is_candidate = std::is_same(); + const bool is_failed_competitor = is_candidate && req.term == state_.term; + const Time now = io_.Now(); + + // Raft paper - 5.2 + // While waiting for votes, a candidate may receive an + // AppendEntries RPC from another server claiming to be leader. If + // the leader’s term (included in its RPC) is at least as large as + // the candidate’s current term, then the candidate recognizes the + // leader as legitimate and returns to follower state. + if (req.term > state_.term || is_failed_competitor) { + // become follower of this leader, reply with our log status + state_.term = req.term; + + io_.Send(from_address, request_id, res); + + Log("becoming Follower of Leader ", from_address.last_known_port, " at term ", req.term); + return Follower{ + .last_received_append_entries_timestamp = now, + .leader_address = from_address, + }; + } + + if (req.term < state_.term) { + // nack this request from an old leader + io_.Send(from_address, request_id, res); + + return std::nullopt; + } + + // at this point, we're dealing with our own leader + if constexpr (std::is_same()) { + // small specialization for when we're already a Follower + MG_ASSERT(role.leader_address == from_address, "Multiple Leaders are acting under the same term number!"); + role.last_received_append_entries_timestamp = now; + } else { + Log("Somehow entered Follower-specific logic as a non-Follower"); + MG_ASSERT(false, "Somehow entered Follower-specific logic as a non-Follower"); + } + + // Handle steady-state conditions. + if (req.batch_start_log_index != state_.log.size()) { + Log("req.batch_start_log_index of ", req.batch_start_log_index, " does not match our log size of ", + state_.log.size()); + } else if (req.last_log_term != LastLogTerm()) { + Log("req.last_log_term differs from our leader term at that slot, expected: ", LastLogTerm(), " but got ", + req.last_log_term); + } else { + // happy path - Apply log + Log("applying batch of ", req.entries.size(), " entries to our log starting at index ", + req.batch_start_log_index); + + const auto resize_length = req.batch_start_log_index; + + MG_ASSERT(resize_length >= state_.committed_log_size, + "Applied history from Leader which goes back in time from our commit_index"); + + // possibly chop-off stuff that was replaced by + // things with different terms (we got data that + // hasn't reached consensus yet, which is normal) + state_.log.resize(resize_length); + + if (req.entries.size() > 0) { + auto &[first_term, op] = req.entries.at(0); + MG_ASSERT(LastLogTerm() <= first_term); + } + + state_.log.insert(state_.log.end(), std::make_move_iterator(req.entries.begin()), + std::make_move_iterator(req.entries.end())); + + MG_ASSERT(req.leader_commit >= state_.committed_log_size); + state_.committed_log_size = std::min(req.leader_commit, state_.log.size()); + + for (; state_.applied_size < state_.committed_log_size; state_.applied_size++) { + const auto &write_request = state_.log[state_.applied_size].second; + replicated_state_.Apply(write_request); + } + + res.success = true; + } + + res.last_log_term = LastLogTerm(); + res.log_size = state_.log.size(); + + Log("returning log_size of ", res.log_size); + + io_.Send(from_address, request_id, res); + + return std::nullopt; + } + + std::optional Handle(Leader &leader, AppendResponse &&res, RequestId /* variable */, Address from_address) { + if (res.term != state_.term) { + Log("received AppendResponse related to a previous term when we (presumably) were the leader"); + return std::nullopt; + } + + // TODO(tyler) when we have dynamic membership, this assert will become incorrect, but we should + // keep it in-place until then because it has bug finding value. + MG_ASSERT(leader.followers.contains(from_address), "received AppendResponse from unknown Follower"); + + // at this point, we know the term matches and we know this Follower + + FollowerTracker &follower = leader.followers.at(from_address); + + if (res.success) { + Log("got successful AppendResponse from ", from_address.last_known_port, " with log_size of ", res.log_size); + follower.next_index = std::max(follower.next_index, res.log_size); + } else { + Log("got unsuccessful AppendResponse from ", from_address.last_known_port, " with log_size of ", res.log_size); + follower.next_index = res.log_size; + } + + follower.confirmed_log_size = std::max(follower.confirmed_log_size, res.log_size); + + BumpCommitIndexAndReplyToClients(leader); + + return std::nullopt; + } + + template + std::optional Handle(FOC & /* variable */, AppendResponse && /* variable */, RequestId /* variable */, + Address /* variable */) { + // we used to be the leader, and are getting old delayed responses + return std::nullopt; + } + + ///////////////////////////////////////////////////////////// + /// RSM-related handle methods + ///////////////////////////////////////////////////////////// + + // Leaders are able to immediately respond to the requester (with a ReadResponseValue) applied to the ReplicatedState + std::optional Handle(Leader & /* variable */, ReadRequest &&req, RequestId request_id, + Address from_address) { + Log("handling ReadOperation"); + ReadOperation read_operation = req.operation; + + ReadResponseValue read_return = replicated_state_.Read(read_operation); + + const ReadResponse resp{ + .success = true, + .read_return = std::move(read_return), + .retry_leader = std::nullopt, + }; + + io_.Send(from_address, request_id, resp); + + return std::nullopt; + } + + // Candidates should respond with a failure, similar to the Candidate + WriteRequest failure below + std::optional Handle(Candidate & /* variable */, ReadRequest && /* variable */, + RequestId request_id, Address from_address) { + Log("received ReadOperation - not redirecting because no Leader is known"); + const ReadResponse res{ + .success = false, + }; + + io_.Send(from_address, request_id, res); + + Cron(); + + return std::nullopt; + } + + // Followers should respond with a redirection, similar to the Follower + WriteRequest response below + std::optional Handle(Follower &follower, ReadRequest && /* variable */, RequestId request_id, + Address from_address) { + Log("redirecting client to known Leader with port ", follower.leader_address.last_known_port); + + const ReadResponse res{ + .success = false, + .retry_leader = follower.leader_address, + }; + + io_.Send(from_address, request_id, res); + + return std::nullopt; + } + + // Raft paper - 8 + // When a client first starts up, it connects to a randomly chosen + // server. If the client’s first choice is not the leader, that + // server will reject the client’s request and supply information + // about the most recent leader it has heard from. + std::optional Handle(Follower &follower, WriteRequest && /* variable */, RequestId request_id, + Address from_address) { + Log("redirecting client to known Leader with port ", follower.leader_address.last_known_port); + + const WriteResponse res{ + .success = false, + .retry_leader = follower.leader_address, + }; + + io_.Send(from_address, request_id, res); + + return std::nullopt; + } + + std::optional Handle(Candidate & /* variable */, WriteRequest && /* variable */, + RequestId request_id, Address from_address) { + Log("received WriteRequest - not redirecting because no Leader is known"); + + const WriteResponse res{ + .success = false, + }; + + io_.Send(from_address, request_id, res); + + Cron(); + + return std::nullopt; + } + + // only leaders actually handle replication requests from clients + std::optional Handle(Leader &leader, WriteRequest &&req, RequestId request_id, + Address from_address) { + Log("handling WriteRequest"); + + // we are the leader. add item to log and send Append to peers + MG_ASSERT(state_.term >= LastLogTerm()); + state_.log.emplace_back(std::pair(state_.term, std::move(req.operation))); + + LogIndex log_index = state_.log.size() - 1; + + PendingClientRequest pcr{ + .request_id = request_id, + .address = from_address, + .received_at = io_.Now(), + }; + + leader.pending_client_requests.emplace(log_index, pcr); + + BroadcastAppendEntries(leader.followers); + + return std::nullopt; + } +}; + +}; // namespace memgraph::io::rsm diff --git a/src/io/simulator/CMakeLists.txt b/src/io/simulator/CMakeLists.txt new file mode 100644 index 000000000..1cb61d8d9 --- /dev/null +++ b/src/io/simulator/CMakeLists.txt @@ -0,0 +1,8 @@ +set(io_simulator_sources + simulator_handle.cpp) + +find_package(fmt REQUIRED) +find_package(Threads REQUIRED) + +add_library(mg-io-simulator STATIC ${io_simulator_sources}) +target_link_libraries(mg-io-simulator stdc++fs Threads::Threads fmt::fmt mg-utils) diff --git a/src/io/simulator/simulator.hpp b/src/io/simulator/simulator.hpp new file mode 100644 index 000000000..354aae6ac --- /dev/null +++ b/src/io/simulator/simulator.hpp @@ -0,0 +1,45 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include +#include + +#include "io/address.hpp" +#include "io/simulator/simulator_config.hpp" +#include "io/simulator/simulator_handle.hpp" +#include "io/simulator/simulator_transport.hpp" + +namespace memgraph::io::simulator { +class Simulator { + std::mt19937 rng_; + std::shared_ptr simulator_handle_; + + public: + explicit Simulator(SimulatorConfig config) + : rng_(std::mt19937{config.rng_seed}), simulator_handle_{std::make_shared(config)} {} + + void ShutDown() { simulator_handle_->ShutDown(); } + + Io Register(Address address) { + std::uniform_int_distribution seed_distrib; + uint64_t seed = seed_distrib(rng_); + return Io{SimulatorTransport{simulator_handle_, address, seed}, address}; + } + + void IncrementServerCountAndWaitForQuiescentState(Address address) { + simulator_handle_->IncrementServerCountAndWaitForQuiescentState(address); + } + + SimulatorStats Stats() { return simulator_handle_->Stats(); } +}; +}; // namespace memgraph::io::simulator diff --git a/src/io/simulator/simulator_config.hpp b/src/io/simulator/simulator_config.hpp new file mode 100644 index 000000000..4719488d2 --- /dev/null +++ b/src/io/simulator/simulator_config.hpp @@ -0,0 +1,30 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include + +#include "io/time.hpp" + +namespace memgraph::io::simulator { + +using memgraph::io::Time; + +struct SimulatorConfig { + uint8_t drop_percent = 0; + bool perform_timeouts = false; + bool scramble_messages = true; + uint64_t rng_seed = 0; + Time start_time = Time::min(); + Time abort_time = Time::max(); +}; +}; // namespace memgraph::io::simulator diff --git a/src/io/simulator/simulator_handle.cpp b/src/io/simulator/simulator_handle.cpp new file mode 100644 index 000000000..16b2b71a1 --- /dev/null +++ b/src/io/simulator/simulator_handle.cpp @@ -0,0 +1,142 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include "io/simulator/simulator_handle.hpp" +#include "io/address.hpp" +#include "io/errors.hpp" +#include "io/simulator/simulator_config.hpp" +#include "io/simulator/simulator_stats.hpp" +#include "io/time.hpp" +#include "io/transport.hpp" + +namespace memgraph::io::simulator { + +using memgraph::io::Duration; +using memgraph::io::Time; + +void SimulatorHandle::ShutDown() { + std::unique_lock lock(mu_); + should_shut_down_ = true; + cv_.notify_all(); +} + +bool SimulatorHandle::ShouldShutDown() const { + std::unique_lock lock(mu_); + return should_shut_down_; +} + +void SimulatorHandle::IncrementServerCountAndWaitForQuiescentState(Address address) { + std::unique_lock lock(mu_); + server_addresses_.insert(address); + + while (true) { + const size_t blocked_servers = blocked_on_receive_; + + const bool all_servers_blocked = blocked_servers == server_addresses_.size(); + + if (all_servers_blocked) { + return; + } + + cv_.wait(lock); + } +} + +bool SimulatorHandle::MaybeTickSimulator() { + std::unique_lock lock(mu_); + + const size_t blocked_servers = blocked_on_receive_; + + if (blocked_servers < server_addresses_.size()) { + // we only need to advance the simulator when all + // servers have reached a quiescent state, blocked + // on their own futures or receive methods. + return false; + } + + stats_.simulator_ticks++; + + cv_.notify_all(); + + TimeoutPromisesPastDeadline(); + + if (in_flight_.empty()) { + // return early here because there are no messages to schedule + + // We tick the clock forward when all servers are blocked but + // there are no in-flight messages to schedule delivery of. + std::poisson_distribution<> time_distrib(50); + Duration clock_advance = std::chrono::microseconds{time_distrib(rng_)}; + cluster_wide_time_microseconds_ += clock_advance; + + MG_ASSERT(cluster_wide_time_microseconds_ < config_.abort_time, + "Cluster has executed beyond its configured abort_time, and something may be failing to make progress " + "in an expected amount of time."); + return true; + } + + if (config_.scramble_messages) { + // scramble messages + std::uniform_int_distribution swap_distrib(0, in_flight_.size() - 1); + const size_t swap_index = swap_distrib(rng_); + std::swap(in_flight_[swap_index], in_flight_.back()); + } + + auto [to_address, opaque_message] = std::move(in_flight_.back()); + in_flight_.pop_back(); + + std::uniform_int_distribution drop_distrib(0, 99); + const int drop_threshold = drop_distrib(rng_); + const bool should_drop = drop_threshold < config_.drop_percent; + + if (should_drop) { + stats_.dropped_messages++; + } + + PromiseKey promise_key{.requester_address = to_address, + .request_id = opaque_message.request_id, + .replier_address = opaque_message.from_address}; + + if (promises_.contains(promise_key)) { + // complete waiting promise if it's there + DeadlineAndOpaquePromise dop = std::move(promises_.at(promise_key)); + promises_.erase(promise_key); + + const bool normal_timeout = config_.perform_timeouts && (dop.deadline < cluster_wide_time_microseconds_); + + if (should_drop || normal_timeout) { + stats_.timed_out_requests++; + dop.promise.TimeOut(); + } else { + stats_.total_responses++; + dop.promise.Fill(std::move(opaque_message)); + } + } else if (should_drop) { + // don't add it anywhere, let it drop + } else { + // add to can_receive_ if not + const auto &[om_vec, inserted] = can_receive_.try_emplace(to_address, std::vector()); + om_vec->second.emplace_back(std::move(opaque_message)); + } + + return true; +} + +Time SimulatorHandle::Now() const { + std::unique_lock lock(mu_); + return cluster_wide_time_microseconds_; +} + +SimulatorStats SimulatorHandle::Stats() { + std::unique_lock lock(mu_); + return stats_; +} +} // namespace memgraph::io::simulator diff --git a/src/io/simulator/simulator_handle.hpp b/src/io/simulator/simulator_handle.hpp new file mode 100644 index 000000000..8a9e77c58 --- /dev/null +++ b/src/io/simulator/simulator_handle.hpp @@ -0,0 +1,175 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "io/address.hpp" +#include "io/errors.hpp" +#include "io/message_conversion.hpp" +#include "io/simulator/simulator_config.hpp" +#include "io/simulator/simulator_stats.hpp" +#include "io/time.hpp" +#include "io/transport.hpp" + +namespace memgraph::io::simulator { + +class SimulatorHandle { + mutable std::mutex mu_{}; + mutable std::condition_variable cv_; + + // messages that have not yet been scheduled or dropped + std::vector> in_flight_; + + // the responses to requests that are being waited on + std::map promises_; + + // messages that are sent to servers that may later receive them + std::map> can_receive_; + + Time cluster_wide_time_microseconds_; + bool should_shut_down_ = false; + SimulatorStats stats_; + size_t blocked_on_receive_ = 0; + std::set
server_addresses_; + std::mt19937 rng_; + SimulatorConfig config_; + + void TimeoutPromisesPastDeadline() { + const Time now = cluster_wide_time_microseconds_; + + for (auto &[promise_key, dop] : promises_) { + if (dop.deadline < now) { + spdlog::debug("timing out request from requester {} to replier {}.", promise_key.requester_address.ToString(), + promise_key.replier_address.ToString()); + std::move(dop).promise.TimeOut(); + promises_.erase(promise_key); + + stats_.timed_out_requests++; + } + } + } + + public: + explicit SimulatorHandle(SimulatorConfig config) + : cluster_wide_time_microseconds_(config.start_time), rng_(config.rng_seed), config_(config) {} + + void IncrementServerCountAndWaitForQuiescentState(Address address); + + /// This method causes most of the interesting simulation logic to happen, wrt network behavior. + /// It checks to see if all background "server" threads are blocked on new messages, and if so, + /// it will decide whether to drop, reorder, or deliver in-flight messages based on the SimulatorConfig + /// that was used to create the Simulator. + bool MaybeTickSimulator(); + + void ShutDown(); + + bool ShouldShutDown() const; + + template + void SubmitRequest(Address to_address, Address from_address, RequestId request_id, Request &&request, + Duration timeout, ResponsePromise &&promise) { + std::unique_lock lock(mu_); + + const Time deadline = cluster_wide_time_microseconds_ + timeout; + + std::any message(request); + OpaqueMessage om{.to_address = to_address, + .from_address = from_address, + .request_id = request_id, + .message = std::move(message)}; + in_flight_.emplace_back(std::make_pair(to_address, std::move(om))); + + PromiseKey promise_key{.requester_address = from_address, .request_id = request_id, .replier_address = to_address}; + OpaquePromise opaque_promise(std::move(promise).ToUnique()); + DeadlineAndOpaquePromise dop{.deadline = deadline, .promise = std::move(opaque_promise)}; + promises_.emplace(std::move(promise_key), std::move(dop)); + + stats_.total_messages++; + stats_.total_requests++; + + cv_.notify_all(); + } + + template + requires(sizeof...(Ms) > 0) RequestResult Receive(const Address &receiver, Duration timeout) { + std::unique_lock lock(mu_); + + blocked_on_receive_ += 1; + + const Time deadline = cluster_wide_time_microseconds_ + timeout; + + while (!should_shut_down_ && (cluster_wide_time_microseconds_ < deadline)) { + if (can_receive_.contains(receiver)) { + std::vector &can_rx = can_receive_.at(receiver); + if (!can_rx.empty()) { + OpaqueMessage message = std::move(can_rx.back()); + can_rx.pop_back(); + + // TODO(tyler) search for item in can_receive_ that matches the desired types, rather + // than asserting that the last item in can_rx matches. + auto m_opt = std::move(message).Take(); + + blocked_on_receive_ -= 1; + + return std::move(m_opt).value(); + } + } + + lock.unlock(); + bool made_progress = MaybeTickSimulator(); + lock.lock(); + if (!should_shut_down_ && !made_progress) { + cv_.wait(lock); + } + } + + blocked_on_receive_ -= 1; + + return TimedOut{}; + } + + template + void Send(Address to_address, Address from_address, RequestId request_id, M message) { + std::unique_lock lock(mu_); + std::any message_any(std::move(message)); + OpaqueMessage om{.to_address = to_address, + .from_address = from_address, + .request_id = request_id, + .message = std::move(message_any)}; + in_flight_.emplace_back(std::make_pair(std::move(to_address), std::move(om))); + + stats_.total_messages++; + + cv_.notify_all(); + } + + Time Now() const; + + template , class Return = uint64_t> + Return Rand(D distrib) { + std::unique_lock lock(mu_); + return distrib(rng_); + } + + SimulatorStats Stats(); +}; +}; // namespace memgraph::io::simulator diff --git a/src/io/simulator/simulator_stats.hpp b/src/io/simulator/simulator_stats.hpp new file mode 100644 index 000000000..7f529a456 --- /dev/null +++ b/src/io/simulator/simulator_stats.hpp @@ -0,0 +1,25 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include + +namespace memgraph::io::simulator { +struct SimulatorStats { + uint64_t total_messages = 0; + uint64_t dropped_messages = 0; + uint64_t timed_out_requests = 0; + uint64_t total_requests = 0; + uint64_t total_responses = 0; + uint64_t simulator_ticks = 0; +}; +}; // namespace memgraph::io::simulator diff --git a/src/io/simulator/simulator_transport.hpp b/src/io/simulator/simulator_transport.hpp new file mode 100644 index 000000000..2706f798c --- /dev/null +++ b/src/io/simulator/simulator_transport.hpp @@ -0,0 +1,65 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include +#include + +#include "io/address.hpp" +#include "io/simulator/simulator_handle.hpp" +#include "io/time.hpp" + +namespace memgraph::io::simulator { + +using memgraph::io::Duration; +using memgraph::io::Time; + +class SimulatorTransport { + std::shared_ptr simulator_handle_; + const Address address_; + std::mt19937 rng_; + + public: + SimulatorTransport(std::shared_ptr simulator_handle, Address address, uint64_t seed) + : simulator_handle_(simulator_handle), address_(address), rng_(std::mt19937{seed}) {} + + template + ResponseFuture Request(Address address, uint64_t request_id, Request request, Duration timeout) { + std::function maybe_tick_simulator = [this] { return simulator_handle_->MaybeTickSimulator(); }; + auto [future, promise] = + memgraph::io::FuturePromisePairWithNotifier>(maybe_tick_simulator); + + simulator_handle_->SubmitRequest(address, address_, request_id, std::move(request), timeout, std::move(promise)); + + return std::move(future); + } + + template + requires(sizeof...(Ms) > 0) RequestResult Receive(Duration timeout) { + return simulator_handle_->template Receive(address_, timeout); + } + + template + void Send(Address to_address, Address from_address, uint64_t request_id, M message) { + return simulator_handle_->template Send(to_address, from_address, request_id, message); + } + + Time Now() const { return simulator_handle_->Now(); } + + bool ShouldShutDown() const { return simulator_handle_->ShouldShutDown(); } + + template , class Return = uint64_t> + Return Rand(D distrib) { + return distrib(rng_); + } +}; +}; // namespace memgraph::io::simulator diff --git a/src/io/time.hpp b/src/io/time.hpp new file mode 100644 index 000000000..b07f90154 --- /dev/null +++ b/src/io/time.hpp @@ -0,0 +1,21 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include + +namespace memgraph::io { + +using Duration = std::chrono::microseconds; +using Time = std::chrono::time_point; + +} // namespace memgraph::io diff --git a/src/io/transport.hpp b/src/io/transport.hpp new file mode 100644 index 000000000..31592c9c3 --- /dev/null +++ b/src/io/transport.hpp @@ -0,0 +1,135 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include +#include +#include +#include + +#include "io/address.hpp" +#include "io/errors.hpp" +#include "io/future.hpp" +#include "io/time.hpp" +#include "utils/result.hpp" + +namespace memgraph::io { + +using memgraph::utils::BasicResult; + +// TODO(tyler) ensure that Message continues to represent +// reasonable constraints around message types over time, +// as we adapt things to use Thrift-generated message types. +template +concept Message = std::same_as>; + +using RequestId = uint64_t; + +template +struct ResponseEnvelope { + M message; + RequestId request_id; + Address to_address; + Address from_address; +}; + +template +using ResponseResult = BasicResult>; + +template +using ResponseFuture = memgraph::io::Future>; + +template +using ResponsePromise = memgraph::io::Promise>; + +template +struct RequestEnvelope { + std::variant message; + RequestId request_id; + Address to_address; + Address from_address; +}; + +template +using RequestResult = BasicResult>; + +template +class Io { + I implementation_; + Address address_; + RequestId request_id_counter_ = 0; + Duration default_timeout_ = std::chrono::microseconds{50000}; + + public: + Io(I io, Address address) : implementation_(io), address_(address) {} + + /// Set the default timeout for all requests that are issued + /// without an explicit timeout set. + void SetDefaultTimeout(Duration timeout) { default_timeout_ = timeout; } + + /// Issue a request with an explicit timeout in microseconds provided. This tends to be used by clients. + template + ResponseFuture RequestWithTimeout(Address address, Request request, Duration timeout) { + const RequestId request_id = ++request_id_counter_; + return implementation_.template Request(address, request_id, request, timeout); + } + + /// Issue a request that times out after the default timeout. This tends + /// to be used by clients. + template + ResponseFuture Request(Address address, Request request) { + const RequestId request_id = ++request_id_counter_; + const Duration timeout = default_timeout_; + return implementation_.template Request(address, request_id, std::move(request), timeout); + } + + /// Wait for an explicit number of microseconds for a request of one of the + /// provided types to arrive. This tends to be used by servers. + template + RequestResult ReceiveWithTimeout(Duration timeout) { + return implementation_.template Receive(timeout); + } + + /// Wait the default number of microseconds for a request of one of the + /// provided types to arrive. This tends to be used by servers. + template + requires(sizeof...(Ms) > 0) RequestResult Receive() { + const Duration timeout = default_timeout_; + return implementation_.template Receive(timeout); + } + + /// Send a message in a best-effort fashion. This is used for messaging where + /// responses are not necessarily expected, and for servers to respond to requests. + /// If you need reliable delivery, this must be built on-top. TCP is not enough for most use cases. + template + void Send(Address to_address, RequestId request_id, M message) { + Address from_address = address_; + return implementation_.template Send(to_address, from_address, request_id, std::move(message)); + } + + /// The current system time. This time source should be preferred over any other, + /// because it lets us deterministically control clocks from tests for making + /// things like timeouts deterministic. + Time Now() const { return implementation_.Now(); } + + /// Returns true if the system should shut-down. + bool ShouldShutDown() const { return implementation_.ShouldShutDown(); } + + /// Returns a random number within the specified distribution. + template , class Return = uint64_t> + Return Rand(D distrib) { + return implementation_.template Rand(distrib); + } + + Address GetAddress() { return address_; } +}; +}; // namespace memgraph::io diff --git a/src/query/v2/requests.hpp b/src/query/v2/requests.hpp new file mode 100644 index 000000000..421a7fd54 --- /dev/null +++ b/src/query/v2/requests.hpp @@ -0,0 +1,223 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "storage/v3/id_types.hpp" +#include "storage/v3/property_value.hpp" + +/// Hybrid-logical clock +struct Hlc { + uint64_t logical_id; + using Duration = std::chrono::microseconds; + using Time = std::chrono::time_point; + Time coordinator_wall_clock; + + bool operator==(const Hlc &other) const = default; +}; + +struct Label { + size_t id; +}; + +// TODO(kostasrim) update this with CompoundKey, same for the rest of the file. +using PrimaryKey = std::vector; +using VertexId = std::pair; +using Gid = size_t; +using PropertyId = memgraph::storage::v3::PropertyId; + +struct EdgeType { + std::string name; +}; + +struct EdgeId { + VertexId id; + Gid gid; +}; + +struct Vertex { + VertexId id; + std::vector