Parameterize shard worker threads in the MachineConfig and simplify test output

This commit is contained in:
Tyler Neely 2022-11-01 13:57:57 +00:00
parent bb1e8aa164
commit a13f260236
6 changed files with 56 additions and 28 deletions

View File

@ -69,11 +69,11 @@ struct LatencyHistogramSummaries {
output +=
fmt::format("{: >50} | {: >8} | {: >8} | {: >8} | {: >8} | {: >8} | {: >8}\n", c1, c2, c3, c4, c5, c6, c7);
};
row("name", "count", "min (μs)", "med (μs)", "p99 (μs)", "max (μs)", "sum (μs)");
row("name", "count", "min (μs)", "med (μs)", "p99 (μs)", "max (μs)", "sum (ms)");
for (const auto &[name, histo] : latencies) {
row(name, histo.count, histo.p0.count(), histo.p50.count(), histo.p99.count(), histo.p100.count(),
histo.sum.count());
histo.sum.count() / 1000);
}
output += "\n";

View File

@ -41,7 +41,7 @@ struct MachineConfig {
bool is_query_engine;
boost::asio::ip::address listen_ip;
uint16_t listen_port;
size_t n_shard_worker_threads = std::max(static_cast<unsigned int>(1), std::thread::hardware_concurrency());
size_t shard_worker_threads = std::max(static_cast<unsigned int>(1), std::thread::hardware_concurrency());
};
} // namespace memgraph::machine_manager

View File

