From d640ca3f1ab23e6771f9d6e1e608be1e958b97c0 Mon Sep 17 00:00:00 2001 From: Mislav Bradac Date: Wed, 13 Sep 2017 21:20:03 +0200 Subject: [PATCH] Do preprocess in client Reviewers: buda Reviewed By: buda Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D793 --- tests/macro_benchmark/harness/clients.py | 4 +- .../harness/clients/bolt_client.hpp | 5 +- .../harness/clients/common.hpp | 1 + .../harness/clients/long_running_client.cpp | 152 +++++++++++------- .../harness/clients/postgres_client.hpp | 5 +- .../harness/clients/query_client.cpp | 65 ++++---- .../harness/groups/pokec/pokec_small.run.json | 2 - .../harness/long_running_suite.py | 23 +-- tests/macro_benchmark/harness/query_suite.py | 9 +- 9 files changed, 150 insertions(+), 116 deletions(-) diff --git a/tests/macro_benchmark/harness/clients.py b/tests/macro_benchmark/harness/clients.py index 38ca70978..f7a869a39 100644 --- a/tests/macro_benchmark/harness/clients.py +++ b/tests/macro_benchmark/harness/clients.py @@ -67,8 +67,10 @@ class QueryClient: str(queries), return_code, stderr) raise Exception("BoltClient execution failed") + data = {"groups" : []} with open(output) as f: - data = json.loads(f.read()) + for line in f: + data["groups"].append(json.loads(line)) data[CPU_TIME] = cpu_time_end - cpu_time_start data[MAX_MEMORY] = usage["max_memory"] diff --git a/tests/macro_benchmark/harness/clients/bolt_client.hpp b/tests/macro_benchmark/harness/clients/bolt_client.hpp index a9bbd99f9..ff0f32451 100644 --- a/tests/macro_benchmark/harness/clients/bolt_client.hpp +++ b/tests/macro_benchmark/harness/clients/bolt_client.hpp @@ -16,8 +16,9 @@ using communication::bolt::DecodedValue; class BoltClient { public: - BoltClient(std::string &address, std::string &port, std::string &username, - std::string &password, std::string database = "") { + BoltClient(const std::string &address, const std::string &port, + const std::string &username, const std::string &password, + const std::string & = "") { SocketT socket; EndpointT endpoint; diff --git a/tests/macro_benchmark/harness/clients/common.hpp b/tests/macro_benchmark/harness/clients/common.hpp index 8192cd5d2..4b81e200a 100644 --- a/tests/macro_benchmark/harness/clients/common.hpp +++ b/tests/macro_benchmark/harness/clients/common.hpp @@ -70,6 +70,7 @@ communication::bolt::QueryData ExecuteNTimesTillSuccess( std::chrono::milliseconds(rand_dist_(pseudo_rand_gen_))); } } + LOG(WARNING) << query << " failed " << times << "times"; throw last_exception; } } diff --git a/tests/macro_benchmark/harness/clients/long_running_client.cpp b/tests/macro_benchmark/harness/clients/long_running_client.cpp index 6084cce38..d8cc84800 100644 --- a/tests/macro_benchmark/harness/clients/long_running_client.cpp +++ b/tests/macro_benchmark/harness/clients/long_running_client.cpp @@ -12,6 +12,7 @@ #include #include +#include "bolt_client.hpp" #include "common.hpp" #include "communication/bolt/client.hpp" #include "communication/bolt/v1/decoder/decoded_value.hpp" @@ -23,9 +24,6 @@ #include "utils/assert.hpp" #include "utils/timer.hpp" -using SocketT = io::network::Socket; -using EndpointT = io::network::NetworkEndpoint; -using Client = communication::bolt::Client; using communication::bolt::DecodedValue; using communication::bolt::DecodedVertex; using communication::bolt::DecodedEdge; @@ -39,7 +37,6 @@ DEFINE_string(password, "", "Password for the database"); DEFINE_int32(duration, 30, "Number of seconds to execute benchmark"); const int MAX_RETRIES = 30; -const int NUM_BUCKETS = 100; struct VertexAndEdges { DecodedVertex vertex; @@ -47,17 +44,23 @@ struct VertexAndEdges { std::vector vertices; }; -std::pair DetachDeleteVertex(Client &client, +std::pair DetachDeleteVertex(BoltClient &client, const std::string &label, int64_t id) { + auto vertex_record = + ExecuteNTimesTillSuccess( + client, "MATCH (n :" + label + " {id : $id}) RETURN n", + std::map{{"id", id}}, MAX_RETRIES) + .records; + CHECK(vertex_record.size() == 1U) << "id : " << id << " " + << vertex_record.size(); + auto records = ExecuteNTimesTillSuccess( client, "MATCH (n :" + label + " {id : $id})-[e]-(m) RETURN n, e, m", std::map{{"id", id}}, MAX_RETRIES) .records; - if (records.size() == 0U) return {{}, 1}; - ExecuteNTimesTillSuccess( client, "MATCH (n :" + label + " {id : $id})-[]-(m) DETACH DELETE n", std::map{{"id", id}}, MAX_RETRIES); @@ -74,10 +77,11 @@ std::pair DetachDeleteVertex(Client &client, vertices.push_back(record[2].ValueVertex()); } - return {{records[0][0].ValueVertex(), edges, vertices}, 2}; + return {{vertex_record[0][0].ValueVertex(), edges, vertices}, 3}; } -int ReturnVertexAndEdges(Client &client, const VertexAndEdges &vertex_and_edges, +int ReturnVertexAndEdges(BoltClient &client, + const VertexAndEdges &vertex_and_edges, const std::string &independent_label) { int num_queries = 0; { @@ -134,13 +138,35 @@ int ReturnVertexAndEdges(Client &client, const VertexAndEdges &vertex_and_edges, if (x.type() == DecodedValue::Type::Double) { LOG_EVERY_N(INFO, 5000) << "exec " << x.ValueDouble() << " planning " << y.ValueDouble(); - CHECK(ret.records.size() == 1U) << "Graph in invalid state"; } + CHECK(ret.records.size() == 1U) + << "Graph in invalid state " + << vertex_and_edges.vertex.properties.at("id"); ++num_queries; } return num_queries; } +int64_t NumNodes(BoltClient &client, const std::string &label) { + auto result = ExecuteNTimesTillSuccess( + client, "MATCH (n :" + label + ") RETURN COUNT(n) as cnt", {}, + MAX_RETRIES); + return result.records[0][0].ValueInt(); +} + +std::vector Neighbours(BoltClient &client, const std::string &label, + int64_t id) { + auto result = ExecuteNTimesTillSuccess( + client, "MATCH (n :" + label + " {id: " + std::to_string(id) + + "})-[e]-(m) RETURN m.id", + {}, MAX_RETRIES); + std::vector ret; + for (const auto &record : result.records) { + ret.push_back(record[0].ValueInt()); + } + return ret; +} + int main(int argc, char **argv) { gflags::ParseCommandLineFlags(&argc, &argv, true); google::InitGoogleLogging(argv[0]); @@ -149,40 +175,60 @@ int main(int argc, char **argv) { std::cin >> config; const auto &queries = config["queries"]; const double read_probability = config["read_probability"]; - const int64_t num_independent_nodes = config["num_independent_nodes"]; const std::string independent_label = config["independent_label"]; - const int64_t num_nodes = config["num_nodes"]; + std::vector independent_nodes_ids; + + BoltClient client(FLAGS_address, FLAGS_port, FLAGS_username, FLAGS_password); + const int64_t num_nodes = NumNodes(client, independent_label); + { + std::vector ids; + std::unordered_set independent; + for (int64_t i = 1; i <= num_nodes; ++i) { + ids.push_back(i); + independent.insert(i); + } + { + std::mt19937 mt; + std::shuffle(ids.begin(), ids.end(), mt); + } + + for (auto i : ids) { + if (independent.find(i) == independent.end()) continue; + independent.erase(i); + std::vector neighbour_ids = + Neighbours(client, independent_label, i); + independent_nodes_ids.push_back(i); + for (auto j : neighbour_ids) { + independent.erase(j); + } + } + } utils::Timer timer; std::vector threads; std::atomic executed_queries{0}; std::atomic keep_running{true}; + LOG(INFO) << "nodes " << num_nodes << " independent " + << independent_nodes_ids.size(); + + int64_t next_to_assign = 0; for (int i = 0; i < FLAGS_num_workers; ++i) { + int64_t size = independent_nodes_ids.size(); + int64_t next_next_to_assign = next_to_assign + size / FLAGS_num_workers + + (i < size % FLAGS_num_workers); + std::vector to_remove( + independent_nodes_ids.begin() + next_to_assign, + independent_nodes_ids.begin() + next_next_to_assign); + LOG(INFO) << next_to_assign << " " << next_next_to_assign; + next_to_assign = next_next_to_assign; + threads.emplace_back( - [&](int thread_id) { - // Initialise client. - SocketT socket; - EndpointT endpoint; - try { - endpoint = EndpointT(FLAGS_address, FLAGS_port); - } catch (const io::network::NetworkEndpointException &e) { - LOG(FATAL) << "Invalid address or port: " << FLAGS_address << ":" - << FLAGS_port; - } - if (!socket.Connect(endpoint)) { - LOG(FATAL) << "Could not connect to: " << FLAGS_address << ":" - << FLAGS_port; - } - Client client(std::move(socket), FLAGS_username, FLAGS_password); + [&](int thread_id, std::vector to_remove) { + BoltClient client(FLAGS_address, FLAGS_port, FLAGS_username, + FLAGS_password); std::mt19937 random_gen(thread_id); - int64_t to_remove = - num_independent_nodes / FLAGS_num_workers * thread_id + 1; - int64_t last_to_remove = - to_remove + num_independent_nodes / FLAGS_num_workers; - bool remove = true; - int64_t num_shifts = 0; std::vector removed; while (keep_running) { @@ -203,40 +249,34 @@ int main(int argc, char **argv) { MAX_RETRIES); ++executed_queries; } else { - if (!remove) { + if (real_dist(random_gen) < + static_cast(removed.size()) / + (removed.size() + to_remove.size())) { + CHECK(removed.size()); + std::uniform_int_distribution<> int_dist(0, removed.size() - 1); + std::swap(removed.back(), removed[int_dist(random_gen)]); executed_queries += ReturnVertexAndEdges(client, removed.back(), independent_label); + to_remove.push_back( + removed.back().vertex.properties["id"].ValueInt()); removed.pop_back(); - if (removed.empty()) { - remove = true; - } } else { - auto ret = - DetachDeleteVertex(client, independent_label, to_remove); - ++to_remove; + CHECK(to_remove.size()); + std::uniform_int_distribution<> int_dist(0, + to_remove.size() - 1); + std::swap(to_remove.back(), to_remove[int_dist(random_gen)]); + auto ret = DetachDeleteVertex(client, independent_label, + to_remove.back()); + removed.push_back(ret.first); + to_remove.pop_back(); executed_queries += ret.second; - if (ret.second > 1) { - removed.push_back(std::move(ret.first)); - } - if (to_remove == last_to_remove) { - for (auto &x : removed) { - x.vertex.properties["id"].ValueInt() += num_nodes; - } - remove = false; - ++num_shifts; - to_remove = - num_independent_nodes / FLAGS_num_workers * thread_id + - 1 + num_shifts * num_nodes; - last_to_remove = - to_remove + num_independent_nodes / FLAGS_num_workers; - } } } } client.Close(); }, - i); + i, std::move(to_remove)); } // Open stream for writing stats. diff --git a/tests/macro_benchmark/harness/clients/postgres_client.hpp b/tests/macro_benchmark/harness/clients/postgres_client.hpp index 8d78fecf2..951125e81 100644 --- a/tests/macro_benchmark/harness/clients/postgres_client.hpp +++ b/tests/macro_benchmark/harness/clients/postgres_client.hpp @@ -28,8 +28,9 @@ class ClientQueryException : public ClientException { class Client { public: - Client(std::string &host, std::string &port, std::string &username, - std::string &password, std::string database = "") { + Client(const std::string &host, const std::string &port, + const std::string &username, const std::string &password, + const std::string &database = "") { // https://www.postgresql.org/docs/9.4/static/libpq-connect.html#LIBPQ-PARAMKEYWORDS std::string pass = ""; if (password != "") { diff --git a/tests/macro_benchmark/harness/clients/query_client.cpp b/tests/macro_benchmark/harness/clients/query_client.cpp index 534fcdc66..74cf43f01 100644 --- a/tests/macro_benchmark/harness/clients/query_client.cpp +++ b/tests/macro_benchmark/harness/clients/query_client.cpp @@ -8,6 +8,7 @@ #include "communication/bolt/v1/decoder/decoded_value.hpp" #include "threading/sync/spinlock.hpp" #include "utils/algorithm.hpp" +#include "utils/string.hpp" #include "utils/timer.hpp" #include "bolt_client.hpp" @@ -45,25 +46,20 @@ void PrintSummary( os << "{\"wall_time\": " << duration << ", " << "\"metadatas\": "; PrintJsonMetadata(os, metadata); - os << "}"; + os << "}\n"; } -template -void ExecuteQueries(std::istream &istream, int num_workers, +template +void ExecuteQueries(const std::vector &queries, int num_workers, std::ostream &ostream, std::string &address, std::string &port, std::string &username, std::string &password, std::string &database) { - std::string query; std::vector threads; SpinLock spinlock; uint64_t last = 0; - std::vector queries; std::vector> metadata; - while (std::getline(istream, query)) { - queries.push_back(query); - } metadata.resize(queries.size()); utils::Timer timer; @@ -86,7 +82,7 @@ void ExecuteQueries(std::istream &istream, int num_workers, try { metadata[pos] = ExecuteNTimesTillSuccess(client, str, {}, MAX_RETRIES).metadata; - } catch (const ExceptionT &e) { + } catch (const utils::BasicException &e) { LOG(FATAL) << "Could not execute query '" << str << "' " << MAX_RETRIES << " times! Error message: " << e.what(); } @@ -128,27 +124,38 @@ int main(int argc, char **argv) { std::string port = FLAGS_port; if (FLAGS_protocol == "bolt") { if (port == "") port = "7687"; - - using BoltClientT = BoltClient; - using BoltExceptionT = communication::bolt::ClientQueryException; - ExecuteQueries( - *istream, FLAGS_num_workers, *ostream, FLAGS_address, port, - FLAGS_username, FLAGS_password, FLAGS_database); } else if (FLAGS_protocol == "postgres") { - LOG(FATAL) << "Postgres not yet supported"; - // TODO: Currently libpq is linked dynamically so it is a pain to move - // harness_client executable to other machines without libpq. - // CHECK(FLAGS_username != "") << "Username can't be empty for - // postgres!"; - // CHECK(FLAGS_database != "") << "Database can't be empty for - // postgres!"; - // if (port == "") port = "5432"; - // - // using PostgresClientT = postgres::Client; - // using PostgresExceptionT = postgres::ClientQueryException; - // ExecuteQueries( - // *istream, FLAGS_num_workers, *ostream, FLAGS_address, port, - // FLAGS_username, FLAGS_password, FLAGS_database); + if (port == "") port = "5432"; + } + + while (!istream->eof()) { + std::vector queries; + std::string query; + while (std::getline(*istream, query) && utils::Trim(query) != "" && + utils::Trim(query) != ";") { + queries.push_back(query); + } + + if (FLAGS_protocol == "bolt") { + ExecuteQueries(queries, FLAGS_num_workers, *ostream, + FLAGS_address, port, FLAGS_username, + FLAGS_password, FLAGS_database); + } else if (FLAGS_protocol == "postgres") { + LOG(FATAL) << "Postgres not yet supported"; + // TODO: Currently libpq is linked dynamically so it is a pain to move + // harness_client executable to other machines without libpq. + // CHECK(FLAGS_username != "") << "Username can't be empty for + // postgres!"; + // CHECK(FLAGS_database != "") << "Database can't be empty for + // postgres!"; + // if (port == "") port = "5432"; + // + // using PostgresClientT = postgres::Client; + // using PostgresExceptionT = postgres::ClientQueryException; + // ExecuteQueries( + // *istream, FLAGS_num_workers, *ostream, FLAGS_address, port, + // FLAGS_username, FLAGS_password, FLAGS_database); + } } return 0; diff --git a/tests/macro_benchmark/harness/groups/pokec/pokec_small.run.json b/tests/macro_benchmark/harness/groups/pokec/pokec_small.run.json index 8cd666f00..6bd9d99f0 100644 --- a/tests/macro_benchmark/harness/groups/pokec/pokec_small.run.json +++ b/tests/macro_benchmark/harness/groups/pokec/pokec_small.run.json @@ -1,6 +1,4 @@ { - "num_independent_nodes" : 4111, - "num_nodes" : 10000, "independent_label": "User", "read_probability": 0.5, "queries" : [ diff --git a/tests/macro_benchmark/harness/long_running_suite.py b/tests/macro_benchmark/harness/long_running_suite.py index 642ad7279..4f988e666 100644 --- a/tests/macro_benchmark/harness/long_running_suite.py +++ b/tests/macro_benchmark/harness/long_running_suite.py @@ -24,33 +24,14 @@ class LongRunningSuite: def run(self, scenario, group_name, scenario_name, runner): runner.start() - # This suite allows empty lines in setup. Those lines separate query - # groups. It is guaranteed that groups will be executed sequentially, - # but queries in each group are possibly executed concurrently. - query_groups = [[]] - for query in scenario.get("setup")(): - if query == "": - query_groups.append([]) - else: - query_groups[-1].append(query) - if query_groups[-1] == []: - query_groups.pop() - log.info("Executing {} query groups in setup" - .format(len(query_groups))) - - for i, queries in enumerate(query_groups): - start_time = time.time() - # TODO: number of threads configurable - runner.setup(queries, self.args.num_client_workers) - log.info("\t{}. group imported in done in {:.2f} seconds".format( - i + 1, time.time() - start_time)) + log.info("Executing setup") + runner.setup(scenario.get("setup")(), self.args.num_client_workers) config = next(scenario.get("config")()) duration = config["duration"] log.info("Executing run for {} seconds with {} client workers".format( duration, self.args.num_client_workers)) - # TODO: number of threads configurable results = runner.run(next(scenario.get("run")()), duration, self.args.num_client_workers) diff --git a/tests/macro_benchmark/harness/query_suite.py b/tests/macro_benchmark/harness/query_suite.py index ce42891a2..7c05fc6a3 100644 --- a/tests/macro_benchmark/harness/query_suite.py +++ b/tests/macro_benchmark/harness/query_suite.py @@ -91,14 +91,17 @@ class _QuerySuite: execute("itersetup") run_result = execute("run", scenario_config.get("num_client_workers", 1)) - add_measurement(run_result, iteration, WALL_TIME) add_measurement(run_result, iteration, CPU_TIME) add_measurement(run_result, iteration, MAX_MEMORY) + assert len(run_result["groups"]) == 1, \ + "Multiple groups in run step not yet supported" + group = run_result["groups"][0] + add_measurement(group, iteration, WALL_TIME) for measurement in ["parsing_time", "plan_execution_time", "planning_time"] : - for i in range(len(run_result.get("metadatas", []))): - add_measurement(run_result["metadatas"][i], iteration, + for i in range(len(group.get("metadatas", []))): + add_measurement(group["metadatas"][i], iteration, measurement) execute("iterteardown")