Add deletion, more stats to card fraud and RWLock
Summary: ^^ this is a sample config to be used: ``` { "num_workers": 1, "cards_per_worker": 10001, "pos_per_worker": 10001, "fraud_probability": 0.01, "hop_probability": 0.05, "cleanup": { "check_interval_sec": 10, "tx_hi": 150000, "tx_lo": 100000 }, "analytic": { "query_interval_ms": 500, "pos_limit": 10 } } ``` I also added `RWLock` --- a wrapper around `pthread_rwlock` Reviewers: mferencevic, mculinovic, florijan, teon.banek Reviewed By: florijan Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1280
This commit is contained in:
parent
7dbee00f2f
commit
2263a423b8
src
tests
@ -48,6 +48,7 @@ set(memgraph_src_files
|
||||
storage/property_value.cpp
|
||||
storage/record_accessor.cpp
|
||||
storage/vertex_accessor.cpp
|
||||
threading/sync/rwlock.cpp
|
||||
threading/thread.cpp
|
||||
transactions/engine_master.cpp
|
||||
transactions/engine_single_node.cpp
|
||||
|
90
src/threading/sync/rwlock.cpp
Normal file
90
src/threading/sync/rwlock.cpp
Normal file
@ -0,0 +1,90 @@
|
||||
#include "threading/sync/rwlock.hpp"
|
||||
|
||||
namespace threading {
|
||||
|
||||
RWLock::RWLock(RWLockPriority priority) {
|
||||
int err;
|
||||
pthread_rwlockattr_t attr;
|
||||
|
||||
err = pthread_rwlockattr_init(&attr);
|
||||
if (err != 0) {
|
||||
throw std::system_error(err, std::system_category());
|
||||
}
|
||||
|
||||
switch (priority) {
|
||||
case RWLockPriority::READ:
|
||||
pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_READER_NP);
|
||||
break;
|
||||
case RWLockPriority::WRITE:
|
||||
/* There is also `PTHREAD_RWLOCK_PREFER_WRITER_NP` but it is not
|
||||
* providing the desired behavior.
|
||||
*
|
||||
* From `man 7 pthread_rwlockattr_setkind_np`:
|
||||
* "Setting the value read-write lock kind to
|
||||
* PTHREAD_RWLOCK_PREFER_WRITER_NP results in the same behavior as
|
||||
* setting the value to PTHREAD_RWLOCK_PREFER_READER_NP. As long as a
|
||||
* reader thread holds the lock, the thread holding a write lock will be
|
||||
* starved. Setting the lock kind to
|
||||
* PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP allows writers to run,
|
||||
* but, as the name implies a writer may not lock recursively."
|
||||
*
|
||||
* For this reason, `RWLock` should not be used recursively.
|
||||
* */
|
||||
pthread_rwlockattr_setkind_np(
|
||||
&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
|
||||
break;
|
||||
}
|
||||
|
||||
err = pthread_rwlock_init(&lock_, &attr);
|
||||
pthread_rwlockattr_destroy(&attr);
|
||||
|
||||
if (err != 0) {
|
||||
throw std::system_error(err, std::system_category());
|
||||
}
|
||||
}
|
||||
|
||||
RWLock::~RWLock() { pthread_rwlock_destroy(&lock_); }
|
||||
|
||||
void RWLock::lock() {
|
||||
int err = pthread_rwlock_wrlock(&lock_);
|
||||
if (err != 0) {
|
||||
throw std::system_error(err, std::system_category());
|
||||
}
|
||||
}
|
||||
|
||||
bool RWLock::try_lock() {
|
||||
int err = pthread_rwlock_trywrlock(&lock_);
|
||||
if (err == 0) return true;
|
||||
if (err == EBUSY) return false;
|
||||
throw std::system_error(err, std::system_category());
|
||||
}
|
||||
|
||||
void RWLock::unlock() {
|
||||
int err = pthread_rwlock_unlock(&lock_);
|
||||
if (err != 0) {
|
||||
throw std::system_error(err, std::system_category());
|
||||
}
|
||||
}
|
||||
|
||||
void RWLock::lock_shared() {
|
||||
int err = pthread_rwlock_rdlock(&lock_);
|
||||
if (err != 0) {
|
||||
throw std::system_error(err, std::system_category());
|
||||
}
|
||||
}
|
||||
|
||||
bool RWLock::try_lock_shared() {
|
||||
int err = pthread_rwlock_tryrdlock(&lock_);
|
||||
if (err == 0) return true;
|
||||
if (err == EBUSY) return false;
|
||||
throw std::system_error(err, std::system_category());
|
||||
}
|
||||
|
||||
void RWLock::unlock_shared() {
|
||||
int err = pthread_rwlock_unlock(&lock_);
|
||||
if (err != 0) {
|
||||
throw std::system_error(err, std::system_category());
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace threading
|
47
src/threading/sync/rwlock.hpp
Normal file
47
src/threading/sync/rwlock.hpp
Normal file
@ -0,0 +1,47 @@
|
||||
/// @file
|
||||
#pragma once
|
||||
|
||||
#include <pthread.h>
|
||||
|
||||
#include "glog/logging.h"
|
||||
|
||||
namespace threading {
|
||||
|
||||
/// By passing the appropriate parameter to the `RWLock` constructor, it is
|
||||
/// possible to control the behavior of `RWLock` while shared lock is held. If
|
||||
/// the priority is set to `READ`, new shared (read) locks can be obtained even
|
||||
/// though there is a thread waiting for an exclusive (write) lock, which can
|
||||
/// lead to writer starvation. If the priority is set to `WRITE`, readers will
|
||||
/// be blocked from obtaining new shared locks while there are writers waiting,
|
||||
/// which can lead to reader starvation. However, if there is an exclusive lock
|
||||
/// held, writers will always be preferred upon it's release.
|
||||
enum RWLockPriority { READ, WRITE };
|
||||
|
||||
/// A wrapper around `pthread_rwlock_t`, useful because it is not possible to
|
||||
/// choose read or write priority for `std::shared_mutex`.
|
||||
class RWLock {
|
||||
public:
|
||||
RWLock(const RWLock &) = delete;
|
||||
RWLock &operator=(const RWLock &) = delete;
|
||||
RWLock(RWLock &&) = delete;
|
||||
RWLock &operator=(RWLock &&) = delete;
|
||||
|
||||
/// Construct a RWLock object with chosen priority. See comment above
|
||||
/// `RWLockPriority` for details.
|
||||
explicit RWLock(RWLockPriority priority);
|
||||
|
||||
~RWLock();
|
||||
|
||||
bool try_lock();
|
||||
void lock();
|
||||
void unlock();
|
||||
|
||||
bool try_lock_shared();
|
||||
void lock_shared();
|
||||
void unlock_shared();
|
||||
|
||||
private:
|
||||
pthread_rwlock_t lock_ = PTHREAD_RWLOCK_INITIALIZER;
|
||||
};
|
||||
|
||||
} // namespace threading
|
@ -1,5 +1,6 @@
|
||||
#include <memory>
|
||||
#include <random>
|
||||
#include <shared_mutex>
|
||||
#include <vector>
|
||||
|
||||
#include "gflags/gflags.h"
|
||||
@ -7,6 +8,7 @@
|
||||
#include "long_running_common.hpp"
|
||||
#include "stats/stats.hpp"
|
||||
#include "stats/stats_rpc_messages.hpp"
|
||||
#include "threading/sync/rwlock.hpp"
|
||||
|
||||
// TODO(mtomic): this sucks but I don't know a different way to make it work
|
||||
#include "boost/archive/binary_iarchive.hpp"
|
||||
@ -17,211 +19,33 @@ BOOST_CLASS_EXPORT(stats::StatsRes);
|
||||
BOOST_CLASS_EXPORT(stats::BatchStatsReq);
|
||||
BOOST_CLASS_EXPORT(stats::BatchStatsRes);
|
||||
|
||||
std::atomic<int> num_pos;
|
||||
std::atomic<int> num_cards;
|
||||
std::atomic<int> num_transactions;
|
||||
std::atomic<int64_t> num_pos;
|
||||
std::atomic<int64_t> num_cards;
|
||||
std::atomic<int64_t> num_transactions;
|
||||
std::atomic<int64_t> max_tx_id;
|
||||
|
||||
threading::RWLock world_lock(threading::RWLockPriority::WRITE);
|
||||
|
||||
DEFINE_string(config, "", "test config");
|
||||
|
||||
enum class Role { WORKER, ANALYTIC };
|
||||
enum class Role { WORKER, ANALYTIC, CLEANUP };
|
||||
|
||||
class CardFraudClient : public TestClient {
|
||||
public:
|
||||
CardFraudClient(int id, nlohmann::json config, Role role = Role::WORKER)
|
||||
: TestClient(), rg_(id), role_(role), config_(config) {}
|
||||
stats::Gauge &num_vertices = stats::GetGauge("vertices");
|
||||
stats::Gauge &num_edges = stats::GetGauge("edges");
|
||||
|
||||
private:
|
||||
std::mt19937 rg_;
|
||||
Role role_;
|
||||
nlohmann::json config_;
|
||||
|
||||
auto HeavyRead() {
|
||||
return Execute(
|
||||
"MATCH (t:Transaction {fraud_reported: true}) "
|
||||
"WITH COLLECT(t.id) as ids "
|
||||
"RETURN head(ids)",
|
||||
{}, "HeavyRead");
|
||||
}
|
||||
|
||||
auto GetFraudulentTransactions() {
|
||||
return Execute(
|
||||
"MATCH (t:Transaction {fraud_reported: true}) "
|
||||
"RETURN t.id as id",
|
||||
{}, "GetFraudulentTransactions");
|
||||
}
|
||||
|
||||
auto GetCompromisedPos(int pos_limit) {
|
||||
return Execute(
|
||||
"MATCH (t:Transaction {fraud_reported: true})-[:Using]->(:Card)"
|
||||
"<-[:Using]-(:Transaction)-[:At]->(p:Pos) "
|
||||
"WITH p.id as pos, count(t) as connected_frauds "
|
||||
"WHERE connected_frauds > 1 "
|
||||
"RETURN pos, connected_frauds "
|
||||
"ORDER BY connected_frauds DESC LIMIT $pos_limit",
|
||||
{{"pos_limit", pos_limit}}, "GetCompromisedPos");
|
||||
}
|
||||
|
||||
auto GetCompromisedPosInc(int pos_limit) {
|
||||
return Execute(
|
||||
"MATCH (p:Pos) "
|
||||
"RETURN p, p.connected_frauds "
|
||||
"ORDER BY p.connected_frauds DESC "
|
||||
"LIMIT $pos_limit",
|
||||
{{"pos_limit", pos_limit}}, "GetCompromisedPosInc");
|
||||
}
|
||||
|
||||
auto ResolvePos(int id) {
|
||||
return Execute(
|
||||
"MATCH (p:Pos {id: $id}) "
|
||||
"SET p.compromised = false "
|
||||
"WITH p MATCH (p)--(t:Transaction)--(c:Card) "
|
||||
"SET t.fraud_reported = false, c.compromised = false",
|
||||
{{"id", id}}, "ResolvePos");
|
||||
}
|
||||
|
||||
auto GetTransaction(int id) {
|
||||
return Execute("MATCH (t:Transaction {id: $id}) RETURN (t)", {{"id", id}},
|
||||
"GetTransaction");
|
||||
}
|
||||
|
||||
auto TepsQuery() {
|
||||
auto result = Execute("MATCH (u)--(v) RETURN count(1)", {}, "TepsQuery");
|
||||
DCHECK(result && result->records[0][0].ValueInt() == num_transactions * 2);
|
||||
}
|
||||
|
||||
auto CompromisePos(int id) {
|
||||
return Execute(
|
||||
"MATCH (p:Pos {id: $id}) "
|
||||
"SET p.compromised = true "
|
||||
"WITH p MATCH (p)--(t:Transaction)--(c:Card) "
|
||||
"SET t.fraud_reported = false, c.compromised = true",
|
||||
{{"id", id}}, "CompromisePos");
|
||||
}
|
||||
auto CreateTransaction(int pos_id, int card_id, int tx_id, bool is_fraud) {
|
||||
return Execute(
|
||||
"MATCH (p:Pos {id: $pos_id}), (c:Card {id: $card_id}) "
|
||||
"CREATE (c)<-[:Using]-(t:Transaction {id: $tx_id, fraud_reported: "
|
||||
"$is_fraud})-[:At]->(p)",
|
||||
{{"pos_id", pos_id},
|
||||
{"card_id", card_id},
|
||||
{"tx_id", tx_id},
|
||||
{"is_fraud", is_fraud}},
|
||||
"CreateTransaction");
|
||||
}
|
||||
|
||||
auto CreateTransactionWithoutEdge(int pos_id, int card_id, int tx_id,
|
||||
bool is_fraud) {
|
||||
return Execute(
|
||||
"MATCH (p:Pos {id: $pos_id}), (c:Card {id: $card_id}) "
|
||||
"CREATE (t:Transaction {id: $tx_id, fraud_reported: false})",
|
||||
{{"pos_id", pos_id},
|
||||
{"card_id", card_id},
|
||||
{"tx_id", tx_id},
|
||||
{"is_fraud", is_fraud}},
|
||||
"CreateTransactionWithoutEdge");
|
||||
}
|
||||
|
||||
auto UpdateFraudScores(int tx_id) {
|
||||
return Execute(
|
||||
"MATCH (t:Transaction {id: "
|
||||
"$tx_id})-[:Using]->(:Card)<-[:Using]-(:Transaction)-[:At]->(p:Pos) "
|
||||
"SET p.connected_frauds = p.connected_frauds + 1",
|
||||
{{"tx_id", tx_id}}, "UpdateFraudScores");
|
||||
}
|
||||
|
||||
int UniformInt(int a, int b) {
|
||||
std::uniform_int_distribution<int> dist(a, b);
|
||||
return dist(rg_);
|
||||
}
|
||||
|
||||
double UniformDouble(double a, double b) {
|
||||
std::uniform_real_distribution<double> dist(a, b);
|
||||
return dist(rg_);
|
||||
}
|
||||
|
||||
public:
|
||||
virtual void Step() override {
|
||||
if (FLAGS_scenario == "heavy_read") {
|
||||
HeavyRead();
|
||||
return;
|
||||
}
|
||||
|
||||
if (FLAGS_scenario == "teps") {
|
||||
TepsQuery();
|
||||
return;
|
||||
}
|
||||
|
||||
if (FLAGS_scenario == "point_lookup") {
|
||||
GetTransaction(UniformInt(0, num_transactions - 1));
|
||||
return;
|
||||
}
|
||||
|
||||
if (FLAGS_scenario == "create_tx") {
|
||||
CreateTransaction(UniformInt(0, num_pos - 1),
|
||||
UniformInt(0, num_cards - 1), num_transactions++,
|
||||
false);
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* - fraud scores are calculated as transactions are added
|
||||
* - no deletions
|
||||
*/
|
||||
if (FLAGS_scenario == "strata_v1") {
|
||||
if (role_ == Role::ANALYTIC) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(
|
||||
config_["analytic_query_interval"].get<int>()));
|
||||
GetCompromisedPosInc(config_["analytic_query_pos_limit"].get<int>());
|
||||
return;
|
||||
}
|
||||
|
||||
if (role_ == Role::WORKER) {
|
||||
bool is_fraud =
|
||||
UniformDouble(0, 1) < config_["fraud_probability"].get<double>();
|
||||
|
||||
int pos_id = UniformInt(0, num_pos - 1);
|
||||
int pos_worker = pos_id / config_["pos_per_worker"].get<int>();
|
||||
|
||||
int card_worker = pos_worker;
|
||||
bool hop =
|
||||
UniformDouble(0, 1) < config_["hop_probability"].get<double>();
|
||||
|
||||
if (hop) {
|
||||
card_worker = UniformInt(0, config_["num_workers"].get<int>() - 2);
|
||||
if (card_worker >= pos_worker) {
|
||||
++card_worker;
|
||||
}
|
||||
}
|
||||
|
||||
int card_id = card_worker * config_["cards_per_worker"].get<int>() +
|
||||
|
||||
UniformInt(0, config_["cards_per_worker"].get<int>() - 1);
|
||||
|
||||
int tx_id = num_transactions++;
|
||||
CreateTransaction(pos_id, card_id, tx_id, is_fraud);
|
||||
if (is_fraud) {
|
||||
UpdateFraudScores(tx_id);
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (FLAGS_scenario == "strata_v2") {
|
||||
LOG(FATAL) << "Not yet implemented!";
|
||||
return;
|
||||
}
|
||||
|
||||
if (FLAGS_scenario == "strata_v3") {
|
||||
LOG(FATAL) << "Not yet implemented!";
|
||||
return;
|
||||
}
|
||||
|
||||
LOG(FATAL) << "Should not get here: unknown scenario!";
|
||||
}
|
||||
};
|
||||
void UpdateStats() {
|
||||
num_vertices.Set(num_pos + num_cards + num_transactions);
|
||||
num_edges.Set(2 * num_transactions);
|
||||
}
|
||||
|
||||
int64_t NumNodesWithLabel(BoltClient &client, std::string label) {
|
||||
std::string query = fmt::format("MATCH (u :{}) RETURN COUNT(u)", label);
|
||||
std::string query = fmt::format("MATCH (u :{}) RETURN count(u)", label);
|
||||
auto result = ExecuteNTimesTillSuccess(client, query, {}, MAX_RETRIES);
|
||||
return result.first.records[0][0].ValueInt();
|
||||
}
|
||||
|
||||
int64_t MaxIdForLabel(BoltClient &client, std::string label) {
|
||||
std::string query = fmt::format("MATCH (u :{}) RETURN max(u.id)", label);
|
||||
auto result = ExecuteNTimesTillSuccess(client, query, {}, MAX_RETRIES);
|
||||
return result.first.records[0][0].ValueInt();
|
||||
}
|
||||
@ -241,6 +65,264 @@ void CreateIndex(BoltClient &client, const std::string &label,
|
||||
}
|
||||
}
|
||||
|
||||
class CardFraudClient : public TestClient {
|
||||
public:
|
||||
CardFraudClient(int id, nlohmann::json config, Role role = Role::WORKER)
|
||||
: TestClient(), rg_(id), role_(role), config_(config) {}
|
||||
|
||||
private:
|
||||
std::mt19937 rg_;
|
||||
Role role_;
|
||||
nlohmann::json config_;
|
||||
|
||||
void GetFraudulentTransactions() {
|
||||
auto result = Execute(
|
||||
"MATCH (t:Transaction {fraud_reported: true}) "
|
||||
"RETURN t.id as id",
|
||||
{}, "GetFraudulentTransactions");
|
||||
CHECK(result) << "Read-only query should not fail!";
|
||||
}
|
||||
|
||||
/* This query could be rewritten into an equivalent one:
|
||||
* 'MATCH (t:Transaction {fraud_reported: true}) RETURN t LIMIT 1'.
|
||||
* When written like this it causes a lot of network traffic between
|
||||
* distributed workers because they have to return all their data to master
|
||||
* instead of just one node, but doesn't overload the client because it has to
|
||||
* process just one return value.
|
||||
*/
|
||||
void HeavyRead() {
|
||||
auto result = Execute(
|
||||
"MATCH (t:Transaction {fraud_reported: true}) "
|
||||
"WITH COLLECT(t.id) as ids "
|
||||
"RETURN head(ids)",
|
||||
{}, "HeavyRead");
|
||||
CHECK(result) << "Read-only query should not fail";
|
||||
}
|
||||
|
||||
/* If a card was used in a fraudulent transaction, we mark all POS it was used
|
||||
* on as possibly fraudulent (having a connected fraud). This query counts
|
||||
* connected frauds for each POS and orders them by that number.
|
||||
*/
|
||||
/* TODO(mtomic): this would make more sense if the data was timestamped. */
|
||||
auto GetCompromisedPos(int pos_limit) {
|
||||
auto result = Execute(
|
||||
"MATCH (t:Transaction {fraud_reported: true})-[:Using]->(:Card)"
|
||||
"<-[:Using]-(:Transaction)-[:At]->(p:Pos) "
|
||||
"WITH p.id as pos, count(t) as connected_frauds "
|
||||
"WHERE connected_frauds > 1 "
|
||||
"RETURN pos, connected_frauds "
|
||||
"ORDER BY connected_frauds DESC LIMIT $pos_limit",
|
||||
{{"pos_limit", pos_limit}}, "GetCompromisedPos");
|
||||
CHECK(result) << "Read-only query should not fail";
|
||||
}
|
||||
|
||||
/* The following two queries approximate the above one. `UpdateFraudScores`
|
||||
* computes the number of connected frauds for each POS incrementally, as
|
||||
* transactions are added, and `GetCompromisedPosInc` just orders them by that
|
||||
* value. Both queries should execute reasonably fast, while the above one can
|
||||
* take a lot of time if the dataset is large. The `fraud_score` value
|
||||
* computed is not equal to `connected_frauds` computed in the above query
|
||||
* because there could be multiple paths from a transaction to a POS. */
|
||||
void UpdateFraudScores(int64_t tx_id) {
|
||||
auto result = Execute(
|
||||
"MATCH (t:Transaction {id: "
|
||||
"$tx_id})-[:Using]->(:Card)<-[:Using]-(:Transaction)-[:At]->(p:Pos) "
|
||||
"SET p.connected_frauds = p.connected_frauds + 1",
|
||||
{{"tx_id", tx_id}}, "UpdateFraudScores");
|
||||
LOG_IF(WARNING, !result) << "`UpdateFraudScores` failed too many times!";
|
||||
}
|
||||
|
||||
void GetCompromisedPosInc(int64_t pos_limit) {
|
||||
auto result = Execute(
|
||||
"MATCH (p:Pos) "
|
||||
"RETURN p, p.connected_frauds "
|
||||
"ORDER BY p.connected_frauds DESC "
|
||||
"LIMIT $pos_limit",
|
||||
{{"pos_limit", pos_limit}}, "GetCompromisedPosInc");
|
||||
CHECK(result) << "Read-only query should not fail";
|
||||
}
|
||||
|
||||
/* This is used to approximate Memgraph's TEPS (traversed edges per seconds)
|
||||
* metric by multiplying the throughput with the size of the dataset. */
|
||||
void TepsQuery() {
|
||||
auto result = Execute("MATCH (u)--(v) RETURN count(1)", {}, "TepsQuery");
|
||||
CHECK(result) << "Read-only query should not fail";
|
||||
CHECK(result->records[0][0].ValueInt() == num_transactions * 4)
|
||||
<< "Wrong count returned from TEPS query";
|
||||
}
|
||||
|
||||
/* Simple point lookup. */
|
||||
void GetTransaction(int64_t id) {
|
||||
auto result = Execute("MATCH (t:Transaction {id: $id}) RETURN (t)",
|
||||
{{"id", id}}, "GetTransaction");
|
||||
CHECK(result) << "Read-only query should not fail";
|
||||
CHECK(result->records[0][0].ValueVertex().properties["id"].ValueInt() == id)
|
||||
<< "Transaction with wrong ID returned from point lookup";
|
||||
}
|
||||
|
||||
void CreateTransaction(int64_t pos_id, int64_t card_id, int64_t tx_id,
|
||||
bool is_fraud) {
|
||||
auto result = Execute(
|
||||
"MATCH (p:Pos {id: $pos_id}), (c:Card {id: $card_id}) "
|
||||
"CREATE (c)<-[:Using]-(t:Transaction {id: $tx_id, fraud_reported: "
|
||||
"$is_fraud})-[:At]->(p) RETURN t",
|
||||
{{"pos_id", pos_id},
|
||||
{"card_id", card_id},
|
||||
{"tx_id", tx_id},
|
||||
{"is_fraud", is_fraud}},
|
||||
"CreateTransaction");
|
||||
|
||||
if (!result) {
|
||||
LOG(WARNING) << "`CreateTransaction` failed too many times!";
|
||||
return;
|
||||
}
|
||||
|
||||
CHECK(result->records.size() == 1) << fmt::format(
|
||||
"Failed to create transaction: (:Card {{id: {}}})<-(:Transaction "
|
||||
"{{id: {}}})->(:Pos {{id: {}}})",
|
||||
card_id, tx_id, pos_id);
|
||||
|
||||
num_transactions++;
|
||||
UpdateStats();
|
||||
}
|
||||
|
||||
int64_t UniformInt(int64_t a, int64_t b) {
|
||||
std::uniform_int_distribution<int64_t> dist(a, b);
|
||||
return dist(rg_);
|
||||
}
|
||||
|
||||
double UniformDouble(double a, double b) {
|
||||
std::uniform_real_distribution<double> dist(a, b);
|
||||
return dist(rg_);
|
||||
}
|
||||
|
||||
public:
|
||||
void AnalyticStep() {
|
||||
std::this_thread::sleep_for(
|
||||
std::chrono::milliseconds(config_["analytic"]["query_interval_ms"]));
|
||||
std::shared_lock<threading::RWLock> lock(world_lock);
|
||||
GetCompromisedPosInc(config_["analytic"]["pos_limit"]);
|
||||
}
|
||||
|
||||
void WorkerStep() {
|
||||
std::shared_lock<threading::RWLock> lock(world_lock);
|
||||
bool is_fraud = UniformDouble(0, 1) < config_["fraud_probability"];
|
||||
|
||||
int64_t pos_id = UniformInt(0, num_pos - 1);
|
||||
int64_t pos_worker = pos_id / config_["pos_per_worker"].get<int64_t>();
|
||||
|
||||
int64_t card_worker = pos_worker;
|
||||
|
||||
if (config_["num_workers"] > 1 &&
|
||||
UniformDouble(0, 1) < config_["hop_probability"]) {
|
||||
card_worker = UniformInt(0, config_["num_workers"].get<int64_t>() - 2);
|
||||
if (card_worker >= pos_worker) {
|
||||
++card_worker;
|
||||
}
|
||||
}
|
||||
|
||||
int64_t card_id =
|
||||
card_worker * config_["cards_per_worker"].get<int64_t>() +
|
||||
UniformInt(0, config_["cards_per_worker"].get<int64_t>() - 1);
|
||||
|
||||
int64_t tx_id = ++max_tx_id;
|
||||
CreateTransaction(pos_id, card_id, tx_id, is_fraud);
|
||||
if (is_fraud) {
|
||||
UpdateFraudScores(tx_id);
|
||||
}
|
||||
}
|
||||
|
||||
int64_t NumTransactions() {
|
||||
auto result =
|
||||
Execute("MATCH (t:Transaction) RETURN count(1)", {}, "NumTransactions");
|
||||
CHECK(result) << "Read-only query should not fail!";
|
||||
return result->records[0][0].ValueInt();
|
||||
}
|
||||
|
||||
void CleanupStep() {
|
||||
if (num_transactions >= config_["cleanup"]["tx_hi"].get<int64_t>()) {
|
||||
LOG(INFO) << "Trying to obtain world lock...";
|
||||
std::unique_lock<threading::RWLock> lock(world_lock);
|
||||
int64_t id_limit = max_tx_id - config_["cleanup"]["tx_lo"].get<int>() + 1;
|
||||
LOG(INFO) << "Transaction cleanup started, deleting transactions "
|
||||
"with ids less than "
|
||||
<< id_limit;
|
||||
utils::Timer timer;
|
||||
auto result = Execute(
|
||||
"MATCH (t:Transaction) WHERE t.id < $id_limit "
|
||||
"DETACH DELETE t RETURN count(1)",
|
||||
{{"id_limit", id_limit}}, "TransactionCleanup");
|
||||
int64_t deleted = 0;
|
||||
if (result) {
|
||||
deleted = result->records[0][0].ValueInt();
|
||||
} else {
|
||||
LOG(ERROR) << "Transaction cleanup failed";
|
||||
}
|
||||
LOG(INFO) << "Deleted " << deleted << " transactions in "
|
||||
<< timer.Elapsed().count() << " seconds";
|
||||
int64_t num_transactions_db = NumTransactions();
|
||||
CHECK(num_transactions_db == num_transactions - deleted) << fmt::format(
|
||||
"Number of transactions after deletion doesn't match: "
|
||||
"before = {}, after = {}, reported deleted = {}, actual = "
|
||||
"{}",
|
||||
num_transactions, num_transactions_db, deleted,
|
||||
num_transactions - num_transactions_db);
|
||||
num_transactions = num_transactions_db;
|
||||
UpdateStats();
|
||||
}
|
||||
|
||||
std::this_thread::sleep_for(
|
||||
std::chrono::seconds(config_["cleanup"]["check_interval_sec"]));
|
||||
}
|
||||
|
||||
virtual void Step() override {
|
||||
if (FLAGS_scenario == "heavy_read") {
|
||||
HeavyRead();
|
||||
return;
|
||||
}
|
||||
|
||||
if (FLAGS_scenario == "teps") {
|
||||
TepsQuery();
|
||||
return;
|
||||
}
|
||||
|
||||
if (FLAGS_scenario == "point_lookup") {
|
||||
GetTransaction(UniformInt(0, num_transactions - 1));
|
||||
return;
|
||||
}
|
||||
|
||||
if (FLAGS_scenario == "create_tx") {
|
||||
CreateTransaction(UniformInt(0, num_pos - 1),
|
||||
UniformInt(0, num_cards - 1), ++max_tx_id, false);
|
||||
return;
|
||||
}
|
||||
|
||||
/* Card fraud demo using incremental approach to computing fraud scores for
|
||||
* POS. New transactions are created at the maximum possible rate. Analytic
|
||||
* query is ran periodically. Transaction cleanup is run when the number of
|
||||
* transactions exceeds provided limit to prevent memory overflow. All other
|
||||
* queries are halted during transaction cleanup (synchronization by shared
|
||||
* mutex).
|
||||
*/
|
||||
if (FLAGS_scenario == "card_fraud_inc") {
|
||||
switch (role_) {
|
||||
case Role::ANALYTIC:
|
||||
AnalyticStep();
|
||||
break;
|
||||
case Role::WORKER:
|
||||
WorkerStep();
|
||||
break;
|
||||
case Role::CLEANUP:
|
||||
CleanupStep();
|
||||
break;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
LOG(FATAL) << "Should not get here: unknown scenario!";
|
||||
}
|
||||
};
|
||||
|
||||
nlohmann::json LoadConfig() {
|
||||
nlohmann::json config;
|
||||
if (FLAGS_config != "") {
|
||||
@ -265,6 +347,7 @@ int main(int argc, char **argv) {
|
||||
num_pos.store(NumNodesWithLabel(client, "Pos"));
|
||||
num_cards.store(NumNodesWithLabel(client, "Card"));
|
||||
num_transactions.store(NumNodesWithLabel(client, "Transaction"));
|
||||
max_tx_id.store(MaxIdForLabel(client, "Transaction"));
|
||||
|
||||
CreateIndex(client, "Pos", "id");
|
||||
CreateIndex(client, "Card", "id");
|
||||
@ -277,18 +360,22 @@ int main(int argc, char **argv) {
|
||||
auto config = LoadConfig();
|
||||
|
||||
std::vector<std::unique_ptr<TestClient>> clients;
|
||||
if (FLAGS_scenario == "strata_v1") {
|
||||
if (FLAGS_scenario == "card_fraud_inc") {
|
||||
CHECK(FLAGS_num_workers >= 2)
|
||||
<< "There should be at least 2 client workers (analytic and cleanup)";
|
||||
CHECK(num_pos == config["num_workers"].get<int>() *
|
||||
config["pos_per_worker"].get<int>())
|
||||
<< "Wrong number of POS per worker";
|
||||
CHECK(num_cards == config["num_workers"].get<int>() *
|
||||
config["cards_per_worker"].get<int>())
|
||||
<< "Wrong number of cards per worker";
|
||||
for (int i = 0; i < FLAGS_num_workers - 1; ++i) {
|
||||
for (int i = 0; i < FLAGS_num_workers - 2; ++i) {
|
||||
clients.emplace_back(std::make_unique<CardFraudClient>(i, config));
|
||||
}
|
||||
clients.emplace_back(std::make_unique<CardFraudClient>(
|
||||
FLAGS_num_workers - 1, config, Role::ANALYTIC));
|
||||
FLAGS_num_workers - 2, config, Role::ANALYTIC));
|
||||
clients.emplace_back(std::make_unique<CardFraudClient>(
|
||||
FLAGS_num_workers - 1, config, Role::CLEANUP));
|
||||
} else {
|
||||
for (int i = 0; i < FLAGS_num_workers; ++i) {
|
||||
clients.emplace_back(std::make_unique<CardFraudClient>(i, config));
|
||||
|
@ -68,6 +68,7 @@ std::pair<communication::bolt::QueryData, int> ExecuteNTimesTillSuccess(
|
||||
auto ret = client.Execute(query, params);
|
||||
return {ret, i};
|
||||
} catch (const utils::BasicException &e) {
|
||||
VLOG(0) << "Error: " << e.what();
|
||||
last_exception = e;
|
||||
utils::Timer t;
|
||||
std::chrono::microseconds to_sleep(rand_dist_(pseudo_rand_gen_));
|
||||
|
@ -67,7 +67,6 @@ class TestClient {
|
||||
std::tie(result, retries) =
|
||||
ExecuteNTimesTillSuccess(client_, query, params, MAX_RETRIES);
|
||||
} catch (const utils::BasicException &e) {
|
||||
LOG(WARNING) << e.what();
|
||||
serialization_errors.Bump(MAX_RETRIES);
|
||||
return std::experimental::nullopt;
|
||||
}
|
||||
@ -165,7 +164,12 @@ void RunMultithreadedTest(std::vector<std::unique_ptr<TestClient>> &clients) {
|
||||
.first;
|
||||
it->second = (it->second.ValueDouble() * old_count + stat.second) /
|
||||
(old_count + new_count);
|
||||
stats::LogStat(
|
||||
fmt::format("queries.{}.{}", query_stats.first, stat.first),
|
||||
(stat.second / new_count));
|
||||
}
|
||||
stats::LogStat(fmt::format("queries.{}.count", query_stats.first),
|
||||
new_count);
|
||||
}
|
||||
|
||||
out << "{\"num_executed_queries\": " << executed_queries.Value() << ", "
|
||||
|
194
tests/unit/rwlock.cpp
Normal file
194
tests/unit/rwlock.cpp
Normal file
@ -0,0 +1,194 @@
|
||||
#include <shared_mutex>
|
||||
#include <thread>
|
||||
|
||||
#include "glog/logging.h"
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
#include "threading/sync/rwlock.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
using threading::RWLock;
|
||||
using threading::RWLockPriority;
|
||||
|
||||
TEST(RWLock, MultipleReaders) {
|
||||
RWLock rwlock(RWLockPriority::READ);
|
||||
|
||||
std::vector<std::thread> threads;
|
||||
utils::Timer timer;
|
||||
for (int i = 0; i < 3; ++i) {
|
||||
threads.push_back(std::thread([&rwlock] {
|
||||
std::shared_lock<RWLock> lock(rwlock);
|
||||
std::this_thread::sleep_for(100ms);
|
||||
}));
|
||||
}
|
||||
|
||||
for (int i = 0; i < 3; ++i) {
|
||||
threads[i].join();
|
||||
}
|
||||
|
||||
EXPECT_LE(timer.Elapsed(), 150ms);
|
||||
EXPECT_GE(timer.Elapsed(), 90ms);
|
||||
}
|
||||
|
||||
TEST(RWLock, SingleWriter) {
|
||||
RWLock rwlock(RWLockPriority::READ);
|
||||
|
||||
std::vector<std::thread> threads;
|
||||
utils::Timer timer;
|
||||
for (int i = 0; i < 3; ++i) {
|
||||
threads.push_back(std::thread([&rwlock] {
|
||||
std::unique_lock<RWLock> lock(rwlock);
|
||||
std::this_thread::sleep_for(100ms);
|
||||
}));
|
||||
}
|
||||
|
||||
for (int i = 0; i < 3; ++i) {
|
||||
threads[i].join();
|
||||
}
|
||||
|
||||
EXPECT_LE(timer.Elapsed(), 350ms);
|
||||
EXPECT_GE(timer.Elapsed(), 290ms);
|
||||
}
|
||||
|
||||
/* Next two tests demonstrate that writers are always preferred when an unique
|
||||
* lock is released. */
|
||||
|
||||
TEST(RWLock, WritersPreferred_1) {
|
||||
RWLock rwlock(RWLockPriority::READ);
|
||||
rwlock.lock();
|
||||
bool first = true;
|
||||
|
||||
std::thread t1([&rwlock, &first] {
|
||||
std::shared_lock<RWLock> lock(rwlock);
|
||||
EXPECT_FALSE(first);
|
||||
});
|
||||
|
||||
std::thread t2([&rwlock, &first] {
|
||||
std::unique_lock<RWLock> lock(rwlock);
|
||||
EXPECT_TRUE(first);
|
||||
first = false;
|
||||
});
|
||||
|
||||
std::this_thread::sleep_for(100ms);
|
||||
rwlock.unlock();
|
||||
t1.join();
|
||||
t2.join();
|
||||
}
|
||||
|
||||
TEST(RWLock, WritersPreferred_2) {
|
||||
RWLock rwlock(RWLockPriority::WRITE);
|
||||
rwlock.lock();
|
||||
bool first = true;
|
||||
|
||||
std::thread t1([&rwlock, &first] {
|
||||
std::shared_lock<RWLock> lock(rwlock);
|
||||
EXPECT_FALSE(first);
|
||||
});
|
||||
|
||||
std::thread t2([&rwlock, &first] {
|
||||
std::unique_lock<RWLock> lock(rwlock);
|
||||
EXPECT_TRUE(first);
|
||||
first = false;
|
||||
});
|
||||
|
||||
std::this_thread::sleep_for(100ms);
|
||||
rwlock.unlock();
|
||||
t1.join();
|
||||
t2.join();
|
||||
}
|
||||
|
||||
TEST(RWLock, ReadPriority) {
|
||||
/*
|
||||
* - Main thread is holding a shared lock until T = 100ms.
|
||||
* - Thread 1 tries to acquire an unique lock at T = 30ms.
|
||||
* - Thread 2 successfuly acquires a shared lock at T = 60ms, even though
|
||||
* there's a writer waiting.
|
||||
*/
|
||||
RWLock rwlock(RWLockPriority::READ);
|
||||
rwlock.lock_shared();
|
||||
bool first = true;
|
||||
|
||||
std::thread t1([&rwlock, &first] {
|
||||
std::this_thread::sleep_for(30ms);
|
||||
std::unique_lock<RWLock> lock(rwlock);
|
||||
EXPECT_FALSE(first);
|
||||
});
|
||||
|
||||
std::thread t2([&rwlock, &first] {
|
||||
std::this_thread::sleep_for(60ms);
|
||||
std::shared_lock<RWLock> lock(rwlock);
|
||||
EXPECT_TRUE(first);
|
||||
first = false;
|
||||
});
|
||||
|
||||
std::this_thread::sleep_for(100ms);
|
||||
rwlock.unlock_shared();
|
||||
t1.join();
|
||||
t2.join();
|
||||
}
|
||||
|
||||
TEST(RWLock, WritePriority) {
|
||||
/*
|
||||
* - Main thread is holding a shared lock until T = 100ms.
|
||||
* - Thread 1 tries to acquire an unique lock at T = 30ms.
|
||||
* - Thread 2 tries to acquire a shared lock at T = 60ms, but it is not able
|
||||
* to because of write priority.
|
||||
*/
|
||||
RWLock rwlock(RWLockPriority::WRITE);
|
||||
rwlock.lock_shared();
|
||||
bool first = true;
|
||||
|
||||
std::thread t1([&rwlock, &first] {
|
||||
std::this_thread::sleep_for(30ms);
|
||||
std::unique_lock<RWLock> lock(rwlock);
|
||||
EXPECT_TRUE(first);
|
||||
first = false;
|
||||
});
|
||||
|
||||
std::thread t2([&rwlock, &first] {
|
||||
std::this_thread::sleep_for(60ms);
|
||||
std::shared_lock<RWLock> lock(rwlock);
|
||||
EXPECT_FALSE(first);
|
||||
});
|
||||
|
||||
std::this_thread::sleep_for(100ms);
|
||||
rwlock.unlock_shared();
|
||||
|
||||
t1.join();
|
||||
t2.join();
|
||||
}
|
||||
|
||||
TEST(RWLock, TryLock) {
|
||||
RWLock rwlock(RWLockPriority::WRITE);
|
||||
rwlock.lock();
|
||||
|
||||
std::thread t1([&rwlock] { EXPECT_FALSE(rwlock.try_lock()); });
|
||||
t1.join();
|
||||
|
||||
std::thread t2([&rwlock] { EXPECT_FALSE(rwlock.try_lock_shared()); });
|
||||
t2.join();
|
||||
|
||||
rwlock.unlock();
|
||||
|
||||
std::thread t3([&rwlock] {
|
||||
EXPECT_TRUE(rwlock.try_lock());
|
||||
rwlock.unlock();
|
||||
});
|
||||
t3.join();
|
||||
|
||||
std::thread t4([&rwlock] {
|
||||
EXPECT_TRUE(rwlock.try_lock_shared());
|
||||
rwlock.unlock_shared();
|
||||
});
|
||||
t4.join();
|
||||
|
||||
rwlock.lock_shared();
|
||||
|
||||
std::thread t5([&rwlock] {
|
||||
EXPECT_TRUE(rwlock.try_lock_shared());
|
||||
rwlock.unlock_shared();
|
||||
});
|
||||
t5.join();
|
||||
}
|
Loading…
Reference in New Issue
Block a user