diff --git a/src/storage/v3/shard_manager.hpp b/src/storage/v3/shard_manager.hpp index a849c8c17..4d7507e87 100644 --- a/src/storage/v3/shard_manager.hpp +++ b/src/storage/v3/shard_manager.hpp @@ -13,6 +13,7 @@ #include #include +#include #include #include @@ -92,6 +93,7 @@ class ShardManager { workers_.emplace_back(queue); worker_handles_.emplace_back(std::move(worker_handle)); + worker_rsm_counts_.emplace_back(0); } } @@ -118,8 +120,29 @@ class ShardManager { } size_t UuidToWorkerIndex(const uuid &to) { - size_t hash = boost::hash()(to); - return hash % workers_.size(); + if (rsm_worker_mapping_.contains(to)) { + return rsm_worker_mapping_.at(to); + } + + // We will now create a mapping for this (probably new) shard + // by choosing the worker with the lowest number of existing + // mappings. + + size_t min_index = 0; + size_t min_count = worker_rsm_counts_.at(min_index); + + for (int i = 0; i < worker_rsm_counts_.size(); i++) { + size_t worker_count = worker_rsm_counts_.at(i); + if (worker_count <= min_count) { + min_count = worker_count; + min_index = i; + } + } + + worker_rsm_counts_[min_index]++; + rsm_worker_mapping_.emplace(to, min_index); + + return min_index; } void SendToWorkerByIndex(size_t worker_index, shard_worker::Message &&message) { @@ -179,7 +202,8 @@ class ShardManager { io::Io io_; std::vector workers_; std::vector worker_handles_; - std::set rsm_set_; + std::vector worker_rsm_counts_; + std::unordered_map> rsm_worker_mapping_; Time next_reconciliation_ = Time::min(); Address coordinator_leader_; std::optional>> heartbeat_res_; @@ -235,17 +259,20 @@ class ShardManager { void EnsureShardsInitialized(HeartbeatResponse hr) { for (const auto &to_init : hr.shards_to_initialize) { - if (rsm_set_.contains(to_init.uuid)) { + initialized_but_not_confirmed_rsm_.emplace(to_init.uuid); + + if (rsm_worker_mapping_.contains(to_init.uuid)) { // it's not a bug for the coordinator to send us UUIDs that we have // already created, because there may have been lag that caused // the coordinator not to hear back from us. return; } - SendToWorkerByUuid(to_init.uuid, to_init); + size_t worker_index = UuidToWorkerIndex(to_init.uuid); - rsm_set_.emplace(to_init.uuid); - initialized_but_not_confirmed_rsm_.emplace(to_init.uuid); + SendToWorkerByIndex(worker_index, to_init); + + rsm_worker_mapping_.emplace(to_init.uuid, worker_index); } } }; diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index 52e3a7eef..7cf045443 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -443,6 +443,6 @@ target_link_libraries(${test_prefix}pretty_print_ast_to_original_expression_test add_unit_test(coordinator_shard_map.cpp) target_link_libraries(${test_prefix}coordinator_shard_map mg-coordinator) -# Tests for 1000 shards, 1000 creates, scan -add_unit_test(1k_shards_1k_create_scanall.cpp) -target_link_libraries(${test_prefix}1k_shards_1k_create_scanall mg-io mg-coordinator mg-storage-v3 mg-query-v2) +# Tests for many shards, many creates, scan +add_unit_test(high_density_shard_create_scan.cpp) +target_link_libraries(${test_prefix}high_density_shard_create_scan mg-io mg-coordinator mg-storage-v3 mg-query-v2)