add strata v1 scenario

Reviewers: mculinovic, mferencevic

Reviewed By: mferencevic

Subscribers: pullbot, buda

Differential Revision: https://phabricator.memgraph.io/D1266
This commit is contained in:
Marin Tomic 2018-03-05 19:11:52 +01:00
parent 89cf8ca077
commit 5637703317
6 changed files with 211 additions and 75 deletions

View File

@ -6,7 +6,7 @@ import time
NUM_MACHINES = 3
# test setup
SCENARIOS = ["point_lookup", "create_tx_without_edge"]
SCENARIOS = ["point_lookup", "create_tx"]
DURATION = 300
WORKERS = 6

View File

@ -21,28 +21,53 @@ std::atomic<int> num_pos;
std::atomic<int> num_cards;
std::atomic<int> num_transactions;
DEFINE_string(config, "", "test config");
enum class Role { WORKER, ANALYTIC };
class CardFraudClient : public TestClient {
public:
CardFraudClient(int id) : TestClient(), rg_(id) {}
CardFraudClient(int id, nlohmann::json config, Role role = Role::WORKER)
: TestClient(), rg_(id), role_(role), config_(config) {}
private:
std::mt19937 rg_;
Role role_;
nlohmann::json config_;
auto HeavyRead() {
return Execute(
"MATCH (t:Transaction {fraud_reported: true}) "
"WITH COLLECT(t.id) as ids "
"RETURN head(ids)",
{}, "HeavyRead");
}
auto GetFraudulentTransactions() {
return Execute(
"MATCH (t:Transaction {fraud_reported: true}) "
"RETURN t.id as id",
{});
{}, "GetFraudulentTransactions");
}
auto GetCompromisedPos() {
auto GetCompromisedPos(int pos_limit) {
return Execute(
"MATCH (t:Transaction {fraud_reported: true})-[:Using]->(:Card)"
"<-[:Using]-(:Transaction)-[:At]->(p:Pos) "
"WITH p.id as pos, count(t) as connected_frauds "
"WHERE connected_frauds > 1 "
"RETURN pos, connected_frauds ORDER BY connected_frauds DESC",
{});
"RETURN pos, connected_frauds "
"ORDER BY connected_frauds DESC LIMIT $pos_limit",
{{"pos_limit", pos_limit}}, "GetCompromisedPos");
}
auto GetCompromisedPosInc(int pos_limit) {
return Execute(
"MATCH (p:Pos) "
"RETURN p, p.connected_frauds "
"ORDER BY p.connected_frauds DESC "
"LIMIT $pos_limit",
{{"pos_limit", pos_limit}}, "GetCompromisedPosInc");
}
auto ResolvePos(int id) {
@ -51,16 +76,17 @@ class CardFraudClient : public TestClient {
"SET p.compromised = false "
"WITH p MATCH (p)--(t:Transaction)--(c:Card) "
"SET t.fraud_reported = false, c.compromised = false",
{{"id", id}});
{{"id", id}}, "ResolvePos");
}
auto GetTransaction(int id) {
return Execute("MATCH (t:Transaction {id: $id}) RETURN (t)", {{"id", id}});
return Execute("MATCH (t:Transaction {id: $id}) RETURN (t)", {{"id", id}},
"GetTransaction");
}
auto TepsQuery() {
auto result = Execute("MATCH (u)--(v) RETURN count(1)", {});
DCHECK(result.records[0][0].ValueInt() == num_transactions * 2);
auto result = Execute("MATCH (u)--(v) RETURN count(1)", {}, "TepsQuery");
DCHECK(result && result->records[0][0].ValueInt() == num_transactions * 2);
}
auto CompromisePos(int id) {
@ -69,67 +95,135 @@ class CardFraudClient : public TestClient {
"SET p.compromised = true "
"WITH p MATCH (p)--(t:Transaction)--(c:Card) "
"SET t.fraud_reported = false, c.compromised = true",
{{"id", id}});
{{"id", id}}, "CompromisePos");
}
auto CreateTransaction(int pos_id, int card_id) {
auto CreateTransaction(int pos_id, int card_id, int tx_id, bool is_fraud) {
return Execute(
"MATCH (p:Pos {id: $pos_id}), (c:Card {id: $card_id}) "
"CREATE (t:Transaction {id: $tx_id, fraud_reported: false}) "
"CREATE (c)<-[:Using]-(t)-[:At]->(p)",
"CREATE (c)<-[:Using]-(t:Transaction {id: $tx_id, fraud_reported: "
"$is_fraud})-[:At]->(p)",
{{"pos_id", pos_id},
{"card_id", card_id},
{"tx_id", num_transactions++}});
{"tx_id", tx_id},
{"is_fraud", is_fraud}},
"CreateTransaction");
}
auto CreateTransactionWithoutEdge(int pos_id, int card_id) {
auto CreateTransactionWithoutEdge(int pos_id, int card_id, int tx_id,
bool is_fraud) {
return Execute(
"MATCH (p:Pos {id: $pos_id}), (c:Card {id: $card_id}) "
"CREATE (t:Transaction {id: $tx_id, fraud_reported: false})",
{{"pos_id", pos_id},
{"card_id", card_id},
{"tx_id", num_transactions++}});
{"tx_id", tx_id},
{"is_fraud", is_fraud}},
"CreateTransactionWithoutEdge");
}
auto UpdateFraudScores(int tx_id) {
return Execute(
"MATCH (t:Transaction {id: "
"$tx_id})-[:Using]->(:Card)<-[:Using]-(:Transaction)-[:At]->(p:Pos) "
"SET p.connected_frauds = p.connected_frauds + 1",
{{"tx_id", tx_id}}, "UpdateFraudScores");
}
int UniformInt(int a, int b) {
std::uniform_int_distribution<int> dist(a, b);
return dist(rg_);
}
double UniformDouble(double a, double b) {
std::uniform_real_distribution<double> dist(a, b);
return dist(rg_);
}
public:
virtual void Step() override {
if (FLAGS_scenario == "read_only") {
std::uniform_int_distribution<int> dist(0, 1);
if (dist(rg_)) {
GetFraudulentTransactions();
} else {
GetCompromisedPos();
}
} else if (FLAGS_scenario == "read_write") {
std::uniform_int_distribution<int> dist(0, num_pos - 1);
int pos_id = dist(rg_);
CompromisePos(pos_id);
GetFraudulentTransactions();
ResolvePos(pos_id);
} else if (FLAGS_scenario == "teps") {
TepsQuery();
} else if (FLAGS_scenario == "point_lookup") {
std::uniform_int_distribution<int> dist(0, num_transactions - 1);
int tx_id = dist(rg_);
GetTransaction(tx_id);
} else if (FLAGS_scenario == "create_tx") {
std::uniform_int_distribution<int> dist_pos(0, num_pos - 1);
std::uniform_int_distribution<int> dist_card(0, num_cards - 1);
CreateTransaction(dist_pos(rg_), dist_card(rg_));
} else if (FLAGS_scenario == "create_tx_without_edge") {
std::uniform_int_distribution<int> dist_pos(0, num_pos - 1);
std::uniform_int_distribution<int> dist_card(0, num_cards - 1);
CreateTransactionWithoutEdge(dist_pos(rg_), dist_card(rg_));
} else {
LOG(FATAL) << "Should not get here: unknown scenario!";
if (FLAGS_scenario == "heavy_read") {
HeavyRead();
return;
}
if (FLAGS_scenario == "teps") {
TepsQuery();
return;
}
if (FLAGS_scenario == "point_lookup") {
GetTransaction(UniformInt(0, num_transactions - 1));
return;
}
if (FLAGS_scenario == "create_tx") {
CreateTransaction(UniformInt(0, num_pos - 1),
UniformInt(0, num_cards - 1), num_transactions++,
false);
return;
}
/*
* - fraud scores are calculated as transactions are added
* - no deletions
*/
if (FLAGS_scenario == "strata_v1") {
if (role_ == Role::ANALYTIC) {
std::this_thread::sleep_for(std::chrono::milliseconds(
config_["analytic_query_interval"].get<int>()));
GetCompromisedPosInc(config_["analytic_query_pos_limit"].get<int>());
return;
}
if (role_ == Role::WORKER) {
bool is_fraud =
UniformDouble(0, 1) < config_["fraud_probability"].get<double>();
int pos_id = UniformInt(0, num_pos - 1);
int pos_worker = pos_id / config_["pos_per_worker"].get<int>();
int card_worker = pos_worker;
bool hop =
UniformDouble(0, 1) < config_["hop_probability"].get<double>();
if (hop) {
card_worker = UniformInt(0, config_["num_workers"].get<int>() - 2);
if (card_worker >= pos_worker) {
++card_worker;
}
}
int card_id = card_worker * config_["cards_per_worker"].get<int>() +
UniformInt(0, config_["cards_per_worker"].get<int>() - 1);
int tx_id = num_transactions++;
CreateTransaction(pos_id, card_id, tx_id, is_fraud);
if (is_fraud) {
UpdateFraudScores(tx_id);
}
}
return;
}
if (FLAGS_scenario == "strata_v2") {
LOG(FATAL) << "Not yet implemented!";
return;
}
if (FLAGS_scenario == "strata_v3") {
LOG(FATAL) << "Not yet implemented!";
return;
}
LOG(FATAL) << "Should not get here: unknown scenario!";
}
};
int64_t NumNodesWithLabel(BoltClient &client, std::string label) {
std::string query = fmt::format("MATCH (u :{}) RETURN COUNT(u)", label);
auto result = ExecuteNTimesTillSuccess(client, query, {}, MAX_RETRIES);
return result.records[0][0].ValueInt();
return result.first.records[0][0].ValueInt();
}
void CreateIndex(BoltClient &client, const std::string &label,
@ -147,6 +241,18 @@ void CreateIndex(BoltClient &client, const std::string &label,
}
}
nlohmann::json LoadConfig() {
nlohmann::json config;
if (FLAGS_config != "") {
LOG(INFO) << "Loading config from: " << FLAGS_config;
std::ifstream is(FLAGS_config);
is >> config;
} else {
LOG(INFO) << "No test config provided";
}
return config;
}
int main(int argc, char **argv) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
google::InitGoogleLogging(argv[0]);
@ -168,9 +274,25 @@ int main(int argc, char **argv) {
client.Close();
auto config = LoadConfig();
std::vector<std::unique_ptr<TestClient>> clients;
for (int i = 0; i < FLAGS_num_workers; ++i) {
clients.emplace_back(std::make_unique<CardFraudClient>(i));
if (FLAGS_scenario == "strata_v1") {
CHECK(num_pos == config["num_workers"].get<int>() *
config["pos_per_worker"].get<int>())
<< "Wrong number of POS per worker";
CHECK(num_cards == config["num_workers"].get<int>() *
config["cards_per_worker"].get<int>())
<< "Wrong number of cards per worker";
for (int i = 0; i < FLAGS_num_workers - 1; ++i) {
clients.emplace_back(std::make_unique<CardFraudClient>(i, config));
}
clients.emplace_back(std::make_unique<CardFraudClient>(
FLAGS_num_workers - 1, config, Role::ANALYTIC));
} else {
for (int i = 0; i < FLAGS_num_workers; ++i) {
clients.emplace_back(std::make_unique<CardFraudClient>(i, config));
}
}
RunMultithreadedTest(clients);

View File

@ -56,7 +56,7 @@ void PrintJsonDecodedValue(std::ostream &os,
}
template <typename TClient>
communication::bolt::QueryData ExecuteNTimesTillSuccess(
std::pair<communication::bolt::QueryData, int> ExecuteNTimesTillSuccess(
TClient &client, const std::string &query,
const std::map<std::string, communication::bolt::DecodedValue> &params,
int times) {
@ -66,7 +66,7 @@ communication::bolt::QueryData ExecuteNTimesTillSuccess(
for (int i = 0; i < times; ++i) {
try {
auto ret = client.Execute(query, params);
return ret;
return {ret, i};
} catch (const utils::BasicException &e) {
last_exception = e;
utils::Timer t;
@ -79,4 +79,4 @@ communication::bolt::QueryData ExecuteNTimesTillSuccess(
LOG(WARNING) << query << " failed " << times << "times";
throw last_exception.value();
}
}
} // namespace

View File

@ -24,6 +24,7 @@ DEFINE_string(group, "unknown", "Test group name");
DEFINE_string(scenario, "unknown", "Test scenario name");
auto &executed_queries = stats::GetCounter("executed_queries");
auto &serialization_errors = stats::GetCounter("serialization_errors");
class TestClient {
public:
@ -55,12 +56,21 @@ class TestClient {
protected:
virtual void Step() = 0;
communication::bolt::QueryData Execute(
std::experimental::optional<communication::bolt::QueryData> Execute(
const std::string &query,
const std::map<std::string, DecodedValue> &params,
const std::string &query_name = "") {
communication::bolt::QueryData result;
int retries;
utils::Timer timer;
auto result = ExecuteNTimesTillSuccess(client_, query, params, MAX_RETRIES);
try {
std::tie(result, retries) =
ExecuteNTimesTillSuccess(client_, query, params, MAX_RETRIES);
} catch (const utils::BasicException &e) {
LOG(WARNING) << e.what();
serialization_errors.Bump(MAX_RETRIES);
return std::experimental::nullopt;
}
auto wall_time = timer.Elapsed();
auto metadata = result.metadata;
metadata["wall_time"] = wall_time.count();
@ -73,6 +83,7 @@ class TestClient {
}
}
executed_queries.Bump();
serialization_errors.Bump(retries);
return result;
}
@ -128,7 +139,10 @@ void RunMultithreadedTest(std::vector<std::unique_ptr<TestClient>> &clients) {
// little bit chaotic. Think about refactoring this part to only use json
// and write DecodedValue to json converter.
const std::vector<std::string> fields = {
"wall_time", "parsing_time", "planning_time", "plan_execution_time",
"wall_time",
"parsing_time",
"planning_time",
"plan_execution_time",
};
for (const auto &query_stats : stats) {
std::map<std::string, double> new_aggregated_query_stats;
@ -149,21 +163,21 @@ void RunMultithreadedTest(std::vector<std::unique_ptr<TestClient>> &clients) {
for (const auto &stat : new_aggregated_query_stats) {
auto it = aggregated_query_stats.insert({stat.first, DecodedValue(0.0)})
.first;
it->second =
(it->second.ValueDouble() * old_count + stat.second) /
(old_count + new_count);
it->second = (it->second.ValueDouble() * old_count + stat.second) /
(old_count + new_count);
}
}
out << "{\"num_executed_queries\": " << executed_queries.Value() << ", "
<< "\"elapsed_time\": " << timer.Elapsed().count()
<< ", \"queries\": [";
utils::PrintIterable(out, aggregated_stats, ", ", [](auto &stream,
const auto &x) {
stream << "{\"query\": " << nlohmann::json(x.first) << ", \"stats\": ";
PrintJsonDecodedValue(stream, DecodedValue(x.second));
stream << "}";
});
utils::PrintIterable(
out, aggregated_stats, ", ", [](auto &stream, const auto &x) {
stream << "{\"query\": " << nlohmann::json(x.first)
<< ", \"stats\": ";
PrintJsonDecodedValue(stream, DecodedValue(x.second));
stream << "}";
});
out << "]}" << std::endl;
out.flush();
std::this_thread::sleep_for(1s);

View File

@ -128,21 +128,21 @@ class PokecClient : public TestClient {
auto ret = Execute(os.str(), {},
"MATCH (n :label {id: ...}) MATCH (m :label {id: ...}) "
"CREATE (n)-[:type ...]-(m)");
CHECK(ret.records.size() == 1U)
CHECK(ret->records.size() == 1U)
<< "from_id: " << from_id << " "
<< "to_id: " << to_id << " "
<< "ret.records.size(): " << ret.records.size();
<< "ret.records.size(): " << ret->records.size();
return ret;
}
VertexAndEdges RetrieveAndDeleteVertex(const std::string &label, int64_t id) {
auto vertex_record = MatchVertex(label, id).records;
auto vertex_record = MatchVertex(label, id)->records;
CHECK(vertex_record.size() == 1U)
<< "id: " << id << " "
<< "vertex_record.size(): " << vertex_record.size();
auto records = MatchNeighbours(label, id).records;
auto records = MatchNeighbours(label, id)->records;
DetachDeleteVertex(label, id);
@ -175,7 +175,7 @@ class PokecClient : public TestClient {
vertex_and_edges.vertex.properties.at("id").ValueInt(), label,
vertex_and_edges.vertices[i].properties.at("id").ValueInt(),
vertex_and_edges.edges[i])
.records;
->records;
CHECK(records.size() == 1U)
<< "Graph in invalid state "
<< vertex_and_edges.vertex.properties.at("id");
@ -227,7 +227,7 @@ int64_t NumNodes(BoltClient &client, const std::string &label) {
auto result = ExecuteNTimesTillSuccess(
client, "MATCH (n :" + label + ") RETURN COUNT(n) as cnt", {},
MAX_RETRIES);
return result.records[0][0].ValueInt();
return result.first.records[0][0].ValueInt();
}
std::vector<int64_t> Neighbours(BoltClient &client, const std::string &label,
@ -238,7 +238,7 @@ std::vector<int64_t> Neighbours(BoltClient &client, const std::string &label,
"})-[e]-(m) RETURN m.id",
{}, MAX_RETRIES);
std::vector<int64_t> ret;
for (const auto &record : result.records) {
for (const auto &record : result.first.records) {
ret.push_back(record[0].ValueInt());
}
return ret;
@ -306,6 +306,6 @@ int main(int argc, char **argv) {
}
RunMultithreadedTest(clients);
return 0;
}

View File

@ -80,8 +80,8 @@ void ExecuteQueries(const std::vector<std::string> &queries, int num_workers,
str = queries[pos];
}
try {
metadata[pos] =
ExecuteNTimesTillSuccess(client, str, {}, MAX_RETRIES).metadata;
metadata[pos] = ExecuteNTimesTillSuccess(client, str, {}, MAX_RETRIES)
.first.metadata;
} catch (const utils::BasicException &e) {
LOG(FATAL) << "Could not execute query '" << str << "' "
<< MAX_RETRIES << " times! Error message: " << e.what();