From 6d21e58b09f593ca163ec18eae4e3016177a37c0 Mon Sep 17 00:00:00 2001 From: Matej Ferencevic Date: Mon, 12 Nov 2018 11:19:27 +0100 Subject: [PATCH] Add ClientPool and ThreadPool tests to RPC benchmark Summary: Add script for plotting throughput Reviewers: teon.banek, mculinovic, buda Reviewed By: teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1721 --- tests/benchmark/rpc.cpp | 69 +++++++++++++++--- tools/plot/rpc_throughput | 148 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 209 insertions(+), 8 deletions(-) create mode 100755 tools/plot/rpc_throughput diff --git a/tests/benchmark/rpc.cpp b/tests/benchmark/rpc.cpp index 5d0cca52b..a8ded6a0a 100644 --- a/tests/benchmark/rpc.cpp +++ b/tests/benchmark/rpc.cpp @@ -44,15 +44,34 @@ DEFINE_bool(run_benchmark, true, "Set to false to only run server"); std::experimental::optional server; std::experimental::optional clients[kThreadsNum]; +std::experimental::optional client_pool; +std::experimental::optional thread_pool; static void BenchmarkRpc(benchmark::State &state) { - std::string data('a', state.range(0)); + std::string data(state.range(0), 'a'); while (state.KeepRunning()) { clients[state.thread_index]->Call(data); } state.SetItemsProcessed(state.iterations()); } +static void BenchmarkRpcPool(benchmark::State &state) { + std::string data(state.range(0), 'a'); + while (state.KeepRunning()) { + client_pool->Call(data); + } + state.SetItemsProcessed(state.iterations()); +} + +static void BenchmarkRpcPoolAsync(benchmark::State &state) { + std::string data(state.range(0), 'a'); + while (state.KeepRunning()) { + auto future = thread_pool->Run([&data] { client_pool->Call(data); }); + future.get(); + } + state.SetItemsProcessed(state.iterations()); +} + BENCHMARK(BenchmarkRpc) ->RangeMultiplier(4) ->Range(4, 1 << 13) @@ -60,10 +79,25 @@ BENCHMARK(BenchmarkRpc) ->Unit(benchmark::kNanosecond) ->UseRealTime(); +BENCHMARK(BenchmarkRpcPool) + ->RangeMultiplier(4) + ->Range(4, 1 << 13) + ->ThreadRange(1, kThreadsNum) + ->Unit(benchmark::kNanosecond) + ->UseRealTime(); + +BENCHMARK(BenchmarkRpcPoolAsync) + ->RangeMultiplier(4) + ->Range(4, 1 << 13) + ->ThreadRange(1, kThreadsNum) + ->Unit(benchmark::kNanosecond) + ->UseRealTime(); + int main(int argc, char **argv) { + ::benchmark::Initialize(&argc, argv); + gflags::AllowCommandLineReparsing(); gflags::ParseCommandLineFlags(&argc, &argv, true); google::InitGoogleLogging(argv[0]); - ::benchmark::Initialize(&argc, argv); if (FLAGS_run_server) { server.emplace( @@ -81,16 +115,35 @@ int main(int argc, char **argv) { if (FLAGS_run_benchmark) { std::this_thread::sleep_for(std::chrono::milliseconds(200)); + io::network ::Endpoint endpoint; + if (FLAGS_run_server) { + endpoint = server->endpoint(); + } else { + endpoint = io::network::Endpoint(FLAGS_server_address, FLAGS_server_port); + } + for (int i = 0; i < kThreadsNum; ++i) { - if (FLAGS_run_server) { - clients[i].emplace(server->endpoint()); - } else { - clients[i].emplace( - io::network::Endpoint(FLAGS_server_address, FLAGS_server_port)); - } + clients[i].emplace(endpoint); clients[i]->Call("init"); } + // The client pool connects to the server only when there are no leftover + // unused RPC clients (during concurrent execution). To reduce the overhead + // of making connections to the server during the benchmark here we + // simultaneously call the Echo RPC on the client pool to make the client + // pool connect to the server `kThreadsNum` times. + client_pool.emplace(endpoint); + std::thread threads[kThreadsNum]; + for (int i = 0; i < kThreadsNum; ++i) { + threads[i] = + std::thread([] { client_pool->Call(std::string(10000, 'a')); }); + } + for (int i = 0; i < kThreadsNum; ++i) { + threads[i].join(); + } + + thread_pool.emplace(kThreadsNum, "RPC client"); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); ::benchmark::RunSpecifiedBenchmarks(); diff --git a/tools/plot/rpc_throughput b/tools/plot/rpc_throughput new file mode 100755 index 000000000..042b61e5e --- /dev/null +++ b/tools/plot/rpc_throughput @@ -0,0 +1,148 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +RPC throughput barcharts generator (based on RPC benchmark JSON output). +To obtain data used for this script use: +`./tests/benchmark/rpc --benchmark_out=test.json --benchmark_out_format=json` +""" + +import json +import collections +import os +import argparse +import numpy +import shutil + +import matplotlib +# Must set "Agg" backend before importing pyplot +# This is so the script works on headless machines (without X11) +matplotlib.use("Agg") +import matplotlib.pyplot as plt + + +def parse_args(): + argp = argparse.ArgumentParser(description=__doc__) + argp.add_argument("input", help="Load data from file.") + argp.add_argument("output", help="Save plots to this directory.") + argp.add_argument("--plot-width", type=int, default=1920, + help="Pixel width of generated plots.") + argp.add_argument("--plot-height", type=int, default=1080, + help="Pixel height of generated plots.") + argp.add_argument("--plot-dpi", type=int, default=96, + help="DPI of generated plots.") + return argp.parse_args() + + +def humanize(num): + suffix = ["", "k", "M", "G", "T", "P", "E", "Z", "Y"] + pos = 0 + while num >= 1000.0: + if pos == len(suffix) - 1: + break + num /= 1000 + pos += 1 + return str(int(round(num, 0))) + suffix[pos] + + +def dehumanize(num): + pos = -1 + suffix = ["k", "M", "G", "T", "P", "E", "Z", "Y"] + for index, suff in enumerate(suffix): + if num.endswith(suff): + pos = index + num = num[:-1] + break + return int(num) * 1000 ** (pos + 1) + + +def autolabel(ax, rects): + for rect in rects: + height = rect.get_height() + ax.text(rect.get_x() + rect.get_width()/2., 1.00*height, + humanize(height), + ha="center", va="bottom") + + +def generate_plot(size, results, plot_width, plot_height, plot_dpi, + output_file): + # Font size. + plt.rc("font", size=10) + plt.rc("axes", titlesize=24) + plt.rc("axes", labelsize=16) + plt.rc("xtick", labelsize=12) + plt.rc("ytick", labelsize=12) + plt.rc("legend", fontsize=16) + plt.rc("figure", titlesize=24) + + groups = sorted(results.keys()) + categories = list(map(lambda x: x[0], results[groups[0]])) + + # Plot. + ind = numpy.arange(len(groups)) + width = 0.10 + fig, ax = plt.subplots() + fig.set_size_inches(plot_width / plot_dpi, + plot_height / plot_dpi) + ax.set_xlabel("Concurrent threads") + ax.set_ylabel("Throughput (call/s)") + ax.set_facecolor("#dcdcdc") + ax.set_xticks(ind + width / len(categories)) + ax.set_xticklabels(groups) + for line in ax.get_xgridlines(): + line.set_linestyle(" ") + for line in ax.get_ygridlines(): + line.set_linestyle("--") + ax.set_axisbelow(True) + plt.grid(True) + ax.set_title("RPC throughput (size: {})".format(size)) + + # Draw bars. + rects = [] + for index, category in enumerate(categories): + category_results = [results[group][index][1] for group in groups] + rect = ax.bar(ind + index * width, category_results, width) + rects.append(rect) + autolabel(ax, rect) + ax.legend(rects, categories) + + # Plot output. + plt.savefig(output_file, dpi=plot_dpi) + + +def main(): + # Read the arguments. + args = parse_args() + + # Load data. + with open(args.input) as f: + data = json.load(f) + + # Process data. + results = collections.defaultdict(lambda: collections.defaultdict(list)) + for benchmark in data["benchmarks"]: + info = benchmark["name"].split("/") + name, size = info[0:2] + threads = int(info[-1].split(":")[1]) + throughput = benchmark["items_per_second"] + results[size][threads].append((name, throughput)) + + # Cleanup output directory. + if os.path.isdir(args.output): + shutil.rmtree(args.output) + if os.path.exists(args.output): + os.remove(args.output) + os.mkdir(args.output) + + # Generate plots. + sizes = sorted(results.keys(), key=dehumanize) + for prefix, size in enumerate(sizes): + result = results[size] + output_file = os.path.join(args.output, + "{}_{}.png".format(prefix, size)) + generate_plot(size, result, args.plot_width, args.plot_height, + args.plot_dpi, output_file) + + +if __name__ == "__main__": + main()