From cf7190ecc6830edca0043c39e39ac8ca79e8c5c1 Mon Sep 17 00:00:00 2001 From: Mislav Bradac Date: Tue, 19 Sep 2017 15:31:44 +0200 Subject: [PATCH] Refactor long running benchmark Reviewers: buda Reviewed By: buda Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D801 --- .../clients/long_running_client.cpp | 418 ++++++++++++------ tests/macro_benchmark/long_running_suite.py | 11 +- tools/plot_througput | 80 ++++ tools/requirements.txt | 8 + 4 files changed, 371 insertions(+), 146 deletions(-) create mode 100755 tools/plot_througput create mode 100644 tools/requirements.txt diff --git a/tests/macro_benchmark/clients/long_running_client.cpp b/tests/macro_benchmark/clients/long_running_client.cpp index d8cc84800..4ecdeae21 100644 --- a/tests/macro_benchmark/clients/long_running_client.cpp +++ b/tests/macro_benchmark/clients/long_running_client.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -44,52 +45,66 @@ struct VertexAndEdges { std::vector vertices; }; -std::pair DetachDeleteVertex(BoltClient &client, - const std::string &label, - int64_t id) { - auto vertex_record = - ExecuteNTimesTillSuccess( - client, "MATCH (n :" + label + " {id : $id}) RETURN n", - std::map{{"id", id}}, MAX_RETRIES) - .records; - CHECK(vertex_record.size() == 1U) << "id : " << id << " " - << vertex_record.size(); +std::atomic executed_queries; - auto records = - ExecuteNTimesTillSuccess( - client, "MATCH (n :" + label + " {id : $id})-[e]-(m) RETURN n, e, m", - std::map{{"id", id}}, MAX_RETRIES) - .records; +class Session { + public: + Session(const nlohmann::json &config, const std::string &address, + const std::string &port, const std::string &username, + const std::string &password) + : config_(config), client_(address, port, username, password) {} - ExecuteNTimesTillSuccess( - client, "MATCH (n :" + label + " {id : $id})-[]-(m) DETACH DELETE n", - std::map{{"id", id}}, MAX_RETRIES); + private: + const nlohmann::json &config_; + BoltClient client_; + std::unordered_map>> + stats_; + SpinLock lock_; - std::vector edges; - edges.reserve(records.size()); - for (const auto &record : records) { - edges.push_back(record[1].ValueEdge()); + auto Execute(const std::string &query, + const std::map ¶ms, + const std::string &query_name = "") { + utils::Timer timer; + auto result = ExecuteNTimesTillSuccess(client_, query, params, MAX_RETRIES); + ++executed_queries; + auto wall_time = timer.Elapsed(); + auto metadata = result.metadata; + metadata["wall_time"] = wall_time.count(); + { + std::unique_lock guard(lock_); + if (query_name != "") { + stats_[query_name].push_back(std::move(metadata)); + } else { + stats_[query].push_back(std::move(metadata)); + } + } + return result; } - std::vector vertices; - vertices.reserve(records.size()); - for (const auto &record : records) { - vertices.push_back(record[2].ValueVertex()); + auto MatchVertex(const std::string &label, int64_t id) { + return Execute(fmt::format("MATCH (n :{} {{id : $id}}) RETURN n", label), + {{"id", id}}); } - return {{vertex_record[0][0].ValueVertex(), edges, vertices}, 3}; -} + auto MatchNeighbours(const std::string &label, int64_t id) { + return Execute( + fmt::format("MATCH (n :{} {{id : $id}})-[e]-(m) RETURN n, e, m", label), + {{"id", id}}); + } -int ReturnVertexAndEdges(BoltClient &client, - const VertexAndEdges &vertex_and_edges, - const std::string &independent_label) { - int num_queries = 0; - { + auto DetachDeleteVertex(const std::string &label, int64_t id) { + return Execute( + fmt::format("MATCH (n :{} {{id : $id}}) DETACH DELETE n", label), + {{"id", id}}); + } + + auto CreateVertex(const DecodedVertex &vertex) { std::stringstream os; os << "CREATE (n :"; - PrintIterable(os, vertex_and_edges.vertex.labels, ":"); + PrintIterable(os, vertex.labels, ":"); os << " {"; - PrintIterable(os, vertex_and_edges.vertex.properties, ", ", + PrintIterable(os, vertex.properties, ", ", [&](auto &stream, const auto &pair) { if (pair.second.type() == DecodedValue::Type::String) { stream << pair.first << ": \"" << pair.second << "\""; @@ -98,19 +113,17 @@ int ReturnVertexAndEdges(BoltClient &client, } }); os << "})"; - ExecuteNTimesTillSuccess(client, os.str(), {}, MAX_RETRIES); - ++num_queries; + return Execute(os.str(), {}, "CREATE (n :labels... {...})"); } - for (int i = 0; i < static_cast(vertex_and_edges.vertices.size()); ++i) { + auto CreateEdge(const DecodedVertex &from, const std::string &from_label, + int64_t from_id, const std::string &to_label, int64_t to_id, + const DecodedEdge &edge) { std::stringstream os; - os << "MATCH (n :" << independent_label - << " {id: " << vertex_and_edges.vertex.properties.at("id") << "}) "; - os << "MATCH (m :" << independent_label - << " {id: " << vertex_and_edges.vertices[i].properties.at("id") << "}) "; - const auto &edge = vertex_and_edges.edges[i]; + os << fmt::format("MATCH (n :{} {{id : {}}}) ", from_label, from_id); + os << fmt::format("MATCH (m :{} {{id : {}}}) ", to_label, to_id); os << "CREATE (n)"; - if (edge.to == vertex_and_edges.vertex.id) { + if (edge.to == from.id) { os << "<-"; } else { os << "-"; @@ -125,27 +138,128 @@ int ReturnVertexAndEdges(BoltClient &client, } }); os << "}]"; - if (edge.from == vertex_and_edges.vertex.id) { + if (edge.from == from.id) { os << "->"; } else { os << "-"; } - os << "(m)"; - os << " RETURN n.id"; - auto ret = ExecuteNTimesTillSuccess(client, os.str(), {}, MAX_RETRIES); - auto x = ret.metadata["plan_execution_time"]; - auto y = ret.metadata["planning_time"]; - if (x.type() == DecodedValue::Type::Double) { - LOG_EVERY_N(INFO, 5000) << "exec " << x.ValueDouble() << " planning " - << y.ValueDouble(); - } + os << "(m) "; + os << "RETURN n.id"; + auto ret = Execute(os.str(), {}, + "MATCH (n :label {id: ...}) MATCH (m :label {id: ...}) " + "CREATE (n)-[:type ...]-(m)"); CHECK(ret.records.size() == 1U) - << "Graph in invalid state " - << vertex_and_edges.vertex.properties.at("id"); - ++num_queries; + << "from_id: " << from_id << " " + << "to_id: " << to_id << " " + << "ret.records.size(): " << ret.records.size(); + return ret; } - return num_queries; -} + + VertexAndEdges RetrieveAndDeleteVertex(const std::string &label, int64_t id) { + auto vertex_record = MatchVertex(label, id).records; + + CHECK(vertex_record.size() == 1U) + << "id: " << id << " " + << "vertex_record.size(): " << vertex_record.size(); + + auto records = MatchNeighbours(label, id).records; + + DetachDeleteVertex(label, id); + + std::vector edges; + edges.reserve(records.size()); + for (const auto &record : records) { + edges.push_back(record[1].ValueEdge()); + } + + std::vector vertices; + vertices.reserve(records.size()); + for (const auto &record : records) { + vertices.push_back(record[2].ValueVertex()); + } + + return {vertex_record[0][0].ValueVertex(), edges, vertices}; + } + + void ReturnVertexAndEdges(const VertexAndEdges &vertex_and_edges, + const std::string &label) { + int num_queries = 0; + CreateVertex(vertex_and_edges.vertex); + ++num_queries; + + for (int i = 0; i < static_cast(vertex_and_edges.vertices.size()); + ++i) { + auto records = + CreateEdge( + vertex_and_edges.vertex, label, + vertex_and_edges.vertex.properties.at("id").ValueInt(), label, + vertex_and_edges.vertices[i].properties.at("id").ValueInt(), + vertex_and_edges.edges[i]) + .records; + CHECK(records.size() == 1U) + << "Graph in invalid state " + << vertex_and_edges.vertex.properties.at("id"); + ++num_queries; + } + } + + public: + void Run(int id, std::vector to_remove, + std::atomic &keep_running) { + std::mt19937 rg(id); + std::vector removed; + + const auto &queries = config_["queries"]; + const double read_probability = config_["read_probability"]; + const std::string independent_label = config_["independent_label"]; + + while (keep_running) { + std::uniform_real_distribution<> real_dist(0.0, 1.0); + + // Read query. + if (real_dist(rg) < read_probability) { + CHECK(queries.size()) + << "Specify at least one read query or set read_probability to 0"; + std::uniform_int_distribution<> read_query_dist(0, queries.size() - 1); + const auto &query = queries[read_query_dist(rg)]; + std::map params; + for (const auto ¶m : query["params"]) { + std::uniform_int_distribution param_value_dist( + param["low"], param["high"]); + params[param["name"]] = param_value_dist(rg); + } + Execute(query["query"], params); + } else { + auto remove_random = [&](auto &v) { + CHECK(v.size()); + std::uniform_int_distribution<> int_dist(0, v.size() - 1); + std::swap(v.back(), v[int_dist(rg)]); + auto ret = v.back(); + v.pop_back(); + return ret; + }; + if (real_dist(rg) < static_cast(removed.size()) / + (removed.size() + to_remove.size())) { + auto vertices_and_edges = remove_random(removed); + ReturnVertexAndEdges(vertices_and_edges, independent_label); + to_remove.push_back( + vertices_and_edges.vertex.properties["id"].ValueInt()); + } else { + auto node_id = remove_random(to_remove); + auto ret = RetrieveAndDeleteVertex(independent_label, node_id); + removed.push_back(ret); + } + } + } + } + + auto ConsumeStats() { + std::unique_lock guard(lock_); + auto stats = stats_; + stats_.clear(); + return stats; + } +}; int64_t NumNodes(BoltClient &client, const std::string &label) { auto result = ExecuteNTimesTillSuccess( @@ -167,56 +281,66 @@ std::vector Neighbours(BoltClient &client, const std::string &label, return ret; } +std::vector IndependentSet(BoltClient &client, + const std::string &label) { + const int64_t num_nodes = NumNodes(client, label); + std::vector independent_nodes_ids; + std::vector ids; + std::unordered_set independent; + for (int64_t i = 1; i <= num_nodes; ++i) { + ids.push_back(i); + independent.insert(i); + } + { + std::mt19937 mt; + std::shuffle(ids.begin(), ids.end(), mt); + } + + for (auto i : ids) { + if (independent.find(i) == independent.end()) continue; + independent.erase(i); + std::vector neighbour_ids = Neighbours(client, label, i); + independent_nodes_ids.push_back(i); + for (auto j : neighbour_ids) { + independent.erase(j); + } + } + LOG(INFO) << "Number of nodes nodes: " << num_nodes << "\n" + << "Number of independent nodes: " << independent_nodes_ids.size(); + + return independent_nodes_ids; +} + int main(int argc, char **argv) { gflags::ParseCommandLineFlags(&argc, &argv, true); google::InitGoogleLogging(argv[0]); nlohmann::json config; std::cin >> config; - const auto &queries = config["queries"]; - const double read_probability = config["read_probability"]; const std::string independent_label = config["independent_label"]; - std::vector independent_nodes_ids; - BoltClient client(FLAGS_address, FLAGS_port, FLAGS_username, FLAGS_password); - const int64_t num_nodes = NumNodes(client, independent_label); - { - std::vector ids; - std::unordered_set independent; - for (int64_t i = 1; i <= num_nodes; ++i) { - ids.push_back(i); - independent.insert(i); - } - { - std::mt19937 mt; - std::shuffle(ids.begin(), ids.end(), mt); - } - - for (auto i : ids) { - if (independent.find(i) == independent.end()) continue; - independent.erase(i); - std::vector neighbour_ids = - Neighbours(client, independent_label, i); - independent_nodes_ids.push_back(i); - for (auto j : neighbour_ids) { - independent.erase(j); - } - } - } + auto independent_nodes_ids = [&] { + BoltClient client(FLAGS_address, FLAGS_port, FLAGS_username, + FLAGS_password); + return IndependentSet(client, independent_label); + }(); utils::Timer timer; std::vector threads; - std::atomic executed_queries{0}; std::atomic keep_running{true}; - LOG(INFO) << "nodes " << num_nodes << " independent " - << independent_nodes_ids.size(); - int64_t next_to_assign = 0; + std::vector> sessions; + sessions.reserve(FLAGS_num_workers); + for (int i = 0; i < FLAGS_num_workers; ++i) { int64_t size = independent_nodes_ids.size(); int64_t next_next_to_assign = next_to_assign + size / FLAGS_num_workers + (i < size % FLAGS_num_workers); + + sessions.push_back(std::make_unique( + config, FLAGS_address, FLAGS_port, FLAGS_username, FLAGS_password)); + std::vector to_remove( independent_nodes_ids.begin() + next_to_assign, independent_nodes_ids.begin() + next_next_to_assign); @@ -224,57 +348,9 @@ int main(int argc, char **argv) { next_to_assign = next_next_to_assign; threads.emplace_back( - [&](int thread_id, std::vector to_remove) { - BoltClient client(FLAGS_address, FLAGS_port, FLAGS_username, - FLAGS_password); - - std::mt19937 random_gen(thread_id); - std::vector removed; - - while (keep_running) { - std::uniform_real_distribution<> real_dist(0.0, 1.0); - - // Read query. - if (real_dist(random_gen) < read_probability) { - std::uniform_int_distribution<> read_query_dist( - 0, static_cast(queries.size()) - 1); - const auto &query = queries[read_query_dist(random_gen)]; - std::map params; - for (const auto ¶m : query["params"]) { - std::uniform_int_distribution param_value_dist( - param["low"], param["high"]); - params[param["name"]] = param_value_dist(random_gen); - } - ExecuteNTimesTillSuccess(client, query["query"], params, - MAX_RETRIES); - ++executed_queries; - } else { - if (real_dist(random_gen) < - static_cast(removed.size()) / - (removed.size() + to_remove.size())) { - CHECK(removed.size()); - std::uniform_int_distribution<> int_dist(0, removed.size() - 1); - std::swap(removed.back(), removed[int_dist(random_gen)]); - executed_queries += ReturnVertexAndEdges(client, removed.back(), - independent_label); - to_remove.push_back( - removed.back().vertex.properties["id"].ValueInt()); - removed.pop_back(); - } else { - CHECK(to_remove.size()); - std::uniform_int_distribution<> int_dist(0, - to_remove.size() - 1); - std::swap(to_remove.back(), to_remove[int_dist(random_gen)]); - auto ret = DetachDeleteVertex(client, independent_label, - to_remove.back()); - removed.push_back(ret.first); - to_remove.pop_back(); - executed_queries += ret.second; - } - } - } - - client.Close(); + [&](int thread_id, const std::vector to_remove) { + sessions[thread_id]->Run(thread_id, std::move(to_remove), + keep_running); }, i, std::move(to_remove)); } @@ -291,9 +367,63 @@ int main(int argc, char **argv) { std::ostream out(buf); while (timer.Elapsed().count() < FLAGS_duration) { + std::unordered_map> + aggregated_stats; + using namespace std::chrono_literals; - out << "{ \"num_executed_queries\": " << executed_queries << ", " - << "\"elapsed_time\": " << timer.Elapsed().count() << "}" << std::endl; + std::unordered_map>> + stats; + for (const auto &session : sessions) { + auto session_stats = session->ConsumeStats(); + for (const auto &session_query_stats : session_stats) { + auto &query_stats = stats[session_query_stats.first]; + query_stats.insert(query_stats.end(), + session_query_stats.second.begin(), + session_query_stats.second.end()); + } + } + + // TODO: Here we combine pure values, json and DecodedValue which is a + // little bit chaotic. Think about refactoring this part to only use json + // and write DecodedValue to json converter. + const std::vector fields = { + "wall_time", "parsing_time", "planning_time", "plan_execution_time", + }; + for (const auto &query_stats : stats) { + std::map new_aggregated_query_stats; + for (const auto &stats : query_stats.second) { + for (const auto &field : fields) { + auto it = stats.find(field); + if (it != stats.end()) { + new_aggregated_query_stats[field] += it->second.ValueDouble(); + } + } + } + int64_t new_count = query_stats.second.size(); + + auto &aggregated_query_stats = aggregated_stats[query_stats.first]; + aggregated_query_stats.insert({"count", DecodedValue(0)}); + auto old_count = aggregated_query_stats["count"].ValueInt(); + aggregated_query_stats["count"].ValueInt() += new_count; + for (const auto &stat : new_aggregated_query_stats) { + auto it = aggregated_query_stats.insert({stat.first, DecodedValue(0.0)}) + .first; + it->second = + (it->second.ValueDouble() * old_count + stat.second * new_count) / + (old_count + new_count); + } + } + + out << "{\"num_executed_queries\": " << executed_queries << ", " + << "\"elapsed_time\": " << timer.Elapsed().count() + << ", \"queries\": ["; + PrintIterable(out, aggregated_stats, ", ", [](auto &stream, const auto &x) { + stream << "{\"query\": " << nlohmann::json(x.first) << ", \"stats\": "; + PrintJsonDecodedValue(stream, DecodedValue(x.second)); + stream << "}"; + }); + out << "]}" << std::endl; out.flush(); std::this_thread::sleep_for(1s); } diff --git a/tests/macro_benchmark/long_running_suite.py b/tests/macro_benchmark/long_running_suite.py index 4f988e666..f3ca6413a 100644 --- a/tests/macro_benchmark/long_running_suite.py +++ b/tests/macro_benchmark/long_running_suite.py @@ -19,6 +19,7 @@ class LongRunningSuite: def __init__(self, args): argp = ArgumentParser("LongRunningSuiteArgumentParser") argp.add_argument("--num-client-workers", default=4) + argp.add_argument("--duration", type=int) self.args, _ = argp.parse_known_args(args) pass @@ -30,6 +31,8 @@ class LongRunningSuite: config = next(scenario.get("config")()) duration = config["duration"] + if self.args.duration: + duration = self.args.duration log.info("Executing run for {} seconds with {} client workers".format( duration, self.args.num_client_workers)) results = runner.run(next(scenario.get("run")()), duration, @@ -38,15 +41,19 @@ class LongRunningSuite: runner.stop() measurements = [] + summary_format = "{:>15} {:>22}\n" + self.summary = summary_format.format( + "elapsed_time", "num_executed_queries") for result in results: - print(result["num_executed_queries"], result["elapsed_time"]) + self.summary += summary_format.format( + result["elapsed_time"], result["num_executed_queries"]) # TODO: Revise this. measurements.append({ "target": "throughput", "value": result["num_executed_queries"] / result["elapsed_time"], "unit": "queries per second", "type": "throughput"}) - self.summary = "Throughtput: " + str(measurements[-1]["value"]) + self.summary += "\n\nThroughtput: " + str(measurements[-1]["value"]) return measurements def runners(self): diff --git a/tools/plot_througput b/tools/plot_througput new file mode 100755 index 000000000..805b0bb41 --- /dev/null +++ b/tools/plot_througput @@ -0,0 +1,80 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import json +import os +import matplotlib.pyplot as plt +from matplotlib.cbook import get_sample_data +from argparse import ArgumentParser + +COLORS = { + 'memgraph': '#ff7300', + 'neo4j': '#008cc2' +} + + +def parse_args(): + argp = ArgumentParser(description=__doc__) + argp.add_argument("--vendor-references", nargs="+", + help="Short references that represent all the " + "vendors that are going to be " + "visualized on the plot.") + argp.add_argument("--vendor-titles", nargs="+", + help="Vender titles that are going to appear " + "on the plot, e.g. legend titles.") + argp.add_argument("--results", nargs="+", + help="Result files for each vendor") + argp.add_argument("--plot-title", default="{{Plot title placeholder}}", + help="Plot title.") + #argp.add_argument("--max-label-width", default=11, type=int, + # help="Maximum length of the x-axis labels (-1 is unlimited)") + return argp.parse_args() + + +def main(): + # Read the arguments. + args = parse_args() + + # Prepare the datastructure. + vendors = {} + for vendor_reference, vendor_title, vendor_results in \ + zip(args.vendor_references, args.vendor_titles, args.results): + vendors[vendor_reference] = {} + vendors[vendor_reference]['title'] = vendor_title + vendors[vendor_reference]['results_path'] = vendor_results + vendors[vendor_reference]['color'] = COLORS[vendor_reference] + vendors[vendor_reference]['t'] = [] + vendors[vendor_reference]['dq/dt'] = [] + + fig, ax = plt.subplots() + ax.set_ylabel('Throughput (queries per second)') + ax.set_xlabel('Time (seconds)') + ax.set_title(args.plot_title) + ax.set_aspect(0.01) + + # Collect the benchmark data and plot lines. + print("Pokec throughput") + for vendor_reference, vendor_data in vendors.items(): + print("Vendor: %s" % vendor_reference) + with open(vendor_data['results_path']) as results_file: + # Skip first line which contains titles. + prev_time, prev_num_queries = 0.0, 0 + for line in results_file.readlines()[1:]: + data = line.split() + if data == []: break + assert len(data) == 2, "Invalid data" + new_time = float(data[0]) + new_num_quries = int(data[1]) + dt, dq = new_time - prev_time, new_num_quries - prev_num_queries + prev_time, prev_num_queries = new_time, new_num_quries + vendor_data['t'].append(new_time) + vendor_data['dq/dt'].append(dq / dt) + + line1, = ax.plot(vendor_data['t'], vendor_data['dq/dt'], '-', linewidth=2, + label=vendor_data['title'], color=vendor_data['color']) + + ax.legend(loc='lower right') + plt.show() + +if __name__ == '__main__': + main() diff --git a/tools/requirements.txt b/tools/requirements.txt new file mode 100644 index 000000000..e86db6d17 --- /dev/null +++ b/tools/requirements.txt @@ -0,0 +1,8 @@ +cycler==0.10.0 +matplotlib==2.0.2 +numpy==1.13.1 +pkg-resources==0.0.0 +pyparsing==2.2.0 +python-dateutil==2.6.1 +pytz==2017.2 +six==1.11.0