Handle Coordinator work on a separate thread, unblocking the MachineManager to route additional messages to Shards

This commit is contained in:
Tyler Neely 2022-11-02 17:15:52 +00:00
parent 78528bd609
commit a815ec9617
2 changed files with 224 additions and 38 deletions

View File

@ -0,0 +1,157 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <chrono>
#include <deque>
#include <memory>
#include <queue>
#include <variant>
#include <boost/uuid/uuid.hpp>
#include "coordinator/coordinator.hpp"
#include "coordinator/shard_map.hpp"
#include "io/address.hpp"
#include "io/future.hpp"
#include "io/messages.hpp"
#include "io/rsm/raft.hpp"
#include "io/time.hpp"
#include "io/transport.hpp"
#include "query/v2/requests.hpp"
namespace memgraph::coordinator::coordinator_worker {
/// Obligations:
/// * ShutDown
/// * Cron
/// * RouteMessage
using boost::uuids::uuid;
using coordinator::Coordinator;
using coordinator::CoordinatorRsm;
using coordinator::ShardToInitialize;
using io::Address;
using io::RequestId;
using io::Time;
using io::messages::CoordinatorMessages;
using io::rsm::Raft;
using msgs::ReadRequests;
using msgs::ReadResponses;
using msgs::WriteRequests;
using msgs::WriteResponses;
struct ShutDown {};
struct Cron {};
struct RouteMessage {
CoordinatorMessages message;
RequestId request_id;
Address to;
Address from;
};
using Message = std::variant<RouteMessage, Cron, ShutDown>;
struct QueueInner {
std::mutex mu{};
std::condition_variable cv;
// TODO(tyler) handle simulator communication std::shared_ptr<std::atomic<int>> blocked;
// TODO(tyler) investigate using a priority queue that prioritizes messages in a way that
// improves overall QoS. For example, maybe we want to schedule raft Append messages
// ahead of Read messages or generally writes before reads for lowering the load on the
// overall system faster etc... When we do this, we need to make sure to avoid
// starvation by sometimes randomizing priorities, rather than following a strict
// prioritization.
std::deque<Message> queue;
};
/// There are two reasons to implement our own Queue instead of using
/// one off-the-shelf:
/// 1. we will need to know in the simulator when all threads are waiting
/// 2. we will want to implement our own priority queue within this for QoS
class Queue {
std::shared_ptr<QueueInner> inner_ = std::make_shared<QueueInner>();
public:
void Push(Message &&message) {
{
std::unique_lock<std::mutex> lock(inner_->mu);
inner_->queue.emplace_back(std::forward<Message>(message));
} // lock dropped before notifying condition variable
inner_->cv.notify_all();
}
Message Pop() {
std::unique_lock<std::mutex> lock(inner_->mu);
while (inner_->queue.empty()) {
inner_->cv.wait(lock);
}
Message message = std::move(inner_->queue.front());
inner_->queue.pop_front();
return message;
}
};
/// A CoordinatorWorker owns Raft<CoordinatorRsm> instances. receives messages from the MachineManager.
template <class IoImpl>
class CoordinatorWorker {
io::Io<IoImpl> io_;
Queue queue_;
CoordinatorRsm<IoImpl> coordinator_;
bool Process(ShutDown && /*shut_down*/) { return false; }
bool Process(Cron &&cron) {
coordinator_.Cron();
return true;
}
bool Process(RouteMessage &&route_message) {
coordinator_.Handle(std::move(route_message.message), route_message.request_id, route_message.from);
return true;
}
public:
CoordinatorWorker(io::Io<IoImpl> io, Queue queue, Coordinator coordinator)
: io_(io), queue_(queue), coordinator_{std::move(io.ForkLocal()), {}, std::move(coordinator)} {}
CoordinatorWorker(CoordinatorWorker &&) = default;
CoordinatorWorker &operator=(CoordinatorWorker &&) = default;
CoordinatorWorker(const CoordinatorWorker &) = delete;
CoordinatorWorker &operator=(const CoordinatorWorker &) = delete;
~CoordinatorWorker() = default;
void Run() {
while (true) {
Message message = queue_.Pop();
const bool should_continue =
std::visit([&](auto &&msg) { return Process(std::forward<decltype(msg)>(msg)); }, std::move(message));
if (!should_continue) {
return;
}
}
}
};
} // namespace memgraph::coordinator::coordinator_worker

View File

