Implement skeleton worker threadpool for the ShardManager

This commit is contained in:
Tyler Neely 2022-10-31 16:03:47 +00:00
parent 9448e23dc9
commit cebe6f62fa
5 changed files with 252 additions and 80 deletions

View File

@ -11,7 +11,11 @@
#pragma once
#include <algorithm>
#include <thread>
#include <boost/asio/ip/tcp.hpp>
#include "io/address.hpp"
#include "storage/v3/property_value.hpp"
#include "storage/v3/schemas.hpp"
@ -37,6 +41,7 @@ struct MachineConfig {
bool is_query_engine;
boost::asio::ip::address listen_ip;
uint16_t listen_port;
size_t n_shard_worker_threads = std::max(static_cast<unsigned int>(1), std::thread::hardware_concurrency());
};
} // namespace memgraph::machine_manager

View File

@ -70,11 +70,11 @@ class MachineManager {
public:
// TODO initialize ShardManager with "real" coordinator addresses instead of io.GetAddress
// which is only true for single-machine config.
MachineManager(io::Io<IoImpl> io, MachineConfig config, Coordinator coordinator, coordinator::ShardMap &shard_map)
MachineManager(io::Io<IoImpl> io, MachineConfig config, Coordinator coordinator, coordinator::ShardMap shard_map)
: io_(io),
config_(config),
coordinator_{std::move(io.ForkLocal()), {}, std::move(coordinator)},
shard_manager_{io.ForkLocal(), coordinator_.GetAddress(), shard_map} {}
shard_manager_{io.ForkLocal(), config.n_shard_worker_threads, coordinator_.GetAddress(), shard_map} {}
Address CoordinatorAddress() { return coordinator_.GetAddress(); }

View File

@ -16,45 +16,45 @@
#include <boost/uuid/uuid.hpp>
#include <coordinator/coordinator.hpp>
#include <io/address.hpp>
#include <io/message_conversion.hpp>
#include <io/messages.hpp>
#include <io/rsm/raft.hpp>
#include <io/time.hpp>
#include <io/transport.hpp>
#include <query/v2/requests.hpp>
#include <storage/v3/shard.hpp>
#include <storage/v3/shard_rsm.hpp>
#include "coordinator/coordinator.hpp"
#include "coordinator/shard_map.hpp"
#include "io/address.hpp"
#include "io/message_conversion.hpp"
#include "io/messages.hpp"
#include "io/rsm/raft.hpp"
#include "io/time.hpp"
#include "io/transport.hpp"
#include "query/v2/requests.hpp"
#include "storage/v3/config.hpp"
#include "storage/v3/shard_scheduler.hpp"
#include "storage/v3/shard.hpp"
#include "storage/v3/shard_rsm.hpp"
#include "storage/v3/shard_worker.hpp"
namespace memgraph::storage::v3 {
using boost::uuids::uuid;
using memgraph::coordinator::CoordinatorWriteRequests;
using memgraph::coordinator::CoordinatorWriteResponses;
using memgraph::coordinator::HeartbeatRequest;
using memgraph::coordinator::HeartbeatResponse;
using memgraph::io::Address;
using memgraph::io::Duration;
using memgraph::io::Message;
using memgraph::io::RequestId;
using memgraph::io::ResponseFuture;
using memgraph::io::Time;
using memgraph::io::messages::CoordinatorMessages;
using memgraph::io::messages::ShardManagerMessages;
using memgraph::io::messages::ShardMessages;
using memgraph::io::rsm::Raft;
using memgraph::io::rsm::WriteRequest;
using memgraph::io::rsm::WriteResponse;
using memgraph::msgs::ReadRequests;
using memgraph::msgs::ReadResponses;
using memgraph::msgs::WriteRequests;
using memgraph::msgs::WriteResponses;
using memgraph::storage::v3::ShardRsm;
using coordinator::CoordinatorWriteRequests;
using coordinator::CoordinatorWriteResponses;
using coordinator::HeartbeatRequest;
using coordinator::HeartbeatResponse;
using io::Address;
using io::Duration;
using io::Message;
using io::RequestId;
using io::ResponseFuture;
using io::Time;
using io::messages::CoordinatorMessages;
using io::messages::ShardManagerMessages;
using io::messages::ShardMessages;
using io::rsm::Raft;
using io::rsm::WriteRequest;
using io::rsm::WriteResponse;
using msgs::ReadRequests;
using msgs::ReadResponses;
using msgs::WriteRequests;
using msgs::WriteResponses;
using storage::v3::ShardRsm;
using ShardManagerOrRsmMessage = std::variant<ShardMessages, ShardManagerMessages>;
using TimeUuidPair = std::pair<Time, uuid>;
@ -78,8 +78,44 @@ static_assert(kMinimumCronInterval < kMaximumCronInterval,
template <typename IoImpl>
class ShardManager {
public:
ShardManager(io::Io<IoImpl> io, Address coordinator_leader, coordinator::ShardMap shard_map)
: io_(io), coordinator_leader_(coordinator_leader), shard_map_{std::move(shard_map)}, shard_scheduler_(io) {}
ShardManager(io::Io<IoImpl> io, size_t n_shard_worker_threads, Address coordinator_leader,
coordinator::ShardMap shard_map)
: io_(io), coordinator_leader_(coordinator_leader), shard_map_{std::move(shard_map)} {
MG_ASSERT(n_shard_worker_threads >= 1);
shard_worker::Queue queue;
for (int i = 0; i < n_shard_worker_threads; i++) {
shard_worker::Queue queue;
shard_worker::ShardWorker worker{io, queue};
auto worker_handle = std::jthread([worker = std::move(worker)]() mutable { worker.Run(); });
workers_.emplace_back(queue);
worker_handles_.emplace_back(std::move(worker_handle));
}
}
ShardManager(ShardManager &&) = default;
ShardManager &operator=(ShardManager &&) = default;
ShardManager(const ShardManager &) = delete;
ShardManager &operator=(const ShardManager &) = delete;
~ShardManager() {
auto shutdown_acks = std::vector<io::Future<bool>>{};
for (auto worker : workers_) {
auto [future, promise] = io::FuturePromisePair<bool>();
worker.Push(shard_worker::ShutDown{.acknowledge_shutdown = std::move(promise)});
shutdown_acks.emplace_back(std::move(future));
}
for (auto &&ack : shutdown_acks) {
bool acked = std::move(ack).Wait();
MG_ASSERT(acked);
}
// The jthread handes for our shard worker threads will be
// blocked on implicitly when worker_handles_ is destroyed.
}
/// Periodic protocol maintenance. Returns the time that Cron should be called again
/// in the future.
@ -134,7 +170,8 @@ class ShardManager {
private:
io::Io<IoImpl> io_;
ShardScheduler<IoImpl> shard_scheduler_;
std::vector<shard_worker::Queue> workers_;
std::vector<std::jthread> worker_handles_;
std::map<uuid, ShardRaft<IoImpl>> rsm_map_;
std::priority_queue<std::pair<Time, uuid>, std::vector<std::pair<Time, uuid>>, std::greater<>> cron_schedule_;
Time next_cron_ = Time::min();

View File

@ -1,43 +0,0 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <boost/asio/thread_pool.hpp>
#include <boost/uuid/uuid.hpp>
#include "io/rsm/raft.hpp"
#include "query/v2/requests.hpp"
#include "storage/v3/shard_manager.hpp"
namespace memgraph::storage::v3 {
using memgraph::io::rsm::Raft;
using memgraph::io::rsm::WriteRequest;
using memgraph::io::rsm::WriteResponse;
using memgraph::msgs::ReadRequests;
using memgraph::msgs::ReadResponses;
using memgraph::msgs::WriteRequests;
using memgraph::msgs::WriteResponses;
template <typename IoImpl>
using ShardRaft = Raft<IoImpl, ShardRsm, WriteRequests, WriteResponses, ReadRequests, ReadResponses>;
template <class IoImpl>
class ShardScheduler {
std::map<boost::uuids::uuid, ShardRaft<IoImpl>> rsm_map_;
io::Io<IoImpl> io_;
public:
ShardScheduler(io::Io<IoImpl> io) : io_(io) {}
};
} // namespace memgraph::storage::v3

View File

@ -0,0 +1,173 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <deque>
#include <memory>
#include <queue>
#include <variant>
#include <boost/uuid/uuid.hpp>
#include "io/future.hpp"
#include "io/rsm/raft.hpp"
#include "io/time.hpp"
#include "io/transport.hpp"
#include "query/v2/requests.hpp"
#include "storage/v3/shard_rsm.hpp"
namespace memgraph::storage::v3::shard_worker {
/// Obligations:
/// * ShutDown
/// * Cron
/// * Handle
/// * InitializeRsm
using boost::uuids::uuid;
using io::Time;
using io::rsm::Raft;
using msgs::ReadRequests;
using msgs::ReadResponses;
using msgs::WriteRequests;
using msgs::WriteResponses;
using storage::v3::ShardRsm;
template <typename IoImpl>
using ShardRaft = Raft<IoImpl, ShardRsm, WriteRequests, WriteResponses, ReadRequests, ReadResponses>;
struct ShutDown {
io::Promise<bool> acknowledge_shutdown;
};
struct Cron {
io::Promise<io::Time> request_next_cron_at;
};
struct InitializeRsm {};
struct Handle {};
using Message = std::variant<ShutDown, Cron, InitializeRsm, Handle>;
struct QueueInner {
std::mutex mu{};
std::condition_variable cv;
// TODO(tyler) handle simulator communication std::shared_ptr<std::atomic<int>> blocked;
// TODO(tyler) investigate using a priority queue that prioritizes messages in a way that
// improves overall QoS. For example, maybe we want to schedule raft Append messages
// ahead of Read messages or generally writes before reads for lowering the load on the
// overall system faster etc... When we do this, we need to make sure to avoid
// starvation by sometimes randomizing priorities, rather than following a strict
// prioritization.
std::deque<Message> queue;
};
/// There are two reasons to implement our own Queue instead of using
/// one off-the-shelf:
/// 1. we will need to know in the simulator when all threads are waiting
/// 2. we will want to implement our own priority queue within this for QoS
class Queue {
std::shared_ptr<QueueInner> inner_ = std::make_shared<QueueInner>();
public:
void Push(Message &&message) {
{
std::unique_lock<std::mutex> lock(inner_->mu);
inner_->queue.push_back(std::forward<Message>(message));
} // lock dropped before notifying condition variable
inner_->cv.notify_all();
}
Message Pop() {
std::unique_lock<std::mutex> lock(inner_->mu);
while (inner_->queue.empty()) {
inner_->cv.wait(lock);
}
Message message = std::move(inner_->queue.front());
inner_->queue.pop_front();
return message;
}
};
/// A ShardWorker owns Raft<ShardRsm> instances. receives messages from the ShardManager.
template <class IoImpl>
class ShardWorker {
io::Io<IoImpl> io_;
Queue queue_;
std::priority_queue<std::pair<Time, uuid>, std::vector<std::pair<Time, uuid>>, std::greater<>> cron_schedule_;
Time next_cron_ = Time::min();
std::map<uuid, ShardRaft<IoImpl>> rsm_map_;
bool Process(ShutDown &&shut_down) {
shut_down.acknowledge_shutdown.Fill(true);
return false;
}
bool Process(Cron &&cron) {
Time ret = Cron();
return true;
}
bool Process(InitializeRsm &&initialize_rsm) { return true; }
bool Process(Handle &&handle) { return true; }
Time Cron() {
spdlog::info("running ShardManager::Cron, address {}", io_.GetAddress().ToString());
Time now = io_.Now();
if (!cron_schedule_.empty()) {
const auto &[time, uuid] = cron_schedule_.top();
auto &rsm = rsm_map_.at(uuid);
Time next_for_uuid = rsm.Cron();
cron_schedule_.pop();
cron_schedule_.push(std::make_pair(next_for_uuid, uuid));
const auto &[next_time, _uuid] = cron_schedule_.top();
return std::min(next_cron_, next_time);
}
return next_cron_;
}
public:
ShardWorker(io::Io<IoImpl> io, Queue queue) : io_(io), queue_(queue) {}
ShardWorker(ShardWorker &&) = default;
ShardWorker &operator=(ShardWorker &&) = default;
ShardWorker(const ShardWorker &) = delete;
ShardWorker &operator=(const ShardWorker &) = delete;
~ShardWorker() = default;
void Run() {
while (true) {
Message message = queue_.Pop();
const bool should_continue =
std::visit([&](auto &&msg) { return Process(std::forward<decltype(msg)>(msg)); }, std::move(message));
if (!should_continue) {
return;
}
}
}
};
} // namespace memgraph::storage::v3::shard_worker