From 25dae811c9be9b4a82e1f79b94ddc3447eafe713 Mon Sep 17 00:00:00 2001
From: Marko Culinovic <marko.culinovic@memgraph.io>
Date: Thu, 1 Feb 2018 09:56:14 +0100
Subject: [PATCH] Generate distributed snapshot for card fraud demo

Reviewers: mtomic, buda, dgleich

Reviewed By: buda

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1146
---
 customers/strata2018/card_fraud_config.json   |  19 +
 tests/manual/card_fraud_generate_snapshot.cpp | 505 ++++++++++++++++++
 2 files changed, 524 insertions(+)
 create mode 100644 customers/strata2018/card_fraud_config.json
 create mode 100644 tests/manual/card_fraud_generate_snapshot.cpp

diff --git a/customers/strata2018/card_fraud_config.json b/customers/strata2018/card_fraud_config.json
new file mode 100644
index 000000000..d27b84b98
--- /dev/null
+++ b/customers/strata2018/card_fraud_config.json
@@ -0,0 +1,19 @@
+{
+  "indexes" : ["Card.id", "Pos.id", "Transaction.fraud_reported"],
+  "nodes" : [
+    {
+      "count_per_worker" : 10,
+      "label" : "Card"
+    },
+    {
+      "count_per_worker" : 10,
+      "label" : "Pos"
+    },
+    {
+      "count_per_worker" : 20,
+      "label" : "Transaction"
+    }
+  ],
+  "compromised_pos_probability" : 0.2,
+  "hop_probability" : 0.1
+}
diff --git a/tests/manual/card_fraud_generate_snapshot.cpp b/tests/manual/card_fraud_generate_snapshot.cpp
new file mode 100644
index 000000000..8e0c5e365
--- /dev/null
+++ b/tests/manual/card_fraud_generate_snapshot.cpp
@@ -0,0 +1,505 @@
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include <fmt/format.h>
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <json/json.hpp>
+
+#include "communication/bolt/v1/encoder/base_encoder.hpp"
+#include "durability/snapshot_encoder.hpp"
+#include "durability/version.hpp"
+#include "utils/string.hpp"
+#include "utils/timer.hpp"
+
+DEFINE_string(num_workers, "1",
+              "Number of distributed workers (including master)");
+DEFINE_string(prefix, "snapshot",
+              "Filename prefix for all generated snapshots");
+DEFINE_string(config, "", "Path to config JSON file");
+
+/**
+ * Config file should defined as follows.
+ *
+ *{
+ *  "indexes" : ["Card.id", "Pos.id", "Transaction.fraud_reported"],
+ *  "nodes" : [
+ *    {
+ *      "count_per_worker" : 10,
+ *      "label" : "Card"
+ *    },
+ *    {
+ *      "count_per_worker" : 10,
+ *      "label" : "Pos"
+ *    },
+ *    {
+ *      "count_per_worker" : 20,
+ *      "label" : "Transaction"
+ *    }
+ *  ],
+ *  "compromised_pos_probability" : 0.2,
+ *  "hop_percentage" : 0.1
+ *}
+ */
+
+/// Helper class for tracking info about the generated graph.
+class GraphState {
+  typedef std::unordered_map<std::string, std::vector<int64_t>> LabelNodes;
+  /**
+   * WorkerNode.first = worker_id
+   * WorkerNode.second = node_gid
+   */
+  typedef std::pair<int, int64_t> WorkerNode;
+
+ public:
+  GraphState(int num_workers)
+      : worker_nodes_(num_workers),
+        compromised_pos_(num_workers),
+        compromised_card_(num_workers),
+        out_edges_(num_workers),
+        in_edges_(num_workers) {}
+
+  void AddNode(int worker_id, const std::string &label, int64_t node_bolt_id) {
+    LabelNodes &label_nodes = worker_nodes_[worker_id];
+    label_nodes[label].emplace_back(node_bolt_id);
+  }
+
+  // Gets the ID of a random node on worker that has the given label.
+  int64_t RandomNode(int worker_id, const std::string &label) {
+    auto label_nodes = worker_nodes_[worker_id];
+    auto found = label_nodes.find(label);
+    CHECK(found != label_nodes.end()) << "Label not found";
+    return found->second[rand_(gen_) * found->second.size()];
+  }
+
+  const std::vector<int64_t> &NodesWithLabel(int worker_id,
+                                             const std::string &label) {
+    return worker_nodes_[worker_id][label];
+  }
+
+  void AddCompromisedPos(int worker_id, int pos_id) {
+    std::unordered_set<int64_t> &compromised = compromised_pos_[worker_id];
+    compromised.insert(pos_id);
+  }
+
+  bool IsCompromisedPos(int worker_id, int pos_id) {
+    std::unordered_set<int64_t> &compromised = compromised_pos_[worker_id];
+    return compromised.find(pos_id) != compromised.end();
+  }
+
+  void AddCompromisedCard(int worker_id, int card_id) {
+    std::unordered_set<int64_t> &compromised = compromised_card_[worker_id];
+    compromised.insert(card_id);
+  }
+
+  bool IsCompromisedCard(int worker_id, int card_id) {
+    std::unordered_set<int64_t> &compromised = compromised_card_[worker_id];
+    return compromised.find(card_id) != compromised.end();
+  }
+
+  struct Edge {
+    enum class Type { USING, AT };
+    int64_t gid;
+    WorkerNode from;
+    WorkerNode to;
+    Type type;
+  };
+
+  void AddEdge(int64_t edge_gid, WorkerNode from, WorkerNode to,
+               Edge::Type type) {
+    Edge edge{edge_gid, from, to, type};
+    out_edges_[from.first][from.second].emplace_back(edge);
+    in_edges_[to.first][to.second].emplace_back(edge);
+  }
+
+  const std::vector<Edge> Edges(int worker_id) {
+    std::vector<Edge> edges;
+    const auto &worker_edges = out_edges_[worker_id];
+
+    auto size = std::accumulate(
+        std::begin(worker_edges), std::end(worker_edges), 0,
+        [](const int size,
+           const std::unordered_map<int64_t, std::vector<Edge>>::value_type
+               &edges) { return size + edges.second.size(); });
+
+    edges.reserve(size);
+    for (auto it = worker_edges.begin(); it != worker_edges.end(); ++it) {
+      edges.insert(edges.end(), it->second.begin(), it->second.end());
+    }
+    return edges;
+  }
+
+  const std::vector<Edge> &OutEdges(int worker_id, int64_t node_id) {
+    return out_edges_[worker_id][node_id];
+  }
+
+  const std::vector<Edge> &InEdges(int worker_id, int64_t node_id) {
+    return in_edges_[worker_id][node_id];
+  }
+
+ private:
+  // Maps worker node labels to node bolt_ids.
+  std::vector<LabelNodes> worker_nodes_;
+
+  // Compromised cards and pos.
+  std::vector<std::unordered_set<int64_t>> compromised_pos_;
+  std::vector<std::unordered_set<int64_t>> compromised_card_;
+
+  // In/Out Vertex Edges.
+  std::vector<std::unordered_map<int64_t, std::vector<Edge>>> out_edges_;
+  std::vector<std::unordered_map<int64_t, std::vector<Edge>>> in_edges_;
+
+  // Random generator
+  std::mt19937 gen_{std::random_device{}()};
+  std::uniform_real_distribution<> rand_{0.0, 1.0};
+};
+
+// Utilities for writing to the snapshot file.
+// Snapshot file has the following contents in order:
+//   1) Magic number.
+//   2) Worker Id
+//   3) Internal Id of vertex generator
+//   4) Internal Id of edge generator
+//   5) Transaction ID of the snapshooter. When generated set to 0.
+//   6) Transactional snapshot of the snapshoter. When the snapshot is
+//   generated it's an empty list.
+//   7) List of label+property index.
+//   8) All nodes, sequentially, but not encoded as a list.
+//   9) All relationships, sequentially, but not encoded as a list.
+//   10) Summary with node count, relationship count and hash digest.
+class Writer {
+  using DecodedValue = communication::bolt::DecodedValue;
+  const std::string kEdgeUsing = "Using";
+  const std::string kEdgeAt = "At";
+
+ public:
+  Writer(const std::string &path, int worker_id) : buffer_(path) {
+    // 1) Magic number
+    encoder_.WriteRAW(durability::kMagicNumber.data(),
+                      durability::kMagicNumber.size());
+    encoder_.WriteTypedValue(durability::kVersion);
+
+    // 2) WorkerId - important for distributed storage
+    worker_id_ = worker_id;
+    encoder_.WriteInt(worker_id_);
+
+    // The following two entries indicate the starting points for generating new
+    // Vertex/Edge IDs in the DB. They are important when there are
+    // vertices/edges that were moved to another worker (in distributed
+    // Memgraph), so be careful! In this case we don't have to worry
+    // because we'll never move vertices/edges between workers.
+    // 3) ID of the vertex generator.
+    encoder_.WriteInt(0);
+    // 4) ID of the edge generator.
+    encoder_.WriteInt(0);
+
+    // 5) Transactional ID of the snapshooter.
+    encoder_.WriteInt(0);
+    // 6) Transactional Snapshot is an empty list of transaction IDs.
+    encoder_.WriteList(std::vector<query::TypedValue>{});
+  }
+
+  template <typename TValue>
+  void WriteList(const std::vector<TValue> &list) {
+    encoder_.WriteList(
+        std::vector<query::TypedValue>(list.begin(), list.end()));
+  }
+
+  void WriteNode(int64_t id, const std::vector<std::string> &labels,
+                 std::unordered_map<std::string, DecodedValue> properties,
+                 const std::vector<GraphState::Edge> &out_edges,
+                 const std::vector<GraphState::Edge> &in_edges) {
+    encoder_.WriteRAW(underlying_cast(communication::bolt::Marker::TinyStruct) +
+                      3);
+    encoder_.WriteRAW(underlying_cast(communication::bolt::Signature::Node));
+    encoder_.WriteInt(id);
+    encoder_.WriteList(
+        std::vector<query::TypedValue>{labels.begin(), labels.end()});
+    encoder_.WriteMap(properties);
+
+    encoder_.WriteInt(in_edges.size());
+    for (auto edge : in_edges) {
+      auto edge_addr = Edges::EdgeAddress(edge.gid, edge.from.first);
+      auto vertex_addr =
+          Edges::VertexAddress(edge.from.second, edge.from.first);
+      encoder_.WriteInt(edge_addr.raw());
+      encoder_.WriteInt(vertex_addr.raw());
+      encoder_.WriteString(
+          edge.type == GraphState::Edge::Type::USING ? kEdgeUsing : kEdgeAt);
+    }
+
+    encoder_.WriteInt(out_edges.size());
+    for (auto edge : out_edges) {
+      auto edge_addr = Edges::EdgeAddress(edge.gid, edge.from.first);
+      auto vertex_addr = Edges::VertexAddress(edge.to.second, edge.to.first);
+      encoder_.WriteInt(edge_addr.raw());
+      encoder_.WriteInt(vertex_addr.raw());
+      encoder_.WriteString(
+          edge.type == GraphState::Edge::Type::USING ? kEdgeUsing : kEdgeAt);
+    }
+  }
+
+  void WriteEdge(int64_t gid, GraphState::Edge::Type type,
+                 const std::unordered_map<std::string, DecodedValue> properties,
+                 int64_t gid_from, int64_t gid_to) {
+    encoder_.WriteRAW(underlying_cast(communication::bolt::Marker::TinyStruct) +
+                      5);
+    encoder_.WriteRAW(
+        underlying_cast(communication::bolt::Signature::Relationship));
+    encoder_.WriteInt(gid);
+    encoder_.WriteInt(gid_from);
+    encoder_.WriteInt(gid_to);
+    encoder_.WriteString(type == GraphState::Edge::Type::USING ? kEdgeUsing
+                                                               : kEdgeAt);
+    encoder_.WriteMap(properties);
+  }
+
+  void Close(uint64_t node_count, uint64_t edge_count) {
+    // 10) Summary with node count, relationship count and hash digest.
+    buffer_.WriteValue(node_count);
+    buffer_.WriteValue(edge_count);
+    buffer_.WriteValue(buffer_.hash());
+    buffer_.Close();
+    // Log summary
+    LOG(INFO) << fmt::format("-- Summary for worker: {}", worker_id_);
+    LOG(INFO) << fmt::format("---- Total nodes: {}", node_count);
+    LOG(INFO) << fmt::format("---- Total edges: {}", edge_count);
+  }
+
+ private:
+  HashedFileWriter buffer_;
+  durability::SnapshotEncoder<HashedFileWriter> encoder_{buffer_};
+  int worker_id_;
+};
+
+// Helper class for property value generation.
+class ValueGenerator {
+  using json = nlohmann::json;
+  using DecodedValue = communication::bolt::DecodedValue;
+
+ public:
+  std::unordered_map<std::string, DecodedValue> MakePosProperties(
+      bool compromised) {
+    std::unordered_map<std::string, DecodedValue> props;
+    props.emplace("id", DecodedValue(Counter("Pos.id")));
+    props.emplace("compromised", DecodedValue(compromised));
+    return props;
+  }
+
+  std::unordered_map<std::string, DecodedValue> MakeTxProperties(
+      bool fraud_reported) {
+    std::unordered_map<std::string, DecodedValue> props;
+    props.emplace("id", DecodedValue(Counter("Transaction.id")));
+    props.emplace("fraud_reported", DecodedValue(fraud_reported));
+    return props;
+  }
+
+  std::unordered_map<std::string, DecodedValue> MakeCardProperties(
+      bool compromised) {
+    std::unordered_map<std::string, DecodedValue> props;
+    props.emplace("id", DecodedValue(Counter("Card.id")));
+    props.emplace("compromised", DecodedValue(compromised));
+    return props;
+  }
+
+  int64_t Counter(const std::string &name) { return counters_[name]++; }
+
+  bool Bernoulli(double p) { return rand_(gen_) < p; }
+
+ private:
+  std::mt19937 gen_{std::random_device{}()};
+  std::uniform_real_distribution<> rand_{0.0, 1.0};
+  std::unordered_map<std::string, int64_t> counters_;
+};
+
+nlohmann::json GetWithDefault(const nlohmann::json &object,
+                              const std::string &key,
+                              const nlohmann::json &default_value) {
+  const auto &found = object.find(key);
+  if (found == object.end()) return default_value;
+  return *found;
+}
+
+int main(int argc, char **argv) {
+  google::ParseCommandLineFlags(&argc, &argv, true);
+  google::InitGoogleLogging(argv[0]);
+
+  nlohmann::json config;
+  {
+    std::ifstream config_file(FLAGS_config);
+    config_file >> config;
+  }
+
+  // Vector IDs.
+  int num_workers = std::atoi(FLAGS_num_workers.c_str());
+  std::vector<int> worker_ids(num_workers);
+  std::iota(worker_ids.begin(), worker_ids.end(), 0);
+
+  // Generated node and edge counters.
+  std::vector<uint64_t> nodes_count(num_workers, 0);
+  std::vector<uint64_t> edges_count(num_workers, 0);
+
+  GraphState state(num_workers);
+  ValueGenerator value_generator;
+
+  const std::string kLabelTransaction = "Transaction";
+  const std::string kLabelCard = "Card";
+  const std::string kLabelPos = "Pos";
+
+  double compromised_pos_probability = config["compromised_pos_probability"];
+  double hop_probability = config["hop_probability"];
+
+  LOG(INFO) << "Creating snapshots with config: ";
+  LOG(INFO) << fmt::format("-- Compromised Pos probability: {}",
+                           compromised_pos_probability);
+  LOG(INFO) << fmt::format("-- Hop probability : {}", hop_probability);
+
+  // Allocate ids for nodes and write them to state.
+  LOG(INFO) << "Creating nodes...";
+  for (auto worker_id : worker_ids) {
+    gid::Generator node_generator{worker_id};
+
+    const auto &nodes_config = config["nodes"];
+    CHECK(nodes_config.is_array() && nodes_config.size() > 0)
+        << "Generator config must have 'nodes' array with at least one element";
+    for (const auto &node_config : config["nodes"]) {
+      CHECK(node_config.is_object()) << "Node config must be a dict";
+      const auto &label = node_config["label"];
+      CHECK(label.is_string() && !label.empty())
+          << "Node must have label specified";
+      for (int i = 0; i < node_config["count_per_worker"]; i++) {
+        auto node_gid = node_generator.Next(std::experimental::nullopt);
+        if (label == kLabelPos &&
+            value_generator.Bernoulli(compromised_pos_probability)) {
+          // Write compromised to state.
+          state.AddCompromisedPos(worker_id, node_gid);
+        }
+        // Write node to state.
+        state.AddNode(worker_id, label, node_gid);
+      }
+    }
+    nodes_count[worker_id] = node_generator.LocalCount();
+  }
+  LOG(INFO) << "Creating nodes...DONE";
+
+  std::random_device random_device;
+  std::mt19937 engine{random_device()};
+
+  // Create edges for each transaction.
+  LOG(INFO) << "Creating edges...";
+  for (auto worker_id : worker_ids) {
+    gid::Generator edge_generator{worker_id};
+
+    auto filter = [worker_id, num_workers](const int other_worker) {
+      if (num_workers == 1) return true;
+      return other_worker != worker_id;
+    };
+    std::vector<int> hop_workers;
+    std::copy_if(worker_ids.begin(), worker_ids.end(),
+                 std::back_inserter(hop_workers), filter);
+    std::uniform_int_distribution<int> dist(0, hop_workers.size() - 1);
+
+    // Create and write edges to state.
+    // Write compromised cards to state.
+    const auto &transactions =
+        state.NodesWithLabel(worker_id, kLabelTransaction);
+    for (auto transaction_id : transactions) {
+      int card_worker_id = worker_id;
+      if (value_generator.Bernoulli(hop_probability)) {
+        card_worker_id = hop_workers[dist(engine)];
+      }
+      auto card_id = state.RandomNode(card_worker_id, kLabelCard);
+
+      int pos_worker_id = worker_id;
+      if (value_generator.Bernoulli(hop_probability)) {
+        pos_worker_id = hop_workers[dist(engine)];
+      }
+      auto pos_id = state.RandomNode(pos_worker_id, kLabelPos);
+
+      auto edge_using_id = edge_generator.Next(std::experimental::nullopt);
+      state.AddEdge(edge_using_id, std::make_pair(worker_id, transaction_id),
+                    std::make_pair(card_worker_id, card_id),
+                    GraphState::Edge::Type::USING);
+      auto edge_at_id = edge_generator.Next(std::experimental::nullopt);
+      state.AddEdge(edge_at_id, std::make_pair(worker_id, transaction_id),
+                    std::make_pair(pos_worker_id, pos_id),
+                    GraphState::Edge::Type::AT);
+
+      if (state.IsCompromisedPos(pos_worker_id, pos_id)) {
+        state.AddCompromisedCard(card_worker_id, card_id);
+      }
+    }
+    edges_count[worker_id] = edge_generator.LocalCount();
+  }
+  LOG(INFO) << "Creating edges...DONE";
+
+  // Write snapshot files.
+  LOG(INFO) << "Writing snapshots...";
+  for (int worker_id = 0; worker_id < num_workers; ++worker_id) {
+    std::string filename = FLAGS_prefix + std::to_string(worker_id);
+    Writer writer(filename, worker_id);
+
+    // Write indexes to snapshot.
+    std::vector<std::string> indexes;
+    for (const auto &index : GetWithDefault(config, "indexes", {}))
+      for (const auto &index_part : utils::Split(index, "."))
+        indexes.push_back(index_part);
+    writer.WriteList(indexes);
+
+    // Write Cards to snapshot.
+    const auto &cards = state.NodesWithLabel(worker_id, kLabelCard);
+    for (auto card_id : cards) {
+      bool is_compromised = state.IsCompromisedCard(worker_id, card_id);
+      auto props = value_generator.MakeCardProperties(is_compromised);
+      DCHECK(state.OutEdges(worker_id, card_id).size() == 0);
+      writer.WriteNode(card_id, std::vector<std::string>{kLabelCard}, props,
+                       state.OutEdges(worker_id, card_id),
+                       state.InEdges(worker_id, card_id));
+    }
+
+    // Write Pos to snapshot.
+    const auto &pos_ids = state.NodesWithLabel(worker_id, kLabelPos);
+    for (auto pos_id : pos_ids) {
+      bool is_compromised = state.IsCompromisedPos(worker_id, pos_id);
+      auto props = value_generator.MakePosProperties(is_compromised);
+      DCHECK(state.OutEdges(worker_id, pos_id).size() == 0);
+      writer.WriteNode(pos_id, std::vector<std::string>{kLabelPos}, props,
+                       state.OutEdges(worker_id, pos_id),
+                       state.InEdges(worker_id, pos_id));
+    }
+    // Write Transactions to snapshot.
+    const auto &transactions =
+        state.NodesWithLabel(worker_id, kLabelTransaction);
+    for (auto tx_id : transactions) {
+      const auto &out_edges = state.OutEdges(worker_id, tx_id);
+      const auto &in_edges = state.InEdges(worker_id, tx_id);
+      DCHECK(out_edges.size() == 2);
+      DCHECK(out_edges[0].type == GraphState::Edge::Type::USING);
+      DCHECK(out_edges[1].type == GraphState::Edge::Type::AT);
+      DCHECK(in_edges.size() == 0);
+      bool fraud_reported = false;
+      if (state.IsCompromisedCard(out_edges[1].to.first,
+                                  out_edges[1].to.second) &&
+          state.IsCompromisedPos(out_edges[0].to.first,
+                                 out_edges[0].to.second)) {
+        fraud_reported = value_generator.Bernoulli(0.1);
+      }
+      auto props = value_generator.MakeTxProperties(fraud_reported);
+      writer.WriteNode(tx_id, std::vector<std::string>{kLabelTransaction},
+                       props, state.OutEdges(worker_id, tx_id),
+                       state.InEdges(worker_id, tx_id));
+    }
+    // Write edges to snapshot.
+    std::unordered_map<std::string, communication::bolt::DecodedValue>
+        empty_props;
+    const auto &edges = state.Edges(worker_id);
+    for (auto edge : edges) {
+      writer.WriteEdge(edge.gid, edge.type, empty_props, edge.from.second,
+                       edge.to.second);
+    }
+    writer.Close(nodes_count[worker_id], edges_count[worker_id]);
+  }
+  LOG(INFO) << "Writing snapshots...DONE";
+  return 0;
+}