From 5a162213e5216d0087c83090123d1c5280b92ce3 Mon Sep 17 00:00:00 2001 From: Marin Tomic Date: Fri, 26 Jan 2018 18:50:16 +0100 Subject: [PATCH] Refactor pokec, add card fraud Reviewers: buda, mculinovic, mferencevic Reviewed By: mculinovic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1143 --- tests/macro_benchmark/clients.py | 4 +- tests/macro_benchmark/clients/bolt_client.hpp | 2 + .../clients/card_fraud_client.cpp | 118 ++++++++ tests/macro_benchmark/clients/common.hpp | 2 + .../clients/long_running_common.hpp | 172 ++++++++++++ ...ng_running_client.cpp => pokec_client.cpp} | 257 +++++------------- .../groups/card_fraud/.gitignore | 1 + .../groups/card_fraud/config.json | 4 + .../groups/card_fraud/read_only.run.json | 3 + .../groups/card_fraud/read_write.run.json | 3 + .../groups/card_fraud/setup.py | 47 ++++ .../macro_benchmark/groups/pokec/config.json | 3 +- tests/macro_benchmark/groups/pokec/run.json | 25 +- tests/macro_benchmark/long_running_suite.py | 8 +- tests/macro_benchmark/run_card_fraud | 18 ++ 15 files changed, 442 insertions(+), 225 deletions(-) create mode 100644 tests/macro_benchmark/clients/card_fraud_client.cpp create mode 100644 tests/macro_benchmark/clients/long_running_common.hpp rename tests/macro_benchmark/clients/{long_running_client.cpp => pokec_client.cpp} (51%) create mode 100644 tests/macro_benchmark/groups/card_fraud/.gitignore create mode 100644 tests/macro_benchmark/groups/card_fraud/config.json create mode 100644 tests/macro_benchmark/groups/card_fraud/read_only.run.json create mode 100644 tests/macro_benchmark/groups/card_fraud/read_write.run.json create mode 100644 tests/macro_benchmark/groups/card_fraud/setup.py create mode 100755 tests/macro_benchmark/run_card_fraud diff --git a/tests/macro_benchmark/clients.py b/tests/macro_benchmark/clients.py index 8b4aaf8b0..289525d9f 100644 --- a/tests/macro_benchmark/clients.py +++ b/tests/macro_benchmark/clients.py @@ -86,11 +86,11 @@ class LongRunningClient: # TODO: This is quite similar to __call__ method of QueryClient. Remove # duplication. - def __call__(self, config, database, duration, num_workers=None): + def __call__(self, config, database, duration, client, num_workers=None): if num_workers is None: num_workers = self.default_num_workers self.log.debug("execute('%s')", config) - client_path = "tests/macro_benchmark/long_running_client" + client_path = "tests/macro_benchmark/{}".format(client) client = get_absolute_path(client_path, "build") if not os.path.exists(client): # Apollo builds both debug and release binaries on diff diff --git a/tests/macro_benchmark/clients/bolt_client.hpp b/tests/macro_benchmark/clients/bolt_client.hpp index a9fb5e68f..9d9659934 100644 --- a/tests/macro_benchmark/clients/bolt_client.hpp +++ b/tests/macro_benchmark/clients/bolt_client.hpp @@ -1,3 +1,5 @@ +#pragma once + #include #include diff --git a/tests/macro_benchmark/clients/card_fraud_client.cpp b/tests/macro_benchmark/clients/card_fraud_client.cpp new file mode 100644 index 000000000..e67498175 --- /dev/null +++ b/tests/macro_benchmark/clients/card_fraud_client.cpp @@ -0,0 +1,118 @@ +#include +#include +#include + +#include "gflags/gflags.h" + +#include "long_running_common.hpp" + +class CardFraudClient : public TestClient { + public: + CardFraudClient(int id, int num_pos, nlohmann::json config) + : TestClient(), rg_(id), num_pos_(num_pos), config_(config) {} + + private: + std::mt19937 rg_; + int num_pos_; + nlohmann::json config_; + + auto GetFraudulentTransactions() { + return Execute( + "MATCH (t:Transaction {fraud_reported: true}) " + "RETURN t.id as id", + {}); + } + + auto GetCompromisedPos() { + return Execute( + "MATCH (t:Transaction {fraud_reported: true})-[:Using]->(:Card)" + "<-[:Using]-(:Transaction)-[:At]->(p:Pos) " + "WITH p.id as pos, count(t) as connected_frauds " + "WHERE connected_frauds > 1 " + "RETURN pos, connected_frauds ORDER BY connected_frauds DESC", + {}); + } + + auto ResolvePos(int id) { + return Execute( + "MATCH (p:Pos {id: $id}) " + "SET p.compromised = false " + "WITH p MATCH (p)--(t:Transaction)--(c:Card) " + "SET t.fraud_reported = false, c.compromised = false", + {{"id", id}}); + } + + auto CompromisePos(int id) { + return Execute( + "MATCH (p:Pos {id: $id}) " + "SET p.compromised = true " + "WITH p MATCH (p)--(t:Transaction)--(c:Card) " + "SET t.fraud_reported = false, c.compromised = true", + {{"id", id}}); + } + + public: + virtual void Step() override { + if (config_["scenario"] == "read_only") { + std::uniform_int_distribution dist(0, 1); + if (dist(rg_)) { + GetFraudulentTransactions(); + } else { + GetCompromisedPos(); + } + } else if (config_["scenario"] == "read_write") { + std::uniform_int_distribution dist(0, num_pos_ - 1); + int pos_id = dist(rg_); + CompromisePos(pos_id); + GetFraudulentTransactions(); + ResolvePos(pos_id); + } else { + LOG(FATAL) << "Should not get here!"; + } + } +}; + +int64_t NumPos(BoltClient &client) { + auto result = ExecuteNTimesTillSuccess( + client, "MATCH (n :Pos) RETURN COUNT(n) as cnt;", {}, MAX_RETRIES); + return result.records[0][0].ValueInt(); +} + +void CreateIndex(BoltClient &client, const std::string &label, + const std::string &property) { + LOG(INFO) << fmt::format("Creating indexes for :{}({})...", label, property); + ExecuteNTimesTillSuccess( + client, fmt::format("CREATE INDEX ON :{}({});", label, property), {}, + MAX_RETRIES); + try { + LOG(INFO) << fmt::format("Trying to sync indexes..."); + ExecuteNTimesTillSuccess(client, "CALL db.awaitIndexes(14400);", {}, + MAX_RETRIES); + } catch (utils::BasicException &e) { + LOG(WARNING) << "Index sync failed: " << e.what(); + } +} + +int main(int argc, char **argv) { + gflags::ParseCommandLineFlags(&argc, &argv, true); + google::InitGoogleLogging(argv[0]); + + nlohmann::json config; + std::cin >> config; + + BoltClient client(FLAGS_address, FLAGS_port, FLAGS_username, FLAGS_password); + int num_pos = NumPos(client); + CreateIndex(client, "Card", "id"); + CreateIndex(client, "Pos", "id"); + CreateIndex(client, "Transaction", "fraud_reported"); + + std::vector> clients; + for (int i = 0; i < FLAGS_num_workers; ++i) { + clients.emplace_back(std::make_unique(i, num_pos, config)); + } + + RunMultithreadedTest(clients); + + return 0; + +} diff --git a/tests/macro_benchmark/clients/common.hpp b/tests/macro_benchmark/clients/common.hpp index a90051f6b..055f66bb8 100644 --- a/tests/macro_benchmark/clients/common.hpp +++ b/tests/macro_benchmark/clients/common.hpp @@ -1,3 +1,5 @@ +#pragma once + #include #include #include diff --git a/tests/macro_benchmark/clients/long_running_common.hpp b/tests/macro_benchmark/clients/long_running_common.hpp new file mode 100644 index 000000000..35fb51feb --- /dev/null +++ b/tests/macro_benchmark/clients/long_running_common.hpp @@ -0,0 +1,172 @@ +#include "json/json.hpp" + +#include "bolt_client.hpp" +#include "common.hpp" +#include "communication/bolt/v1/decoder/decoded_value.hpp" +#include "utils/timer.hpp" + +const int MAX_RETRIES = 30; + +DEFINE_string(address, "127.0.0.1", "Server address"); +DEFINE_int32(port, 7687, "Server port"); +DEFINE_int32(num_workers, 1, "Number of workers"); +DEFINE_string(output, "", "Output file"); +DEFINE_string(username, "", "Username for the database"); +DEFINE_string(password, "", "Password for the database"); +DEFINE_int32(duration, 30, "Number of seconds to execute benchmark"); + +class TestClient { + public: + TestClient() + : client_(FLAGS_address, FLAGS_port, FLAGS_username, FLAGS_password) {} + + virtual ~TestClient() {} + + auto ConsumeStats() { + std::unique_lock guard(lock_); + auto stats = stats_; + stats_.clear(); + return stats; + } + + void Run() { + runner_thread_ = std::thread([&] { + while (keep_running_) { + Step(); + } + }); + } + + void Stop() { + keep_running_ = false; + runner_thread_.join(); + } + + protected: + virtual void Step() = 0; + + communication::bolt::QueryData Execute( + const std::string &query, + const std::map ¶ms, + const std::string &query_name = "") { + utils::Timer timer; + auto result = ExecuteNTimesTillSuccess(client_, query, params, MAX_RETRIES); + auto wall_time = timer.Elapsed(); + auto metadata = result.metadata; + metadata["wall_time"] = wall_time.count(); + { + std::unique_lock guard(lock_); + if (query_name != "") { + stats_[query_name].push_back(std::move(metadata)); + } else { + stats_[query].push_back(std::move(metadata)); + } + } + return result; + } + + SpinLock lock_; + std::unordered_map>> + stats_; + + std::atomic keep_running_{true}; + std::thread runner_thread_; + + private: + BoltClient client_; +}; + +void RunMultithreadedTest(std::vector> &clients) { + CHECK((int)clients.size() == FLAGS_num_workers); + + // Open stream for writing stats. + std::streambuf *buf; + std::ofstream f; + if (FLAGS_output != "") { + f.open(FLAGS_output); + buf = f.rdbuf(); + } else { + buf = std::cout.rdbuf(); + } + std::ostream out(buf); + + utils::Timer timer; + for (auto &client : clients) { + client->Run(); + } + LOG(INFO) << "Starting test with " << clients.size() << " workers"; + uint64_t executed_queries = 0; + while (timer.Elapsed().count() < FLAGS_duration) { + std::unordered_map> + aggregated_stats; + + using namespace std::chrono_literals; + std::unordered_map>> + stats; + for (const auto &client : clients) { + auto client_stats = client->ConsumeStats(); + for (const auto &client_query_stats : client_stats) { + auto &query_stats = stats[client_query_stats.first]; + query_stats.insert(query_stats.end(), client_query_stats.second.begin(), + client_query_stats.second.end()); + executed_queries += + client_query_stats.second.end() - client_query_stats.second.begin(); + } + } + + // TODO: Here we combine pure values, json and DecodedValue which is a + // little bit chaotic. Think about refactoring this part to only use json + // and write DecodedValue to json converter. + const std::vector fields = { + "wall_time", + "parsing_time", + "planning_time", + "plan_execution_time", + }; + for (const auto &query_stats : stats) { + std::map new_aggregated_query_stats; + for (const auto &stats : query_stats.second) { + for (const auto &field : fields) { + auto it = stats.find(field); + if (it != stats.end()) { + new_aggregated_query_stats[field] += it->second.ValueDouble(); + } + } + } + int64_t new_count = query_stats.second.size(); + + auto &aggregated_query_stats = aggregated_stats[query_stats.first]; + aggregated_query_stats.insert({"count", DecodedValue(0)}); + auto old_count = aggregated_query_stats["count"].ValueInt(); + aggregated_query_stats["count"].ValueInt() += new_count; + for (const auto &stat : new_aggregated_query_stats) { + auto it = aggregated_query_stats.insert({stat.first, DecodedValue(0.0)}) + .first; + it->second = + (it->second.ValueDouble() * old_count + stat.second * new_count) / + (old_count + new_count); + } + } + + out << "{\"num_executed_queries\": " << executed_queries << ", " + << "\"elapsed_time\": " << timer.Elapsed().count() + << ", \"queries\": ["; + utils::PrintIterable( + out, aggregated_stats, ", ", [](auto &stream, const auto &x) { + stream << "{\"query\": " << nlohmann::json(x.first) + << ", \"stats\": "; + PrintJsonDecodedValue(stream, DecodedValue(x.second)); + stream << "}"; + }); + out << "]}" << std::endl; + out.flush(); + std::this_thread::sleep_for(1s); + } + LOG(INFO) << "Stopping workers..."; + for (auto &client : clients) { + client->Stop(); + } + LOG(INFO) << "Stopped workers..."; +} diff --git a/tests/macro_benchmark/clients/long_running_client.cpp b/tests/macro_benchmark/clients/pokec_client.cpp similarity index 51% rename from tests/macro_benchmark/clients/long_running_client.cpp rename to tests/macro_benchmark/clients/pokec_client.cpp index 08eaa841d..c29f5b2ff 100644 --- a/tests/macro_benchmark/clients/long_running_client.cpp +++ b/tests/macro_benchmark/clients/pokec_client.cpp @@ -19,6 +19,7 @@ #include "communication/bolt/v1/decoder/decoded_value.hpp" #include "io/network/endpoint.hpp" #include "io/network/socket.hpp" +#include "long_running_common.hpp" #include "threading/sync/spinlock.hpp" #include "utils/algorithm.hpp" #include "utils/network.hpp" @@ -28,58 +29,24 @@ using communication::bolt::DecodedEdge; using communication::bolt::DecodedValue; using communication::bolt::DecodedVertex; -DEFINE_string(address, "127.0.0.1", "Server address"); -DEFINE_int32(port, 7687, "Server port"); -DEFINE_int32(num_workers, 1, "Number of workers"); -DEFINE_string(output, "", "Output file"); -DEFINE_string(username, "", "Username for the database"); -DEFINE_string(password, "", "Password for the database"); -DEFINE_int32(duration, 30, "Number of seconds to execute benchmark"); - -const int MAX_RETRIES = 30; - struct VertexAndEdges { DecodedVertex vertex; std::vector edges; std::vector vertices; }; -std::atomic executed_queries; +const std::string INDEPENDENT_LABEL = "User"; -class Session { +class PokecClient : public TestClient { public: - Session(const nlohmann::json &config, const std::string &address, - uint16_t port, const std::string &username, - const std::string &password) - : config_(config), client_(address, port, username, password) {} + PokecClient(int id, std::vector to_remove, nlohmann::json config) + : TestClient(), rg_(id), config_(config), to_remove_(to_remove) {} private: - const nlohmann::json &config_; - BoltClient client_; - std::unordered_map>> - stats_; - SpinLock lock_; - - auto Execute(const std::string &query, - const std::map ¶ms, - const std::string &query_name = "") { - utils::Timer timer; - auto result = ExecuteNTimesTillSuccess(client_, query, params, MAX_RETRIES); - ++executed_queries; - auto wall_time = timer.Elapsed(); - auto metadata = result.metadata; - metadata["wall_time"] = wall_time.count(); - { - std::unique_lock guard(lock_); - if (query_name != "") { - stats_[query_name].push_back(std::move(metadata)); - } else { - stats_[query].push_back(std::move(metadata)); - } - } - return result; - } + std::mt19937 rg_; + nlohmann::json config_; + std::vector to_remove_; + std::vector removed_; auto MatchVertex(const std::string &label, int64_t id) { return Execute(fmt::format("MATCH (n :{} {{id : $id}}) RETURN n", label), @@ -115,6 +82,20 @@ class Session { return Execute(os.str(), {}, "CREATE (n :labels... {...})"); } + auto GetAverageAge2(int64_t id) { + return Execute( + "MATCH (n :User {id: $id})-[]-(m) " + "RETURN AVG(n.age + m.age)", + {{"id", id}}); + } + + auto GetAverageAge3(int64_t id) { + return Execute( + "MATCH (n :User {id: $id})-[]-(m)-[]-(k) " + "RETURN AVG(n.age + m.age + k.age)", + {{"id", id}}); + } + auto CreateEdge(const DecodedVertex &from, const std::string &from_label, int64_t from_id, const std::string &to_label, int64_t to_id, const DecodedEdge &edge) { @@ -203,61 +184,43 @@ class Session { } public: - void Run(int id, std::vector to_remove, - std::atomic &keep_running) { - std::mt19937 rg(id); - std::vector removed; - - const auto &queries = config_["queries"]; - const double read_probability = config_["read_probability"]; - const std::string independent_label = config_["independent_label"]; - - while (keep_running) { - std::uniform_real_distribution<> real_dist(0.0, 1.0); - - // Read query. - if (real_dist(rg) < read_probability) { - CHECK(queries.size()) - << "Specify at least one read query or set read_probability to 0"; - std::uniform_int_distribution<> read_query_dist(0, queries.size() - 1); - const auto &query = queries[read_query_dist(rg)]; - std::map params; - for (const auto ¶m : query["params"]) { - std::uniform_int_distribution param_value_dist( - param["low"], param["high"]); - params[param["name"]] = param_value_dist(rg); - } - Execute(query["query"], params); + virtual void Step() override { + std::uniform_real_distribution<> real_dist(0.0, 1.0); + if (real_dist(rg_) < config_["read_probability"]) { + std::uniform_int_distribution<> read_query_dist(0, 1); + int id = real_dist(rg_) * to_remove_.size(); + switch (read_query_dist(rg_)) { + case 0: + GetAverageAge2(id); + break; + case 1: + GetAverageAge3(id); + break; + default: + LOG(FATAL) << "Should not get here"; + } + } else { + auto remove_random = [&](auto &v) { + CHECK(v.size()); + std::uniform_int_distribution<> int_dist(0, v.size() - 1); + std::swap(v.back(), v[int_dist(rg_)]); + auto ret = v.back(); + v.pop_back(); + return ret; + }; + if (real_dist(rg_) < static_cast(removed_.size()) / + (removed_.size() + to_remove_.size())) { + auto vertices_and_edges = remove_random(removed_); + ReturnVertexAndEdges(vertices_and_edges, INDEPENDENT_LABEL); + to_remove_.push_back( + vertices_and_edges.vertex.properties["id"].ValueInt()); } else { - auto remove_random = [&](auto &v) { - CHECK(v.size()); - std::uniform_int_distribution<> int_dist(0, v.size() - 1); - std::swap(v.back(), v[int_dist(rg)]); - auto ret = v.back(); - v.pop_back(); - return ret; - }; - if (real_dist(rg) < static_cast(removed.size()) / - (removed.size() + to_remove.size())) { - auto vertices_and_edges = remove_random(removed); - ReturnVertexAndEdges(vertices_and_edges, independent_label); - to_remove.push_back( - vertices_and_edges.vertex.properties["id"].ValueInt()); - } else { - auto node_id = remove_random(to_remove); - auto ret = RetrieveAndDeleteVertex(independent_label, node_id); - removed.push_back(ret); - } + auto node_id = remove_random(to_remove_); + auto ret = RetrieveAndDeleteVertex(INDEPENDENT_LABEL, node_id); + removed_.push_back(ret); } } } - - auto ConsumeStats() { - std::unique_lock guard(lock_); - auto stats = stats_; - stats_.clear(); - return stats; - } }; int64_t NumNodes(BoltClient &client, const std::string &label) { @@ -305,7 +268,7 @@ std::vector IndependentSet(BoltClient &client, independent.erase(j); } } - LOG(INFO) << "Number of nodes nodes: " << num_nodes << "\n" + LOG(INFO) << "Number of nodes: " << num_nodes << "\n" << "Number of independent nodes: " << independent_nodes_ids.size(); return independent_nodes_ids; @@ -317,126 +280,32 @@ int main(int argc, char **argv) { nlohmann::json config; std::cin >> config; - const std::string independent_label = config["independent_label"]; auto independent_nodes_ids = [&] { BoltClient client(utils::ResolveHostname(FLAGS_address), FLAGS_port, FLAGS_username, FLAGS_password); - return IndependentSet(client, independent_label); + return IndependentSet(client, INDEPENDENT_LABEL); }(); - utils::Timer timer; - std::vector threads; - std::atomic keep_running{true}; - int64_t next_to_assign = 0; - std::vector> sessions; - sessions.reserve(FLAGS_num_workers); + std::vector> clients; + clients.reserve(FLAGS_num_workers); for (int i = 0; i < FLAGS_num_workers; ++i) { int64_t size = independent_nodes_ids.size(); int64_t next_next_to_assign = next_to_assign + size / FLAGS_num_workers + (i < size % FLAGS_num_workers); - sessions.push_back(std::make_unique( - config, FLAGS_address, FLAGS_port, FLAGS_username, FLAGS_password)); - std::vector to_remove( independent_nodes_ids.begin() + next_to_assign, independent_nodes_ids.begin() + next_next_to_assign); LOG(INFO) << next_to_assign << " " << next_next_to_assign; next_to_assign = next_next_to_assign; - threads.emplace_back( - [&](int thread_id, const std::vector to_remove) { - sessions[thread_id]->Run(thread_id, std::move(to_remove), - keep_running); - }, - i, std::move(to_remove)); - } - - // Open stream for writing stats. - std::streambuf *buf; - std::ofstream f; - if (FLAGS_output != "") { - f.open(FLAGS_output); - buf = f.rdbuf(); - } else { - buf = std::cout.rdbuf(); - } - std::ostream out(buf); - - while (timer.Elapsed().count() < FLAGS_duration) { - std::unordered_map> - aggregated_stats; - - using namespace std::chrono_literals; - std::unordered_map>> - stats; - for (const auto &session : sessions) { - auto session_stats = session->ConsumeStats(); - for (const auto &session_query_stats : session_stats) { - auto &query_stats = stats[session_query_stats.first]; - query_stats.insert(query_stats.end(), - session_query_stats.second.begin(), - session_query_stats.second.end()); - } - } - - // TODO: Here we combine pure values, json and DecodedValue which is a - // little bit chaotic. Think about refactoring this part to only use json - // and write DecodedValue to json converter. - const std::vector fields = { - "wall_time", - "parsing_time", - "planning_time", - "plan_execution_time", - }; - for (const auto &query_stats : stats) { - std::map new_aggregated_query_stats; - for (const auto &stats : query_stats.second) { - for (const auto &field : fields) { - auto it = stats.find(field); - if (it != stats.end()) { - new_aggregated_query_stats[field] += it->second.ValueDouble(); - } - } - } - int64_t new_count = query_stats.second.size(); - - auto &aggregated_query_stats = aggregated_stats[query_stats.first]; - aggregated_query_stats.insert({"count", DecodedValue(0)}); - auto old_count = aggregated_query_stats["count"].ValueInt(); - aggregated_query_stats["count"].ValueInt() += new_count; - for (const auto &stat : new_aggregated_query_stats) { - auto it = aggregated_query_stats.insert({stat.first, DecodedValue(0.0)}) - .first; - it->second = - (it->second.ValueDouble() * old_count + stat.second * new_count) / - (old_count + new_count); - } - } - - out << "{\"num_executed_queries\": " << executed_queries << ", " - << "\"elapsed_time\": " << timer.Elapsed().count() - << ", \"queries\": ["; - utils::PrintIterable( - out, aggregated_stats, ", ", [](auto &stream, const auto &x) { - stream << "{\"query\": " << nlohmann::json(x.first) - << ", \"stats\": "; - PrintJsonDecodedValue(stream, DecodedValue(x.second)); - stream << "}"; - }); - out << "]}" << std::endl; - out.flush(); - std::this_thread::sleep_for(1s); - } - keep_running = false; - - for (int i = 0; i < FLAGS_num_workers; ++i) { - threads[i].join(); + clients.emplace_back(std::make_unique(i, to_remove, config)); } + RunMultithreadedTest(clients); + return 0; } diff --git a/tests/macro_benchmark/groups/card_fraud/.gitignore b/tests/macro_benchmark/groups/card_fraud/.gitignore new file mode 100644 index 000000000..02b720e54 --- /dev/null +++ b/tests/macro_benchmark/groups/card_fraud/.gitignore @@ -0,0 +1 @@ +pokec_*.setup.cypher diff --git a/tests/macro_benchmark/groups/card_fraud/config.json b/tests/macro_benchmark/groups/card_fraud/config.json new file mode 100644 index 000000000..32e2c2b33 --- /dev/null +++ b/tests/macro_benchmark/groups/card_fraud/config.json @@ -0,0 +1,4 @@ +{ + "duration": 30, + "client": "card_fraud_client" +} diff --git a/tests/macro_benchmark/groups/card_fraud/read_only.run.json b/tests/macro_benchmark/groups/card_fraud/read_only.run.json new file mode 100644 index 000000000..291ec5de4 --- /dev/null +++ b/tests/macro_benchmark/groups/card_fraud/read_only.run.json @@ -0,0 +1,3 @@ +{ + "scenario": "read_only" +} diff --git a/tests/macro_benchmark/groups/card_fraud/read_write.run.json b/tests/macro_benchmark/groups/card_fraud/read_write.run.json new file mode 100644 index 000000000..48c4456e6 --- /dev/null +++ b/tests/macro_benchmark/groups/card_fraud/read_write.run.json @@ -0,0 +1,3 @@ +{ + "scenario": "read_write" +} diff --git a/tests/macro_benchmark/groups/card_fraud/setup.py b/tests/macro_benchmark/groups/card_fraud/setup.py new file mode 100644 index 000000000..7acf7d0fe --- /dev/null +++ b/tests/macro_benchmark/groups/card_fraud/setup.py @@ -0,0 +1,47 @@ +import random + +def init_data(card_count, pos_count): + print("UNWIND range(0, {} - 1) AS id " + "CREATE (:Card {{id: id, compromised: false}});".format( + card_count)) + print("UNWIND range(0, {} - 1) AS id " + "CREATE (:Pos {{id: id, compromised: false}});".format( + pos_count)) + + +def compromise_pos_device(pos_id): + print("MATCH (p:Pos {{id: {}}}) SET p.compromised = true;".format(pos_id)) + + +def compromise_pos_devices(pos_count, fraud_count): + for pos_id in random.sample(range(pos_count), fraud_count): + compromise_pos_device(pos_id) + + +def pump_transactions(card_count, pos_count, tx_count, report_pct): + # Create transactions. If the POS is compromised, then the + # Card of the transaction gets compromised too. If the card + # is compromised, there is a 0.1 chance the transaction is + # fraudulent and detected (regardless of POS). + q = ("MATCH (c:Card {{id: {}}}), (p:Pos {{id: {}}}) " + "CREATE (t:Transaction " + "{{id: {}, fraud_reported: c.compromised AND (rand() < %f)}}) " + "CREATE (c)<-[:Using]-(t)-[:At]->(p) " + "SET c.compromised = p.compromised;" % report_pct) + + def rint(max): return random.randint(0, max - 1) # NOQA + for i in range(tx_count): + print(q.format(rint(card_count), rint(pos_count), i)) + + +POS_COUNT = 1000 +CARD_COUNT = 10000 +FRAUD_POS_COUNT = 20 +TX_COUNT = 50000 +REPORT_PCT = 0.1 + +random.seed(12345) + +init_data(CARD_COUNT, POS_COUNT) +compromise_pos_devices(POS_COUNT, FRAUD_POS_COUNT) +pump_transactions(CARD_COUNT, POS_COUNT, TX_COUNT, REPORT_PCT) diff --git a/tests/macro_benchmark/groups/pokec/config.json b/tests/macro_benchmark/groups/pokec/config.json index ec6cf94bd..249ab7bee 100644 --- a/tests/macro_benchmark/groups/pokec/config.json +++ b/tests/macro_benchmark/groups/pokec/config.json @@ -1,3 +1,4 @@ { - "duration": 60 + "duration": 60, + "client": "pokec_client" } diff --git a/tests/macro_benchmark/groups/pokec/run.json b/tests/macro_benchmark/groups/pokec/run.json index 6bd9d99f0..db8b794e9 100644 --- a/tests/macro_benchmark/groups/pokec/run.json +++ b/tests/macro_benchmark/groups/pokec/run.json @@ -1,26 +1,3 @@ { - "independent_label": "User", - "read_probability": 0.5, - "queries" : [ - { - "query": "MATCH (n :User {id : $id})-[]-(m) RETURN AVG(n.age + m.age)", - "params" : [ - { - "name" : "id", - "low" : 1, - "high" : 10000 - } - ] - }, - { - "query": "MATCH (n :User {id : $id})-[]-(m)-[]-(k) RETURN AVG(n.age + m.age + k.age)", - "params" : [ - { - "name" : "id", - "low" : 1, - "high" : 10000 - } - ] - } - ] + "read_probability": 0.5 } diff --git a/tests/macro_benchmark/long_running_suite.py b/tests/macro_benchmark/long_running_suite.py index dd57a5ce1..a49fb0ce9 100644 --- a/tests/macro_benchmark/long_running_suite.py +++ b/tests/macro_benchmark/long_running_suite.py @@ -29,7 +29,7 @@ class LongRunningSuite: duration = self.args.duration log.info("Executing run for {} seconds".format( duration)) - results = runner.run(next(scenario.get("run")()), duration) + results = runner.run(next(scenario.get("run")()), duration, config["client"]) runner.stop() @@ -53,7 +53,7 @@ class LongRunningSuite: return {"MemgraphRunner": MemgraphRunner, "NeoRunner": NeoRunner} def groups(self): - return ["pokec"] + return ["pokec", "card_fraud"] class _LongRunningRunner: @@ -69,9 +69,9 @@ class _LongRunningRunner: def setup(self, queries, num_client_workers=None): return self.query_client(queries, self.database, num_client_workers) - def run(self, config, duration, num_client_workers=None): + def run(self, config, duration, client, num_client_workers=None): return self.long_running_client( - config, self.database, duration, num_client_workers) + config, self.database, duration, client, num_client_workers) def stop(self): self.log.info("stop") diff --git a/tests/macro_benchmark/run_card_fraud b/tests/macro_benchmark/run_card_fraud new file mode 100755 index 000000000..a0a16cb34 --- /dev/null +++ b/tests/macro_benchmark/run_card_fraud @@ -0,0 +1,18 @@ +#!/bin/bash -e + +script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" + +cd ${script_dir} +mkdir -p .results/card_fraud + +./harness LongRunningSuite MemgraphRunner --groups card_fraud +mv .harness_summary ${script_dir}/.results/card_fraud/memgraph.summary + +./harness LongRunningSuite NeoRunner --groups card_fraud +mv .harness_summary ${script_dir}/.results/card_fraud/neo4j.summary + +../../tools/plot/pokec_throughput \ + --vendor-references neo4j memgraph \ + --vendor-titles Neo4j Memgraph \ + --results ${script_dir}/.results/card_fraud/neo4j.summary ${script_dir}/.results/card_fraud/memgraph.summary \ + --plot-title "Card Fraud Small" --window-size 1