Complete migration from single-threaded ShardManager to multi-threaded ShardWorker processing
This commit is contained in:
parent
d0cad6e6ba
commit
951b058116
src/storage/v3
@ -14,6 +14,7 @@
|
||||
#include <queue>
|
||||
#include <set>
|
||||
|
||||
#include <boost/functional/hash.hpp>
|
||||
#include <boost/uuid/uuid.hpp>
|
||||
|
||||
#include "coordinator/coordinator.hpp"
|
||||
@ -116,39 +117,43 @@ class ShardManager {
|
||||
// blocked on implicitly when worker_handles_ is destroyed.
|
||||
}
|
||||
|
||||
size_t UuidToWorkerIndex(const uuid &to) {
|
||||
size_t hash = boost::hash<boost::uuids::uuid>()(to);
|
||||
return hash % workers_.size();
|
||||
}
|
||||
|
||||
void SendToWorkerByIndex(size_t worker_index, shard_worker::Message &&message) {
|
||||
workers_[worker_index].Push(std::forward<shard_worker::Message>(message));
|
||||
}
|
||||
|
||||
void SendToWorkerByUuid(const uuid &to, shard_worker::Message &&message) {
|
||||
size_t worker_index = UuidToWorkerIndex(to);
|
||||
SendToWorkerByIndex(worker_index, std::forward<shard_worker::Message>(message));
|
||||
}
|
||||
|
||||
/// Periodic protocol maintenance. Returns the time that Cron should be called again
|
||||
/// in the future.
|
||||
Time Cron() {
|
||||
spdlog::info("running ShardManager::Cron, address {}", io_.GetAddress().ToString());
|
||||
Time now = io_.Now();
|
||||
|
||||
if (now >= next_cron_) {
|
||||
if (now >= next_reconciliation_) {
|
||||
Reconciliation();
|
||||
|
||||
std::uniform_int_distribution time_distrib(kMinimumCronInterval.count(), kMaximumCronInterval.count());
|
||||
|
||||
const auto rand = io_.Rand(time_distrib);
|
||||
|
||||
next_cron_ = now + Duration{rand};
|
||||
next_reconciliation_ = now + Duration{rand};
|
||||
}
|
||||
|
||||
if (!cron_schedule_.empty()) {
|
||||
const auto &[time, uuid] = cron_schedule_.top();
|
||||
|
||||
if (time <= now) {
|
||||
auto &rsm = rsm_map_.at(uuid);
|
||||
Time next_for_uuid = rsm.Cron();
|
||||
|
||||
cron_schedule_.pop();
|
||||
cron_schedule_.push(std::make_pair(next_for_uuid, uuid));
|
||||
|
||||
const auto &[next_time, _uuid] = cron_schedule_.top();
|
||||
|
||||
return std::min(next_cron_, next_time);
|
||||
}
|
||||
for (auto &worker : workers_) {
|
||||
worker.Push(shard_worker::Cron{});
|
||||
}
|
||||
|
||||
return next_cron_;
|
||||
Time next_worker_cron = now + std::chrono::milliseconds(500);
|
||||
|
||||
return std::min(next_worker_cron, next_reconciliation_);
|
||||
}
|
||||
|
||||
/// Returns the Address for our underlying Io implementation
|
||||
@ -162,18 +167,20 @@ class ShardManager {
|
||||
MG_ASSERT(address.last_known_port == to.last_known_port);
|
||||
MG_ASSERT(address.last_known_ip == to.last_known_ip);
|
||||
|
||||
auto &rsm = rsm_map_.at(to.unique_id);
|
||||
|
||||
rsm.Handle(std::forward<ShardMessages>(sm), request_id, from);
|
||||
SendToWorkerByUuid(to.unique_id, shard_worker::RouteMessage{
|
||||
.message = std::forward<ShardMessages>(sm),
|
||||
.request_id = request_id,
|
||||
.to = to,
|
||||
.from = from,
|
||||
});
|
||||
}
|
||||
|
||||
private:
|
||||
io::Io<IoImpl> io_;
|
||||
std::vector<shard_worker::Queue> workers_;
|
||||
std::vector<std::jthread> worker_handles_;
|
||||
std::map<uuid, ShardRaft<IoImpl>> rsm_map_;
|
||||
std::priority_queue<std::pair<Time, uuid>, std::vector<std::pair<Time, uuid>>, std::greater<>> cron_schedule_;
|
||||
Time next_cron_ = Time::min();
|
||||
std::set<uuid> rsm_set_;
|
||||
Time next_reconciliation_ = Time::min();
|
||||
Address coordinator_leader_;
|
||||
std::optional<ResponseFuture<WriteResponse<CoordinatorWriteResponses>>> heartbeat_res_;
|
||||
|
||||
@ -227,39 +234,20 @@ class ShardManager {
|
||||
}
|
||||
|
||||
void EnsureShardsInitialized(HeartbeatResponse hr) {
|
||||
for (const auto &shard_to_initialize : hr.shards_to_initialize) {
|
||||
InitializeRsm(shard_to_initialize);
|
||||
initialized_but_not_confirmed_rsm_.emplace(shard_to_initialize.uuid);
|
||||
for (const auto &to_init : hr.shards_to_initialize) {
|
||||
if (rsm_set_.contains(to_init.uuid)) {
|
||||
// it's not a bug for the coordinator to send us UUIDs that we have
|
||||
// already created, because there may have been lag that caused
|
||||
// the coordinator not to hear back from us.
|
||||
return;
|
||||
}
|
||||
|
||||
SendToWorkerByUuid(to_init.uuid, to_init);
|
||||
|
||||
rsm_set_.emplace(to_init.uuid);
|
||||
initialized_but_not_confirmed_rsm_.emplace(to_init.uuid);
|
||||
}
|
||||
}
|
||||
|
||||
void InitializeRsm(coordinator::ShardToInitialize to_init) {
|
||||
if (rsm_map_.contains(to_init.uuid)) {
|
||||
// it's not a bug for the coordinator to send us UUIDs that we have
|
||||
// already created, because there may have been lag that caused
|
||||
// the coordinator not to hear back from us.
|
||||
return;
|
||||
}
|
||||
|
||||
auto rsm_io = io_.ForkLocal();
|
||||
auto io_addr = rsm_io.GetAddress();
|
||||
io_addr.unique_id = to_init.uuid;
|
||||
rsm_io.SetAddress(io_addr);
|
||||
|
||||
// TODO(tyler) get peers from Coordinator in HeartbeatResponse
|
||||
std::vector<Address> rsm_peers = {};
|
||||
|
||||
std::unique_ptr<Shard> shard = std::make_unique<Shard>(to_init.label_id, to_init.min_key, to_init.max_key,
|
||||
to_init.schema, to_init.config, to_init.id_to_names);
|
||||
|
||||
ShardRsm rsm_state{std::move(shard)};
|
||||
|
||||
ShardRaft<IoImpl> rsm{std::move(rsm_io), rsm_peers, std::move(rsm_state)};
|
||||
|
||||
spdlog::info("SM created a new shard with UUID {}", to_init.uuid);
|
||||
|
||||
rsm_map_.emplace(to_init.uuid, std::move(rsm));
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace memgraph::storage::v3
|
||||
|
@ -11,6 +11,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <chrono>
|
||||
#include <deque>
|
||||
#include <memory>
|
||||
#include <queue>
|
||||
@ -58,9 +59,7 @@ struct ShutDown {
|
||||
io::Promise<bool> acknowledge_shutdown;
|
||||
};
|
||||
|
||||
struct Cron {
|
||||
io::Promise<io::Time> request_next_cron_at;
|
||||
};
|
||||
struct Cron {};
|
||||
|
||||
struct RouteMessage {
|
||||
ShardMessages message;
|
||||
@ -132,8 +131,7 @@ class ShardWorker {
|
||||
}
|
||||
|
||||
bool Process(Cron &&cron) {
|
||||
Time next_cron = Cron();
|
||||
cron.request_next_cron_at.Fill(next_cron);
|
||||
Cron();
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -155,21 +153,23 @@ class ShardWorker {
|
||||
spdlog::info("running ShardManager::Cron, address {}", io_.GetAddress().ToString());
|
||||
Time now = io_.Now();
|
||||
|
||||
if (!cron_schedule_.empty()) {
|
||||
while (!cron_schedule_.empty()) {
|
||||
const auto &[time, uuid] = cron_schedule_.top();
|
||||
|
||||
auto &rsm = rsm_map_.at(uuid);
|
||||
Time next_for_uuid = rsm.Cron();
|
||||
if (time <= now) {
|
||||
auto &rsm = rsm_map_.at(uuid);
|
||||
Time next_for_uuid = rsm.Cron();
|
||||
|
||||
cron_schedule_.pop();
|
||||
cron_schedule_.push(std::make_pair(next_for_uuid, uuid));
|
||||
cron_schedule_.pop();
|
||||
cron_schedule_.push(std::make_pair(next_for_uuid, uuid));
|
||||
|
||||
const auto &[next_time, _uuid] = cron_schedule_.top();
|
||||
|
||||
return std::min(next_cron_, next_time);
|
||||
const auto &[next_time, _uuid] = cron_schedule_.top();
|
||||
} else {
|
||||
return time;
|
||||
}
|
||||
}
|
||||
|
||||
return next_cron_;
|
||||
return now + std::chrono::microseconds(1000);
|
||||
}
|
||||
|
||||
void InitializeRsm(ShardToInitialize to_init) {
|
||||
@ -197,6 +197,10 @@ class ShardWorker {
|
||||
|
||||
spdlog::info("SM created a new shard with UUID {}", to_init.uuid);
|
||||
|
||||
// perform an initial Cron call for the new RSM
|
||||
Time next_cron = rsm.Cron();
|
||||
cron_schedule_.push(std::make_pair(next_cron, to_init.uuid));
|
||||
|
||||
rsm_map_.emplace(to_init.uuid, std::move(rsm));
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user