diff --git a/src/coordinator/coordinator_worker.hpp b/src/coordinator/coordinator_worker.hpp new file mode 100644 index 000000000..46bc546cd --- /dev/null +++ b/src/coordinator/coordinator_worker.hpp @@ -0,0 +1,156 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include +#include +#include +#include +#include + +#include "coordinator/coordinator.hpp" +#include "coordinator/coordinator_rsm.hpp" +#include "coordinator/shard_map.hpp" +#include "io/address.hpp" +#include "io/future.hpp" +#include "io/messages.hpp" +#include "io/rsm/raft.hpp" +#include "io/time.hpp" +#include "io/transport.hpp" +#include "query/v2/requests.hpp" + +namespace memgraph::coordinator::coordinator_worker { + +/// Obligations: +/// * ShutDown +/// * Cron +/// * RouteMessage + +using coordinator::Coordinator; +using coordinator::CoordinatorRsm; +using io::Address; +using io::RequestId; +using io::Time; +using io::messages::CoordinatorMessages; +using msgs::ReadRequests; +using msgs::ReadResponses; +using msgs::WriteRequests; +using msgs::WriteResponses; + +struct ShutDown {}; + +struct Cron {}; + +struct RouteMessage { + CoordinatorMessages message; + RequestId request_id; + Address to; + Address from; +}; + +using Message = std::variant; + +struct QueueInner { + std::mutex mu{}; + std::condition_variable cv; + // TODO(tyler) handle simulator communication std::shared_ptr> blocked; + + // TODO(tyler) investigate using a priority queue that prioritizes messages in a way that + // improves overall QoS. For example, maybe we want to schedule raft Append messages + // ahead of Read messages or generally writes before reads for lowering the load on the + // overall system faster etc... When we do this, we need to make sure to avoid + // starvation by sometimes randomizing priorities, rather than following a strict + // prioritization. + std::deque queue; +}; + +/// There are two reasons to implement our own Queue instead of using +/// one off-the-shelf: +/// 1. we will need to know in the simulator when all threads are waiting +/// 2. we will want to implement our own priority queue within this for QoS +class Queue { + std::shared_ptr inner_ = std::make_shared(); + + public: + void Push(Message &&message) { + { + MG_ASSERT(inner_.use_count() > 0); + std::unique_lock lock(inner_->mu); + + inner_->queue.emplace_back(std::move(message)); + } // lock dropped before notifying condition variable + + inner_->cv.notify_all(); + } + + Message Pop() { + MG_ASSERT(inner_.use_count() > 0); + std::unique_lock lock(inner_->mu); + + while (inner_->queue.empty()) { + inner_->cv.wait(lock); + } + + Message message = std::move(inner_->queue.front()); + inner_->queue.pop_front(); + + return message; + } +}; + +/// A CoordinatorWorker owns Raft instances. receives messages from the MachineManager. +template +class CoordinatorWorker { + io::Io io_; + Queue queue_; + CoordinatorRsm coordinator_; + + bool Process(ShutDown && /*shut_down*/) { return false; } + + bool Process(Cron && /* cron */) { + coordinator_.Cron(); + return true; + } + + bool Process(RouteMessage &&route_message) { + coordinator_.Handle(std::move(route_message.message), route_message.request_id, route_message.from); + + return true; + } + + public: + CoordinatorWorker(io::Io io, Queue queue, Coordinator coordinator) + : io_(std::move(io)), + queue_(std::move(queue)), + coordinator_{std::move(io_.ForkLocal()), {}, std::move(coordinator)} {} + + CoordinatorWorker(CoordinatorWorker &&) noexcept = default; + CoordinatorWorker &operator=(CoordinatorWorker &&) noexcept = default; + CoordinatorWorker(const CoordinatorWorker &) = delete; + CoordinatorWorker &operator=(const CoordinatorWorker &) = delete; + ~CoordinatorWorker() = default; + + void Run() { + while (true) { + Message message = queue_.Pop(); + + const bool should_continue = std::visit( + [this](auto &&msg) { return this->Process(std::forward(msg)); }, std::move(message)); + + if (!should_continue) { + return; + } + } + } +}; + +} // namespace memgraph::coordinator::coordinator_worker diff --git a/src/io/local_transport/local_transport.hpp b/src/io/local_transport/local_transport.hpp index 4fc6a4361..258df6385 100644 --- a/src/io/local_transport/local_transport.hpp +++ b/src/io/local_transport/local_transport.hpp @@ -31,14 +31,9 @@ class LocalTransport { : local_transport_handle_(std::move(local_transport_handle)) {} template - ResponseFuture Request(Address to_address, Address from_address, RequestId request_id, RequestT request, - Duration timeout) { - auto [future, promise] = memgraph::io::FuturePromisePair>(); - - local_transport_handle_->SubmitRequest(to_address, from_address, request_id, std::move(request), timeout, - std::move(promise)); - - return std::move(future); + ResponseFuture Request(Address to_address, Address from_address, RequestT request, Duration timeout) { + return local_transport_handle_->template SubmitRequest(to_address, from_address, + std::move(request), timeout); } template @@ -61,8 +56,6 @@ class LocalTransport { return distrib(rng); } - std::unordered_map ResponseLatencies() { - return local_transport_handle_->ResponseLatencies(); - } + LatencyHistogramSummaries ResponseLatencies() { return local_transport_handle_->ResponseLatencies(); } }; }; // namespace memgraph::io::local_transport diff --git a/src/io/local_transport/local_transport_handle.hpp b/src/io/local_transport/local_transport_handle.hpp index ad257c926..2303ae735 100644 --- a/src/io/local_transport/local_transport_handle.hpp +++ b/src/io/local_transport/local_transport_handle.hpp @@ -30,6 +30,7 @@ class LocalTransportHandle { mutable std::condition_variable cv_; bool should_shut_down_ = false; MessageHistogramCollector histograms_; + RequestId request_id_counter_ = 0; // the responses to requests that are being waited on std::map promises_; @@ -56,7 +57,7 @@ class LocalTransportHandle { return should_shut_down_; } - std::unordered_map ResponseLatencies() { + LatencyHistogramSummaries ResponseLatencies() { std::unique_lock lock(mu_); return histograms_.ResponseLatencies(); } @@ -113,8 +114,7 @@ class LocalTransportHandle { .message = std::move(message_any), .type_info = type_info}; - PromiseKey promise_key{ - .requester_address = to_address, .request_id = opaque_message.request_id, .replier_address = from_address}; + PromiseKey promise_key{.requester_address = to_address, .request_id = opaque_message.request_id}; { std::unique_lock lock(mu_); @@ -139,8 +139,10 @@ class LocalTransportHandle { } template - void SubmitRequest(Address to_address, Address from_address, RequestId request_id, RequestT &&request, - Duration timeout, ResponsePromise promise) { + ResponseFuture SubmitRequest(Address to_address, Address from_address, RequestT &&request, + Duration timeout) { + auto [future, promise] = memgraph::io::FuturePromisePair>(); + const bool port_matches = to_address.last_known_port == from_address.last_known_port; const bool ip_matches = to_address.last_known_ip == from_address.last_known_ip; @@ -149,17 +151,22 @@ class LocalTransportHandle { const auto now = Now(); const Time deadline = now + timeout; + RequestId request_id = 0; { std::unique_lock lock(mu_); - PromiseKey promise_key{ - .requester_address = from_address, .request_id = request_id, .replier_address = to_address}; + request_id = ++request_id_counter_; + PromiseKey promise_key{.requester_address = from_address, .request_id = request_id}; OpaquePromise opaque_promise(std::move(promise).ToUnique()); DeadlineAndOpaquePromise dop{.requested_at = now, .deadline = deadline, .promise = std::move(opaque_promise)}; + + MG_ASSERT(!promises_.contains(promise_key)); promises_.emplace(std::move(promise_key), std::move(dop)); } // lock dropped Send(to_address, from_address, request_id, std::forward(request)); + + return std::move(future); } }; diff --git a/src/io/message_conversion.hpp b/src/io/message_conversion.hpp index 11c045123..1463abc06 100644 --- a/src/io/message_conversion.hpp +++ b/src/io/message_conversion.hpp @@ -11,6 +11,8 @@ #pragma once +#include + #include "io/transport.hpp" #include "utils/type_info_ref.hpp" @@ -19,9 +21,6 @@ namespace memgraph::io { struct PromiseKey { Address requester_address; uint64_t request_id; - // TODO(tyler) possibly remove replier_address from promise key - // once we want to support DSR. - Address replier_address; public: friend bool operator<(const PromiseKey &lhs, const PromiseKey &rhs) { @@ -29,11 +28,7 @@ struct PromiseKey { return lhs.requester_address < rhs.requester_address; } - if (lhs.request_id != rhs.request_id) { - return lhs.request_id < rhs.request_id; - } - - return lhs.replier_address < rhs.replier_address; + return lhs.request_id < rhs.request_id; } }; @@ -90,6 +85,10 @@ struct OpaqueMessage { }; } + std::string demangled_name = "\"" + boost::core::demangle(message.type().name()) + "\""; + spdlog::error("failed to cast message of type {} to expected request type (probably in Receive argument types)", + demangled_name); + return std::nullopt; } }; diff --git a/src/io/message_histogram_collector.hpp b/src/io/message_histogram_collector.hpp index 4c99f7f1f..e663be988 100644 --- a/src/io/message_histogram_collector.hpp +++ b/src/io/message_histogram_collector.hpp @@ -20,6 +20,7 @@ #include "io/time.hpp" #include "utils/histogram.hpp" #include "utils/logging.hpp" +#include "utils/print_helpers.hpp" #include "utils/type_info_ref.hpp" namespace memgraph::io { @@ -57,6 +58,35 @@ struct LatencyHistogramSummary { } }; +struct LatencyHistogramSummaries { + std::unordered_map latencies; + + std::string SummaryTable() { + std::string output; + + const auto row = [&output](const auto &c1, const auto &c2, const auto &c3, const auto &c4, const auto &c5, + const auto &c6, const auto &c7) { + output += + fmt::format("{: >50} | {: >8} | {: >8} | {: >8} | {: >8} | {: >8} | {: >8}\n", c1, c2, c3, c4, c5, c6, c7); + }; + row("name", "count", "min (μs)", "med (μs)", "p99 (μs)", "max (μs)", "sum (ms)"); + + for (const auto &[name, histo] : latencies) { + row(name, histo.count, histo.p0.count(), histo.p50.count(), histo.p99.count(), histo.p100.count(), + histo.sum.count() / 1000); + } + + output += "\n"; + return output; + } + + friend std::ostream &operator<<(std::ostream &in, const LatencyHistogramSummaries &histo) { + using memgraph::utils::print_helpers::operator<<; + in << histo.latencies; + return in; + } +}; + class MessageHistogramCollector { std::unordered_map histograms_; @@ -66,7 +96,7 @@ class MessageHistogramCollector { histo.Measure(duration.count()); } - std::unordered_map ResponseLatencies() { + LatencyHistogramSummaries ResponseLatencies() { std::unordered_map ret{}; for (const auto &[type_id, histo] : histograms_) { @@ -90,7 +120,7 @@ class MessageHistogramCollector { ret.emplace(demangled_name, latency_histogram_summary); } - return ret; + return LatencyHistogramSummaries{.latencies = ret}; } }; diff --git a/src/io/rsm/raft.hpp b/src/io/rsm/raft.hpp index 8d825f9e9..eccbf031b 100644 --- a/src/io/rsm/raft.hpp +++ b/src/io/rsm/raft.hpp @@ -22,6 +22,8 @@ #include #include +#include + #include "io/message_conversion.hpp" #include "io/simulator/simulator.hpp" #include "io/transport.hpp" @@ -109,6 +111,16 @@ utils::TypeInfoRef TypeInfoFor(const WriteResponse & /* write_respo return typeid(WriteReturn); } +template +utils::TypeInfoRef TypeInfoFor(const WriteRequest & /* write_request */) { + return typeid(WriteOperation); +} + +template +utils::TypeInfoRef TypeInfoFor(const WriteRequest> &write_request) { + return TypeInfoForVariant(write_request.operation); +} + /// AppendRequest is a raft-level message that the Leader /// periodically broadcasts to all Follower peers. This /// serves three main roles: @@ -569,7 +581,7 @@ class Raft { const Time now = io_.Now(); const Duration broadcast_timeout = RandomTimeout(kMinimumBroadcastTimeout, kMaximumBroadcastTimeout); - if (now - leader.last_broadcast > broadcast_timeout) { + if (now > leader.last_broadcast + broadcast_timeout) { BroadcastAppendEntries(leader.followers); leader.last_broadcast = now; } @@ -918,7 +930,9 @@ class Raft { // only leaders actually handle replication requests from clients std::optional Handle(Leader &leader, WriteRequest &&req, RequestId request_id, Address from_address) { - Log("handling WriteRequest"); + auto type_info = TypeInfoFor(req); + std::string demangled_name = boost::core::demangle(type_info.get().name()); + Log("handling WriteRequest<" + demangled_name + ">"); // we are the leader. add item to log and send Append to peers MG_ASSERT(state_.term >= LastLogTerm()); diff --git a/src/io/simulator/simulator_handle.cpp b/src/io/simulator/simulator_handle.cpp index d71ecd0a9..74925812e 100644 --- a/src/io/simulator/simulator_handle.cpp +++ b/src/io/simulator/simulator_handle.cpp @@ -31,7 +31,7 @@ bool SimulatorHandle::ShouldShutDown() const { return should_shut_down_; } -std::unordered_map SimulatorHandle::ResponseLatencies() { +LatencyHistogramSummaries SimulatorHandle::ResponseLatencies() { std::unique_lock lock(mu_); return histograms_.ResponseLatencies(); } @@ -108,9 +108,7 @@ bool SimulatorHandle::MaybeTickSimulator() { stats_.dropped_messages++; } - PromiseKey promise_key{.requester_address = to_address, - .request_id = opaque_message.request_id, - .replier_address = opaque_message.from_address}; + PromiseKey promise_key{.requester_address = to_address, .request_id = opaque_message.request_id}; if (promises_.contains(promise_key)) { // complete waiting promise if it's there diff --git a/src/io/simulator/simulator_handle.hpp b/src/io/simulator/simulator_handle.hpp index 75bf8fb5e..3420786c7 100644 --- a/src/io/simulator/simulator_handle.hpp +++ b/src/io/simulator/simulator_handle.hpp @@ -56,14 +56,14 @@ class SimulatorHandle { std::uniform_int_distribution drop_distrib_{0, 99}; SimulatorConfig config_; MessageHistogramCollector histograms_; + RequestId request_id_counter_{0}; void TimeoutPromisesPastDeadline() { const Time now = cluster_wide_time_microseconds_; for (auto it = promises_.begin(); it != promises_.end();) { auto &[promise_key, dop] = *it; if (dop.deadline < now && config_.perform_timeouts) { - spdlog::info("timing out request from requester {} to replier {}.", promise_key.requester_address.ToString(), - promise_key.replier_address.ToString()); + spdlog::info("timing out request from requester {}.", promise_key.requester_address.ToString()); std::move(dop).promise.TimeOut(); it = promises_.erase(it); @@ -78,7 +78,7 @@ class SimulatorHandle { explicit SimulatorHandle(SimulatorConfig config) : cluster_wide_time_microseconds_(config.start_time), rng_(config.rng_seed), config_(config) {} - std::unordered_map ResponseLatencies(); + LatencyHistogramSummaries ResponseLatencies(); ~SimulatorHandle() { for (auto it = promises_.begin(); it != promises_.end();) { @@ -101,12 +101,17 @@ class SimulatorHandle { bool ShouldShutDown() const; template - void SubmitRequest(Address to_address, Address from_address, RequestId request_id, Request &&request, - Duration timeout, ResponsePromise &&promise) { + ResponseFuture SubmitRequest(Address to_address, Address from_address, Request &&request, Duration timeout, + std::function &&maybe_tick_simulator) { auto type_info = TypeInfoFor(request); + auto [future, promise] = memgraph::io::FuturePromisePairWithNotifier>( + std::forward>(maybe_tick_simulator)); + std::unique_lock lock(mu_); + RequestId request_id = ++request_id_counter_; + const Time deadline = cluster_wide_time_microseconds_ + timeout; std::any message(request); @@ -117,19 +122,24 @@ class SimulatorHandle { .type_info = type_info}; in_flight_.emplace_back(std::make_pair(to_address, std::move(om))); - PromiseKey promise_key{.requester_address = from_address, .request_id = request_id, .replier_address = to_address}; + PromiseKey promise_key{.requester_address = from_address, .request_id = request_id}; OpaquePromise opaque_promise(std::move(promise).ToUnique()); DeadlineAndOpaquePromise dop{ .requested_at = cluster_wide_time_microseconds_, .deadline = deadline, .promise = std::move(opaque_promise), }; + + MG_ASSERT(!promises_.contains(promise_key)); + promises_.emplace(std::move(promise_key), std::move(dop)); stats_.total_messages++; stats_.total_requests++; cv_.notify_all(); + + return std::move(future); } template diff --git a/src/io/simulator/simulator_transport.hpp b/src/io/simulator/simulator_transport.hpp index f1c68230d..5e5a24aa9 100644 --- a/src/io/simulator/simulator_transport.hpp +++ b/src/io/simulator/simulator_transport.hpp @@ -33,16 +33,11 @@ class SimulatorTransport { : simulator_handle_(simulator_handle), address_(address), rng_(std::mt19937{seed}) {} template - ResponseFuture Request(Address to_address, Address from_address, uint64_t request_id, RequestT request, - Duration timeout) { + ResponseFuture Request(Address to_address, Address from_address, RequestT request, Duration timeout) { std::function maybe_tick_simulator = [this] { return simulator_handle_->MaybeTickSimulator(); }; - auto [future, promise] = - memgraph::io::FuturePromisePairWithNotifier>(maybe_tick_simulator); - simulator_handle_->SubmitRequest(to_address, from_address, request_id, std::move(request), timeout, - std::move(promise)); - - return std::move(future); + return simulator_handle_->template SubmitRequest(to_address, from_address, std::move(request), + timeout, std::move(maybe_tick_simulator)); } template @@ -64,8 +59,6 @@ class SimulatorTransport { return distrib(rng_); } - std::unordered_map ResponseLatencies() { - return simulator_handle_->ResponseLatencies(); - } + LatencyHistogramSummaries ResponseLatencies() { return simulator_handle_->ResponseLatencies(); } }; }; // namespace memgraph::io::simulator diff --git a/src/io/transport.hpp b/src/io/transport.hpp index a1a337ddb..2abf10af2 100644 --- a/src/io/transport.hpp +++ b/src/io/transport.hpp @@ -68,7 +68,6 @@ template class Io { I implementation_; Address address_; - RequestId request_id_counter_ = 0; Duration default_timeout_ = std::chrono::microseconds{100000}; public: @@ -84,20 +83,17 @@ class Io { /// Issue a request with an explicit timeout in microseconds provided. This tends to be used by clients. template ResponseFuture RequestWithTimeout(Address address, RequestT request, Duration timeout) { - const RequestId request_id = ++request_id_counter_; const Address from_address = address_; - return implementation_.template Request(address, from_address, request_id, request, timeout); + return implementation_.template Request(address, from_address, request, timeout); } /// Issue a request that times out after the default timeout. This tends /// to be used by clients. template ResponseFuture Request(Address to_address, RequestT request) { - const RequestId request_id = ++request_id_counter_; const Duration timeout = default_timeout_; const Address from_address = address_; - return implementation_.template Request(to_address, from_address, request_id, - std::move(request), timeout); + return implementation_.template Request(to_address, from_address, std::move(request), timeout); } /// Wait for an explicit number of microseconds for a request of one of the @@ -143,8 +139,6 @@ class Io { Io ForkLocal() { return Io(implementation_, address_.ForkUniqueAddress()); } - std::unordered_map ResponseLatencies() { - return implementation_.ResponseLatencies(); - } + LatencyHistogramSummaries ResponseLatencies() { return implementation_.ResponseLatencies(); } }; }; // namespace memgraph::io diff --git a/src/machine_manager/machine_config.hpp b/src/machine_manager/machine_config.hpp index 6e46d2b83..52711642b 100644 --- a/src/machine_manager/machine_config.hpp +++ b/src/machine_manager/machine_config.hpp @@ -11,7 +11,11 @@ #pragma once +#include +#include + #include + #include "io/address.hpp" #include "storage/v3/property_value.hpp" #include "storage/v3/schemas.hpp" @@ -37,6 +41,7 @@ struct MachineConfig { bool is_query_engine; boost::asio::ip::address listen_ip; uint16_t listen_port; + size_t shard_worker_threads = std::max(static_cast(1), std::thread::hardware_concurrency()); }; } // namespace memgraph::machine_manager diff --git a/src/machine_manager/machine_manager.hpp b/src/machine_manager/machine_manager.hpp index fc4d903a4..f9ea8ff2a 100644 --- a/src/machine_manager/machine_manager.hpp +++ b/src/machine_manager/machine_manager.hpp @@ -11,39 +11,43 @@ #pragma once -#include -#include -#include -#include -#include -#include -#include +#include "coordinator/coordinator_rsm.hpp" +#include "coordinator/coordinator_worker.hpp" +#include "io/message_conversion.hpp" +#include "io/messages.hpp" +#include "io/rsm/rsm_client.hpp" +#include "io/time.hpp" +#include "machine_manager/machine_config.hpp" +#include "storage/v3/shard_manager.hpp" namespace memgraph::machine_manager { -using memgraph::coordinator::Coordinator; -using memgraph::coordinator::CoordinatorReadRequests; -using memgraph::coordinator::CoordinatorReadResponses; -using memgraph::coordinator::CoordinatorRsm; -using memgraph::coordinator::CoordinatorWriteRequests; -using memgraph::coordinator::CoordinatorWriteResponses; -using memgraph::io::ConvertVariant; -using memgraph::io::Duration; -using memgraph::io::RequestId; -using memgraph::io::Time; -using memgraph::io::messages::CoordinatorMessages; -using memgraph::io::messages::ShardManagerMessages; -using memgraph::io::messages::ShardMessages; -using memgraph::io::messages::StorageReadRequest; -using memgraph::io::messages::StorageWriteRequest; -using memgraph::io::rsm::AppendRequest; -using memgraph::io::rsm::AppendResponse; -using memgraph::io::rsm::ReadRequest; -using memgraph::io::rsm::VoteRequest; -using memgraph::io::rsm::VoteResponse; -using memgraph::io::rsm::WriteRequest; -using memgraph::io::rsm::WriteResponse; -using memgraph::storage::v3::ShardManager; +using coordinator::Coordinator; +using coordinator::CoordinatorReadRequests; +using coordinator::CoordinatorReadResponses; +using coordinator::CoordinatorRsm; +using coordinator::CoordinatorWriteRequests; +using coordinator::CoordinatorWriteResponses; +using coordinator::coordinator_worker::CoordinatorWorker; +using CoordinatorRouteMessage = coordinator::coordinator_worker::RouteMessage; +using CoordinatorQueue = coordinator::coordinator_worker::Queue; +using io::ConvertVariant; +using io::Duration; +using io::RequestId; +using io::Time; +using io::messages::CoordinatorMessages; +using io::messages::ShardManagerMessages; +using io::messages::ShardMessages; +using io::messages::StorageReadRequest; +using io::messages::StorageWriteRequest; +using io::rsm::AppendRequest; +using io::rsm::AppendResponse; +using io::rsm::ReadRequest; +using io::rsm::VoteRequest; +using io::rsm::VoteResponse; +using io::rsm::WriteRequest; +using io::rsm::WriteResponse; +using storage::v3::ShardManager; /// The MachineManager is responsible for: /// * starting the entire system and ensuring that high-level @@ -62,7 +66,9 @@ template class MachineManager { io::Io io_; MachineConfig config_; - CoordinatorRsm coordinator_; + Address coordinator_address_; + CoordinatorQueue coordinator_queue_; + std::jthread coordinator_handle_; ShardManager shard_manager_; Time next_cron_ = Time::min(); @@ -72,10 +78,27 @@ class MachineManager { MachineManager(io::Io io, MachineConfig config, Coordinator coordinator) : io_(io), config_(config), - coordinator_{std::move(io.ForkLocal()), {}, std::move(coordinator)}, - shard_manager_{io.ForkLocal(), coordinator_.GetAddress()} {} + coordinator_address_(io.GetAddress().ForkUniqueAddress()), + shard_manager_{io.ForkLocal(), config.shard_worker_threads, coordinator_address_} { + auto coordinator_io = io.ForkLocal(); + coordinator_io.SetAddress(coordinator_address_); + CoordinatorWorker coordinator_worker{coordinator_io, coordinator_queue_, coordinator}; + coordinator_handle_ = std::jthread([coordinator = std::move(coordinator_worker)]() mutable { coordinator.Run(); }); + } - Address CoordinatorAddress() { return coordinator_.GetAddress(); } + MachineManager(MachineManager &&) noexcept = default; + MachineManager &operator=(MachineManager &&) noexcept = default; + MachineManager(const MachineManager &) = delete; + MachineManager &operator=(const MachineManager &) = delete; + + ~MachineManager() { + if (coordinator_handle_.joinable()) { + coordinator_queue_.Push(coordinator::coordinator_worker::ShutDown{}); + coordinator_handle_.join(); + } + } + + Address CoordinatorAddress() { return coordinator_address_; } void Run() { while (!io_.ShouldShutDown()) { @@ -85,7 +108,7 @@ class MachineManager { next_cron_ = Cron(); } - Duration receive_timeout = next_cron_ - now; + Duration receive_timeout = std::max(next_cron_, now) - now; // Note: this parameter pack must be kept in-sync with the ReceiveWithTimeout parameter pack below using AllMessages = @@ -113,7 +136,7 @@ class MachineManager { spdlog::info("MM got message to {}", request_envelope.to_address.ToString()); // If message is for the coordinator, cast it to subset and pass it to the coordinator - bool to_coordinator = coordinator_.GetAddress() == request_envelope.to_address; + bool to_coordinator = coordinator_address_ == request_envelope.to_address; if (to_coordinator) { std::optional conversion_attempt = ConvertVariant, AppendRequest, @@ -126,8 +149,13 @@ class MachineManager { CoordinatorMessages &&cm = std::move(conversion_attempt.value()); - coordinator_.Handle(std::forward(cm), request_envelope.request_id, - request_envelope.from_address); + CoordinatorRouteMessage route_message{ + .message = std::move(cm), + .request_id = request_envelope.request_id, + .to = request_envelope.to_address, + .from = request_envelope.from_address, + }; + coordinator_queue_.Push(std::move(route_message)); continue; } @@ -168,6 +196,7 @@ class MachineManager { private: Time Cron() { spdlog::info("running MachineManager::Cron, address {}", io_.GetAddress().ToString()); + coordinator_queue_.Push(coordinator::coordinator_worker::Cron{}); return shard_manager_.Cron(); } }; diff --git a/src/storage/v3/shard_manager.hpp b/src/storage/v3/shard_manager.hpp index b379bf616..a3dc8b9b6 100644 --- a/src/storage/v3/shard_manager.hpp +++ b/src/storage/v3/shard_manager.hpp @@ -13,47 +13,50 @@ #include #include +#include +#include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include "coordinator/coordinator.hpp" #include "coordinator/shard_map.hpp" +#include "io/address.hpp" +#include "io/message_conversion.hpp" +#include "io/messages.hpp" +#include "io/rsm/raft.hpp" +#include "io/time.hpp" +#include "io/transport.hpp" +#include "query/v2/requests.hpp" #include "storage/v3/config.hpp" +#include "storage/v3/shard.hpp" +#include "storage/v3/shard_rsm.hpp" +#include "storage/v3/shard_worker.hpp" namespace memgraph::storage::v3 { using boost::uuids::uuid; -using memgraph::coordinator::CoordinatorWriteRequests; -using memgraph::coordinator::CoordinatorWriteResponses; -using memgraph::coordinator::HeartbeatRequest; -using memgraph::coordinator::HeartbeatResponse; -using memgraph::io::Address; -using memgraph::io::Duration; -using memgraph::io::Message; -using memgraph::io::RequestId; -using memgraph::io::ResponseFuture; -using memgraph::io::Time; -using memgraph::io::messages::CoordinatorMessages; -using memgraph::io::messages::ShardManagerMessages; -using memgraph::io::messages::ShardMessages; -using memgraph::io::rsm::Raft; -using memgraph::io::rsm::WriteRequest; -using memgraph::io::rsm::WriteResponse; -using memgraph::msgs::ReadRequests; -using memgraph::msgs::ReadResponses; -using memgraph::msgs::WriteRequests; -using memgraph::msgs::WriteResponses; -using memgraph::storage::v3::ShardRsm; +using coordinator::CoordinatorWriteRequests; +using coordinator::CoordinatorWriteResponses; +using coordinator::HeartbeatRequest; +using coordinator::HeartbeatResponse; +using io::Address; +using io::Duration; +using io::Message; +using io::RequestId; +using io::ResponseFuture; +using io::Time; +using io::messages::CoordinatorMessages; +using io::messages::ShardManagerMessages; +using io::messages::ShardMessages; +using io::rsm::Raft; +using io::rsm::WriteRequest; +using io::rsm::WriteResponse; +using msgs::ReadRequests; +using msgs::ReadResponses; +using msgs::WriteRequests; +using msgs::WriteResponses; +using storage::v3::ShardRsm; using ShardManagerOrRsmMessage = std::variant; using TimeUuidPair = std::pair; @@ -77,7 +80,71 @@ static_assert(kMinimumCronInterval < kMaximumCronInterval, template class ShardManager { public: - ShardManager(io::Io io, Address coordinator_leader) : io_(io), coordinator_leader_(coordinator_leader) {} + ShardManager(io::Io io, size_t shard_worker_threads, Address coordinator_leader) + : io_(io), coordinator_leader_(coordinator_leader) { + MG_ASSERT(shard_worker_threads >= 1); + + for (int i = 0; i < shard_worker_threads; i++) { + shard_worker::Queue queue; + shard_worker::ShardWorker worker{io, queue}; + auto worker_handle = std::jthread([worker = std::move(worker)]() mutable { worker.Run(); }); + + workers_.emplace_back(queue); + worker_handles_.emplace_back(std::move(worker_handle)); + worker_rsm_counts_.emplace_back(0); + } + } + + ShardManager(ShardManager &&) noexcept = default; + ShardManager &operator=(ShardManager &&) noexcept = default; + ShardManager(const ShardManager &) = delete; + ShardManager &operator=(const ShardManager &) = delete; + + ~ShardManager() { + for (auto worker : workers_) { + worker.Push(shard_worker::ShutDown{}); + } + + workers_.clear(); + + // The jthread handes for our shard worker threads will be + // blocked on implicitly when worker_handles_ is destroyed. + } + + size_t UuidToWorkerIndex(const uuid &to) { + if (rsm_worker_mapping_.contains(to)) { + return rsm_worker_mapping_.at(to); + } + + // We will now create a mapping for this (probably new) shard + // by choosing the worker with the lowest number of existing + // mappings. + + size_t min_index = 0; + size_t min_count = worker_rsm_counts_.at(min_index); + + for (int i = 0; i < worker_rsm_counts_.size(); i++) { + size_t worker_count = worker_rsm_counts_.at(i); + if (worker_count <= min_count) { + min_count = worker_count; + min_index = i; + } + } + + worker_rsm_counts_[min_index]++; + rsm_worker_mapping_.emplace(to, min_index); + + return min_index; + } + + void SendToWorkerByIndex(size_t worker_index, shard_worker::Message &&message) { + workers_[worker_index].Push(std::forward(message)); + } + + void SendToWorkerByUuid(const uuid &to, shard_worker::Message &&message) { + size_t worker_index = UuidToWorkerIndex(to); + SendToWorkerByIndex(worker_index, std::forward(message)); + } /// Periodic protocol maintenance. Returns the time that Cron should be called again /// in the future. @@ -85,33 +152,23 @@ class ShardManager { spdlog::info("running ShardManager::Cron, address {}", io_.GetAddress().ToString()); Time now = io_.Now(); - if (now >= next_cron_) { + if (now >= next_reconciliation_) { Reconciliation(); std::uniform_int_distribution time_distrib(kMinimumCronInterval.count(), kMaximumCronInterval.count()); const auto rand = io_.Rand(time_distrib); - next_cron_ = now + Duration{rand}; + next_reconciliation_ = now + Duration{rand}; } - if (!cron_schedule_.empty()) { - const auto &[time, uuid] = cron_schedule_.top(); - - if (time <= now) { - auto &rsm = rsm_map_.at(uuid); - Time next_for_uuid = rsm.Cron(); - - cron_schedule_.pop(); - cron_schedule_.push(std::make_pair(next_for_uuid, uuid)); - - const auto &[next_time, _uuid] = cron_schedule_.top(); - - return std::min(next_cron_, next_time); - } + for (auto &worker : workers_) { + worker.Push(shard_worker::Cron{}); } - return next_cron_; + Time next_worker_cron = now + std::chrono::milliseconds(500); + + return std::min(next_worker_cron, next_reconciliation_); } /// Returns the Address for our underlying Io implementation @@ -125,16 +182,21 @@ class ShardManager { MG_ASSERT(address.last_known_port == to.last_known_port); MG_ASSERT(address.last_known_ip == to.last_known_ip); - auto &rsm = rsm_map_.at(to.unique_id); - - rsm.Handle(std::forward(sm), request_id, from); + SendToWorkerByUuid(to.unique_id, shard_worker::RouteMessage{ + .message = std::move(sm), + .request_id = request_id, + .to = to, + .from = from, + }); } private: io::Io io_; - std::map> rsm_map_; - std::priority_queue, std::vector>, std::greater<>> cron_schedule_; - Time next_cron_ = Time::min(); + std::vector workers_; + std::vector worker_handles_; + std::vector worker_rsm_counts_; + std::unordered_map> rsm_worker_mapping_; + Time next_reconciliation_ = Time::min(); Address coordinator_leader_; std::optional>> heartbeat_res_; @@ -188,40 +250,23 @@ class ShardManager { } void EnsureShardsInitialized(HeartbeatResponse hr) { - for (const auto &shard_to_initialize : hr.shards_to_initialize) { - InitializeRsm(shard_to_initialize); - initialized_but_not_confirmed_rsm_.emplace(shard_to_initialize.uuid); + for (const auto &to_init : hr.shards_to_initialize) { + initialized_but_not_confirmed_rsm_.emplace(to_init.uuid); + + if (rsm_worker_mapping_.contains(to_init.uuid)) { + // it's not a bug for the coordinator to send us UUIDs that we have + // already created, because there may have been lag that caused + // the coordinator not to hear back from us. + return; + } + + size_t worker_index = UuidToWorkerIndex(to_init.uuid); + + SendToWorkerByIndex(worker_index, to_init); + + rsm_worker_mapping_.emplace(to_init.uuid, worker_index); } } - - /// Returns true if the RSM was able to be initialized, and false if it was already initialized - void InitializeRsm(coordinator::ShardToInitialize to_init) { - if (rsm_map_.contains(to_init.uuid)) { - // it's not a bug for the coordinator to send us UUIDs that we have - // already created, because there may have been lag that caused - // the coordinator not to hear back from us. - return; - } - - auto rsm_io = io_.ForkLocal(); - auto io_addr = rsm_io.GetAddress(); - io_addr.unique_id = to_init.uuid; - rsm_io.SetAddress(io_addr); - - // TODO(tyler) get peers from Coordinator in HeartbeatResponse - std::vector
rsm_peers = {}; - - std::unique_ptr shard = std::make_unique(to_init.label_id, to_init.min_key, to_init.max_key, - to_init.schema, to_init.config, to_init.id_to_names); - - ShardRsm rsm_state{std::move(shard)}; - - ShardRaft rsm{std::move(rsm_io), rsm_peers, std::move(rsm_state)}; - - spdlog::info("SM created a new shard with UUID {}", to_init.uuid); - - rsm_map_.emplace(to_init.uuid, std::move(rsm)); - } }; } // namespace memgraph::storage::v3 diff --git a/src/storage/v3/shard_worker.hpp b/src/storage/v3/shard_worker.hpp new file mode 100644 index 000000000..46e02e6cc --- /dev/null +++ b/src/storage/v3/shard_worker.hpp @@ -0,0 +1,224 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include +#include +#include +#include +#include + +#include + +#include "coordinator/coordinator.hpp" +#include "coordinator/shard_map.hpp" +#include "io/address.hpp" +#include "io/future.hpp" +#include "io/messages.hpp" +#include "io/rsm/raft.hpp" +#include "io/time.hpp" +#include "io/transport.hpp" +#include "query/v2/requests.hpp" +#include "storage/v3/shard_rsm.hpp" + +namespace memgraph::storage::v3::shard_worker { + +/// Obligations: +/// * ShutDown +/// * Cron +/// * RouteMessage +/// * ShardToInitialize + +using boost::uuids::uuid; + +using coordinator::ShardToInitialize; +using io::Address; +using io::RequestId; +using io::Time; +using io::messages::ShardMessages; +using io::rsm::Raft; +using msgs::ReadRequests; +using msgs::ReadResponses; +using msgs::WriteRequests; +using msgs::WriteResponses; +using storage::v3::ShardRsm; + +template +using ShardRaft = Raft; + +struct ShutDown {}; + +struct Cron {}; + +struct RouteMessage { + ShardMessages message; + RequestId request_id; + Address to; + Address from; +}; + +using Message = std::variant; + +struct QueueInner { + std::mutex mu{}; + std::condition_variable cv; + // TODO(tyler) handle simulator communication std::shared_ptr> blocked; + + // TODO(tyler) investigate using a priority queue that prioritizes messages in a way that + // improves overall QoS. For example, maybe we want to schedule raft Append messages + // ahead of Read messages or generally writes before reads for lowering the load on the + // overall system faster etc... When we do this, we need to make sure to avoid + // starvation by sometimes randomizing priorities, rather than following a strict + // prioritization. + std::deque queue; +}; + +/// There are two reasons to implement our own Queue instead of using +/// one off-the-shelf: +/// 1. we will need to know in the simulator when all threads are waiting +/// 2. we will want to implement our own priority queue within this for QoS +class Queue { + std::shared_ptr inner_ = std::make_shared(); + + public: + void Push(Message &&message) { + { + MG_ASSERT(inner_.use_count() > 0); + std::unique_lock lock(inner_->mu); + + inner_->queue.emplace_back(std::forward(message)); + } // lock dropped before notifying condition variable + + inner_->cv.notify_all(); + } + + Message Pop() { + MG_ASSERT(inner_.use_count() > 0); + std::unique_lock lock(inner_->mu); + + while (inner_->queue.empty()) { + inner_->cv.wait(lock); + } + + Message message = std::move(inner_->queue.front()); + inner_->queue.pop_front(); + + return message; + } +}; + +/// A ShardWorker owns Raft instances. receives messages from the ShardManager. +template +class ShardWorker { + io::Io io_; + Queue queue_; + std::priority_queue, std::vector>, std::greater<>> cron_schedule_; + Time next_cron_ = Time::min(); + std::map> rsm_map_; + + bool Process(ShutDown && /* shut_down */) { return false; } + + bool Process(Cron && /* cron */) { + Cron(); + return true; + } + + bool Process(ShardToInitialize &&shard_to_initialize) { + InitializeRsm(std::forward(shard_to_initialize)); + + return true; + } + + bool Process(RouteMessage &&route_message) { + auto &rsm = rsm_map_.at(route_message.to.unique_id); + + rsm.Handle(std::move(route_message.message), route_message.request_id, route_message.from); + + return true; + } + + Time Cron() { + spdlog::info("running ShardWorker::Cron, address {}", io_.GetAddress().ToString()); + Time now = io_.Now(); + + while (!cron_schedule_.empty()) { + const auto &[time, uuid] = cron_schedule_.top(); + + if (time <= now) { + auto &rsm = rsm_map_.at(uuid); + Time next_for_uuid = rsm.Cron(); + + cron_schedule_.pop(); + cron_schedule_.push(std::make_pair(next_for_uuid, uuid)); + } else { + return time; + } + } + + return now + std::chrono::microseconds(1000); + } + + void InitializeRsm(ShardToInitialize to_init) { + if (rsm_map_.contains(to_init.uuid)) { + // it's not a bug for the coordinator to send us UUIDs that we have + // already created, because there may have been lag that caused + // the coordinator not to hear back from us. + return; + } + + auto rsm_io = io_.ForkLocal(); + auto io_addr = rsm_io.GetAddress(); + io_addr.unique_id = to_init.uuid; + rsm_io.SetAddress(io_addr); + + // TODO(tyler) get peers from Coordinator in HeartbeatResponse + std::vector
rsm_peers = {}; + + std::unique_ptr shard = std::make_unique(to_init.label_id, to_init.min_key, to_init.max_key, + to_init.schema, to_init.config, to_init.id_to_names); + + ShardRsm rsm_state{std::move(shard)}; + + ShardRaft rsm{std::move(rsm_io), rsm_peers, std::move(rsm_state)}; + + spdlog::info("SM created a new shard with UUID {}", to_init.uuid); + + // perform an initial Cron call for the new RSM + Time next_cron = rsm.Cron(); + cron_schedule_.push(std::make_pair(next_cron, to_init.uuid)); + + rsm_map_.emplace(to_init.uuid, std::move(rsm)); + } + + public: + ShardWorker(io::Io io, Queue queue) : io_(io), queue_(queue) {} + ShardWorker(ShardWorker &&) noexcept = default; + ShardWorker &operator=(ShardWorker &&) noexcept = default; + ShardWorker(const ShardWorker &) = delete; + ShardWorker &operator=(const ShardWorker &) = delete; + ~ShardWorker() = default; + + void Run() { + while (true) { + Message message = queue_.Pop(); + + const bool should_continue = + std::visit([&](auto &&msg) { return Process(std::forward(msg)); }, std::move(message)); + + if (!should_continue) { + return; + } + } + } +}; + +} // namespace memgraph::storage::v3::shard_worker diff --git a/tests/simulation/cluster_property_test.cpp b/tests/simulation/cluster_property_test.cpp index 0de5c21fc..48327be5b 100644 --- a/tests/simulation/cluster_property_test.cpp +++ b/tests/simulation/cluster_property_test.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include "generated_operations.hpp" #include "io/simulator/simulator_config.hpp" @@ -35,6 +36,8 @@ using storage::v3::kMaximumCronInterval; RC_GTEST_PROP(RandomClusterConfig, HappyPath, (ClusterConfig cluster_config, NonEmptyOpVec ops)) { // TODO(tyler) set abort_time to something more restrictive than Time::max() + spdlog::cfg::load_env_levels(); + SimulatorConfig sim_config{ .drop_percent = 0, .perform_timeouts = false, diff --git a/tests/simulation/test_cluster.hpp b/tests/simulation/test_cluster.hpp index b87bda998..6a32a391d 100644 --- a/tests/simulation/test_cluster.hpp +++ b/tests/simulation/test_cluster.hpp @@ -194,6 +194,22 @@ void ExecuteOp(msgs::ShardRequestManager &shard_request_mana } } +/// This struct exists as a way of detaching +/// a thread if something causes an uncaught +/// exception - because that thread would not +/// receive a ShutDown message otherwise, and +/// would cause the test to hang forever. +struct DetachIfDropped { + std::jthread &handle; + bool detach = true; + + ~DetachIfDropped() { + if (detach && handle.joinable()) { + handle.detach(); + } + } +}; + void RunClusterSimulation(const SimulatorConfig &sim_config, const ClusterConfig &cluster_config, const std::vector &ops) { spdlog::info("========================== NEW SIMULATION =========================="); @@ -217,9 +233,7 @@ void RunClusterSimulation(const SimulatorConfig &sim_config, const ClusterConfig auto mm_thread_1 = std::jthread(RunMachine, std::move(mm_1)); - // Need to detach this thread so that the destructor does not - // block before we can propagate assertion failures. - mm_thread_1.detach(); + auto detach_on_error = DetachIfDropped{.handle = mm_thread_1}; // TODO(tyler) clarify addresses of coordinator etc... as it's a mess @@ -236,6 +250,11 @@ void RunClusterSimulation(const SimulatorConfig &sim_config, const ClusterConfig std::visit([&](auto &o) { ExecuteOp(shard_request_manager, correctness_model, o); }, op.inner); } + // We have now completed our workload without failing any assertions, so we can + // disable detaching the worker thread, which will cause the mm_thread_1 jthread + // to be joined when this function returns. + detach_on_error.detach = false; + simulator.ShutDown(); SimulatorStats stats = simulator.Stats(); diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index 4a0ee2460..0e5824318 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -400,6 +400,6 @@ target_link_libraries(${test_prefix}pretty_print_ast_to_original_expression_test add_unit_test(coordinator_shard_map.cpp) target_link_libraries(${test_prefix}coordinator_shard_map mg-coordinator) -# Tests for 1000 shards, 1000 creates, scan -add_unit_test(1k_shards_1k_create_scanall.cpp) -target_link_libraries(${test_prefix}1k_shards_1k_create_scanall mg-io mg-coordinator mg-storage-v3 mg-query-v2) +# Tests for many shards, many creates, scan +add_unit_test(high_density_shard_create_scan.cpp) +target_link_libraries(${test_prefix}high_density_shard_create_scan mg-io mg-coordinator mg-storage-v3 mg-query-v2) diff --git a/tests/unit/1k_shards_1k_create_scanall.cpp b/tests/unit/high_density_shard_create_scan.cpp similarity index 74% rename from tests/unit/1k_shards_1k_create_scanall.cpp rename to tests/unit/high_density_shard_create_scan.cpp index eb1f8a5d5..9c2d1cfd7 100644 --- a/tests/unit/1k_shards_1k_create_scanall.cpp +++ b/tests/unit/high_density_shard_create_scan.cpp @@ -31,7 +31,6 @@ #include "machine_manager/machine_manager.hpp" #include "query/v2/requests.hpp" #include "query/v2/shard_request_manager.hpp" -#include "utils/print_helpers.hpp" #include "utils/variant_helpers.hpp" namespace memgraph::tests::simulation { @@ -82,13 +81,14 @@ struct ScanAll { }; MachineManager MkMm(LocalSystem &local_system, std::vector
coordinator_addresses, Address addr, - ShardMap shard_map) { + ShardMap shard_map, size_t shard_worker_threads) { MachineConfig config{ .coordinator_addresses = std::move(coordinator_addresses), .is_storage = true, .is_coordinator = true, .listen_ip = addr.last_known_ip, .listen_port = addr.last_known_port, + .shard_worker_threads = shard_worker_threads, }; Io io = local_system.Register(addr); @@ -124,7 +124,7 @@ void WaitForShardsToInitialize(CoordinatorClient &coordinator_cl } } -ShardMap TestShardMap(int n_splits, int replication_factor) { +ShardMap TestShardMap(int shards, int replication_factor, int gap_between_shards) { ShardMap sm{}; const auto label_name = std::string("test_label"); @@ -147,8 +147,8 @@ ShardMap TestShardMap(int n_splits, int replication_factor) { MG_ASSERT(label_id.has_value()); // split the shard at N split points - for (int64_t i = 1; i < n_splits; ++i) { - const auto key1 = memgraph::storage::v3::PropertyValue(i); + for (int64_t i = 1; i < shards; ++i) { + const auto key1 = memgraph::storage::v3::PropertyValue(i * gap_between_shards); const auto key2 = memgraph::storage::v3::PropertyValue(0); const auto split_point = {key1, key2}; @@ -208,7 +208,16 @@ void ExecuteOp(msgs::ShardRequestManager &shard_request_manager, } } -TEST(MachineManager, ManyShards) { +void RunWorkload(int shards, int replication_factor, int create_ops, int scan_ops, int shard_worker_threads, + int gap_between_shards) { + spdlog::info("======================== NEW TEST ========================"); + spdlog::info("shards: ", shards); + spdlog::info("replication factor: ", replication_factor); + spdlog::info("create ops: ", create_ops); + spdlog::info("scan all ops: ", scan_ops); + spdlog::info("shard worker threads: ", shard_worker_threads); + spdlog::info("gap between shards: ", gap_between_shards); + LocalSystem local_system; auto cli_addr = Address::TestAddress(1); @@ -221,19 +230,20 @@ TEST(MachineManager, ManyShards) { machine_1_addr, }; - auto shard_splits = 1024; - auto replication_factor = 1; - auto create_ops = 1000; + auto time_before_shard_map_creation = cli_io_2.Now(); + ShardMap initialization_sm = TestShardMap(shards, replication_factor, gap_between_shards); + auto time_after_shard_map_creation = cli_io_2.Now(); - ShardMap initialization_sm = TestShardMap(shard_splits, replication_factor); - - auto mm_1 = MkMm(local_system, coordinator_addresses, machine_1_addr, initialization_sm); + auto mm_1 = MkMm(local_system, coordinator_addresses, machine_1_addr, initialization_sm, shard_worker_threads); Address coordinator_address = mm_1.CoordinatorAddress(); auto mm_thread_1 = std::jthread(RunMachine, std::move(mm_1)); CoordinatorClient coordinator_client(cli_io, coordinator_address, {coordinator_address}); + + auto time_before_shard_stabilization = cli_io_2.Now(); WaitForShardsToInitialize(coordinator_client); + auto time_after_shard_stabilization = cli_io_2.Now(); msgs::ShardRequestManager shard_request_manager(std::move(coordinator_client), std::move(cli_io)); @@ -241,18 +251,54 @@ TEST(MachineManager, ManyShards) { auto correctness_model = std::set{}; + auto time_before_creates = cli_io_2.Now(); + for (int i = 0; i < create_ops; i++) { ExecuteOp(shard_request_manager, correctness_model, CreateVertex{.first = i, .second = i}); } - ExecuteOp(shard_request_manager, correctness_model, ScanAll{}); + auto time_after_creates = cli_io_2.Now(); + + for (int i = 0; i < scan_ops; i++) { + ExecuteOp(shard_request_manager, correctness_model, ScanAll{}); + } + + auto time_after_scan = cli_io_2.Now(); local_system.ShutDown(); - auto histo = cli_io_2.ResponseLatencies(); + auto latencies = cli_io_2.ResponseLatencies(); - using memgraph::utils::print_helpers::operator<<; - std::cout << "response latencies: " << histo << std::endl; + spdlog::info("response latencies: \n{}", latencies.SummaryTable()); + + spdlog::info("serial time break-down: (μs)"); + + spdlog::info("{: >20}: {: >10}", "split shard map", + (time_after_shard_map_creation - time_before_shard_map_creation).count()); + spdlog::info("{: >20}: {: >10}", "shard stabilization", + (time_after_shard_stabilization - time_before_shard_stabilization).count()); + spdlog::info("{: >20}: {: >10}", "create nodes", (time_after_creates - time_before_creates).count()); + spdlog::info("{: >20}: {: >10}", "scan nodes", (time_after_scan - time_after_creates).count()); + + std::cout << fmt::format("{} {} {}\n", shards, shard_worker_threads, (time_after_scan - time_after_creates).count()); +} + +TEST(MachineManager, ManyShards) { + auto shards_attempts = {1, 64}; + auto shard_worker_thread_attempts = {1, 32}; + auto replication_factor = 1; + auto create_ops = 128; + auto scan_ops = 1; + + std::cout << "splits threads scan_all_microseconds\n"; + + for (const auto shards : shards_attempts) { + auto gap_between_shards = create_ops / shards; + + for (const auto shard_worker_threads : shard_worker_thread_attempts) { + RunWorkload(shards, replication_factor, create_ops, scan_ops, shard_worker_threads, gap_between_shards); + } + } } } // namespace memgraph::tests::simulation