@ -73,7 +73,7 @@ class MachineManager {
: io_(io),
config_(config),
coordinator_{std::move(io.ForkLocal()), {}, std::move(coordinator)},
shard_manager_{io.ForkLocal(), config.n_shard_worker_threads, coordinator_.GetAddress()} {}
shard_manager_{io.ForkLocal(), config.shard_worker_threads, coordinator_.GetAddress()} {}
Address CoordinatorAddress() { return coordinator_.GetAddress(); }

View File

@ -79,13 +79,13 @@ static_assert(kMinimumCronInterval < kMaximumCronInterval,
template <typename IoImpl>
class ShardManager {
public:
ShardManager(io::Io<IoImpl> io, size_t n_shard_worker_threads, Address coordinator_leader)
ShardManager(io::Io<IoImpl> io, size_t shard_worker_threads, Address coordinator_leader)
: io_(io), coordinator_leader_(coordinator_leader) {
MG_ASSERT(n_shard_worker_threads >= 1);
MG_ASSERT(shard_worker_threads >= 1);
shard_worker::Queue queue;
for (int i = 0; i < n_shard_worker_threads; i++) {
for (int i = 0; i < shard_worker_threads; i++) {
shard_worker::Queue queue;
shard_worker::ShardWorker worker{io, queue};
auto worker_handle = std::jthread([worker = std::move(worker)]() mutable { worker.Run(); });

View File

@ -162,8 +162,6 @@ class ShardWorker {
cron_schedule_.pop();
cron_schedule_.push(std::make_pair(next_for_uuid, uuid));
const auto &[next_time, _uuid] = cron_schedule_.top();
} else {
return time;
}

View File

@ -81,13 +81,14 @@ struct ScanAll {
};
MachineManager<LocalTransport> MkMm(LocalSystem &local_system, std::vector<Address> coordinator_addresses, Address addr,
ShardMap shard_map) {
ShardMap shard_map, size_t shard_worker_threads) {
MachineConfig config{
.coordinator_addresses = coordinator_addresses,
.is_storage = true,
.is_coordinator = true,
.listen_ip = addr.last_known_ip,
.listen_port = addr.last_known_port,
.shard_worker_threads = shard_worker_threads,
};
Io<LocalTransport> io = local_system.Register(addr);
@ -123,7 +124,7 @@ void WaitForShardsToInitialize(CoordinatorClient<LocalTransport> &coordinator_cl
}
}
ShardMap TestShardMap(int n_splits, int replication_factor) {
ShardMap TestShardMap(int shards, int replication_factor, int gap_between_shards) {
ShardMap sm{};
const std::string label_name = std::string("test_label");
@ -146,8 +147,8 @@ ShardMap TestShardMap(int n_splits, int replication_factor) {
MG_ASSERT(label_id.has_value());
// split the shard at N split points
for (int64_t i = 1; i < n_splits; ++i) {
const auto key1 = memgraph::storage::v3::PropertyValue(i);
for (int64_t i = 1; i < shards; ++i) {
const auto key1 = memgraph::storage::v3::PropertyValue(i * gap_between_shards);
const auto key2 = memgraph::storage::v3::PropertyValue(0);
const auto split_point = {key1, key2};
@ -207,7 +208,16 @@ void ExecuteOp(msgs::ShardRequestManager<LocalTransport> &shard_request_manager,
}
}
TEST(MachineManager, ManyShards) {
void RunWorkload(int shards, int replication_factor, int create_ops, int scan_ops, int shard_worker_threads,
int gap_between_shards) {
// std::cout << "======================== NEW TEST ======================== \n";
// std::cout << "shards: " << shards << std::endl;
// std::cout << "replication factor: " << replication_factor << std::endl;
// std::cout << "create ops: " << create_ops << std::endl;
// std::cout << "scan all ops: " << scan_ops << std::endl;
// std::cout << "shard worker threads: " << shard_worker_threads << std::endl;
// std::cout << "gap between shards: " << gap_between_shards << std::endl;
LocalSystem local_system;
auto cli_addr = Address::TestAddress(1);
@ -220,15 +230,11 @@ TEST(MachineManager, ManyShards) {
machine_1_addr,
};
auto shard_splits = 1024;
auto replication_factor = 1;
auto create_ops = 1000;
auto time_before_shard_map_creation = cli_io_2.Now();
ShardMap initialization_sm = TestShardMap(shard_splits, replication_factor);
ShardMap initialization_sm = TestShardMap(shards, replication_factor, gap_between_shards);
auto time_after_shard_map_creation = cli_io_2.Now();
auto mm_1 = MkMm(local_system, coordinator_addresses, machine_1_addr, initialization_sm);
auto mm_1 = MkMm(local_system, coordinator_addresses, machine_1_addr, initialization_sm, shard_worker_threads);
Address coordinator_address = mm_1.CoordinatorAddress();
auto mm_thread_1 = std::jthread(RunMachine, std::move(mm_1));
@ -253,7 +259,9 @@ TEST(MachineManager, ManyShards) {
auto time_after_creates = cli_io_2.Now();
ExecuteOp(shard_request_manager, correctness_model, ScanAll{});
for (int i = 0; i < scan_ops; i++) {
ExecuteOp(shard_request_manager, correctness_model, ScanAll{});
}
auto time_after_scan = cli_io_2.Now();
@ -261,14 +269,36 @@ TEST(MachineManager, ManyShards) {
auto latencies = cli_io_2.ResponseLatencies();
std::cout << "response latencies: \n" << latencies.SummaryTable();
// std::cout << "response latencies: \n" << latencies.SummaryTable();
std::cout << "split shard map: " << (time_after_shard_map_creation - time_before_shard_map_creation).count()
<< std::endl;
std::cout << "shard stabilization: " << (time_after_shard_stabilization - time_before_shard_stabilization).count()
<< std::endl;
std::cout << "create nodes: " << (time_after_creates - time_before_creates).count() << std::endl;
std::cout << "scan nodes: " << (time_after_scan - time_after_creates).count() << std::endl;
// std::cout << "serial time break-down: (μs)\n";
// std::cout << fmt::format("{: >20}: {: >10}\n", "split shard map", (time_after_shard_map_creation -
// time_before_shard_map_creation).count()); std::cout << fmt::format("{: >20}: {: >10}\n", "shard stabilization",
// (time_after_shard_stabilization - time_before_shard_stabilization).count()); std::cout << fmt::format("{: >20}: {:
// >10}\n", "create nodes", (time_after_creates - time_before_creates).count()); std::cout << fmt::format("{: >20}: {:
// >10}\n", "scan nodes", (time_after_scan - time_after_creates).count());
std::cout << fmt::format("{} {} {}\n", shards, shard_worker_threads, (time_after_scan - time_after_creates).count());
}
TEST(MachineManager, ManyShards) {
auto shards_attempts = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13,
14, 15, 16, 17, 18, 19, 20, 22, 24, 26, 28, 30, 32};
auto shard_worker_thread_attempts = {1, 2, 3, 4, 6, 8};
auto replication_factor = 1;
auto create_ops = 1024;
auto scan_ops = 1;
std::cout << "splits threads scan_all_microseconds\n";
for (const auto shards : shards_attempts) {
auto gap_between_shards = create_ops / shards;
for (const auto shard_worker_threads : shard_worker_thread_attempts) {
RunWorkload(shards, replication_factor, create_ops, scan_ops, shard_worker_threads, gap_between_shards);
}
}
}
} // namespace memgraph::tests::simulation