Optimize card_fraud_generate_snapshot

Summary:
Reduce memory usage and execution time in card fraud snapshot
generation.

Reviewers: mculinovic

Reviewed By: mculinovic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1270
This commit is contained in:
florijan 2018-03-05 11:54:35 +01:00
parent 9c95d4c381
commit 23efdf27d0

View File

@ -1,4 +1,5 @@
#include <experimental/filesystem>
#include <functional>
#include <unordered_map>
#include <unordered_set>
#include <vector>
@ -52,110 +53,60 @@ namespace fs = std::experimental::filesystem;
/// Helper class for tracking info about the generated graph.
class GraphState {
typedef std::unordered_map<std::string, std::vector<int64_t>> LabelNodes;
/**
* WorkerNode.first = worker_id
* WorkerNode.second = node_gid
*/
typedef std::pair<int, int64_t> WorkerNode;
typedef std::unordered_map<std::string, std::vector<gid::Gid>> LabelNodes;
public:
GraphState(int num_workers)
: worker_nodes_(num_workers),
compromised_pos_(num_workers),
compromised_card_(num_workers),
out_edges_(num_workers),
in_edges_(num_workers) {}
void AddNode(int worker_id, const std::string &label, int64_t node_bolt_id) {
LabelNodes &label_nodes = worker_nodes_[worker_id];
label_nodes[label].emplace_back(node_bolt_id);
}
: worker_nodes(num_workers),
compromised_pos(num_workers),
compromised_card(num_workers),
out_edges(num_workers),
in_edges(num_workers) {}
// Gets the ID of a random node on worker that has the given label.
int64_t RandomNode(int worker_id, const std::string &label) {
auto &label_nodes = worker_nodes_[worker_id];
gid::Gid RandomNode(int worker_id, const std::string &label) {
auto &label_nodes = worker_nodes[worker_id];
auto found = label_nodes.find(label);
CHECK(found != label_nodes.end()) << "Label not found";
return found->second[rand_(gen_) * found->second.size()];
}
const std::vector<int64_t> &NodesWithLabel(int worker_id,
const std::string &label) {
return worker_nodes_[worker_id][label];
}
void AddCompromisedPos(int worker_id, int64_t pos_id) {
std::unordered_set<int64_t> &compromised = compromised_pos_[worker_id];
compromised.insert(pos_id);
}
bool IsCompromisedPos(int worker_id, int64_t pos_id) {
std::unordered_set<int64_t> &compromised = compromised_pos_[worker_id];
bool IsCompromisedPos(int worker_id, gid::Gid pos_id) {
std::unordered_set<gid::Gid> &compromised = compromised_pos[worker_id];
return compromised.find(pos_id) != compromised.end();
}
void AddCompromisedCard(int worker_id, int64_t card_id) {
std::unordered_set<int64_t> &compromised = compromised_card_[worker_id];
compromised.insert(card_id);
}
bool IsCompromisedCard(int worker_id, int64_t card_id) {
std::unordered_set<int64_t> &compromised = compromised_card_[worker_id];
bool IsCompromisedCard(int worker_id, gid::Gid card_id) {
std::unordered_set<gid::Gid> &compromised = compromised_card[worker_id];
return compromised.find(card_id) != compromised.end();
}
struct Edge {
enum class Type { USING, AT };
int64_t gid;
WorkerNode from;
WorkerNode to;
gid::Gid gid;
storage::VertexAddress from;
storage::VertexAddress to;
Type type;
};
void AddEdge(int64_t edge_gid, WorkerNode from, WorkerNode to,
Edge::Type type) {
Edge edge{edge_gid, from, to, type};
out_edges_[from.first][from.second].emplace_back(edge);
in_edges_[to.first][to.second].emplace_back(edge);
void AddEdge(gid::Gid edge_gid, storage::VertexAddress from,
storage::VertexAddress to, Edge::Type type) {
out_edges[from.worker_id()][from.gid()].emplace_back(edges.size());
in_edges[to.worker_id()][to.gid()].emplace_back(edges.size());
edges.emplace_back(Edge{edge_gid, from, to, type});
}
const std::vector<Edge> Edges(int worker_id) {
std::vector<Edge> edges;
const auto &worker_edges = out_edges_[worker_id];
auto size = std::accumulate(
std::begin(worker_edges), std::end(worker_edges), 0,
[](const int size,
const std::unordered_map<int64_t, std::vector<Edge>>::value_type
&edges) { return size + edges.second.size(); });
edges.reserve(size);
for (auto it = worker_edges.begin(); it != worker_edges.end(); ++it) {
edges.insert(edges.end(), it->second.begin(), it->second.end());
}
return edges;
}
const std::vector<Edge> &OutEdges(int worker_id, int64_t node_id) {
return out_edges_[worker_id][node_id];
}
const std::vector<Edge> &InEdges(int worker_id, int64_t node_id) {
return in_edges_[worker_id][node_id];
}
private:
// Maps worker node labels to node bolt_ids.
std::vector<LabelNodes> worker_nodes_;
std::vector<LabelNodes> worker_nodes;
// Compromised cards and pos.
std::vector<std::unordered_set<int64_t>> compromised_pos_;
std::vector<std::unordered_set<int64_t>> compromised_card_;
std::vector<std::unordered_set<gid::Gid>> compromised_pos;
std::vector<std::unordered_set<gid::Gid>> compromised_card;
// In/Out Vertex Edges.
std::vector<std::unordered_map<int64_t, std::vector<Edge>>> out_edges_;
std::vector<std::unordered_map<int64_t, std::vector<Edge>>> in_edges_;
std::vector<Edge> edges;
std::vector<std::unordered_map<gid::Gid, std::vector<int>>> out_edges;
std::vector<std::unordered_map<gid::Gid, std::vector<int>>> in_edges;
// Random generator
std::mt19937 gen_{std::random_device{}()};
@ -213,54 +164,53 @@ class Writer {
std::vector<query::TypedValue>(list.begin(), list.end()));
}
void WriteNode(int64_t id, const std::vector<std::string> &labels,
void WriteNode(gid::Gid id, const std::string &label,
std::unordered_map<std::string, DecodedValue> &properties,
const std::vector<GraphState::Edge> &out_edges,
const std::vector<GraphState::Edge> &in_edges) {
const std::vector<GraphState::Edge> &edges,
const std::vector<int> &out_edges,
const std::vector<int> &in_edges) {
encoder_.WriteRAW(underlying_cast(communication::bolt::Marker::TinyStruct) +
3);
encoder_.WriteRAW(underlying_cast(communication::bolt::Signature::Node));
encoder_.WriteInt(id);
encoder_.WriteList(
std::vector<query::TypedValue>{labels.begin(), labels.end()});
encoder_.WriteList(std::vector<query::TypedValue>{label});
encoder_.WriteMap(properties);
encoder_.WriteInt(in_edges.size());
for (auto &edge : in_edges) {
auto edge_addr = storage::EdgeAddress(edge.gid, edge.from.first);
auto vertex_addr =
storage::VertexAddress(edge.from.second, edge.from.first);
for (auto edge_num : in_edges) {
auto &edge = edges[edge_num];
auto edge_addr = storage::EdgeAddress(edge.gid, edge.from.worker_id());
encoder_.WriteInt(edge_addr.raw());
encoder_.WriteInt(vertex_addr.raw());
encoder_.WriteInt(edge.from.raw());
encoder_.WriteString(
edge.type == GraphState::Edge::Type::USING ? kEdgeUsing : kEdgeAt);
}
encoder_.WriteInt(out_edges.size());
for (auto &edge : out_edges) {
auto edge_addr = storage::EdgeAddress(edge.gid, edge.from.first);
auto vertex_addr = storage::VertexAddress(edge.to.second, edge.to.first);
for (auto edge_num : out_edges) {
auto &edge = edges[edge_num];
auto edge_addr = storage::EdgeAddress(edge.gid, edge.from.worker_id());
encoder_.WriteInt(edge_addr.raw());
encoder_.WriteInt(vertex_addr.raw());
encoder_.WriteInt(edge.to.raw());
encoder_.WriteString(
edge.type == GraphState::Edge::Type::USING ? kEdgeUsing : kEdgeAt);
}
}
void WriteEdge(
int64_t gid, GraphState::Edge::Type type,
const std::unordered_map<std::string, DecodedValue> &properties,
int64_t gid_from, int64_t gid_to) {
void WriteEdge(const GraphState::Edge &edge) {
encoder_.WriteRAW(underlying_cast(communication::bolt::Marker::TinyStruct) +
5);
encoder_.WriteRAW(
underlying_cast(communication::bolt::Signature::Relationship));
encoder_.WriteInt(gid);
encoder_.WriteInt(gid_from);
encoder_.WriteInt(gid_to);
encoder_.WriteString(type == GraphState::Edge::Type::USING ? kEdgeUsing
: kEdgeAt);
encoder_.WriteMap(properties);
encoder_.WriteInt(edge.gid);
encoder_.WriteInt(edge.from.gid());
encoder_.WriteInt(edge.to.gid());
encoder_.WriteString(edge.type == GraphState::Edge::Type::USING ? kEdgeUsing
: kEdgeAt);
// Write edges to snapshot.
std::unordered_map<std::string, communication::bolt::DecodedValue>
empty_props{};
encoder_.WriteMap(empty_props);
}
void Close(uint64_t node_count, uint64_t edge_count, uint64_t hops_count) {
@ -349,9 +299,8 @@ int main(int argc, char **argv) {
std::iota(worker_ids.begin(), worker_ids.end(), 0);
// Generated node and edge counters.
std::vector<uint64_t> nodes_count(num_workers, 0);
std::vector<uint64_t> edges_count(num_workers, 0);
std::vector<uint64_t> hops_count(num_workers, 0);
std::vector<int> nodes_count(num_workers, 0);
std::vector<int> hops_count(num_workers, 0);
GraphState state(num_workers);
ValueGenerator value_generator;
@ -388,10 +337,10 @@ int main(int argc, char **argv) {
if (label == kLabelPos &&
value_generator.Bernoulli(compromised_pos_probability)) {
// Write compromised to state.
state.AddCompromisedPos(worker_id, node_gid);
state.compromised_pos[worker_id].insert(node_gid);
}
// Write node to state.
state.AddNode(worker_id, label, node_gid);
state.worker_nodes[worker_id][label].emplace_back(node_gid);
}
}
nodes_count[worker_id] = node_generator.LocalCount();
@ -417,8 +366,7 @@ int main(int argc, char **argv) {
// Create and write edges to state.
// Write compromised cards to state.
const auto &transactions =
state.NodesWithLabel(worker_id, kLabelTransaction);
const auto &transactions = state.worker_nodes[worker_id][kLabelTransaction];
for (auto &transaction_id : transactions) {
int card_worker_id = worker_id;
if (value_generator.Bernoulli(hop_probability)) {
@ -435,19 +383,20 @@ int main(int argc, char **argv) {
auto pos_id = state.RandomNode(pos_worker_id, kLabelPos);
auto edge_using_id = edge_generator.Next(std::experimental::nullopt);
state.AddEdge(edge_using_id, std::make_pair(worker_id, transaction_id),
std::make_pair(card_worker_id, card_id),
state.AddEdge(edge_using_id,
storage::VertexAddress(transaction_id, worker_id),
storage::VertexAddress(card_id, card_worker_id),
GraphState::Edge::Type::USING);
auto edge_at_id = edge_generator.Next(std::experimental::nullopt);
state.AddEdge(edge_at_id, std::make_pair(worker_id, transaction_id),
std::make_pair(pos_worker_id, pos_id),
state.AddEdge(edge_at_id,
storage::VertexAddress(transaction_id, worker_id),
storage::VertexAddress(pos_id, pos_worker_id),
GraphState::Edge::Type::AT);
if (state.IsCompromisedPos(pos_worker_id, pos_id)) {
state.AddCompromisedCard(card_worker_id, card_id);
state.compromised_card[card_worker_id].insert(card_id);
}
}
edges_count[worker_id] = edge_generator.LocalCount();
}
LOG(INFO) << "Creating edges...DONE";
@ -473,57 +422,52 @@ int main(int argc, char **argv) {
writer.WriteList(indexes);
// Write Cards to snapshot.
const auto &cards = state.NodesWithLabel(worker_id, kLabelCard);
for (auto &card_id : cards) {
for (auto &card_id : state.worker_nodes[worker_id][kLabelCard]) {
bool is_compromised = state.IsCompromisedCard(worker_id, card_id);
auto props =
value_generator.MakeCardProperties(is_compromised, worker_id);
DCHECK(state.OutEdges(worker_id, card_id).size() == 0);
writer.WriteNode(card_id, std::vector<std::string>{kLabelCard}, props,
state.OutEdges(worker_id, card_id),
state.InEdges(worker_id, card_id));
DCHECK(state.out_edges[worker_id][card_id].size() == 0);
writer.WriteNode(card_id, kLabelCard, props, state.edges,
state.out_edges[worker_id][card_id],
state.in_edges[worker_id][card_id]);
}
// Write Pos to snapshot.
const auto &pos_ids = state.NodesWithLabel(worker_id, kLabelPos);
for (auto &pos_id : pos_ids) {
for (auto &pos_id : state.worker_nodes[worker_id][kLabelPos]) {
bool is_compromised = state.IsCompromisedPos(worker_id, pos_id);
auto props = value_generator.MakePosProperties(is_compromised, worker_id);
DCHECK(state.OutEdges(worker_id, pos_id).size() == 0);
writer.WriteNode(pos_id, std::vector<std::string>{kLabelPos}, props,
state.OutEdges(worker_id, pos_id),
state.InEdges(worker_id, pos_id));
DCHECK(state.out_edges[worker_id][pos_id].size() == 0);
writer.WriteNode(pos_id, kLabelPos, props, state.edges,
state.out_edges[worker_id][pos_id],
state.in_edges[worker_id][pos_id]);
}
// Write Transactions to snapshot.
const auto &transactions =
state.NodesWithLabel(worker_id, kLabelTransaction);
for (auto &tx_id : transactions) {
const auto &out_edges = state.OutEdges(worker_id, tx_id);
const auto &in_edges = state.InEdges(worker_id, tx_id);
for (auto &tx_id : state.worker_nodes[worker_id][kLabelTransaction]) {
const auto &edges = state.edges;
const auto &out_edges = state.out_edges[worker_id][tx_id];
DCHECK(out_edges.size() == 2);
DCHECK(out_edges[0].type == GraphState::Edge::Type::USING);
DCHECK(out_edges[1].type == GraphState::Edge::Type::AT);
DCHECK(in_edges.size() == 0);
DCHECK(edges[out_edges[0]].type == GraphState::Edge::Type::USING);
DCHECK(edges[out_edges[1]].type == GraphState::Edge::Type::AT);
DCHECK(state.in_edges[worker_id][tx_id].size() == 0);
bool fraud_reported = false;
if (state.IsCompromisedCard(out_edges[0].to.first,
out_edges[0].to.second)) {
if (state.IsCompromisedCard(edges[out_edges[0]].to.worker_id(),
edges[out_edges[0]].to.gid())) {
fraud_reported = value_generator.Bernoulli(fraud_reported_probability);
}
auto props = value_generator.MakeTxProperties(fraud_reported, worker_id);
writer.WriteNode(tx_id, std::vector<std::string>{kLabelTransaction},
props, state.OutEdges(worker_id, tx_id),
state.InEdges(worker_id, tx_id));
writer.WriteNode(tx_id, kLabelTransaction, props, state.edges,
state.out_edges[worker_id][tx_id],
state.in_edges[worker_id][tx_id]);
}
// Write edges to snapshot.
std::unordered_map<std::string, communication::bolt::DecodedValue>
empty_props;
const auto &edges = state.Edges(worker_id);
for (auto &edge : edges) {
writer.WriteEdge(edge.gid, edge.type, empty_props, edge.from.second,
edge.to.second);
int edge_count{0};
for (auto &edge : state.edges) {
if (edge.from.worker_id() == worker_id) {
writer.WriteEdge(edge);
++edge_count;
}
}
writer.Close(nodes_count[worker_id], edges_count[worker_id],
hops_count[worker_id]);
writer.Close(nodes_count[worker_id], edge_count, hops_count[worker_id]);
}
LOG(INFO) << "Writing snapshots...DONE";
return 0;