Refactor pokec, add card fraud
Reviewers: buda, mculinovic, mferencevic Reviewed By: mculinovic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1143
This commit is contained in:
parent
1d5d67aeac
commit
5a162213e5
@ -86,11 +86,11 @@ class LongRunningClient:
|
||||
|
||||
# TODO: This is quite similar to __call__ method of QueryClient. Remove
|
||||
# duplication.
|
||||
def __call__(self, config, database, duration, num_workers=None):
|
||||
def __call__(self, config, database, duration, client, num_workers=None):
|
||||
if num_workers is None: num_workers = self.default_num_workers
|
||||
self.log.debug("execute('%s')", config)
|
||||
|
||||
client_path = "tests/macro_benchmark/long_running_client"
|
||||
client_path = "tests/macro_benchmark/{}".format(client)
|
||||
client = get_absolute_path(client_path, "build")
|
||||
if not os.path.exists(client):
|
||||
# Apollo builds both debug and release binaries on diff
|
||||
|
@ -1,3 +1,5 @@
|
||||
#pragma once
|
||||
|
||||
#include <fstream>
|
||||
#include <vector>
|
||||
|
||||
|
118
tests/macro_benchmark/clients/card_fraud_client.cpp
Normal file
118
tests/macro_benchmark/clients/card_fraud_client.cpp
Normal file
@ -0,0 +1,118 @@
|
||||
#include <memory>
|
||||
#include <random>
|
||||
#include <vector>
|
||||
|
||||
#include "gflags/gflags.h"
|
||||
|
||||
#include "long_running_common.hpp"
|
||||
|
||||
class CardFraudClient : public TestClient {
|
||||
public:
|
||||
CardFraudClient(int id, int num_pos, nlohmann::json config)
|
||||
: TestClient(), rg_(id), num_pos_(num_pos), config_(config) {}
|
||||
|
||||
private:
|
||||
std::mt19937 rg_;
|
||||
int num_pos_;
|
||||
nlohmann::json config_;
|
||||
|
||||
auto GetFraudulentTransactions() {
|
||||
return Execute(
|
||||
"MATCH (t:Transaction {fraud_reported: true}) "
|
||||
"RETURN t.id as id",
|
||||
{});
|
||||
}
|
||||
|
||||
auto GetCompromisedPos() {
|
||||
return Execute(
|
||||
"MATCH (t:Transaction {fraud_reported: true})-[:Using]->(:Card)"
|
||||
"<-[:Using]-(:Transaction)-[:At]->(p:Pos) "
|
||||
"WITH p.id as pos, count(t) as connected_frauds "
|
||||
"WHERE connected_frauds > 1 "
|
||||
"RETURN pos, connected_frauds ORDER BY connected_frauds DESC",
|
||||
{});
|
||||
}
|
||||
|
||||
auto ResolvePos(int id) {
|
||||
return Execute(
|
||||
"MATCH (p:Pos {id: $id}) "
|
||||
"SET p.compromised = false "
|
||||
"WITH p MATCH (p)--(t:Transaction)--(c:Card) "
|
||||
"SET t.fraud_reported = false, c.compromised = false",
|
||||
{{"id", id}});
|
||||
}
|
||||
|
||||
auto CompromisePos(int id) {
|
||||
return Execute(
|
||||
"MATCH (p:Pos {id: $id}) "
|
||||
"SET p.compromised = true "
|
||||
"WITH p MATCH (p)--(t:Transaction)--(c:Card) "
|
||||
"SET t.fraud_reported = false, c.compromised = true",
|
||||
{{"id", id}});
|
||||
}
|
||||
|
||||
public:
|
||||
virtual void Step() override {
|
||||
if (config_["scenario"] == "read_only") {
|
||||
std::uniform_int_distribution<int> dist(0, 1);
|
||||
if (dist(rg_)) {
|
||||
GetFraudulentTransactions();
|
||||
} else {
|
||||
GetCompromisedPos();
|
||||
}
|
||||
} else if (config_["scenario"] == "read_write") {
|
||||
std::uniform_int_distribution<int> dist(0, num_pos_ - 1);
|
||||
int pos_id = dist(rg_);
|
||||
CompromisePos(pos_id);
|
||||
GetFraudulentTransactions();
|
||||
ResolvePos(pos_id);
|
||||
} else {
|
||||
LOG(FATAL) << "Should not get here!";
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
int64_t NumPos(BoltClient &client) {
|
||||
auto result = ExecuteNTimesTillSuccess(
|
||||
client, "MATCH (n :Pos) RETURN COUNT(n) as cnt;", {}, MAX_RETRIES);
|
||||
return result.records[0][0].ValueInt();
|
||||
}
|
||||
|
||||
void CreateIndex(BoltClient &client, const std::string &label,
|
||||
const std::string &property) {
|
||||
LOG(INFO) << fmt::format("Creating indexes for :{}({})...", label, property);
|
||||
ExecuteNTimesTillSuccess(
|
||||
client, fmt::format("CREATE INDEX ON :{}({});", label, property), {},
|
||||
MAX_RETRIES);
|
||||
try {
|
||||
LOG(INFO) << fmt::format("Trying to sync indexes...");
|
||||
ExecuteNTimesTillSuccess(client, "CALL db.awaitIndexes(14400);", {},
|
||||
MAX_RETRIES);
|
||||
} catch (utils::BasicException &e) {
|
||||
LOG(WARNING) << "Index sync failed: " << e.what();
|
||||
}
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
google::InitGoogleLogging(argv[0]);
|
||||
|
||||
nlohmann::json config;
|
||||
std::cin >> config;
|
||||
|
||||
BoltClient client(FLAGS_address, FLAGS_port, FLAGS_username, FLAGS_password);
|
||||
int num_pos = NumPos(client);
|
||||
CreateIndex(client, "Card", "id");
|
||||
CreateIndex(client, "Pos", "id");
|
||||
CreateIndex(client, "Transaction", "fraud_reported");
|
||||
|
||||
std::vector<std::unique_ptr<TestClient>> clients;
|
||||
for (int i = 0; i < FLAGS_num_workers; ++i) {
|
||||
clients.emplace_back(std::make_unique<CardFraudClient>(i, num_pos, config));
|
||||
}
|
||||
|
||||
RunMultithreadedTest(clients);
|
||||
|
||||
return 0;
|
||||
|
||||
}
|
@ -1,3 +1,5 @@
|
||||
#pragma once
|
||||
|
||||
#include <chrono>
|
||||
#include <experimental/optional>
|
||||
#include <map>
|
||||
|
172
tests/macro_benchmark/clients/long_running_common.hpp
Normal file
172
tests/macro_benchmark/clients/long_running_common.hpp
Normal file
@ -0,0 +1,172 @@
|
||||
#include "json/json.hpp"
|
||||
|
||||
#include "bolt_client.hpp"
|
||||
#include "common.hpp"
|
||||
#include "communication/bolt/v1/decoder/decoded_value.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
const int MAX_RETRIES = 30;
|
||||
|
||||
DEFINE_string(address, "127.0.0.1", "Server address");
|
||||
DEFINE_int32(port, 7687, "Server port");
|
||||
DEFINE_int32(num_workers, 1, "Number of workers");
|
||||
DEFINE_string(output, "", "Output file");
|
||||
DEFINE_string(username, "", "Username for the database");
|
||||
DEFINE_string(password, "", "Password for the database");
|
||||
DEFINE_int32(duration, 30, "Number of seconds to execute benchmark");
|
||||
|
||||
class TestClient {
|
||||
public:
|
||||
TestClient()
|
||||
: client_(FLAGS_address, FLAGS_port, FLAGS_username, FLAGS_password) {}
|
||||
|
||||
virtual ~TestClient() {}
|
||||
|
||||
auto ConsumeStats() {
|
||||
std::unique_lock<SpinLock> guard(lock_);
|
||||
auto stats = stats_;
|
||||
stats_.clear();
|
||||
return stats;
|
||||
}
|
||||
|
||||
void Run() {
|
||||
runner_thread_ = std::thread([&] {
|
||||
while (keep_running_) {
|
||||
Step();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void Stop() {
|
||||
keep_running_ = false;
|
||||
runner_thread_.join();
|
||||
}
|
||||
|
||||
protected:
|
||||
virtual void Step() = 0;
|
||||
|
||||
communication::bolt::QueryData Execute(
|
||||
const std::string &query,
|
||||
const std::map<std::string, DecodedValue> ¶ms,
|
||||
const std::string &query_name = "") {
|
||||
utils::Timer timer;
|
||||
auto result = ExecuteNTimesTillSuccess(client_, query, params, MAX_RETRIES);
|
||||
auto wall_time = timer.Elapsed();
|
||||
auto metadata = result.metadata;
|
||||
metadata["wall_time"] = wall_time.count();
|
||||
{
|
||||
std::unique_lock<SpinLock> guard(lock_);
|
||||
if (query_name != "") {
|
||||
stats_[query_name].push_back(std::move(metadata));
|
||||
} else {
|
||||
stats_[query].push_back(std::move(metadata));
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
SpinLock lock_;
|
||||
std::unordered_map<std::string,
|
||||
std::vector<std::map<std::string, DecodedValue>>>
|
||||
stats_;
|
||||
|
||||
std::atomic<bool> keep_running_{true};
|
||||
std::thread runner_thread_;
|
||||
|
||||
private:
|
||||
BoltClient client_;
|
||||
};
|
||||
|
||||
void RunMultithreadedTest(std::vector<std::unique_ptr<TestClient>> &clients) {
|
||||
CHECK((int)clients.size() == FLAGS_num_workers);
|
||||
|
||||
// Open stream for writing stats.
|
||||
std::streambuf *buf;
|
||||
std::ofstream f;
|
||||
if (FLAGS_output != "") {
|
||||
f.open(FLAGS_output);
|
||||
buf = f.rdbuf();
|
||||
} else {
|
||||
buf = std::cout.rdbuf();
|
||||
}
|
||||
std::ostream out(buf);
|
||||
|
||||
utils::Timer timer;
|
||||
for (auto &client : clients) {
|
||||
client->Run();
|
||||
}
|
||||
LOG(INFO) << "Starting test with " << clients.size() << " workers";
|
||||
uint64_t executed_queries = 0;
|
||||
while (timer.Elapsed().count() < FLAGS_duration) {
|
||||
std::unordered_map<std::string, std::map<std::string, DecodedValue>>
|
||||
aggregated_stats;
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
std::unordered_map<std::string,
|
||||
std::vector<std::map<std::string, DecodedValue>>>
|
||||
stats;
|
||||
for (const auto &client : clients) {
|
||||
auto client_stats = client->ConsumeStats();
|
||||
for (const auto &client_query_stats : client_stats) {
|
||||
auto &query_stats = stats[client_query_stats.first];
|
||||
query_stats.insert(query_stats.end(), client_query_stats.second.begin(),
|
||||
client_query_stats.second.end());
|
||||
executed_queries +=
|
||||
client_query_stats.second.end() - client_query_stats.second.begin();
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Here we combine pure values, json and DecodedValue which is a
|
||||
// little bit chaotic. Think about refactoring this part to only use json
|
||||
// and write DecodedValue to json converter.
|
||||
const std::vector<std::string> fields = {
|
||||
"wall_time",
|
||||
"parsing_time",
|
||||
"planning_time",
|
||||
"plan_execution_time",
|
||||
};
|
||||
for (const auto &query_stats : stats) {
|
||||
std::map<std::string, double> new_aggregated_query_stats;
|
||||
for (const auto &stats : query_stats.second) {
|
||||
for (const auto &field : fields) {
|
||||
auto it = stats.find(field);
|
||||
if (it != stats.end()) {
|
||||
new_aggregated_query_stats[field] += it->second.ValueDouble();
|
||||
}
|
||||
}
|
||||
}
|
||||
int64_t new_count = query_stats.second.size();
|
||||
|
||||
auto &aggregated_query_stats = aggregated_stats[query_stats.first];
|
||||
aggregated_query_stats.insert({"count", DecodedValue(0)});
|
||||
auto old_count = aggregated_query_stats["count"].ValueInt();
|
||||
aggregated_query_stats["count"].ValueInt() += new_count;
|
||||
for (const auto &stat : new_aggregated_query_stats) {
|
||||
auto it = aggregated_query_stats.insert({stat.first, DecodedValue(0.0)})
|
||||
.first;
|
||||
it->second =
|
||||
(it->second.ValueDouble() * old_count + stat.second * new_count) /
|
||||
(old_count + new_count);
|
||||
}
|
||||
}
|
||||
|
||||
out << "{\"num_executed_queries\": " << executed_queries << ", "
|
||||
<< "\"elapsed_time\": " << timer.Elapsed().count()
|
||||
<< ", \"queries\": [";
|
||||
utils::PrintIterable(
|
||||
out, aggregated_stats, ", ", [](auto &stream, const auto &x) {
|
||||
stream << "{\"query\": " << nlohmann::json(x.first)
|
||||
<< ", \"stats\": ";
|
||||
PrintJsonDecodedValue(stream, DecodedValue(x.second));
|
||||
stream << "}";
|
||||
});
|
||||
out << "]}" << std::endl;
|
||||
out.flush();
|
||||
std::this_thread::sleep_for(1s);
|
||||
}
|
||||
LOG(INFO) << "Stopping workers...";
|
||||
for (auto &client : clients) {
|
||||
client->Stop();
|
||||
}
|
||||
LOG(INFO) << "Stopped workers...";
|
||||
}
|
@ -19,6 +19,7 @@
|
||||
#include "communication/bolt/v1/decoder/decoded_value.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "io/network/socket.hpp"
|
||||
#include "long_running_common.hpp"
|
||||
#include "threading/sync/spinlock.hpp"
|
||||
#include "utils/algorithm.hpp"
|
||||
#include "utils/network.hpp"
|
||||
@ -28,58 +29,24 @@ using communication::bolt::DecodedEdge;
|
||||
using communication::bolt::DecodedValue;
|
||||
using communication::bolt::DecodedVertex;
|
||||
|
||||
DEFINE_string(address, "127.0.0.1", "Server address");
|
||||
DEFINE_int32(port, 7687, "Server port");
|
||||
DEFINE_int32(num_workers, 1, "Number of workers");
|
||||
DEFINE_string(output, "", "Output file");
|
||||
DEFINE_string(username, "", "Username for the database");
|
||||
DEFINE_string(password, "", "Password for the database");
|
||||
DEFINE_int32(duration, 30, "Number of seconds to execute benchmark");
|
||||
|
||||
const int MAX_RETRIES = 30;
|
||||
|
||||
struct VertexAndEdges {
|
||||
DecodedVertex vertex;
|
||||
std::vector<DecodedEdge> edges;
|
||||
std::vector<DecodedVertex> vertices;
|
||||
};
|
||||
|
||||
std::atomic<int64_t> executed_queries;
|
||||
const std::string INDEPENDENT_LABEL = "User";
|
||||
|
||||
class Session {
|
||||
class PokecClient : public TestClient {
|
||||
public:
|
||||
Session(const nlohmann::json &config, const std::string &address,
|
||||
uint16_t port, const std::string &username,
|
||||
const std::string &password)
|
||||
: config_(config), client_(address, port, username, password) {}
|
||||
PokecClient(int id, std::vector<int64_t> to_remove, nlohmann::json config)
|
||||
: TestClient(), rg_(id), config_(config), to_remove_(to_remove) {}
|
||||
|
||||
private:
|
||||
const nlohmann::json &config_;
|
||||
BoltClient client_;
|
||||
std::unordered_map<std::string,
|
||||
std::vector<std::map<std::string, DecodedValue>>>
|
||||
stats_;
|
||||
SpinLock lock_;
|
||||
|
||||
auto Execute(const std::string &query,
|
||||
const std::map<std::string, DecodedValue> ¶ms,
|
||||
const std::string &query_name = "") {
|
||||
utils::Timer timer;
|
||||
auto result = ExecuteNTimesTillSuccess(client_, query, params, MAX_RETRIES);
|
||||
++executed_queries;
|
||||
auto wall_time = timer.Elapsed();
|
||||
auto metadata = result.metadata;
|
||||
metadata["wall_time"] = wall_time.count();
|
||||
{
|
||||
std::unique_lock<SpinLock> guard(lock_);
|
||||
if (query_name != "") {
|
||||
stats_[query_name].push_back(std::move(metadata));
|
||||
} else {
|
||||
stats_[query].push_back(std::move(metadata));
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
std::mt19937 rg_;
|
||||
nlohmann::json config_;
|
||||
std::vector<int64_t> to_remove_;
|
||||
std::vector<VertexAndEdges> removed_;
|
||||
|
||||
auto MatchVertex(const std::string &label, int64_t id) {
|
||||
return Execute(fmt::format("MATCH (n :{} {{id : $id}}) RETURN n", label),
|
||||
@ -115,6 +82,20 @@ class Session {
|
||||
return Execute(os.str(), {}, "CREATE (n :labels... {...})");
|
||||
}
|
||||
|
||||
auto GetAverageAge2(int64_t id) {
|
||||
return Execute(
|
||||
"MATCH (n :User {id: $id})-[]-(m) "
|
||||
"RETURN AVG(n.age + m.age)",
|
||||
{{"id", id}});
|
||||
}
|
||||
|
||||
auto GetAverageAge3(int64_t id) {
|
||||
return Execute(
|
||||
"MATCH (n :User {id: $id})-[]-(m)-[]-(k) "
|
||||
"RETURN AVG(n.age + m.age + k.age)",
|
||||
{{"id", id}});
|
||||
}
|
||||
|
||||
auto CreateEdge(const DecodedVertex &from, const std::string &from_label,
|
||||
int64_t from_id, const std::string &to_label, int64_t to_id,
|
||||
const DecodedEdge &edge) {
|
||||
@ -203,61 +184,43 @@ class Session {
|
||||
}
|
||||
|
||||
public:
|
||||
void Run(int id, std::vector<int64_t> to_remove,
|
||||
std::atomic<bool> &keep_running) {
|
||||
std::mt19937 rg(id);
|
||||
std::vector<VertexAndEdges> removed;
|
||||
|
||||
const auto &queries = config_["queries"];
|
||||
const double read_probability = config_["read_probability"];
|
||||
const std::string independent_label = config_["independent_label"];
|
||||
|
||||
while (keep_running) {
|
||||
virtual void Step() override {
|
||||
std::uniform_real_distribution<> real_dist(0.0, 1.0);
|
||||
|
||||
// Read query.
|
||||
if (real_dist(rg) < read_probability) {
|
||||
CHECK(queries.size())
|
||||
<< "Specify at least one read query or set read_probability to 0";
|
||||
std::uniform_int_distribution<> read_query_dist(0, queries.size() - 1);
|
||||
const auto &query = queries[read_query_dist(rg)];
|
||||
std::map<std::string, DecodedValue> params;
|
||||
for (const auto ¶m : query["params"]) {
|
||||
std::uniform_int_distribution<int64_t> param_value_dist(
|
||||
param["low"], param["high"]);
|
||||
params[param["name"]] = param_value_dist(rg);
|
||||
if (real_dist(rg_) < config_["read_probability"]) {
|
||||
std::uniform_int_distribution<> read_query_dist(0, 1);
|
||||
int id = real_dist(rg_) * to_remove_.size();
|
||||
switch (read_query_dist(rg_)) {
|
||||
case 0:
|
||||
GetAverageAge2(id);
|
||||
break;
|
||||
case 1:
|
||||
GetAverageAge3(id);
|
||||
break;
|
||||
default:
|
||||
LOG(FATAL) << "Should not get here";
|
||||
}
|
||||
Execute(query["query"], params);
|
||||
} else {
|
||||
auto remove_random = [&](auto &v) {
|
||||
CHECK(v.size());
|
||||
std::uniform_int_distribution<> int_dist(0, v.size() - 1);
|
||||
std::swap(v.back(), v[int_dist(rg)]);
|
||||
std::swap(v.back(), v[int_dist(rg_)]);
|
||||
auto ret = v.back();
|
||||
v.pop_back();
|
||||
return ret;
|
||||
};
|
||||
if (real_dist(rg) < static_cast<double>(removed.size()) /
|
||||
(removed.size() + to_remove.size())) {
|
||||
auto vertices_and_edges = remove_random(removed);
|
||||
ReturnVertexAndEdges(vertices_and_edges, independent_label);
|
||||
to_remove.push_back(
|
||||
if (real_dist(rg_) < static_cast<double>(removed_.size()) /
|
||||
(removed_.size() + to_remove_.size())) {
|
||||
auto vertices_and_edges = remove_random(removed_);
|
||||
ReturnVertexAndEdges(vertices_and_edges, INDEPENDENT_LABEL);
|
||||
to_remove_.push_back(
|
||||
vertices_and_edges.vertex.properties["id"].ValueInt());
|
||||
} else {
|
||||
auto node_id = remove_random(to_remove);
|
||||
auto ret = RetrieveAndDeleteVertex(independent_label, node_id);
|
||||
removed.push_back(ret);
|
||||
auto node_id = remove_random(to_remove_);
|
||||
auto ret = RetrieveAndDeleteVertex(INDEPENDENT_LABEL, node_id);
|
||||
removed_.push_back(ret);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
auto ConsumeStats() {
|
||||
std::unique_lock<SpinLock> guard(lock_);
|
||||
auto stats = stats_;
|
||||
stats_.clear();
|
||||
return stats;
|
||||
}
|
||||
};
|
||||
|
||||
int64_t NumNodes(BoltClient &client, const std::string &label) {
|
||||
@ -305,7 +268,7 @@ std::vector<int64_t> IndependentSet(BoltClient &client,
|
||||
independent.erase(j);
|
||||
}
|
||||
}
|
||||
LOG(INFO) << "Number of nodes nodes: " << num_nodes << "\n"
|
||||
LOG(INFO) << "Number of nodes: " << num_nodes << "\n"
|
||||
<< "Number of independent nodes: " << independent_nodes_ids.size();
|
||||
|
||||
return independent_nodes_ids;
|
||||
@ -317,126 +280,32 @@ int main(int argc, char **argv) {
|
||||
|
||||
nlohmann::json config;
|
||||
std::cin >> config;
|
||||
const std::string independent_label = config["independent_label"];
|
||||
|
||||
auto independent_nodes_ids = [&] {
|
||||
BoltClient client(utils::ResolveHostname(FLAGS_address), FLAGS_port,
|
||||
FLAGS_username, FLAGS_password);
|
||||
return IndependentSet(client, independent_label);
|
||||
return IndependentSet(client, INDEPENDENT_LABEL);
|
||||
}();
|
||||
|
||||
utils::Timer timer;
|
||||
std::vector<std::thread> threads;
|
||||
std::atomic<bool> keep_running{true};
|
||||
|
||||
int64_t next_to_assign = 0;
|
||||
std::vector<std::unique_ptr<Session>> sessions;
|
||||
sessions.reserve(FLAGS_num_workers);
|
||||
std::vector<std::unique_ptr<TestClient>> clients;
|
||||
clients.reserve(FLAGS_num_workers);
|
||||
|
||||
for (int i = 0; i < FLAGS_num_workers; ++i) {
|
||||
int64_t size = independent_nodes_ids.size();
|
||||
int64_t next_next_to_assign = next_to_assign + size / FLAGS_num_workers +
|
||||
(i < size % FLAGS_num_workers);
|
||||
|
||||
sessions.push_back(std::make_unique<Session>(
|
||||
config, FLAGS_address, FLAGS_port, FLAGS_username, FLAGS_password));
|
||||
|
||||
std::vector<int64_t> to_remove(
|
||||
independent_nodes_ids.begin() + next_to_assign,
|
||||
independent_nodes_ids.begin() + next_next_to_assign);
|
||||
LOG(INFO) << next_to_assign << " " << next_next_to_assign;
|
||||
next_to_assign = next_next_to_assign;
|
||||
|
||||
threads.emplace_back(
|
||||
[&](int thread_id, const std::vector<int64_t> to_remove) {
|
||||
sessions[thread_id]->Run(thread_id, std::move(to_remove),
|
||||
keep_running);
|
||||
},
|
||||
i, std::move(to_remove));
|
||||
clients.emplace_back(std::make_unique<PokecClient>(i, to_remove, config));
|
||||
}
|
||||
|
||||
// Open stream for writing stats.
|
||||
std::streambuf *buf;
|
||||
std::ofstream f;
|
||||
if (FLAGS_output != "") {
|
||||
f.open(FLAGS_output);
|
||||
buf = f.rdbuf();
|
||||
} else {
|
||||
buf = std::cout.rdbuf();
|
||||
}
|
||||
std::ostream out(buf);
|
||||
|
||||
while (timer.Elapsed().count() < FLAGS_duration) {
|
||||
std::unordered_map<std::string, std::map<std::string, DecodedValue>>
|
||||
aggregated_stats;
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
std::unordered_map<std::string,
|
||||
std::vector<std::map<std::string, DecodedValue>>>
|
||||
stats;
|
||||
for (const auto &session : sessions) {
|
||||
auto session_stats = session->ConsumeStats();
|
||||
for (const auto &session_query_stats : session_stats) {
|
||||
auto &query_stats = stats[session_query_stats.first];
|
||||
query_stats.insert(query_stats.end(),
|
||||
session_query_stats.second.begin(),
|
||||
session_query_stats.second.end());
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Here we combine pure values, json and DecodedValue which is a
|
||||
// little bit chaotic. Think about refactoring this part to only use json
|
||||
// and write DecodedValue to json converter.
|
||||
const std::vector<std::string> fields = {
|
||||
"wall_time",
|
||||
"parsing_time",
|
||||
"planning_time",
|
||||
"plan_execution_time",
|
||||
};
|
||||
for (const auto &query_stats : stats) {
|
||||
std::map<std::string, double> new_aggregated_query_stats;
|
||||
for (const auto &stats : query_stats.second) {
|
||||
for (const auto &field : fields) {
|
||||
auto it = stats.find(field);
|
||||
if (it != stats.end()) {
|
||||
new_aggregated_query_stats[field] += it->second.ValueDouble();
|
||||
}
|
||||
}
|
||||
}
|
||||
int64_t new_count = query_stats.second.size();
|
||||
|
||||
auto &aggregated_query_stats = aggregated_stats[query_stats.first];
|
||||
aggregated_query_stats.insert({"count", DecodedValue(0)});
|
||||
auto old_count = aggregated_query_stats["count"].ValueInt();
|
||||
aggregated_query_stats["count"].ValueInt() += new_count;
|
||||
for (const auto &stat : new_aggregated_query_stats) {
|
||||
auto it = aggregated_query_stats.insert({stat.first, DecodedValue(0.0)})
|
||||
.first;
|
||||
it->second =
|
||||
(it->second.ValueDouble() * old_count + stat.second * new_count) /
|
||||
(old_count + new_count);
|
||||
}
|
||||
}
|
||||
|
||||
out << "{\"num_executed_queries\": " << executed_queries << ", "
|
||||
<< "\"elapsed_time\": " << timer.Elapsed().count()
|
||||
<< ", \"queries\": [";
|
||||
utils::PrintIterable(
|
||||
out, aggregated_stats, ", ", [](auto &stream, const auto &x) {
|
||||
stream << "{\"query\": " << nlohmann::json(x.first)
|
||||
<< ", \"stats\": ";
|
||||
PrintJsonDecodedValue(stream, DecodedValue(x.second));
|
||||
stream << "}";
|
||||
});
|
||||
out << "]}" << std::endl;
|
||||
out.flush();
|
||||
std::this_thread::sleep_for(1s);
|
||||
}
|
||||
keep_running = false;
|
||||
|
||||
for (int i = 0; i < FLAGS_num_workers; ++i) {
|
||||
threads[i].join();
|
||||
}
|
||||
RunMultithreadedTest(clients);
|
||||
|
||||
return 0;
|
||||
}
|
1
tests/macro_benchmark/groups/card_fraud/.gitignore
vendored
Normal file
1
tests/macro_benchmark/groups/card_fraud/.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
||||
pokec_*.setup.cypher
|
4
tests/macro_benchmark/groups/card_fraud/config.json
Normal file
4
tests/macro_benchmark/groups/card_fraud/config.json
Normal file
@ -0,0 +1,4 @@
|
||||
{
|
||||
"duration": 30,
|
||||
"client": "card_fraud_client"
|
||||
}
|
@ -0,0 +1,3 @@
|
||||
{
|
||||
"scenario": "read_only"
|
||||
}
|
@ -0,0 +1,3 @@
|
||||
{
|
||||
"scenario": "read_write"
|
||||
}
|
47
tests/macro_benchmark/groups/card_fraud/setup.py
Normal file
47
tests/macro_benchmark/groups/card_fraud/setup.py
Normal file
@ -0,0 +1,47 @@
|
||||
import random
|
||||
|
||||
def init_data(card_count, pos_count):
|
||||
print("UNWIND range(0, {} - 1) AS id "
|
||||
"CREATE (:Card {{id: id, compromised: false}});".format(
|
||||
card_count))
|
||||
print("UNWIND range(0, {} - 1) AS id "
|
||||
"CREATE (:Pos {{id: id, compromised: false}});".format(
|
||||
pos_count))
|
||||
|
||||
|
||||
def compromise_pos_device(pos_id):
|
||||
print("MATCH (p:Pos {{id: {}}}) SET p.compromised = true;".format(pos_id))
|
||||
|
||||
|
||||
def compromise_pos_devices(pos_count, fraud_count):
|
||||
for pos_id in random.sample(range(pos_count), fraud_count):
|
||||
compromise_pos_device(pos_id)
|
||||
|
||||
|
||||
def pump_transactions(card_count, pos_count, tx_count, report_pct):
|
||||
# Create transactions. If the POS is compromised, then the
|
||||
# Card of the transaction gets compromised too. If the card
|
||||
# is compromised, there is a 0.1 chance the transaction is
|
||||
# fraudulent and detected (regardless of POS).
|
||||
q = ("MATCH (c:Card {{id: {}}}), (p:Pos {{id: {}}}) "
|
||||
"CREATE (t:Transaction "
|
||||
"{{id: {}, fraud_reported: c.compromised AND (rand() < %f)}}) "
|
||||
"CREATE (c)<-[:Using]-(t)-[:At]->(p) "
|
||||
"SET c.compromised = p.compromised;" % report_pct)
|
||||
|
||||
def rint(max): return random.randint(0, max - 1) # NOQA
|
||||
for i in range(tx_count):
|
||||
print(q.format(rint(card_count), rint(pos_count), i))
|
||||
|
||||
|
||||
POS_COUNT = 1000
|
||||
CARD_COUNT = 10000
|
||||
FRAUD_POS_COUNT = 20
|
||||
TX_COUNT = 50000
|
||||
REPORT_PCT = 0.1
|
||||
|
||||
random.seed(12345)
|
||||
|
||||
init_data(CARD_COUNT, POS_COUNT)
|
||||
compromise_pos_devices(POS_COUNT, FRAUD_POS_COUNT)
|
||||
pump_transactions(CARD_COUNT, POS_COUNT, TX_COUNT, REPORT_PCT)
|
@ -1,3 +1,4 @@
|
||||
{
|
||||
"duration": 60
|
||||
"duration": 60,
|
||||
"client": "pokec_client"
|
||||
}
|
||||
|
@ -1,26 +1,3 @@
|
||||
{
|
||||
"independent_label": "User",
|
||||
"read_probability": 0.5,
|
||||
"queries" : [
|
||||
{
|
||||
"query": "MATCH (n :User {id : $id})-[]-(m) RETURN AVG(n.age + m.age)",
|
||||
"params" : [
|
||||
{
|
||||
"name" : "id",
|
||||
"low" : 1,
|
||||
"high" : 10000
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"query": "MATCH (n :User {id : $id})-[]-(m)-[]-(k) RETURN AVG(n.age + m.age + k.age)",
|
||||
"params" : [
|
||||
{
|
||||
"name" : "id",
|
||||
"low" : 1,
|
||||
"high" : 10000
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
"read_probability": 0.5
|
||||
}
|
||||
|
@ -29,7 +29,7 @@ class LongRunningSuite:
|
||||
duration = self.args.duration
|
||||
log.info("Executing run for {} seconds".format(
|
||||
duration))
|
||||
results = runner.run(next(scenario.get("run")()), duration)
|
||||
results = runner.run(next(scenario.get("run")()), duration, config["client"])
|
||||
|
||||
runner.stop()
|
||||
|
||||
@ -53,7 +53,7 @@ class LongRunningSuite:
|
||||
return {"MemgraphRunner": MemgraphRunner, "NeoRunner": NeoRunner}
|
||||
|
||||
def groups(self):
|
||||
return ["pokec"]
|
||||
return ["pokec", "card_fraud"]
|
||||
|
||||
|
||||
class _LongRunningRunner:
|
||||
@ -69,9 +69,9 @@ class _LongRunningRunner:
|
||||
def setup(self, queries, num_client_workers=None):
|
||||
return self.query_client(queries, self.database, num_client_workers)
|
||||
|
||||
def run(self, config, duration, num_client_workers=None):
|
||||
def run(self, config, duration, client, num_client_workers=None):
|
||||
return self.long_running_client(
|
||||
config, self.database, duration, num_client_workers)
|
||||
config, self.database, duration, client, num_client_workers)
|
||||
|
||||
def stop(self):
|
||||
self.log.info("stop")
|
||||
|
18
tests/macro_benchmark/run_card_fraud
Executable file
18
tests/macro_benchmark/run_card_fraud
Executable file
@ -0,0 +1,18 @@
|
||||
#!/bin/bash -e
|
||||
|
||||
script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
|
||||
|
||||
cd ${script_dir}
|
||||
mkdir -p .results/card_fraud
|
||||
|
||||
./harness LongRunningSuite MemgraphRunner --groups card_fraud
|
||||
mv .harness_summary ${script_dir}/.results/card_fraud/memgraph.summary
|
||||
|
||||
./harness LongRunningSuite NeoRunner --groups card_fraud
|
||||
mv .harness_summary ${script_dir}/.results/card_fraud/neo4j.summary
|
||||
|
||||
../../tools/plot/pokec_throughput \
|
||||
--vendor-references neo4j memgraph \
|
||||
--vendor-titles Neo4j Memgraph \
|
||||
--results ${script_dir}/.results/card_fraud/neo4j.summary ${script_dir}/.results/card_fraud/memgraph.summary \
|
||||
--plot-title "Card Fraud Small" --window-size 1
|
Loading…
Reference in New Issue
Block a user