@ -11,39 +11,43 @@
#pragma once
#include <coordinator/coordinator_rsm.hpp>
#include <io/message_conversion.hpp>
#include <io/messages.hpp>
#include <io/rsm/rsm_client.hpp>
#include <io/time.hpp>
#include <machine_manager/machine_config.hpp>
#include <storage/v3/shard_manager.hpp>
#include "coordinator/coordinator_rsm.hpp"
#include "coordinator/coordinator_worker.hpp"
#include "io/message_conversion.hpp"
#include "io/messages.hpp"
#include "io/rsm/rsm_client.hpp"
#include "io/time.hpp"
#include "machine_manager/machine_config.hpp"
#include "storage/v3/shard_manager.hpp"
namespace memgraph::machine_manager {
using memgraph::coordinator::Coordinator;
using memgraph::coordinator::CoordinatorReadRequests;
using memgraph::coordinator::CoordinatorReadResponses;
using memgraph::coordinator::CoordinatorRsm;
using memgraph::coordinator::CoordinatorWriteRequests;
using memgraph::coordinator::CoordinatorWriteResponses;
using memgraph::io::ConvertVariant;
using memgraph::io::Duration;
using memgraph::io::RequestId;
using memgraph::io::Time;
using memgraph::io::messages::CoordinatorMessages;
using memgraph::io::messages::ShardManagerMessages;
using memgraph::io::messages::ShardMessages;
using memgraph::io::messages::StorageReadRequest;
using memgraph::io::messages::StorageWriteRequest;
using memgraph::io::rsm::AppendRequest;
using memgraph::io::rsm::AppendResponse;
using memgraph::io::rsm::ReadRequest;
using memgraph::io::rsm::VoteRequest;
using memgraph::io::rsm::VoteResponse;
using memgraph::io::rsm::WriteRequest;
using memgraph::io::rsm::WriteResponse;
using memgraph::storage::v3::ShardManager;
using coordinator::Coordinator;
using coordinator::CoordinatorReadRequests;
using coordinator::CoordinatorReadResponses;
using coordinator::CoordinatorRsm;
using coordinator::CoordinatorWriteRequests;
using coordinator::CoordinatorWriteResponses;
using coordinator::coordinator_worker::CoordinatorWorker;
using CoordinatorRouteMessage = coordinator::coordinator_worker::RouteMessage;
using CoordinatorQueue = coordinator::coordinator_worker::Queue;
using io::ConvertVariant;
using io::Duration;
using io::RequestId;
using io::Time;
using io::messages::CoordinatorMessages;
using io::messages::ShardManagerMessages;
using io::messages::ShardMessages;
using io::messages::StorageReadRequest;
using io::messages::StorageWriteRequest;
using io::rsm::AppendRequest;
using io::rsm::AppendResponse;
using io::rsm::ReadRequest;
using io::rsm::VoteRequest;
using io::rsm::VoteResponse;
using io::rsm::WriteRequest;
using io::rsm::WriteResponse;
using storage::v3::ShardManager;
/// The MachineManager is responsible for:
/// * starting the entire system and ensuring that high-level
@ -62,7 +66,9 @@ template <typename IoImpl>
class MachineManager {
io::Io<IoImpl> io_;
MachineConfig config_;
CoordinatorRsm<IoImpl> coordinator_;
Address coordinator_address_;
CoordinatorQueue coordinator_queue_;
std::jthread coordinator_handle_;
ShardManager<IoImpl> shard_manager_;
Time next_cron_ = Time::min();
@ -72,10 +78,27 @@ class MachineManager {
MachineManager(io::Io<IoImpl> io, MachineConfig config, Coordinator coordinator)
: io_(io),
config_(config),
coordinator_{std::move(io.ForkLocal()), {}, std::move(coordinator)},
shard_manager_{io.ForkLocal(), config.shard_worker_threads, coordinator_.GetAddress()} {}
coordinator_address_(io.GetAddress().ForkUniqueAddress()),
shard_manager_{io.ForkLocal(), config.shard_worker_threads, coordinator_address_} {
auto coordinator_io = io.ForkLocal();
coordinator_io.SetAddress(coordinator_address_);
CoordinatorWorker coordinator_worker{coordinator_io, coordinator_queue_, coordinator};
coordinator_handle_ = std::jthread([coordinator = std::move(coordinator_worker)]() mutable { coordinator.Run(); });
}
Address CoordinatorAddress() { return coordinator_.GetAddress(); }
MachineManager(MachineManager &&) = default;
MachineManager &operator=(MachineManager &&) = default;
MachineManager(const MachineManager &) = delete;
MachineManager &operator=(const MachineManager &) = delete;
~MachineManager() {
coordinator_queue_.Push(coordinator::coordinator_worker::ShutDown{});
spdlog::error("a");
std::move(coordinator_handle_).join();
spdlog::error("b");
}
Address CoordinatorAddress() { return coordinator_address_; }
void Run() {
while (!io_.ShouldShutDown()) {
@ -85,7 +108,7 @@ class MachineManager {
next_cron_ = Cron();
}
Duration receive_timeout = next_cron_ - now;
Duration receive_timeout = std::max(next_cron_, now) - now;
// Note: this parameter pack must be kept in-sync with the ReceiveWithTimeout parameter pack below
using AllMessages =
@ -113,7 +136,7 @@ class MachineManager {
spdlog::info("MM got message to {}", request_envelope.to_address.ToString());
// If message is for the coordinator, cast it to subset and pass it to the coordinator
bool to_coordinator = coordinator_.GetAddress() == request_envelope.to_address;
bool to_coordinator = coordinator_address_ == request_envelope.to_address;
if (to_coordinator) {
std::optional<CoordinatorMessages> conversion_attempt =
ConvertVariant<AllMessages, ReadRequest<CoordinatorReadRequests>, AppendRequest<CoordinatorWriteRequests>,
@ -126,8 +149,13 @@ class MachineManager {
CoordinatorMessages &&cm = std::move(conversion_attempt.value());
coordinator_.Handle(std::forward<CoordinatorMessages>(cm), request_envelope.request_id,
request_envelope.from_address);
CoordinatorRouteMessage route_message{
.message = std::forward<CoordinatorMessages>(cm),
.request_id = request_envelope.request_id,
.to = request_envelope.to_address,
.from = request_envelope.from_address,
};
coordinator_queue_.Push(std::move(route_message));
continue;
}
@ -168,6 +196,7 @@ class MachineManager {
private:
Time Cron() {
spdlog::info("running MachineManager::Cron, address {}", io_.GetAddress().ToString());
coordinator_queue_.Push(coordinator::coordinator_worker::Cron{});
return shard_manager_.Cron();
}
};