Use static RSM partitioning function for achieving a smooth Shard->ShardWorker distribution
This commit is contained in:
parent
5d3eaf6a55
commit
a6add80fc9
@ -13,6 +13,7 @@
|
||||
|
||||
#include <queue>
|
||||
#include <set>
|
||||
#include <unordered_map>
|
||||
|
||||
#include <boost/functional/hash.hpp>
|
||||
#include <boost/uuid/uuid.hpp>
|
||||
@ -92,6 +93,7 @@ class ShardManager {
|
||||
|
||||
workers_.emplace_back(queue);
|
||||
worker_handles_.emplace_back(std::move(worker_handle));
|
||||
worker_rsm_counts_.emplace_back(0);
|
||||
}
|
||||
}
|
||||
|
||||
@ -118,8 +120,29 @@ class ShardManager {
|
||||
}
|
||||
|
||||
size_t UuidToWorkerIndex(const uuid &to) {
|
||||
size_t hash = boost::hash<boost::uuids::uuid>()(to);
|
||||
return hash % workers_.size();
|
||||
if (rsm_worker_mapping_.contains(to)) {
|
||||
return rsm_worker_mapping_.at(to);
|
||||
}
|
||||
|
||||
// We will now create a mapping for this (probably new) shard
|
||||
// by choosing the worker with the lowest number of existing
|
||||
// mappings.
|
||||
|
||||
size_t min_index = 0;
|
||||
size_t min_count = worker_rsm_counts_.at(min_index);
|
||||
|
||||
for (int i = 0; i < worker_rsm_counts_.size(); i++) {
|
||||
size_t worker_count = worker_rsm_counts_.at(i);
|
||||
if (worker_count <= min_count) {
|
||||
min_count = worker_count;
|
||||
min_index = i;
|
||||
}
|
||||
}
|
||||
|
||||
worker_rsm_counts_[min_index]++;
|
||||
rsm_worker_mapping_.emplace(to, min_index);
|
||||
|
||||
return min_index;
|
||||
}
|
||||
|
||||
void SendToWorkerByIndex(size_t worker_index, shard_worker::Message &&message) {
|
||||
@ -179,7 +202,8 @@ class ShardManager {
|
||||
io::Io<IoImpl> io_;
|
||||
std::vector<shard_worker::Queue> workers_;
|
||||
std::vector<std::jthread> worker_handles_;
|
||||
std::set<uuid> rsm_set_;
|
||||
std::vector<size_t> worker_rsm_counts_;
|
||||
std::unordered_map<uuid, size_t, boost::hash<boost::uuids::uuid>> rsm_worker_mapping_;
|
||||
Time next_reconciliation_ = Time::min();
|
||||
Address coordinator_leader_;
|
||||
std::optional<ResponseFuture<WriteResponse<CoordinatorWriteResponses>>> heartbeat_res_;
|
||||
@ -235,17 +259,20 @@ class ShardManager {
|
||||
|
||||
void EnsureShardsInitialized(HeartbeatResponse hr) {
|
||||
for (const auto &to_init : hr.shards_to_initialize) {
|
||||
if (rsm_set_.contains(to_init.uuid)) {
|
||||
initialized_but_not_confirmed_rsm_.emplace(to_init.uuid);
|
||||
|
||||
if (rsm_worker_mapping_.contains(to_init.uuid)) {
|
||||
// it's not a bug for the coordinator to send us UUIDs that we have
|
||||
// already created, because there may have been lag that caused
|
||||
// the coordinator not to hear back from us.
|
||||
return;
|
||||
}
|
||||
|
||||
SendToWorkerByUuid(to_init.uuid, to_init);
|
||||
size_t worker_index = UuidToWorkerIndex(to_init.uuid);
|
||||
|
||||
rsm_set_.emplace(to_init.uuid);
|
||||
initialized_but_not_confirmed_rsm_.emplace(to_init.uuid);
|
||||
SendToWorkerByIndex(worker_index, to_init);
|
||||
|
||||
rsm_worker_mapping_.emplace(to_init.uuid, worker_index);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
@ -443,6 +443,6 @@ target_link_libraries(${test_prefix}pretty_print_ast_to_original_expression_test
|
||||
add_unit_test(coordinator_shard_map.cpp)
|
||||
target_link_libraries(${test_prefix}coordinator_shard_map mg-coordinator)
|
||||
|
||||
# Tests for 1000 shards, 1000 creates, scan
|
||||
add_unit_test(1k_shards_1k_create_scanall.cpp)
|
||||
target_link_libraries(${test_prefix}1k_shards_1k_create_scanall mg-io mg-coordinator mg-storage-v3 mg-query-v2)
|
||||
# Tests for many shards, many creates, scan
|
||||
add_unit_test(high_density_shard_create_scan.cpp)
|
||||
target_link_libraries(${test_prefix}high_density_shard_create_scan mg-io mg-coordinator mg-storage-v3 mg-query-v2)
|
||||
|
Loading…
Reference in New Issue
Block a user