Refactor snapshot generators to reduce code duplication
Reviewers: florijan, dgleich, mculinovic Reviewed By: dgleich Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1311
This commit is contained in:
parent
a39602093d
commit
b9c5af2568
@ -15,6 +15,10 @@ namespace fs = std::experimental::filesystem;
|
||||
|
||||
namespace durability {
|
||||
|
||||
// Snapshot layout is described in durability/version.hpp
|
||||
static_assert(durability::kVersion == 5,
|
||||
"Wrong snapshot version, please update!");
|
||||
|
||||
namespace {
|
||||
bool Encode(const fs::path &snapshot_file, database::GraphDb &db,
|
||||
database::GraphDbAccessor &dba) {
|
||||
@ -80,7 +84,7 @@ bool Encode(const fs::path &snapshot_file, database::GraphDb &db,
|
||||
return true;
|
||||
}
|
||||
|
||||
// Removes snaposhot files so that only `max_retained` latest ones are kept. If
|
||||
// Removes snapshot files so that only `max_retained` latest ones are kept. If
|
||||
// `max_retained == -1`, all the snapshots are retained.
|
||||
void RemoveOldSnapshots(const fs::path &snapshot_dir, int max_retained) {
|
||||
if (max_retained == -1) return;
|
||||
|
@ -7,6 +7,32 @@ namespace durability {
|
||||
|
||||
constexpr std::array<uint8_t, 4> kMagicNumber{{'M', 'G', 's', 'n'}};
|
||||
|
||||
// The current default version of snapshot and WAL enconding / decoding.
|
||||
// The current default version of snapshot and WAL encoding / decoding.
|
||||
constexpr int64_t kVersion{5};
|
||||
|
||||
// Snapshot format (version 5):
|
||||
// 1) Magic number + snapshot version
|
||||
// 2) Distributed worker ID
|
||||
//
|
||||
// The following two entries indicate the starting points for generating new
|
||||
// vertex/edge IDs in the DB. They are important when there are vertices/edges
|
||||
// that were moved to another worker (in distributed Memgraph).
|
||||
// 3) Vertex generator ID
|
||||
// 4) Edge generator ID
|
||||
//
|
||||
// 5) A list of label+property indices.
|
||||
//
|
||||
// The following two entries are required when recovering from snapshot combined
|
||||
// with WAL to determine record visibility.
|
||||
// 6) Transactional ID of the snapshooter
|
||||
// 7) Transactional snapshot of the snapshooter
|
||||
//
|
||||
// We must inline edges with nodes because some edges might be stored on other
|
||||
// worker (edges are always stored only on the worker of the edge source).
|
||||
// 8) Bolt encoded nodes + inlined edges (edge address, other endpoint address
|
||||
// and edge type)
|
||||
// 9) Bolt encoded edges
|
||||
//
|
||||
// 10) Snapshot summary (number of nodes, number of edges, hash)
|
||||
|
||||
} // namespace durability
|
||||
|
@ -1,20 +1,8 @@
|
||||
{
|
||||
"indexes" : ["Card.id", "Pos.id", "Transaction.fraud_reported"],
|
||||
"nodes" : [
|
||||
{
|
||||
"count_per_worker" : 10000,
|
||||
"label" : "Card"
|
||||
},
|
||||
{
|
||||
"count_per_worker" : 1000,
|
||||
"label" : "Pos"
|
||||
},
|
||||
{
|
||||
"count_per_worker" : 50000,
|
||||
"label" : "Transaction"
|
||||
}
|
||||
],
|
||||
"compromised_pos_probability" : 0.2,
|
||||
"fraud_reported_probability" : 0.1,
|
||||
"hop_probability" : 0.1
|
||||
"cards_per_worker" : 10000,
|
||||
"pos_per_worker" : 1000,
|
||||
"transactions_per_worker" : 50000,
|
||||
"compromised_pos_probability" : 0.2,
|
||||
"fraud_reported_probability" : 0.1,
|
||||
"hop_probability" : 0.1
|
||||
}
|
||||
|
@ -11,4 +11,4 @@ fi
|
||||
|
||||
NUM_MACHINES="$( cat card_fraud.py | grep -m1 "NUM_MACHINES" | tail -c 2 )"
|
||||
|
||||
../../../build_release/tests/manual/card_fraud_generate_snapshot --config config.json --num-workers $NUM_MACHINES --dir $output_dir
|
||||
../../../build_release/tests/manual/card_fraud_generate_snapshot --config config.json --num-workers $NUM_MACHINES --dir $output_dir
|
||||
|
@ -18,270 +18,83 @@
|
||||
#include "utils/string.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
DEFINE_string(num_workers, "1",
|
||||
"Number of distributed workers (including master)");
|
||||
#include "snapshot_generation/graph_state.hpp"
|
||||
#include "snapshot_generation/snapshot_writer.hpp"
|
||||
|
||||
DEFINE_int32(num_workers, 1,
|
||||
"Number of distributed workers (including master)");
|
||||
DEFINE_string(dir, "tmp",
|
||||
"Directory for storing workers durability directories.");
|
||||
DEFINE_string(config, "", "Path to config JSON file");
|
||||
|
||||
/**
|
||||
* Config file should defined as follows.
|
||||
*
|
||||
*{
|
||||
* "indexes" : ["Card.id", "Pos.id", "Transaction.fraud_reported"],
|
||||
* "nodes" : [
|
||||
* {
|
||||
* "count_per_worker" : 10,
|
||||
* "label" : "Card"
|
||||
* },
|
||||
* {
|
||||
* "count_per_worker" : 10,
|
||||
* "label" : "Pos"
|
||||
* },
|
||||
* {
|
||||
* "count_per_worker" : 20,
|
||||
* "label" : "Transaction"
|
||||
* }
|
||||
* ],
|
||||
* "compromised_pos_probability" : 0.2,
|
||||
* "fraud_reported_probability" : 0.1,
|
||||
* "hop_percentage" : 0.1
|
||||
*}
|
||||
* {
|
||||
* "cards_per_worker" : 10000,
|
||||
* "pos_per_worker" : 1000,
|
||||
* "transactions_per_worker" : 50000,
|
||||
* "compromised_pos_probability" : 0.2,
|
||||
* "fraud_reported_probability" : 0.1,
|
||||
* "hop_probability" : 0.1
|
||||
* }
|
||||
*/
|
||||
|
||||
namespace fs = std::experimental::filesystem;
|
||||
using namespace snapshot_generation;
|
||||
using nlohmann::json;
|
||||
|
||||
/// Helper class for tracking info about the generated graph.
|
||||
class GraphState {
|
||||
typedef std::unordered_map<std::string, std::vector<gid::Gid>> LabelNodes;
|
||||
json BuildGenericConfig(const json &config) {
|
||||
json indices = json::array(
|
||||
{"Card.id", "Pos.id", "Transaction.id", "Transaction.fraud_reported"});
|
||||
|
||||
public:
|
||||
GraphState(int num_workers)
|
||||
: worker_nodes(num_workers),
|
||||
compromised_pos(num_workers),
|
||||
compromised_card(num_workers),
|
||||
out_edges(num_workers),
|
||||
in_edges(num_workers) {}
|
||||
json cards;
|
||||
cards["labels"] = {"Card"};
|
||||
cards["count_per_worker"] = config["cards_per_worker"];
|
||||
|
||||
// Gets the ID of a random node on worker that has the given label.
|
||||
gid::Gid RandomNode(int worker_id, const std::string &label) {
|
||||
auto &label_nodes = worker_nodes[worker_id];
|
||||
auto found = label_nodes.find(label);
|
||||
CHECK(found != label_nodes.end()) << "Label not found";
|
||||
return found->second[rand_(gen_) * found->second.size()];
|
||||
}
|
||||
cards["properties"] = json::object();
|
||||
cards["properties"].push_back(
|
||||
{"id", {{"type", "counter"}, {"param", "Card.id"}}});
|
||||
cards["properties"].push_back(
|
||||
{"compromised", {{"type", "primitive"}, {"param", false}}});
|
||||
|
||||
bool IsCompromisedPos(int worker_id, gid::Gid pos_id) {
|
||||
std::unordered_set<gid::Gid> &compromised = compromised_pos[worker_id];
|
||||
return compromised.find(pos_id) != compromised.end();
|
||||
}
|
||||
json pos;
|
||||
pos["labels"] = {"Pos"};
|
||||
pos["count_per_worker"] = config["pos_per_worker"];
|
||||
|
||||
bool IsCompromisedCard(int worker_id, gid::Gid card_id) {
|
||||
std::unordered_set<gid::Gid> &compromised = compromised_card[worker_id];
|
||||
return compromised.find(card_id) != compromised.end();
|
||||
}
|
||||
pos["properties"] = json::object();
|
||||
pos["properties"].push_back(
|
||||
{"id", {{"type", "counter"}, {"param", "Pos.id"}}});
|
||||
pos["properties"].push_back(
|
||||
{"compromised",
|
||||
{{"type", "bernoulli"},
|
||||
{"param", config["compromised_pos_probability"]}}});
|
||||
pos["properties"].push_back(
|
||||
{"connected_frauds", {{"type", "primitive"}, {"param", 0}}});
|
||||
|
||||
struct Edge {
|
||||
enum class Type { USING, AT };
|
||||
gid::Gid gid;
|
||||
storage::VertexAddress from;
|
||||
storage::VertexAddress to;
|
||||
Type type;
|
||||
};
|
||||
json txs;
|
||||
txs["labels"] = {"Transaction"};
|
||||
txs["count_per_worker"] = config["transactions_per_worker"];
|
||||
|
||||
void AddEdge(gid::Gid edge_gid, storage::VertexAddress from,
|
||||
storage::VertexAddress to, Edge::Type type) {
|
||||
out_edges[from.worker_id()][from.gid()].emplace_back(edges.size());
|
||||
in_edges[to.worker_id()][to.gid()].emplace_back(edges.size());
|
||||
edges.emplace_back(Edge{edge_gid, from, to, type});
|
||||
}
|
||||
txs["properties"] = json::object();
|
||||
txs["properties"].push_back(
|
||||
{"id", {{"type", "counter"}, {"param", "Transaction.id"}}});
|
||||
txs["properties"].push_back(
|
||||
{"fraud_reported", {{"type", "primitive"}, {"param", false}}});
|
||||
|
||||
// Maps worker node labels to node bolt_ids.
|
||||
std::vector<LabelNodes> worker_nodes;
|
||||
json edges;
|
||||
edges = json::array();
|
||||
edges.push_back({{"kind", "unique"},
|
||||
{"from", "Transaction"},
|
||||
{"to", "Card"},
|
||||
{"type", "Using"},
|
||||
{"hop_probability", config["hop_probability"]}});
|
||||
edges.push_back({{"kind", "unique"},
|
||||
{"from", "Transaction"},
|
||||
{"to", "Pos"},
|
||||
{"type", "At"},
|
||||
{"hop_probability", config["hop_probability"]}});
|
||||
|
||||
// Compromised cards and pos.
|
||||
std::vector<std::unordered_set<gid::Gid>> compromised_pos;
|
||||
std::vector<std::unordered_set<gid::Gid>> compromised_card;
|
||||
|
||||
// In/Out Vertex Edges.
|
||||
std::vector<Edge> edges;
|
||||
std::vector<std::unordered_map<gid::Gid, std::vector<int>>> out_edges;
|
||||
std::vector<std::unordered_map<gid::Gid, std::vector<int>>> in_edges;
|
||||
|
||||
// Random generator
|
||||
std::mt19937 gen_{std::random_device{}()};
|
||||
std::uniform_real_distribution<> rand_{0.0, 1.0};
|
||||
};
|
||||
|
||||
// Utilities for writing to the snapshot file.
|
||||
// Snapshot file has the following contents in order:
|
||||
// 1) Magic number.
|
||||
// 2) Worker Id
|
||||
// 3) Internal Id of vertex generator
|
||||
// 4) Internal Id of edge generator
|
||||
// 5) Transaction ID of the snapshooter. When generated set to 0.
|
||||
// 6) Transactional snapshot of the snapshoter. When the snapshot is
|
||||
// generated it's an empty list.
|
||||
// 7) List of label+property index.
|
||||
// 8) All nodes, sequentially, but not encoded as a list.
|
||||
// 9) All relationships, sequentially, but not encoded as a list.
|
||||
// 10) Summary with node count, relationship count and hash digest.
|
||||
class Writer {
|
||||
using DecodedValue = communication::bolt::DecodedValue;
|
||||
const std::string kEdgeUsing = "Using";
|
||||
const std::string kEdgeAt = "At";
|
||||
|
||||
public:
|
||||
Writer(const std::string &path, int worker_id) : buffer_(path) {
|
||||
// 1) Magic number
|
||||
encoder_.WriteRAW(durability::kMagicNumber.data(),
|
||||
durability::kMagicNumber.size());
|
||||
encoder_.WriteTypedValue(durability::kVersion);
|
||||
|
||||
// 2) WorkerId - important for distributed storage
|
||||
worker_id_ = worker_id;
|
||||
encoder_.WriteInt(worker_id_);
|
||||
|
||||
// The following two entries indicate the starting points for generating new
|
||||
// Vertex/Edge IDs in the DB. They are important when there are
|
||||
// vertices/edges that were moved to another worker (in distributed
|
||||
// Memgraph), so be careful! In this case we don't have to worry
|
||||
// because we'll never move vertices/edges between workers.
|
||||
// 3) ID of the vertex generator.
|
||||
encoder_.WriteInt(0);
|
||||
// 4) ID of the edge generator.
|
||||
encoder_.WriteInt(0);
|
||||
|
||||
// 5) Transactional ID of the snapshooter.
|
||||
encoder_.WriteInt(0);
|
||||
// 6) Transactional Snapshot is an empty list of transaction IDs.
|
||||
encoder_.WriteList(std::vector<query::TypedValue>{});
|
||||
}
|
||||
|
||||
template <typename TValue>
|
||||
void WriteList(const std::vector<TValue> &list) {
|
||||
encoder_.WriteList(
|
||||
std::vector<query::TypedValue>(list.begin(), list.end()));
|
||||
}
|
||||
|
||||
void WriteNode(gid::Gid id, const std::string &label,
|
||||
std::unordered_map<std::string, DecodedValue> &properties,
|
||||
const std::vector<GraphState::Edge> &edges,
|
||||
const std::vector<int> &out_edges,
|
||||
const std::vector<int> &in_edges) {
|
||||
encoder_.WriteRAW(underlying_cast(communication::bolt::Marker::TinyStruct) +
|
||||
3);
|
||||
encoder_.WriteRAW(underlying_cast(communication::bolt::Signature::Node));
|
||||
encoder_.WriteInt(id);
|
||||
encoder_.WriteList(std::vector<query::TypedValue>{label});
|
||||
encoder_.WriteMap(properties);
|
||||
|
||||
encoder_.WriteInt(in_edges.size());
|
||||
for (auto edge_num : in_edges) {
|
||||
auto &edge = edges[edge_num];
|
||||
auto edge_addr = storage::EdgeAddress(edge.gid, edge.from.worker_id());
|
||||
encoder_.WriteInt(edge_addr.raw());
|
||||
encoder_.WriteInt(edge.from.raw());
|
||||
encoder_.WriteString(
|
||||
edge.type == GraphState::Edge::Type::USING ? kEdgeUsing : kEdgeAt);
|
||||
}
|
||||
|
||||
encoder_.WriteInt(out_edges.size());
|
||||
for (auto edge_num : out_edges) {
|
||||
auto &edge = edges[edge_num];
|
||||
auto edge_addr = storage::EdgeAddress(edge.gid, edge.from.worker_id());
|
||||
encoder_.WriteInt(edge_addr.raw());
|
||||
encoder_.WriteInt(edge.to.raw());
|
||||
encoder_.WriteString(
|
||||
edge.type == GraphState::Edge::Type::USING ? kEdgeUsing : kEdgeAt);
|
||||
}
|
||||
}
|
||||
|
||||
void WriteEdge(const GraphState::Edge &edge) {
|
||||
encoder_.WriteRAW(underlying_cast(communication::bolt::Marker::TinyStruct) +
|
||||
5);
|
||||
encoder_.WriteRAW(
|
||||
underlying_cast(communication::bolt::Signature::Relationship));
|
||||
encoder_.WriteInt(edge.gid);
|
||||
encoder_.WriteInt(edge.from.gid());
|
||||
encoder_.WriteInt(edge.to.gid());
|
||||
encoder_.WriteString(edge.type == GraphState::Edge::Type::USING ? kEdgeUsing
|
||||
: kEdgeAt);
|
||||
// Write edges to snapshot.
|
||||
std::unordered_map<std::string, communication::bolt::DecodedValue>
|
||||
empty_props{};
|
||||
encoder_.WriteMap(empty_props);
|
||||
}
|
||||
|
||||
void Close(uint64_t node_count, uint64_t edge_count, uint64_t hops_count) {
|
||||
// 10) Summary with node count, relationship count and hash digest.
|
||||
buffer_.WriteValue(node_count);
|
||||
buffer_.WriteValue(edge_count);
|
||||
buffer_.WriteValue(buffer_.hash());
|
||||
buffer_.Close();
|
||||
// Log summary
|
||||
LOG(INFO) << fmt::format("-- Summary for worker: {}", worker_id_);
|
||||
LOG(INFO) << fmt::format("---- Total nodes: {}", node_count);
|
||||
LOG(INFO) << fmt::format("---- Total edges: {}", edge_count);
|
||||
LOG(INFO) << fmt::format("---- Hop edges: {}", hops_count);
|
||||
}
|
||||
|
||||
private:
|
||||
HashedFileWriter buffer_;
|
||||
durability::SnapshotEncoder<HashedFileWriter> encoder_{buffer_};
|
||||
int worker_id_;
|
||||
};
|
||||
|
||||
// Helper class for property value generation.
|
||||
class ValueGenerator {
|
||||
using json = nlohmann::json;
|
||||
using DecodedValue = communication::bolt::DecodedValue;
|
||||
|
||||
public:
|
||||
std::unordered_map<std::string, DecodedValue> MakePosProperties(
|
||||
bool compromised, int worker_id) {
|
||||
std::unordered_map<std::string, DecodedValue> props;
|
||||
props.emplace("id", DecodedValue(Counter("Pos.id")));
|
||||
props.emplace("worker_id", DecodedValue(worker_id));
|
||||
props.emplace("compromised", DecodedValue(compromised));
|
||||
props.emplace("connected_frauds", DecodedValue(0));
|
||||
return props;
|
||||
}
|
||||
|
||||
std::unordered_map<std::string, DecodedValue> MakeTxProperties(
|
||||
bool fraud_reported, int worker_id, int id) {
|
||||
std::unordered_map<std::string, DecodedValue> props;
|
||||
props.emplace("id", DecodedValue(id));
|
||||
props.emplace("worker_id", DecodedValue(worker_id));
|
||||
props.emplace("fraud_reported", DecodedValue(fraud_reported));
|
||||
return props;
|
||||
}
|
||||
|
||||
std::unordered_map<std::string, DecodedValue> MakeCardProperties(
|
||||
bool compromised, int worker_id) {
|
||||
std::unordered_map<std::string, DecodedValue> props;
|
||||
props.emplace("id", DecodedValue(Counter("Card.id")));
|
||||
props.emplace("worker_id", DecodedValue(worker_id));
|
||||
props.emplace("compromised", DecodedValue(compromised));
|
||||
return props;
|
||||
}
|
||||
|
||||
int64_t Counter(const std::string &name) { return counters_[name]++; }
|
||||
|
||||
bool Bernoulli(double p) { return rand_(gen_) < p; }
|
||||
|
||||
private:
|
||||
std::mt19937 gen_{std::random_device{}()};
|
||||
std::uniform_real_distribution<> rand_{0.0, 1.0};
|
||||
std::unordered_map<std::string, int64_t> counters_;
|
||||
};
|
||||
|
||||
nlohmann::json GetWithDefault(const nlohmann::json &object,
|
||||
const std::string &key,
|
||||
const nlohmann::json &default_value) {
|
||||
const auto &found = object.find(key);
|
||||
if (found == object.end()) return default_value;
|
||||
return *found;
|
||||
return json::object(
|
||||
{{"indexes", indices}, {"nodes", {cards, pos, txs}}, {"edges", edges}});
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
@ -294,185 +107,44 @@ int main(int argc, char **argv) {
|
||||
config_file >> config;
|
||||
}
|
||||
|
||||
// Vector IDs.
|
||||
int num_workers = std::atoi(FLAGS_num_workers.c_str());
|
||||
std::vector<int> worker_ids(num_workers);
|
||||
std::iota(worker_ids.begin(), worker_ids.end(), 0);
|
||||
GraphState state =
|
||||
BuildFromConfig(FLAGS_num_workers, BuildGenericConfig(config));
|
||||
|
||||
// Generated node and edge counters.
|
||||
std::vector<int> nodes_count(num_workers, 0);
|
||||
std::vector<int> hops_count(num_workers, 0);
|
||||
// TODO(mtomic): this is not currently used in the demo, maybe remove?
|
||||
|
||||
GraphState state(num_workers);
|
||||
ValueGenerator value_generator;
|
||||
|
||||
const std::string kLabelTransaction = "Transaction";
|
||||
const std::string kLabelCard = "Card";
|
||||
const std::string kLabelPos = "Pos";
|
||||
const fs::path kSnapshotDir = "snapshots";
|
||||
|
||||
double compromised_pos_probability = config["compromised_pos_probability"];
|
||||
double fraud_reported_probability = config["fraud_reported_probability"];
|
||||
double hop_probability = config["hop_probability"];
|
||||
|
||||
LOG(INFO) << "Creating snapshots with config: ";
|
||||
LOG(INFO) << fmt::format("-- Compromised Pos probability: {}",
|
||||
compromised_pos_probability);
|
||||
LOG(INFO) << fmt::format("-- Hop probability : {}", hop_probability);
|
||||
|
||||
// Allocate ids for nodes and write them to state.
|
||||
LOG(INFO) << "Creating nodes...";
|
||||
for (auto worker_id : worker_ids) {
|
||||
gid::Generator node_generator{worker_id};
|
||||
|
||||
const auto &nodes_config = config["nodes"];
|
||||
CHECK(nodes_config.is_array() && nodes_config.size() > 0)
|
||||
<< "Generator config must have 'nodes' array with at least one element";
|
||||
for (const auto &node_config : config["nodes"]) {
|
||||
CHECK(node_config.is_object()) << "Node config must be a dict";
|
||||
const auto &label = node_config["label"];
|
||||
CHECK(label.is_string() && !label.empty())
|
||||
<< "Node must have label specified";
|
||||
for (int i = 0; i < node_config["count_per_worker"]; i++) {
|
||||
auto node_gid = node_generator.Next(std::experimental::nullopt);
|
||||
if (label == kLabelPos &&
|
||||
value_generator.Bernoulli(compromised_pos_probability)) {
|
||||
// Write compromised to state.
|
||||
state.compromised_pos[worker_id].insert(node_gid);
|
||||
}
|
||||
// Write node to state.
|
||||
state.worker_nodes[worker_id][label].emplace_back(node_gid);
|
||||
}
|
||||
// A card is compromised if it was used on a compromised POS.
|
||||
for (auto &tx_gid : state.NodesWithLabel("Transaction")) {
|
||||
auto &node = state.GetNode(tx_gid);
|
||||
auto &edge1 = state.GetEdge(node.out_edges[0]);
|
||||
auto &edge2 = state.GetEdge(node.out_edges[1]);
|
||||
if (edge1.type != "Using") {
|
||||
std::swap(edge1, edge2);
|
||||
}
|
||||
nodes_count[worker_id] = node_generator.LocalCount();
|
||||
}
|
||||
LOG(INFO) << "Creating nodes...DONE";
|
||||
DCHECK(edge1.type == "Using" && edge2.type == "At");
|
||||
|
||||
std::random_device random_device;
|
||||
std::mt19937 engine{random_device()};
|
||||
|
||||
// Create edges for each transaction.
|
||||
LOG(INFO) << "Creating edges...";
|
||||
for (auto &worker_id : worker_ids) {
|
||||
gid::Generator edge_generator{worker_id};
|
||||
|
||||
auto filter = [worker_id, num_workers](const int other_worker) {
|
||||
if (num_workers == 1) return true;
|
||||
return other_worker != worker_id;
|
||||
};
|
||||
std::vector<int> hop_workers;
|
||||
std::copy_if(worker_ids.begin(), worker_ids.end(),
|
||||
std::back_inserter(hop_workers), filter);
|
||||
std::uniform_int_distribution<int> dist(0, hop_workers.size() - 1);
|
||||
|
||||
// Create and write edges to state.
|
||||
// Write compromised cards to state.
|
||||
const auto &transactions = state.worker_nodes[worker_id][kLabelTransaction];
|
||||
for (auto &transaction_id : transactions) {
|
||||
int card_worker_id = worker_id;
|
||||
if (value_generator.Bernoulli(hop_probability)) {
|
||||
card_worker_id = hop_workers[dist(engine)];
|
||||
++hops_count[worker_id];
|
||||
}
|
||||
auto card_id = state.RandomNode(card_worker_id, kLabelCard);
|
||||
|
||||
int pos_worker_id = worker_id;
|
||||
if (value_generator.Bernoulli(hop_probability)) {
|
||||
pos_worker_id = hop_workers[dist(engine)];
|
||||
++hops_count[worker_id];
|
||||
}
|
||||
auto pos_id = state.RandomNode(pos_worker_id, kLabelPos);
|
||||
|
||||
auto edge_using_id = edge_generator.Next(std::experimental::nullopt);
|
||||
state.AddEdge(edge_using_id,
|
||||
storage::VertexAddress(transaction_id, worker_id),
|
||||
storage::VertexAddress(card_id, card_worker_id),
|
||||
GraphState::Edge::Type::USING);
|
||||
auto edge_at_id = edge_generator.Next(std::experimental::nullopt);
|
||||
state.AddEdge(edge_at_id,
|
||||
storage::VertexAddress(transaction_id, worker_id),
|
||||
storage::VertexAddress(pos_id, pos_worker_id),
|
||||
GraphState::Edge::Type::AT);
|
||||
|
||||
if (state.IsCompromisedPos(pos_worker_id, pos_id)) {
|
||||
state.compromised_card[card_worker_id].insert(card_id);
|
||||
}
|
||||
if (state.GetNode(edge2.to).props.at("compromised").Value<bool>()) {
|
||||
state.GetNode(edge1.to).props.at("compromised") = true;
|
||||
}
|
||||
}
|
||||
LOG(INFO) << "Creating edges...DONE";
|
||||
|
||||
// Write snapshot files.
|
||||
LOG(INFO) << "Writing snapshots...";
|
||||
for (int worker_id = 0; worker_id < num_workers; ++worker_id) {
|
||||
const fs::path durability_dir =
|
||||
FLAGS_dir / fs::path("worker_" + std::to_string(worker_id));
|
||||
if (!durability::EnsureDir(durability_dir / kSnapshotDir)) {
|
||||
LOG(ERROR) << "Unable to create durability directory!";
|
||||
exit(0);
|
||||
}
|
||||
const auto snapshot_file =
|
||||
durability::MakeSnapshotPath(durability_dir, worker_id);
|
||||
// Transaction is reported as fraudulent with some probability if a
|
||||
// compromised card was used.
|
||||
std::mt19937 gen{std::random_device{}()};
|
||||
std::uniform_real_distribution<double> dist(0, 1);
|
||||
|
||||
Writer writer(snapshot_file, worker_id);
|
||||
for (auto &tx_gid : state.NodesWithLabel("Transaction")) {
|
||||
auto &node = state.GetNode(tx_gid);
|
||||
auto &edge = state.GetEdge(node.out_edges[0]);
|
||||
DCHECK(edge.type == "Using");
|
||||
|
||||
// Write indexes to snapshot.
|
||||
std::vector<std::string> indexes;
|
||||
for (const auto &index : GetWithDefault(config, "indexes", {}))
|
||||
for (const auto &index_part : utils::Split(index, "."))
|
||||
indexes.push_back(index_part);
|
||||
writer.WriteList(indexes);
|
||||
|
||||
// Write Cards to snapshot.
|
||||
for (auto &card_id : state.worker_nodes[worker_id][kLabelCard]) {
|
||||
bool is_compromised = state.IsCompromisedCard(worker_id, card_id);
|
||||
auto props =
|
||||
value_generator.MakeCardProperties(is_compromised, worker_id);
|
||||
DCHECK(state.out_edges[worker_id][card_id].size() == 0);
|
||||
writer.WriteNode(card_id, kLabelCard, props, state.edges,
|
||||
state.out_edges[worker_id][card_id],
|
||||
state.in_edges[worker_id][card_id]);
|
||||
if (state.GetNode(edge.to).props.at("compromised").Value<bool>()) {
|
||||
node.props.at("fraud_reported") =
|
||||
node.props.at("fraud_reported").Value<bool>() ||
|
||||
(dist(gen) < config["fraud_reported_probability"]);
|
||||
}
|
||||
|
||||
// Write Pos to snapshot.
|
||||
for (auto &pos_id : state.worker_nodes[worker_id][kLabelPos]) {
|
||||
bool is_compromised = state.IsCompromisedPos(worker_id, pos_id);
|
||||
auto props = value_generator.MakePosProperties(is_compromised, worker_id);
|
||||
DCHECK(state.out_edges[worker_id][pos_id].size() == 0);
|
||||
writer.WriteNode(pos_id, kLabelPos, props, state.edges,
|
||||
state.out_edges[worker_id][pos_id],
|
||||
state.in_edges[worker_id][pos_id]);
|
||||
}
|
||||
// Write Transactions to snapshot.
|
||||
int transaction_id = 0;
|
||||
for (auto &tx_id : state.worker_nodes[worker_id][kLabelTransaction]) {
|
||||
const auto &edges = state.edges;
|
||||
const auto &out_edges = state.out_edges[worker_id][tx_id];
|
||||
DCHECK(out_edges.size() == 2);
|
||||
DCHECK(edges[out_edges[0]].type == GraphState::Edge::Type::USING);
|
||||
DCHECK(edges[out_edges[1]].type == GraphState::Edge::Type::AT);
|
||||
DCHECK(state.in_edges[worker_id][tx_id].size() == 0);
|
||||
bool fraud_reported = false;
|
||||
if (state.IsCompromisedCard(edges[out_edges[0]].to.worker_id(),
|
||||
edges[out_edges[0]].to.gid())) {
|
||||
fraud_reported = value_generator.Bernoulli(fraud_reported_probability);
|
||||
}
|
||||
auto props = value_generator.MakeTxProperties(
|
||||
fraud_reported, worker_id, transaction_id * num_workers + worker_id);
|
||||
writer.WriteNode(tx_id, kLabelTransaction, props, state.edges,
|
||||
state.out_edges[worker_id][tx_id],
|
||||
state.in_edges[worker_id][tx_id]);
|
||||
transaction_id++;
|
||||
}
|
||||
// Write edges to snapshot.
|
||||
int edge_count{0};
|
||||
for (auto &edge : state.edges) {
|
||||
if (edge.from.worker_id() == worker_id) {
|
||||
writer.WriteEdge(edge);
|
||||
++edge_count;
|
||||
}
|
||||
}
|
||||
writer.Close(nodes_count[worker_id], edge_count, hops_count[worker_id]);
|
||||
}
|
||||
LOG(INFO) << "Writing snapshots...DONE";
|
||||
|
||||
WriteToSnapshot(state, FLAGS_dir);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -18,9 +18,15 @@
|
||||
#include "utils/string.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
DEFINE_string(out, "", "Destination for the created snapshot file");
|
||||
#include "snapshot_generation/graph_state.hpp"
|
||||
#include "snapshot_generation/snapshot_writer.hpp"
|
||||
|
||||
DEFINE_int32(num_workers, 1, "number of workers");
|
||||
DEFINE_string(out, "tmp", "Destination for the created snapshot file");
|
||||
DEFINE_string(config, "", "Path to config JSON file");
|
||||
|
||||
using namespace snapshot_generation;
|
||||
|
||||
/**
|
||||
* This file contains the program for generating a snapshot based on a JSON
|
||||
* definition. The JSON config has the following form:
|
||||
@ -29,7 +35,7 @@ DEFINE_string(config, "", "Path to config JSON file");
|
||||
* "indexes" : ["Person.id", "Company.id"],
|
||||
* "nodes" : [
|
||||
* {
|
||||
* "count" : 10000,
|
||||
* "count_per_worker" : 10000,
|
||||
* "labels" : ["Person"],
|
||||
* "properties" : {
|
||||
* "id" : { "type" : "counter", "param": "Person.id" },
|
||||
@ -39,7 +45,7 @@ DEFINE_string(config, "", "Path to config JSON file");
|
||||
* }
|
||||
* },
|
||||
* {
|
||||
* "count" : 200,
|
||||
* "count_per_worker" : 200,
|
||||
* "labels" : ["Company"],
|
||||
* "properties" : {
|
||||
* "id" : { "type" : "counter", "param": "Company.id" },
|
||||
@ -52,228 +58,25 @@ DEFINE_string(config, "", "Path to config JSON file");
|
||||
* ],
|
||||
* "edges" : [
|
||||
* {
|
||||
* "count" : 5000,
|
||||
* "kind": "unique",
|
||||
* "from" : "Person",
|
||||
* "to" : "Company",
|
||||
* "type" : "WORKS_IN"
|
||||
* "type" : "WORKS_IN",
|
||||
* "hop_probability": 0.05
|
||||
* },
|
||||
* {
|
||||
* "kind": "random",
|
||||
* "count" : 20,
|
||||
* "from" : "Person",
|
||||
* "to" : "Company",
|
||||
* "type" : "LIKES"
|
||||
* "type" : "LIKES",
|
||||
* "hop_probability": 0.1
|
||||
* }
|
||||
* ]
|
||||
* }
|
||||
*/
|
||||
|
||||
// Utilities for writing to the snapshot file.
|
||||
class Writer {
|
||||
public:
|
||||
Writer(const std::string &path) : buffer_(path) {
|
||||
encoder_.WriteRAW(durability::kMagicNumber.data(),
|
||||
durability::kMagicNumber.size());
|
||||
encoder_.WriteTypedValue(durability::kVersion);
|
||||
|
||||
// ID of the vertex generator.
|
||||
encoder_.WriteInt(0);
|
||||
// ID of the edge generator.
|
||||
encoder_.WriteInt(0);
|
||||
// Transactional ID of the snapshooter.
|
||||
encoder_.WriteInt(0);
|
||||
// Transactional Snapshot is an empty list of transaction IDs.
|
||||
encoder_.WriteList(std::vector<query::TypedValue>{});
|
||||
}
|
||||
|
||||
template <typename TValue>
|
||||
void WriteList(const std::vector<TValue> &list) {
|
||||
encoder_.WriteList(
|
||||
std::vector<query::TypedValue>(list.begin(), list.end()));
|
||||
}
|
||||
|
||||
int64_t WriteNode(
|
||||
const std::vector<std::string> &labels,
|
||||
std::unordered_map<std::string, query::TypedValue> properties) {
|
||||
encoder_.WriteRAW(underlying_cast(communication::bolt::Marker::TinyStruct) +
|
||||
3);
|
||||
encoder_.WriteRAW(underlying_cast(communication::bolt::Signature::Node));
|
||||
auto id = node_generator_.Next(std::experimental::nullopt);
|
||||
encoder_.WriteInt(id);
|
||||
encoder_.WriteList(
|
||||
std::vector<query::TypedValue>{labels.begin(), labels.end()});
|
||||
encoder_.WriteMap(properties);
|
||||
return id;
|
||||
}
|
||||
|
||||
int64_t WriteEdge(
|
||||
const std::string &edge_type,
|
||||
const std::unordered_map<std::string, query::TypedValue> properties,
|
||||
int64_t bolt_id_from, int64_t bolt_id_to) {
|
||||
encoder_.WriteRAW(underlying_cast(communication::bolt::Marker::TinyStruct) +
|
||||
5);
|
||||
encoder_.WriteRAW(
|
||||
underlying_cast(communication::bolt::Signature::Relationship));
|
||||
auto id = edge_generator_.Next(std::experimental::nullopt);
|
||||
encoder_.WriteInt(id);
|
||||
encoder_.WriteInt(bolt_id_from);
|
||||
encoder_.WriteInt(bolt_id_to);
|
||||
encoder_.WriteString(edge_type);
|
||||
encoder_.WriteMap(properties);
|
||||
return id;
|
||||
}
|
||||
|
||||
void Close() {
|
||||
buffer_.WriteValue(node_generator_.LocalCount());
|
||||
buffer_.WriteValue(edge_generator_.LocalCount());
|
||||
buffer_.WriteValue(buffer_.hash());
|
||||
buffer_.Close();
|
||||
}
|
||||
|
||||
private:
|
||||
gid::Generator node_generator_{0};
|
||||
gid::Generator edge_generator_{0};
|
||||
HashedFileWriter buffer_;
|
||||
communication::bolt::BaseEncoder<HashedFileWriter> encoder_{buffer_};
|
||||
};
|
||||
|
||||
// Helper class for tracking info about the generated graph.
|
||||
class GraphState {
|
||||
public:
|
||||
// Tracks that the given node has the given label.
|
||||
void AddNode(const std::string &label, int64_t node_bolt_id) {
|
||||
auto found = label_nodes_.find(label);
|
||||
if (found == label_nodes_.end())
|
||||
label_nodes_.emplace(label, std::vector<int64_t>{node_bolt_id});
|
||||
else
|
||||
found->second.emplace_back(node_bolt_id);
|
||||
}
|
||||
|
||||
// Gets the ID of a random node that has the given label.
|
||||
int64_t RandomNode(const std::string &label) {
|
||||
auto found = label_nodes_.find(label);
|
||||
CHECK(found != label_nodes_.end()) << "Label not found";
|
||||
return found->second[rand_(gen_) * found->second.size()];
|
||||
}
|
||||
|
||||
private:
|
||||
// Maps labels to node bolt_ids
|
||||
std::unordered_map<std::string, std::vector<int64_t>> label_nodes_;
|
||||
|
||||
// Random generator
|
||||
std::mt19937 gen_{std::random_device{}()};
|
||||
std::uniform_real_distribution<> rand_{0.0, 1.0};
|
||||
};
|
||||
|
||||
// Helper class for property value generation.
|
||||
class ValueGenerator {
|
||||
using json = nlohmann::json;
|
||||
using TypedValue = query::TypedValue;
|
||||
|
||||
public:
|
||||
// Generates the whole property map based on the given config.
|
||||
std::unordered_map<std::string, query::TypedValue> MakeProperties(
|
||||
const json &config) {
|
||||
std::unordered_map<std::string, query::TypedValue> props;
|
||||
if (config.is_null()) return props;
|
||||
|
||||
CHECK(config.is_object()) << "Properties config must be a dict";
|
||||
for (auto it = config.begin(); it != config.end(); it++) {
|
||||
auto value = MakeValue(it.value());
|
||||
if (value) props.emplace(it.key(), *value);
|
||||
}
|
||||
return props;
|
||||
}
|
||||
|
||||
// Generates a single value based on the given config.
|
||||
std::experimental::optional<TypedValue> MakeValue(const json &config) {
|
||||
if (config.is_object()) {
|
||||
const std::string &type = config["type"];
|
||||
const auto ¶m = config["param"];
|
||||
if (type == "primitive")
|
||||
return Primitive(param);
|
||||
else if (type == "counter")
|
||||
return TypedValue(Counter(param));
|
||||
else if (type == "optional")
|
||||
return Optional(param);
|
||||
else if (type == "bernoulli")
|
||||
return TypedValue(Bernoulli(param));
|
||||
else if (type == "randint")
|
||||
return TypedValue(RandInt(param));
|
||||
else if (type == "randstring")
|
||||
return TypedValue(RandString(param));
|
||||
else
|
||||
LOG(FATAL) << "Unknown value type";
|
||||
} else
|
||||
return Primitive(config);
|
||||
}
|
||||
|
||||
TypedValue Primitive(const json &config) {
|
||||
if (config.is_string()) return config.get<std::string>();
|
||||
if (config.is_number_integer()) return config.get<int64_t>();
|
||||
if (config.is_number_float()) return config.get<double>();
|
||||
if (config.is_boolean()) return config.get<bool>();
|
||||
|
||||
LOG(FATAL) << "Unsupported primitive type";
|
||||
}
|
||||
|
||||
int64_t Counter(const std::string &name) {
|
||||
auto found = counters_.find(name);
|
||||
if (found == counters_.end()) {
|
||||
counters_.emplace(name, 1);
|
||||
return 0;
|
||||
} else {
|
||||
return found->second++;
|
||||
}
|
||||
}
|
||||
|
||||
int64_t RandInt(const json &range) {
|
||||
CHECK(range.is_array() && range.size() == 2)
|
||||
<< "RandInt value gen config must be a list with 2 elements";
|
||||
auto from = MakeValue(range[0])->ValueInt();
|
||||
auto to = MakeValue(range[1])->ValueInt();
|
||||
CHECK(from < to) << "RandInt lower range must be lesser then upper range";
|
||||
return (int64_t)(rand_(gen_) * (to - from)) + from;
|
||||
}
|
||||
|
||||
std::string RandString(const json &length) {
|
||||
static const char alphanum[] =
|
||||
"0123456789"
|
||||
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
|
||||
"abcdefghijklmnopqrstuvwxyz";
|
||||
|
||||
int length_int = MakeValue(length)->ValueInt();
|
||||
std::string r_val(length_int, 'a');
|
||||
for (int i = 0; i < length_int; ++i)
|
||||
r_val[i] = alphanum[(int64_t)(rand_(gen_) * (sizeof(alphanum) - 1))];
|
||||
|
||||
return r_val;
|
||||
}
|
||||
|
||||
bool Bernoulli(double p) { return rand_(gen_) < p; }
|
||||
|
||||
std::experimental::optional<TypedValue> Optional(const json &config) {
|
||||
CHECK(config.is_array() && config.size() == 2)
|
||||
<< "Optional value gen config must be a list with 2 elements";
|
||||
return Bernoulli(config[0]) ? MakeValue(config[1])
|
||||
: std::experimental::nullopt;
|
||||
}
|
||||
|
||||
private:
|
||||
std::mt19937 gen_{std::random_device{}()};
|
||||
std::uniform_real_distribution<> rand_{0.0, 1.0};
|
||||
std::unordered_map<std::string, int64_t> counters_;
|
||||
};
|
||||
|
||||
nlohmann::json GetWithDefault(const nlohmann::json &object,
|
||||
const std::string &key,
|
||||
const nlohmann::json &default_value) {
|
||||
const auto &found = object.find(key);
|
||||
if (found == object.end()) return default_value;
|
||||
return *found;
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
LOG(FATAL) << "Doesn't work with the newest format - waiting for refactor";
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
google::InitGoogleLogging(argv[0]);
|
||||
|
||||
@ -284,49 +87,8 @@ int main(int argc, char **argv) {
|
||||
config_file >> config;
|
||||
}
|
||||
|
||||
Writer writer(FLAGS_out);
|
||||
GraphState state;
|
||||
ValueGenerator value_generator;
|
||||
GraphState state = BuildFromConfig(FLAGS_num_workers, config);
|
||||
WriteToSnapshot(state, FLAGS_out);
|
||||
|
||||
std::vector<std::string> indexes;
|
||||
for (const auto &index : GetWithDefault(config, "indexes", {}))
|
||||
for (const auto &index_part : utils::Split(index, "."))
|
||||
indexes.push_back(index_part);
|
||||
writer.WriteList(indexes);
|
||||
|
||||
// Create nodes
|
||||
const auto &nodes_config = config["nodes"];
|
||||
CHECK(nodes_config.is_array() && nodes_config.size() > 0)
|
||||
<< "Generator config must have 'nodes' array with at least one element";
|
||||
for (const auto &node_config : config["nodes"]) {
|
||||
CHECK(node_config.is_object()) << "Node config must be a dict";
|
||||
|
||||
for (int i = 0; i < node_config["count"]; i++) {
|
||||
const auto &labels_config = node_config["labels"];
|
||||
CHECK(labels_config.is_array()) << "Must provide an array of node labels";
|
||||
CHECK(node_config.size() > 0)
|
||||
<< "Node labels array must contain at lest one element";
|
||||
auto node_bolt_id = writer.WriteNode(
|
||||
labels_config,
|
||||
value_generator.MakeProperties(node_config["properties"]));
|
||||
|
||||
for (const auto &label : labels_config)
|
||||
state.AddNode(label, node_bolt_id);
|
||||
}
|
||||
}
|
||||
|
||||
// Create edges
|
||||
for (const auto &edge_config : config["edges"]) {
|
||||
CHECK(edge_config.is_object()) << "Edge config must be a dict";
|
||||
const std::string &from = edge_config["from"];
|
||||
const std::string &to = edge_config["to"];
|
||||
for (int i = 0; i < edge_config["count"]; i++)
|
||||
writer.WriteEdge(edge_config["type"],
|
||||
value_generator.MakeProperties(
|
||||
GetWithDefault(edge_config, "properties", nullptr)),
|
||||
state.RandomNode(from), state.RandomNode(to));
|
||||
}
|
||||
|
||||
writer.Close();
|
||||
return 0;
|
||||
}
|
||||
|
250
tests/manual/snapshot_generation/graph_state.hpp
Normal file
250
tests/manual/snapshot_generation/graph_state.hpp
Normal file
@ -0,0 +1,250 @@
|
||||
#pragma once
|
||||
|
||||
#include <functional>
|
||||
#include <random>
|
||||
#include <unordered_map>
|
||||
|
||||
#include "cppitertools/itertools.hpp"
|
||||
#include "json/json.hpp"
|
||||
|
||||
#include "storage/gid.hpp"
|
||||
#include "storage/property_value.hpp"
|
||||
#include "utils/string.hpp"
|
||||
|
||||
#include "value_generator.hpp"
|
||||
|
||||
namespace snapshot_generation {
|
||||
|
||||
nlohmann::json GetWithDefault(const nlohmann::json &object,
|
||||
const std::string &key,
|
||||
const nlohmann::json &default_value) {
|
||||
const auto &found = object.find(key);
|
||||
if (found == object.end()) return default_value;
|
||||
return *found;
|
||||
}
|
||||
|
||||
struct Node {
|
||||
gid::Gid gid;
|
||||
std::vector<std::string> labels;
|
||||
std::unordered_map<std::string, PropertyValue> props;
|
||||
std::vector<gid::Gid> in_edges;
|
||||
std::vector<gid::Gid> out_edges;
|
||||
};
|
||||
|
||||
struct Edge {
|
||||
gid::Gid gid;
|
||||
gid::Gid from;
|
||||
gid::Gid to;
|
||||
std::string type;
|
||||
std::unordered_map<std::string, PropertyValue> props;
|
||||
};
|
||||
|
||||
/// Helper class for tracking info about the generated graph.
|
||||
class GraphState {
|
||||
public:
|
||||
explicit GraphState(int num_workers)
|
||||
: num_workers_(num_workers),
|
||||
worker_nodes_(num_workers),
|
||||
worker_edges_(num_workers) {
|
||||
for (int worker_id = 0; worker_id < num_workers; ++worker_id) {
|
||||
edge_generators_.emplace_back(
|
||||
std::make_unique<gid::Generator>(worker_id));
|
||||
node_generators_.emplace_back(
|
||||
std::make_unique<gid::Generator>(worker_id));
|
||||
}
|
||||
}
|
||||
|
||||
int NumWorkers() { return num_workers_; }
|
||||
|
||||
int64_t NumNodesOnWorker(int worker_id) {
|
||||
return node_generators_[worker_id]->LocalCount();
|
||||
}
|
||||
|
||||
int64_t NumEdgesOnWorker(int worker_id) {
|
||||
return edge_generators_[worker_id]->LocalCount();
|
||||
}
|
||||
|
||||
auto &NodesWithLabel(const std::string &label, int worker_id) {
|
||||
return worker_nodes_[worker_id][label];
|
||||
}
|
||||
|
||||
auto NodesWithLabel(const std::string &label) {
|
||||
return iter::chain.from_iterable(
|
||||
iter::imap([ this, label ](int worker_id) -> auto & {
|
||||
return NodesWithLabel(label, worker_id);
|
||||
},
|
||||
iter::range(num_workers_)));
|
||||
}
|
||||
|
||||
auto NodesOnWorker(int worker_id) {
|
||||
return iter::chain.from_iterable(
|
||||
iter::imap([](auto &p) -> auto & { return p.second; },
|
||||
worker_nodes_[worker_id]));
|
||||
}
|
||||
|
||||
auto EdgesOnWorker(int worker_id) {
|
||||
return iter::chain.from_iterable(
|
||||
iter::imap([](auto &p) -> auto & { return p.second; },
|
||||
worker_edges_[worker_id]));
|
||||
}
|
||||
|
||||
gid::Gid &RandomNode(const std::string &label, int worker_id) {
|
||||
CHECK(0 <= worker_id && worker_id < (int)worker_nodes_.size())
|
||||
<< "Worker ID should be between 0 and " << worker_nodes_.size() - 1;
|
||||
auto &label_nodes = worker_nodes_[worker_id];
|
||||
auto found = label_nodes.find(label);
|
||||
CHECK(found != label_nodes.end()) << "Label not found";
|
||||
return found->second[rand_(gen_) * found->second.size()];
|
||||
}
|
||||
|
||||
gid::Gid &RandomNode(const std::string &label) {
|
||||
return RandomNode(label, rand_(gen_) * worker_nodes_.size());
|
||||
}
|
||||
|
||||
gid::Gid &RandomNodeOnOtherWorker(const std::string &label, int worker_id) {
|
||||
int worker_id2 = rand_(gen_) * (worker_nodes_.size() - 1);
|
||||
if (worker_id2 >= worker_id) ++worker_id2;
|
||||
return RandomNode(label, worker_id2);
|
||||
}
|
||||
|
||||
gid::Gid CreateNode(
|
||||
int worker_id, const std::vector<std::string> &labels,
|
||||
const std::unordered_map<std::string, PropertyValue> &props) {
|
||||
auto node_gid =
|
||||
node_generators_[worker_id]->Next(std::experimental::nullopt);
|
||||
nodes_[node_gid] = {node_gid, labels, props, {}, {}};
|
||||
|
||||
for (const auto &label : labels) {
|
||||
worker_nodes_[worker_id][label].push_back(node_gid);
|
||||
}
|
||||
|
||||
return node_gid;
|
||||
}
|
||||
|
||||
gid::Gid CreateEdge(
|
||||
gid::Gid from, gid::Gid to, const std::string &type,
|
||||
const std::unordered_map<std::string, PropertyValue> &props) {
|
||||
int worker_id = gid::CreatorWorker(from);
|
||||
auto edge_gid =
|
||||
edge_generators_[worker_id]->Next(std::experimental::nullopt);
|
||||
nodes_[from].out_edges.emplace_back(edge_gid);
|
||||
nodes_[to].in_edges.emplace_back(edge_gid);
|
||||
edges_[edge_gid] = Edge{edge_gid, from, to, type, props};
|
||||
worker_edges_[worker_id][type].push_back(edge_gid);
|
||||
return edge_gid;
|
||||
}
|
||||
|
||||
auto &GetNode(gid::Gid gid) { return nodes_[gid]; }
|
||||
auto &GetEdge(gid::Gid gid) { return edges_[gid]; }
|
||||
auto &GetNodes() { return nodes_; }
|
||||
auto &GetEdges() { return edges_; }
|
||||
|
||||
void CreateIndex(std::string label, std::string property) {
|
||||
indices_.emplace_back(std::move(label));
|
||||
indices_.emplace_back(std::move(property));
|
||||
}
|
||||
|
||||
auto &Indices() { return indices_; }
|
||||
|
||||
private:
|
||||
typedef std::unordered_map<std::string, std::vector<gid::Gid>> LabelGid;
|
||||
|
||||
int num_workers_;
|
||||
|
||||
std::vector<std::string> indices_;
|
||||
|
||||
std::vector<LabelGid> worker_nodes_;
|
||||
std::vector<LabelGid> worker_edges_;
|
||||
|
||||
std::unordered_map<gid::Gid, Node> nodes_;
|
||||
std::unordered_map<gid::Gid, Edge> edges_;
|
||||
|
||||
std::vector<std::unique_ptr<gid::Generator>> edge_generators_;
|
||||
std::vector<std::unique_ptr<gid::Generator>> node_generators_;
|
||||
|
||||
std::mt19937 gen_{std::random_device{}()};
|
||||
std::uniform_real_distribution<> rand_{0.0, 1.0};
|
||||
};
|
||||
|
||||
int Worker(gid::Gid gid) { return gid::CreatorWorker(gid); }
|
||||
|
||||
GraphState BuildFromConfig(int num_workers, const nlohmann::json &config) {
|
||||
ValueGenerator value_generator;
|
||||
GraphState state(num_workers);
|
||||
|
||||
for (const auto &index : GetWithDefault(config, "indexes", {})) {
|
||||
auto index_parts = utils::Split(index, ".");
|
||||
CHECK(index_parts.size() == 2) << "Index format should be Label.Property";
|
||||
state.CreateIndex(index_parts[0], index_parts[1]);
|
||||
}
|
||||
|
||||
CHECK(config["nodes"].is_array() && config["nodes"].size() > 0)
|
||||
<< "Generator config must have 'nodes' array with at least one "
|
||||
"element";
|
||||
for (const auto &node_config : config["nodes"]) {
|
||||
CHECK(node_config.is_object()) << "Node config must be a dict";
|
||||
|
||||
const auto &labels = node_config["labels"];
|
||||
CHECK(labels.is_array()) << "Must provide an array of node labels";
|
||||
CHECK(node_config.size() > 0)
|
||||
<< "Node labels array must contain at least one element";
|
||||
|
||||
for (int i = 0; i < node_config["count_per_worker"]; ++i) {
|
||||
for (int worker_id = 0; worker_id < num_workers; ++worker_id) {
|
||||
const auto properties =
|
||||
value_generator.MakeProperties(node_config["properties"]);
|
||||
state.CreateNode(worker_id, labels, properties);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int num_hops = 0;
|
||||
auto get_edge_endpoint = [num_workers, &state, &num_hops, &value_generator](
|
||||
gid::Gid from, std::string label_to,
|
||||
double hop_probability) {
|
||||
if (num_workers > 1 && value_generator.Bernoulli(hop_probability)) {
|
||||
++num_hops;
|
||||
return state.RandomNodeOnOtherWorker(label_to, Worker(from));
|
||||
}
|
||||
return state.RandomNode(label_to, Worker(from));
|
||||
};
|
||||
|
||||
for (const auto &edge_config : config["edges"]) {
|
||||
CHECK(edge_config.is_object()) << "Edge config must be a dict";
|
||||
const std::string &label_from = edge_config["from"];
|
||||
const std::string &label_to = edge_config["to"];
|
||||
const std::string &type = edge_config["type"];
|
||||
const double hop_probability = edge_config["hop_probability"];
|
||||
|
||||
if (edge_config["kind"] == "random") {
|
||||
for (int i = 0; i < edge_config["count"]; i++) {
|
||||
gid::Gid from = state.RandomNode(label_from);
|
||||
gid::Gid to = get_edge_endpoint(from, label_to, hop_probability);
|
||||
|
||||
const auto &props = value_generator.MakeProperties(
|
||||
GetWithDefault(edge_config, "properties", nullptr));
|
||||
state.CreateEdge(from, to, type, props);
|
||||
}
|
||||
}
|
||||
|
||||
if (edge_config["kind"] == "unique") {
|
||||
for (const auto &from : state.NodesWithLabel(label_from)) {
|
||||
gid::Gid to = get_edge_endpoint(from, label_to, hop_probability);
|
||||
const auto &props = value_generator.MakeProperties(
|
||||
GetWithDefault(edge_config, "properties", nullptr));
|
||||
state.CreateEdge(from, to, type, props);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (int worker_id = 0; worker_id < num_workers; ++worker_id) {
|
||||
LOG(INFO) << "-- Summary for worker: " << worker_id;
|
||||
LOG(INFO) << "---- Total nodes: " << state.NumNodesOnWorker(worker_id);
|
||||
LOG(INFO) << "---- Total edges: " << state.NumEdgesOnWorker(worker_id);
|
||||
}
|
||||
LOG(INFO) << "-- Total number of hops: " << num_hops;
|
||||
|
||||
return state;
|
||||
}
|
||||
|
||||
} // namespace snapshot_generation
|
143
tests/manual/snapshot_generation/snapshot_writer.hpp
Normal file
143
tests/manual/snapshot_generation/snapshot_writer.hpp
Normal file
@ -0,0 +1,143 @@
|
||||
#pragma once
|
||||
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "communication/bolt/v1/encoder/base_encoder.hpp"
|
||||
#include "durability/paths.hpp"
|
||||
#include "durability/version.hpp"
|
||||
#include "query/typed_value.hpp"
|
||||
|
||||
#include "graph_state.hpp"
|
||||
|
||||
namespace snapshot_generation {
|
||||
|
||||
// Snapshot layout is described in durability/version.hpp
|
||||
static_assert(durability::kVersion == 5,
|
||||
"Wrong snapshot version, please update!");
|
||||
|
||||
class SnapshotWriter {
|
||||
public:
|
||||
SnapshotWriter(const std::string &path, int worker_id,
|
||||
uint64_t vertex_generator_local_count = 0,
|
||||
uint64_t edge_generator_local_count = 0)
|
||||
: worker_id_(worker_id), buffer_(path) {
|
||||
encoder_.WriteRAW(durability::kMagicNumber.data(),
|
||||
durability::kMagicNumber.size());
|
||||
encoder_.WriteTypedValue(durability::kVersion);
|
||||
encoder_.WriteInt(worker_id_);
|
||||
encoder_.WriteInt(vertex_generator_local_count);
|
||||
encoder_.WriteInt(edge_generator_local_count);
|
||||
encoder_.WriteInt(0);
|
||||
encoder_.WriteList(std::vector<query::TypedValue>{});
|
||||
}
|
||||
|
||||
// reference to `buffer_` gets broken when moving, so let's just forbid moving
|
||||
SnapshotWriter(SnapshotWriter &&rhs) = delete;
|
||||
SnapshotWriter &operator=(SnapshotWriter &&rhs) = delete;
|
||||
|
||||
template <typename TValue>
|
||||
void WriteList(const std::vector<TValue> &list) {
|
||||
encoder_.WriteList(
|
||||
std::vector<query::TypedValue>(list.begin(), list.end()));
|
||||
}
|
||||
|
||||
storage::VertexAddress DefaultVertexAddress(gid::Gid gid) {
|
||||
return storage::VertexAddress(gid, gid::CreatorWorker(gid));
|
||||
}
|
||||
|
||||
storage::EdgeAddress DefaultEdgeAddress(gid::Gid gid) {
|
||||
return storage::EdgeAddress(gid, gid::CreatorWorker(gid));
|
||||
}
|
||||
|
||||
void WriteInlineEdge(const Edge &edge, bool write_from) {
|
||||
encoder_.WriteInt(DefaultEdgeAddress(edge.gid).raw());
|
||||
encoder_.WriteInt(write_from ? DefaultVertexAddress(edge.from).raw()
|
||||
: DefaultVertexAddress(edge.to).raw());
|
||||
encoder_.WriteString(edge.type);
|
||||
}
|
||||
|
||||
void WriteNode(const Node &node,
|
||||
const std::unordered_map<gid::Gid, Edge> &edges) {
|
||||
encoder_.WriteRAW(underlying_cast(communication::bolt::Marker::TinyStruct) +
|
||||
3);
|
||||
encoder_.WriteRAW(underlying_cast(communication::bolt::Signature::Node));
|
||||
encoder_.WriteInt(node.gid);
|
||||
|
||||
WriteList(node.labels);
|
||||
encoder_.WriteMap(node.props);
|
||||
|
||||
encoder_.WriteInt(node.in_edges.size());
|
||||
for (const auto &edge_idx : node.in_edges) {
|
||||
WriteInlineEdge(edges.at(edge_idx), true);
|
||||
}
|
||||
|
||||
encoder_.WriteInt(node.out_edges.size());
|
||||
for (const auto &edge_idx : node.out_edges) {
|
||||
WriteInlineEdge(edges.at(edge_idx), false);
|
||||
}
|
||||
|
||||
++nodes_written_;
|
||||
}
|
||||
|
||||
void WriteEdge(const Edge &edge) {
|
||||
encoder_.WriteRAW(underlying_cast(communication::bolt::Marker::TinyStruct) +
|
||||
5);
|
||||
encoder_.WriteRAW(
|
||||
underlying_cast(communication::bolt::Signature::Relationship));
|
||||
encoder_.WriteInt(edge.gid);
|
||||
encoder_.WriteInt(edge.from);
|
||||
encoder_.WriteInt(edge.to);
|
||||
encoder_.WriteString(edge.type);
|
||||
encoder_.WriteMap(edge.props);
|
||||
|
||||
++edges_written_;
|
||||
}
|
||||
|
||||
void Close() {
|
||||
buffer_.WriteValue(nodes_written_);
|
||||
buffer_.WriteValue(edges_written_);
|
||||
buffer_.WriteValue(buffer_.hash());
|
||||
buffer_.Close();
|
||||
}
|
||||
|
||||
private:
|
||||
int worker_id_;
|
||||
int64_t nodes_written_{0};
|
||||
int64_t edges_written_{0};
|
||||
|
||||
HashedFileWriter buffer_;
|
||||
communication::bolt::BaseEncoder<HashedFileWriter> encoder_{buffer_};
|
||||
};
|
||||
|
||||
void WriteToSnapshot(GraphState &state, const std::string &path) {
|
||||
for (int worker_id = 0; worker_id < state.NumWorkers(); ++worker_id) {
|
||||
const std::experimental::filesystem::path durability_dir =
|
||||
path / std::experimental::filesystem::path("worker_" +
|
||||
std::to_string(worker_id));
|
||||
if (!durability::EnsureDir(durability_dir / "snapshots")) {
|
||||
LOG(ERROR) << "Unable to create durability directory!";
|
||||
exit(0);
|
||||
}
|
||||
const auto snapshot_file =
|
||||
durability::MakeSnapshotPath(durability_dir, worker_id);
|
||||
|
||||
SnapshotWriter writer(snapshot_file, worker_id,
|
||||
state.NumNodesOnWorker(worker_id),
|
||||
state.NumEdgesOnWorker(worker_id));
|
||||
|
||||
writer.WriteList(state.Indices());
|
||||
|
||||
for (const auto &node : state.NodesOnWorker(worker_id)) {
|
||||
writer.WriteNode(state.GetNode(node), state.GetEdges());
|
||||
}
|
||||
|
||||
for (const auto &edge : state.EdgesOnWorker(worker_id)) {
|
||||
writer.WriteEdge(state.GetEdge(edge));
|
||||
}
|
||||
|
||||
writer.Close();
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace snapshot_generation
|
107
tests/manual/snapshot_generation/value_generator.hpp
Normal file
107
tests/manual/snapshot_generation/value_generator.hpp
Normal file
@ -0,0 +1,107 @@
|
||||
#pragma once
|
||||
|
||||
#include <experimental/optional>
|
||||
#include <random>
|
||||
#include <unordered_map>
|
||||
|
||||
#include "glog/logging.h"
|
||||
#include "json/json.hpp"
|
||||
|
||||
#include "storage/property_value.hpp"
|
||||
|
||||
namespace snapshot_generation {
|
||||
|
||||
// Helper class for property value generation.
|
||||
class ValueGenerator {
|
||||
using json = nlohmann::json;
|
||||
|
||||
public:
|
||||
// Generates the whole property map based on the given config.
|
||||
std::unordered_map<std::string, PropertyValue> MakeProperties(
|
||||
const json &config) {
|
||||
std::unordered_map<std::string, PropertyValue> props;
|
||||
if (config.is_null()) return props;
|
||||
|
||||
CHECK(config.is_object()) << "Properties config must be a dict";
|
||||
for (auto it = config.begin(); it != config.end(); it++) {
|
||||
auto value = MakeValue(it.value());
|
||||
if (value) props.emplace(it.key(), *value);
|
||||
}
|
||||
return props;
|
||||
}
|
||||
|
||||
// Generates a single value based on the given config.
|
||||
std::experimental::optional<PropertyValue> MakeValue(const json &config) {
|
||||
if (config.is_object()) {
|
||||
const std::string &type = config["type"];
|
||||
const auto ¶m = config["param"];
|
||||
if (type == "primitive") return Primitive(param);
|
||||
if (type == "counter") return Counter(param);
|
||||
if (type == "optional") return Optional(param);
|
||||
if (type == "bernoulli") return Bernoulli(param);
|
||||
if (type == "randint") return RandInt(param);
|
||||
if (type == "randstring") return RandString(param);
|
||||
|
||||
LOG(FATAL) << "Unknown value type";
|
||||
} else {
|
||||
return Primitive(config);
|
||||
}
|
||||
}
|
||||
|
||||
// Returns whatever value is stored in the JSON
|
||||
PropertyValue Primitive(const json &config) {
|
||||
if (config.is_string()) return config.get<std::string>();
|
||||
if (config.is_number_integer()) return config.get<int64_t>();
|
||||
if (config.is_number_float()) return config.get<double>();
|
||||
if (config.is_boolean()) return config.get<bool>();
|
||||
|
||||
LOG(FATAL) << "Unsupported primitive type";
|
||||
}
|
||||
|
||||
// Increments the counter and returns its current value
|
||||
int64_t Counter(const std::string &name) { return counters_[name]++; }
|
||||
|
||||
// Generates a random integer in range specified by JSON config
|
||||
int64_t RandInt(const json &range) {
|
||||
CHECK(range.is_array() && range.size() == 2)
|
||||
<< "RandInt value gen config must be a list with 2 elements";
|
||||
auto from = MakeValue(range[0])->Value<int64_t>();
|
||||
auto to = MakeValue(range[1])->Value<int64_t>();
|
||||
CHECK(from < to) << "RandInt lower range must be lesser than upper range";
|
||||
return (int64_t)(rand_(gen_) * (to - from)) + from;
|
||||
}
|
||||
|
||||
// Generates a random alphanumeric string with length specified by JSON config
|
||||
std::string RandString(const json &length) {
|
||||
static const char alphanum[] =
|
||||
"0123456789"
|
||||
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
|
||||
"abcdefghijklmnopqrstuvwxyz";
|
||||
|
||||
int length_int = MakeValue(length)->Value<int64_t>();
|
||||
std::string r_val(length_int, 'a');
|
||||
for (int i = 0; i < length_int; ++i)
|
||||
r_val[i] = alphanum[(int64_t)(rand_(gen_) * strlen(alphanum))];
|
||||
|
||||
return r_val;
|
||||
}
|
||||
|
||||
// Returns true with given probability
|
||||
bool Bernoulli(double p) { return rand_(gen_) < p; }
|
||||
|
||||
// Returns a value specified by config with some probability, and nullopt
|
||||
// otherwise
|
||||
std::experimental::optional<PropertyValue> Optional(const json &config) {
|
||||
CHECK(config.is_array() && config.size() == 2)
|
||||
<< "Optional value gen config must be a list with 2 elements";
|
||||
return Bernoulli(config[0]) ? MakeValue(config[1])
|
||||
: std::experimental::nullopt;
|
||||
}
|
||||
|
||||
private:
|
||||
std::mt19937 gen_{std::random_device{}()};
|
||||
std::uniform_real_distribution<> rand_{0.0, 1.0};
|
||||
std::unordered_map<std::string, int64_t> counters_;
|
||||
};
|
||||
|
||||
} // namespace snapshot_generation
|
Loading…
Reference in New Issue
Block a user