diff --git a/src/durability/snapshooter.cpp b/src/durability/snapshooter.cpp index f57493f87..f4ce972db 100644 --- a/src/durability/snapshooter.cpp +++ b/src/durability/snapshooter.cpp @@ -15,6 +15,10 @@ namespace fs = std::experimental::filesystem; namespace durability { +// Snapshot layout is described in durability/version.hpp +static_assert(durability::kVersion == 5, + "Wrong snapshot version, please update!"); + namespace { bool Encode(const fs::path &snapshot_file, database::GraphDb &db, database::GraphDbAccessor &dba) { @@ -80,7 +84,7 @@ bool Encode(const fs::path &snapshot_file, database::GraphDb &db, return true; } -// Removes snaposhot files so that only `max_retained` latest ones are kept. If +// Removes snapshot files so that only `max_retained` latest ones are kept. If // `max_retained == -1`, all the snapshots are retained. void RemoveOldSnapshots(const fs::path &snapshot_dir, int max_retained) { if (max_retained == -1) return; diff --git a/src/durability/version.hpp b/src/durability/version.hpp index 9e0ecddae..ec372abd1 100644 --- a/src/durability/version.hpp +++ b/src/durability/version.hpp @@ -7,6 +7,32 @@ namespace durability { constexpr std::array kMagicNumber{{'M', 'G', 's', 'n'}}; -// The current default version of snapshot and WAL enconding / decoding. +// The current default version of snapshot and WAL encoding / decoding. constexpr int64_t kVersion{5}; + +// Snapshot format (version 5): +// 1) Magic number + snapshot version +// 2) Distributed worker ID +// +// The following two entries indicate the starting points for generating new +// vertex/edge IDs in the DB. They are important when there are vertices/edges +// that were moved to another worker (in distributed Memgraph). +// 3) Vertex generator ID +// 4) Edge generator ID +// +// 5) A list of label+property indices. +// +// The following two entries are required when recovering from snapshot combined +// with WAL to determine record visibility. +// 6) Transactional ID of the snapshooter +// 7) Transactional snapshot of the snapshooter +// +// We must inline edges with nodes because some edges might be stored on other +// worker (edges are always stored only on the worker of the edge source). +// 8) Bolt encoded nodes + inlined edges (edge address, other endpoint address +// and edge type) +// 9) Bolt encoded edges +// +// 10) Snapshot summary (number of nodes, number of edges, hash) + } // namespace durability diff --git a/tests/distributed/card_fraud/config.json b/tests/distributed/card_fraud/config.json index 62a3b5ca2..c3047ec41 100644 --- a/tests/distributed/card_fraud/config.json +++ b/tests/distributed/card_fraud/config.json @@ -1,20 +1,8 @@ { - "indexes" : ["Card.id", "Pos.id", "Transaction.fraud_reported"], - "nodes" : [ - { - "count_per_worker" : 10000, - "label" : "Card" - }, - { - "count_per_worker" : 1000, - "label" : "Pos" - }, - { - "count_per_worker" : 50000, - "label" : "Transaction" - } - ], - "compromised_pos_probability" : 0.2, - "fraud_reported_probability" : 0.1, - "hop_probability" : 0.1 + "cards_per_worker" : 10000, + "pos_per_worker" : 1000, + "transactions_per_worker" : 50000, + "compromised_pos_probability" : 0.2, + "fraud_reported_probability" : 0.1, + "hop_probability" : 0.1 } diff --git a/tests/distributed/card_fraud/generate_dataset.sh b/tests/distributed/card_fraud/generate_dataset.sh index b218050c1..d1eab13d6 100755 --- a/tests/distributed/card_fraud/generate_dataset.sh +++ b/tests/distributed/card_fraud/generate_dataset.sh @@ -11,4 +11,4 @@ fi NUM_MACHINES="$( cat card_fraud.py | grep -m1 "NUM_MACHINES" | tail -c 2 )" -../../../build_release/tests/manual/card_fraud_generate_snapshot --config config.json --num-workers $NUM_MACHINES --dir $output_dir + ../../../build_release/tests/manual/card_fraud_generate_snapshot --config config.json --num-workers $NUM_MACHINES --dir $output_dir diff --git a/tests/manual/card_fraud_generate_snapshot.cpp b/tests/manual/card_fraud_generate_snapshot.cpp index 4ff1b9bf3..ff78b6fe5 100644 --- a/tests/manual/card_fraud_generate_snapshot.cpp +++ b/tests/manual/card_fraud_generate_snapshot.cpp @@ -18,270 +18,83 @@ #include "utils/string.hpp" #include "utils/timer.hpp" -DEFINE_string(num_workers, "1", - "Number of distributed workers (including master)"); +#include "snapshot_generation/graph_state.hpp" +#include "snapshot_generation/snapshot_writer.hpp" + +DEFINE_int32(num_workers, 1, + "Number of distributed workers (including master)"); DEFINE_string(dir, "tmp", "Directory for storing workers durability directories."); DEFINE_string(config, "", "Path to config JSON file"); /** * Config file should defined as follows. - * - *{ - * "indexes" : ["Card.id", "Pos.id", "Transaction.fraud_reported"], - * "nodes" : [ - * { - * "count_per_worker" : 10, - * "label" : "Card" - * }, - * { - * "count_per_worker" : 10, - * "label" : "Pos" - * }, - * { - * "count_per_worker" : 20, - * "label" : "Transaction" - * } - * ], - * "compromised_pos_probability" : 0.2, - * "fraud_reported_probability" : 0.1, - * "hop_percentage" : 0.1 - *} + * { + * "cards_per_worker" : 10000, + * "pos_per_worker" : 1000, + * "transactions_per_worker" : 50000, + * "compromised_pos_probability" : 0.2, + * "fraud_reported_probability" : 0.1, + * "hop_probability" : 0.1 + * } */ -namespace fs = std::experimental::filesystem; +using namespace snapshot_generation; +using nlohmann::json; -/// Helper class for tracking info about the generated graph. -class GraphState { - typedef std::unordered_map> LabelNodes; +json BuildGenericConfig(const json &config) { + json indices = json::array( + {"Card.id", "Pos.id", "Transaction.id", "Transaction.fraud_reported"}); - public: - GraphState(int num_workers) - : worker_nodes(num_workers), - compromised_pos(num_workers), - compromised_card(num_workers), - out_edges(num_workers), - in_edges(num_workers) {} + json cards; + cards["labels"] = {"Card"}; + cards["count_per_worker"] = config["cards_per_worker"]; - // Gets the ID of a random node on worker that has the given label. - gid::Gid RandomNode(int worker_id, const std::string &label) { - auto &label_nodes = worker_nodes[worker_id]; - auto found = label_nodes.find(label); - CHECK(found != label_nodes.end()) << "Label not found"; - return found->second[rand_(gen_) * found->second.size()]; - } + cards["properties"] = json::object(); + cards["properties"].push_back( + {"id", {{"type", "counter"}, {"param", "Card.id"}}}); + cards["properties"].push_back( + {"compromised", {{"type", "primitive"}, {"param", false}}}); - bool IsCompromisedPos(int worker_id, gid::Gid pos_id) { - std::unordered_set &compromised = compromised_pos[worker_id]; - return compromised.find(pos_id) != compromised.end(); - } + json pos; + pos["labels"] = {"Pos"}; + pos["count_per_worker"] = config["pos_per_worker"]; - bool IsCompromisedCard(int worker_id, gid::Gid card_id) { - std::unordered_set &compromised = compromised_card[worker_id]; - return compromised.find(card_id) != compromised.end(); - } + pos["properties"] = json::object(); + pos["properties"].push_back( + {"id", {{"type", "counter"}, {"param", "Pos.id"}}}); + pos["properties"].push_back( + {"compromised", + {{"type", "bernoulli"}, + {"param", config["compromised_pos_probability"]}}}); + pos["properties"].push_back( + {"connected_frauds", {{"type", "primitive"}, {"param", 0}}}); - struct Edge { - enum class Type { USING, AT }; - gid::Gid gid; - storage::VertexAddress from; - storage::VertexAddress to; - Type type; - }; + json txs; + txs["labels"] = {"Transaction"}; + txs["count_per_worker"] = config["transactions_per_worker"]; - void AddEdge(gid::Gid edge_gid, storage::VertexAddress from, - storage::VertexAddress to, Edge::Type type) { - out_edges[from.worker_id()][from.gid()].emplace_back(edges.size()); - in_edges[to.worker_id()][to.gid()].emplace_back(edges.size()); - edges.emplace_back(Edge{edge_gid, from, to, type}); - } + txs["properties"] = json::object(); + txs["properties"].push_back( + {"id", {{"type", "counter"}, {"param", "Transaction.id"}}}); + txs["properties"].push_back( + {"fraud_reported", {{"type", "primitive"}, {"param", false}}}); - // Maps worker node labels to node bolt_ids. - std::vector worker_nodes; + json edges; + edges = json::array(); + edges.push_back({{"kind", "unique"}, + {"from", "Transaction"}, + {"to", "Card"}, + {"type", "Using"}, + {"hop_probability", config["hop_probability"]}}); + edges.push_back({{"kind", "unique"}, + {"from", "Transaction"}, + {"to", "Pos"}, + {"type", "At"}, + {"hop_probability", config["hop_probability"]}}); - // Compromised cards and pos. - std::vector> compromised_pos; - std::vector> compromised_card; - - // In/Out Vertex Edges. - std::vector edges; - std::vector>> out_edges; - std::vector>> in_edges; - - // Random generator - std::mt19937 gen_{std::random_device{}()}; - std::uniform_real_distribution<> rand_{0.0, 1.0}; -}; - -// Utilities for writing to the snapshot file. -// Snapshot file has the following contents in order: -// 1) Magic number. -// 2) Worker Id -// 3) Internal Id of vertex generator -// 4) Internal Id of edge generator -// 5) Transaction ID of the snapshooter. When generated set to 0. -// 6) Transactional snapshot of the snapshoter. When the snapshot is -// generated it's an empty list. -// 7) List of label+property index. -// 8) All nodes, sequentially, but not encoded as a list. -// 9) All relationships, sequentially, but not encoded as a list. -// 10) Summary with node count, relationship count and hash digest. -class Writer { - using DecodedValue = communication::bolt::DecodedValue; - const std::string kEdgeUsing = "Using"; - const std::string kEdgeAt = "At"; - - public: - Writer(const std::string &path, int worker_id) : buffer_(path) { - // 1) Magic number - encoder_.WriteRAW(durability::kMagicNumber.data(), - durability::kMagicNumber.size()); - encoder_.WriteTypedValue(durability::kVersion); - - // 2) WorkerId - important for distributed storage - worker_id_ = worker_id; - encoder_.WriteInt(worker_id_); - - // The following two entries indicate the starting points for generating new - // Vertex/Edge IDs in the DB. They are important when there are - // vertices/edges that were moved to another worker (in distributed - // Memgraph), so be careful! In this case we don't have to worry - // because we'll never move vertices/edges between workers. - // 3) ID of the vertex generator. - encoder_.WriteInt(0); - // 4) ID of the edge generator. - encoder_.WriteInt(0); - - // 5) Transactional ID of the snapshooter. - encoder_.WriteInt(0); - // 6) Transactional Snapshot is an empty list of transaction IDs. - encoder_.WriteList(std::vector{}); - } - - template - void WriteList(const std::vector &list) { - encoder_.WriteList( - std::vector(list.begin(), list.end())); - } - - void WriteNode(gid::Gid id, const std::string &label, - std::unordered_map &properties, - const std::vector &edges, - const std::vector &out_edges, - const std::vector &in_edges) { - encoder_.WriteRAW(underlying_cast(communication::bolt::Marker::TinyStruct) + - 3); - encoder_.WriteRAW(underlying_cast(communication::bolt::Signature::Node)); - encoder_.WriteInt(id); - encoder_.WriteList(std::vector{label}); - encoder_.WriteMap(properties); - - encoder_.WriteInt(in_edges.size()); - for (auto edge_num : in_edges) { - auto &edge = edges[edge_num]; - auto edge_addr = storage::EdgeAddress(edge.gid, edge.from.worker_id()); - encoder_.WriteInt(edge_addr.raw()); - encoder_.WriteInt(edge.from.raw()); - encoder_.WriteString( - edge.type == GraphState::Edge::Type::USING ? kEdgeUsing : kEdgeAt); - } - - encoder_.WriteInt(out_edges.size()); - for (auto edge_num : out_edges) { - auto &edge = edges[edge_num]; - auto edge_addr = storage::EdgeAddress(edge.gid, edge.from.worker_id()); - encoder_.WriteInt(edge_addr.raw()); - encoder_.WriteInt(edge.to.raw()); - encoder_.WriteString( - edge.type == GraphState::Edge::Type::USING ? kEdgeUsing : kEdgeAt); - } - } - - void WriteEdge(const GraphState::Edge &edge) { - encoder_.WriteRAW(underlying_cast(communication::bolt::Marker::TinyStruct) + - 5); - encoder_.WriteRAW( - underlying_cast(communication::bolt::Signature::Relationship)); - encoder_.WriteInt(edge.gid); - encoder_.WriteInt(edge.from.gid()); - encoder_.WriteInt(edge.to.gid()); - encoder_.WriteString(edge.type == GraphState::Edge::Type::USING ? kEdgeUsing - : kEdgeAt); - // Write edges to snapshot. - std::unordered_map - empty_props{}; - encoder_.WriteMap(empty_props); - } - - void Close(uint64_t node_count, uint64_t edge_count, uint64_t hops_count) { - // 10) Summary with node count, relationship count and hash digest. - buffer_.WriteValue(node_count); - buffer_.WriteValue(edge_count); - buffer_.WriteValue(buffer_.hash()); - buffer_.Close(); - // Log summary - LOG(INFO) << fmt::format("-- Summary for worker: {}", worker_id_); - LOG(INFO) << fmt::format("---- Total nodes: {}", node_count); - LOG(INFO) << fmt::format("---- Total edges: {}", edge_count); - LOG(INFO) << fmt::format("---- Hop edges: {}", hops_count); - } - - private: - HashedFileWriter buffer_; - durability::SnapshotEncoder encoder_{buffer_}; - int worker_id_; -}; - -// Helper class for property value generation. -class ValueGenerator { - using json = nlohmann::json; - using DecodedValue = communication::bolt::DecodedValue; - - public: - std::unordered_map MakePosProperties( - bool compromised, int worker_id) { - std::unordered_map props; - props.emplace("id", DecodedValue(Counter("Pos.id"))); - props.emplace("worker_id", DecodedValue(worker_id)); - props.emplace("compromised", DecodedValue(compromised)); - props.emplace("connected_frauds", DecodedValue(0)); - return props; - } - - std::unordered_map MakeTxProperties( - bool fraud_reported, int worker_id, int id) { - std::unordered_map props; - props.emplace("id", DecodedValue(id)); - props.emplace("worker_id", DecodedValue(worker_id)); - props.emplace("fraud_reported", DecodedValue(fraud_reported)); - return props; - } - - std::unordered_map MakeCardProperties( - bool compromised, int worker_id) { - std::unordered_map props; - props.emplace("id", DecodedValue(Counter("Card.id"))); - props.emplace("worker_id", DecodedValue(worker_id)); - props.emplace("compromised", DecodedValue(compromised)); - return props; - } - - int64_t Counter(const std::string &name) { return counters_[name]++; } - - bool Bernoulli(double p) { return rand_(gen_) < p; } - - private: - std::mt19937 gen_{std::random_device{}()}; - std::uniform_real_distribution<> rand_{0.0, 1.0}; - std::unordered_map counters_; -}; - -nlohmann::json GetWithDefault(const nlohmann::json &object, - const std::string &key, - const nlohmann::json &default_value) { - const auto &found = object.find(key); - if (found == object.end()) return default_value; - return *found; + return json::object( + {{"indexes", indices}, {"nodes", {cards, pos, txs}}, {"edges", edges}}); } int main(int argc, char **argv) { @@ -294,185 +107,44 @@ int main(int argc, char **argv) { config_file >> config; } - // Vector IDs. - int num_workers = std::atoi(FLAGS_num_workers.c_str()); - std::vector worker_ids(num_workers); - std::iota(worker_ids.begin(), worker_ids.end(), 0); + GraphState state = + BuildFromConfig(FLAGS_num_workers, BuildGenericConfig(config)); - // Generated node and edge counters. - std::vector nodes_count(num_workers, 0); - std::vector hops_count(num_workers, 0); + // TODO(mtomic): this is not currently used in the demo, maybe remove? - GraphState state(num_workers); - ValueGenerator value_generator; - - const std::string kLabelTransaction = "Transaction"; - const std::string kLabelCard = "Card"; - const std::string kLabelPos = "Pos"; - const fs::path kSnapshotDir = "snapshots"; - - double compromised_pos_probability = config["compromised_pos_probability"]; - double fraud_reported_probability = config["fraud_reported_probability"]; - double hop_probability = config["hop_probability"]; - - LOG(INFO) << "Creating snapshots with config: "; - LOG(INFO) << fmt::format("-- Compromised Pos probability: {}", - compromised_pos_probability); - LOG(INFO) << fmt::format("-- Hop probability : {}", hop_probability); - - // Allocate ids for nodes and write them to state. - LOG(INFO) << "Creating nodes..."; - for (auto worker_id : worker_ids) { - gid::Generator node_generator{worker_id}; - - const auto &nodes_config = config["nodes"]; - CHECK(nodes_config.is_array() && nodes_config.size() > 0) - << "Generator config must have 'nodes' array with at least one element"; - for (const auto &node_config : config["nodes"]) { - CHECK(node_config.is_object()) << "Node config must be a dict"; - const auto &label = node_config["label"]; - CHECK(label.is_string() && !label.empty()) - << "Node must have label specified"; - for (int i = 0; i < node_config["count_per_worker"]; i++) { - auto node_gid = node_generator.Next(std::experimental::nullopt); - if (label == kLabelPos && - value_generator.Bernoulli(compromised_pos_probability)) { - // Write compromised to state. - state.compromised_pos[worker_id].insert(node_gid); - } - // Write node to state. - state.worker_nodes[worker_id][label].emplace_back(node_gid); - } + // A card is compromised if it was used on a compromised POS. + for (auto &tx_gid : state.NodesWithLabel("Transaction")) { + auto &node = state.GetNode(tx_gid); + auto &edge1 = state.GetEdge(node.out_edges[0]); + auto &edge2 = state.GetEdge(node.out_edges[1]); + if (edge1.type != "Using") { + std::swap(edge1, edge2); } - nodes_count[worker_id] = node_generator.LocalCount(); - } - LOG(INFO) << "Creating nodes...DONE"; + DCHECK(edge1.type == "Using" && edge2.type == "At"); - std::random_device random_device; - std::mt19937 engine{random_device()}; - - // Create edges for each transaction. - LOG(INFO) << "Creating edges..."; - for (auto &worker_id : worker_ids) { - gid::Generator edge_generator{worker_id}; - - auto filter = [worker_id, num_workers](const int other_worker) { - if (num_workers == 1) return true; - return other_worker != worker_id; - }; - std::vector hop_workers; - std::copy_if(worker_ids.begin(), worker_ids.end(), - std::back_inserter(hop_workers), filter); - std::uniform_int_distribution dist(0, hop_workers.size() - 1); - - // Create and write edges to state. - // Write compromised cards to state. - const auto &transactions = state.worker_nodes[worker_id][kLabelTransaction]; - for (auto &transaction_id : transactions) { - int card_worker_id = worker_id; - if (value_generator.Bernoulli(hop_probability)) { - card_worker_id = hop_workers[dist(engine)]; - ++hops_count[worker_id]; - } - auto card_id = state.RandomNode(card_worker_id, kLabelCard); - - int pos_worker_id = worker_id; - if (value_generator.Bernoulli(hop_probability)) { - pos_worker_id = hop_workers[dist(engine)]; - ++hops_count[worker_id]; - } - auto pos_id = state.RandomNode(pos_worker_id, kLabelPos); - - auto edge_using_id = edge_generator.Next(std::experimental::nullopt); - state.AddEdge(edge_using_id, - storage::VertexAddress(transaction_id, worker_id), - storage::VertexAddress(card_id, card_worker_id), - GraphState::Edge::Type::USING); - auto edge_at_id = edge_generator.Next(std::experimental::nullopt); - state.AddEdge(edge_at_id, - storage::VertexAddress(transaction_id, worker_id), - storage::VertexAddress(pos_id, pos_worker_id), - GraphState::Edge::Type::AT); - - if (state.IsCompromisedPos(pos_worker_id, pos_id)) { - state.compromised_card[card_worker_id].insert(card_id); - } + if (state.GetNode(edge2.to).props.at("compromised").Value()) { + state.GetNode(edge1.to).props.at("compromised") = true; } } - LOG(INFO) << "Creating edges...DONE"; - // Write snapshot files. - LOG(INFO) << "Writing snapshots..."; - for (int worker_id = 0; worker_id < num_workers; ++worker_id) { - const fs::path durability_dir = - FLAGS_dir / fs::path("worker_" + std::to_string(worker_id)); - if (!durability::EnsureDir(durability_dir / kSnapshotDir)) { - LOG(ERROR) << "Unable to create durability directory!"; - exit(0); - } - const auto snapshot_file = - durability::MakeSnapshotPath(durability_dir, worker_id); + // Transaction is reported as fraudulent with some probability if a + // compromised card was used. + std::mt19937 gen{std::random_device{}()}; + std::uniform_real_distribution dist(0, 1); - Writer writer(snapshot_file, worker_id); + for (auto &tx_gid : state.NodesWithLabel("Transaction")) { + auto &node = state.GetNode(tx_gid); + auto &edge = state.GetEdge(node.out_edges[0]); + DCHECK(edge.type == "Using"); - // Write indexes to snapshot. - std::vector indexes; - for (const auto &index : GetWithDefault(config, "indexes", {})) - for (const auto &index_part : utils::Split(index, ".")) - indexes.push_back(index_part); - writer.WriteList(indexes); - - // Write Cards to snapshot. - for (auto &card_id : state.worker_nodes[worker_id][kLabelCard]) { - bool is_compromised = state.IsCompromisedCard(worker_id, card_id); - auto props = - value_generator.MakeCardProperties(is_compromised, worker_id); - DCHECK(state.out_edges[worker_id][card_id].size() == 0); - writer.WriteNode(card_id, kLabelCard, props, state.edges, - state.out_edges[worker_id][card_id], - state.in_edges[worker_id][card_id]); + if (state.GetNode(edge.to).props.at("compromised").Value()) { + node.props.at("fraud_reported") = + node.props.at("fraud_reported").Value() || + (dist(gen) < config["fraud_reported_probability"]); } - - // Write Pos to snapshot. - for (auto &pos_id : state.worker_nodes[worker_id][kLabelPos]) { - bool is_compromised = state.IsCompromisedPos(worker_id, pos_id); - auto props = value_generator.MakePosProperties(is_compromised, worker_id); - DCHECK(state.out_edges[worker_id][pos_id].size() == 0); - writer.WriteNode(pos_id, kLabelPos, props, state.edges, - state.out_edges[worker_id][pos_id], - state.in_edges[worker_id][pos_id]); - } - // Write Transactions to snapshot. - int transaction_id = 0; - for (auto &tx_id : state.worker_nodes[worker_id][kLabelTransaction]) { - const auto &edges = state.edges; - const auto &out_edges = state.out_edges[worker_id][tx_id]; - DCHECK(out_edges.size() == 2); - DCHECK(edges[out_edges[0]].type == GraphState::Edge::Type::USING); - DCHECK(edges[out_edges[1]].type == GraphState::Edge::Type::AT); - DCHECK(state.in_edges[worker_id][tx_id].size() == 0); - bool fraud_reported = false; - if (state.IsCompromisedCard(edges[out_edges[0]].to.worker_id(), - edges[out_edges[0]].to.gid())) { - fraud_reported = value_generator.Bernoulli(fraud_reported_probability); - } - auto props = value_generator.MakeTxProperties( - fraud_reported, worker_id, transaction_id * num_workers + worker_id); - writer.WriteNode(tx_id, kLabelTransaction, props, state.edges, - state.out_edges[worker_id][tx_id], - state.in_edges[worker_id][tx_id]); - transaction_id++; - } - // Write edges to snapshot. - int edge_count{0}; - for (auto &edge : state.edges) { - if (edge.from.worker_id() == worker_id) { - writer.WriteEdge(edge); - ++edge_count; - } - } - writer.Close(nodes_count[worker_id], edge_count, hops_count[worker_id]); } - LOG(INFO) << "Writing snapshots...DONE"; + + WriteToSnapshot(state, FLAGS_dir); + return 0; } diff --git a/tests/manual/generate_snapshot.cpp b/tests/manual/generate_snapshot.cpp index c293eb7f3..a11782ae6 100644 --- a/tests/manual/generate_snapshot.cpp +++ b/tests/manual/generate_snapshot.cpp @@ -18,9 +18,15 @@ #include "utils/string.hpp" #include "utils/timer.hpp" -DEFINE_string(out, "", "Destination for the created snapshot file"); +#include "snapshot_generation/graph_state.hpp" +#include "snapshot_generation/snapshot_writer.hpp" + +DEFINE_int32(num_workers, 1, "number of workers"); +DEFINE_string(out, "tmp", "Destination for the created snapshot file"); DEFINE_string(config, "", "Path to config JSON file"); +using namespace snapshot_generation; + /** * This file contains the program for generating a snapshot based on a JSON * definition. The JSON config has the following form: @@ -29,7 +35,7 @@ DEFINE_string(config, "", "Path to config JSON file"); * "indexes" : ["Person.id", "Company.id"], * "nodes" : [ * { - * "count" : 10000, + * "count_per_worker" : 10000, * "labels" : ["Person"], * "properties" : { * "id" : { "type" : "counter", "param": "Person.id" }, @@ -39,7 +45,7 @@ DEFINE_string(config, "", "Path to config JSON file"); * } * }, * { - * "count" : 200, + * "count_per_worker" : 200, * "labels" : ["Company"], * "properties" : { * "id" : { "type" : "counter", "param": "Company.id" }, @@ -52,228 +58,25 @@ DEFINE_string(config, "", "Path to config JSON file"); * ], * "edges" : [ * { - * "count" : 5000, + * "kind": "unique", * "from" : "Person", * "to" : "Company", - * "type" : "WORKS_IN" + * "type" : "WORKS_IN", + * "hop_probability": 0.05 * }, * { + * "kind": "random", * "count" : 20, * "from" : "Person", * "to" : "Company", - * "type" : "LIKES" + * "type" : "LIKES", + * "hop_probability": 0.1 * } * ] * } */ -// Utilities for writing to the snapshot file. -class Writer { - public: - Writer(const std::string &path) : buffer_(path) { - encoder_.WriteRAW(durability::kMagicNumber.data(), - durability::kMagicNumber.size()); - encoder_.WriteTypedValue(durability::kVersion); - - // ID of the vertex generator. - encoder_.WriteInt(0); - // ID of the edge generator. - encoder_.WriteInt(0); - // Transactional ID of the snapshooter. - encoder_.WriteInt(0); - // Transactional Snapshot is an empty list of transaction IDs. - encoder_.WriteList(std::vector{}); - } - - template - void WriteList(const std::vector &list) { - encoder_.WriteList( - std::vector(list.begin(), list.end())); - } - - int64_t WriteNode( - const std::vector &labels, - std::unordered_map properties) { - encoder_.WriteRAW(underlying_cast(communication::bolt::Marker::TinyStruct) + - 3); - encoder_.WriteRAW(underlying_cast(communication::bolt::Signature::Node)); - auto id = node_generator_.Next(std::experimental::nullopt); - encoder_.WriteInt(id); - encoder_.WriteList( - std::vector{labels.begin(), labels.end()}); - encoder_.WriteMap(properties); - return id; - } - - int64_t WriteEdge( - const std::string &edge_type, - const std::unordered_map properties, - int64_t bolt_id_from, int64_t bolt_id_to) { - encoder_.WriteRAW(underlying_cast(communication::bolt::Marker::TinyStruct) + - 5); - encoder_.WriteRAW( - underlying_cast(communication::bolt::Signature::Relationship)); - auto id = edge_generator_.Next(std::experimental::nullopt); - encoder_.WriteInt(id); - encoder_.WriteInt(bolt_id_from); - encoder_.WriteInt(bolt_id_to); - encoder_.WriteString(edge_type); - encoder_.WriteMap(properties); - return id; - } - - void Close() { - buffer_.WriteValue(node_generator_.LocalCount()); - buffer_.WriteValue(edge_generator_.LocalCount()); - buffer_.WriteValue(buffer_.hash()); - buffer_.Close(); - } - - private: - gid::Generator node_generator_{0}; - gid::Generator edge_generator_{0}; - HashedFileWriter buffer_; - communication::bolt::BaseEncoder encoder_{buffer_}; -}; - -// Helper class for tracking info about the generated graph. -class GraphState { - public: - // Tracks that the given node has the given label. - void AddNode(const std::string &label, int64_t node_bolt_id) { - auto found = label_nodes_.find(label); - if (found == label_nodes_.end()) - label_nodes_.emplace(label, std::vector{node_bolt_id}); - else - found->second.emplace_back(node_bolt_id); - } - - // Gets the ID of a random node that has the given label. - int64_t RandomNode(const std::string &label) { - auto found = label_nodes_.find(label); - CHECK(found != label_nodes_.end()) << "Label not found"; - return found->second[rand_(gen_) * found->second.size()]; - } - - private: - // Maps labels to node bolt_ids - std::unordered_map> label_nodes_; - - // Random generator - std::mt19937 gen_{std::random_device{}()}; - std::uniform_real_distribution<> rand_{0.0, 1.0}; -}; - -// Helper class for property value generation. -class ValueGenerator { - using json = nlohmann::json; - using TypedValue = query::TypedValue; - - public: - // Generates the whole property map based on the given config. - std::unordered_map MakeProperties( - const json &config) { - std::unordered_map props; - if (config.is_null()) return props; - - CHECK(config.is_object()) << "Properties config must be a dict"; - for (auto it = config.begin(); it != config.end(); it++) { - auto value = MakeValue(it.value()); - if (value) props.emplace(it.key(), *value); - } - return props; - } - - // Generates a single value based on the given config. - std::experimental::optional MakeValue(const json &config) { - if (config.is_object()) { - const std::string &type = config["type"]; - const auto ¶m = config["param"]; - if (type == "primitive") - return Primitive(param); - else if (type == "counter") - return TypedValue(Counter(param)); - else if (type == "optional") - return Optional(param); - else if (type == "bernoulli") - return TypedValue(Bernoulli(param)); - else if (type == "randint") - return TypedValue(RandInt(param)); - else if (type == "randstring") - return TypedValue(RandString(param)); - else - LOG(FATAL) << "Unknown value type"; - } else - return Primitive(config); - } - - TypedValue Primitive(const json &config) { - if (config.is_string()) return config.get(); - if (config.is_number_integer()) return config.get(); - if (config.is_number_float()) return config.get(); - if (config.is_boolean()) return config.get(); - - LOG(FATAL) << "Unsupported primitive type"; - } - - int64_t Counter(const std::string &name) { - auto found = counters_.find(name); - if (found == counters_.end()) { - counters_.emplace(name, 1); - return 0; - } else { - return found->second++; - } - } - - int64_t RandInt(const json &range) { - CHECK(range.is_array() && range.size() == 2) - << "RandInt value gen config must be a list with 2 elements"; - auto from = MakeValue(range[0])->ValueInt(); - auto to = MakeValue(range[1])->ValueInt(); - CHECK(from < to) << "RandInt lower range must be lesser then upper range"; - return (int64_t)(rand_(gen_) * (to - from)) + from; - } - - std::string RandString(const json &length) { - static const char alphanum[] = - "0123456789" - "ABCDEFGHIJKLMNOPQRSTUVWXYZ" - "abcdefghijklmnopqrstuvwxyz"; - - int length_int = MakeValue(length)->ValueInt(); - std::string r_val(length_int, 'a'); - for (int i = 0; i < length_int; ++i) - r_val[i] = alphanum[(int64_t)(rand_(gen_) * (sizeof(alphanum) - 1))]; - - return r_val; - } - - bool Bernoulli(double p) { return rand_(gen_) < p; } - - std::experimental::optional Optional(const json &config) { - CHECK(config.is_array() && config.size() == 2) - << "Optional value gen config must be a list with 2 elements"; - return Bernoulli(config[0]) ? MakeValue(config[1]) - : std::experimental::nullopt; - } - - private: - std::mt19937 gen_{std::random_device{}()}; - std::uniform_real_distribution<> rand_{0.0, 1.0}; - std::unordered_map counters_; -}; - -nlohmann::json GetWithDefault(const nlohmann::json &object, - const std::string &key, - const nlohmann::json &default_value) { - const auto &found = object.find(key); - if (found == object.end()) return default_value; - return *found; -} - int main(int argc, char **argv) { - LOG(FATAL) << "Doesn't work with the newest format - waiting for refactor"; gflags::ParseCommandLineFlags(&argc, &argv, true); google::InitGoogleLogging(argv[0]); @@ -284,49 +87,8 @@ int main(int argc, char **argv) { config_file >> config; } - Writer writer(FLAGS_out); - GraphState state; - ValueGenerator value_generator; + GraphState state = BuildFromConfig(FLAGS_num_workers, config); + WriteToSnapshot(state, FLAGS_out); - std::vector indexes; - for (const auto &index : GetWithDefault(config, "indexes", {})) - for (const auto &index_part : utils::Split(index, ".")) - indexes.push_back(index_part); - writer.WriteList(indexes); - - // Create nodes - const auto &nodes_config = config["nodes"]; - CHECK(nodes_config.is_array() && nodes_config.size() > 0) - << "Generator config must have 'nodes' array with at least one element"; - for (const auto &node_config : config["nodes"]) { - CHECK(node_config.is_object()) << "Node config must be a dict"; - - for (int i = 0; i < node_config["count"]; i++) { - const auto &labels_config = node_config["labels"]; - CHECK(labels_config.is_array()) << "Must provide an array of node labels"; - CHECK(node_config.size() > 0) - << "Node labels array must contain at lest one element"; - auto node_bolt_id = writer.WriteNode( - labels_config, - value_generator.MakeProperties(node_config["properties"])); - - for (const auto &label : labels_config) - state.AddNode(label, node_bolt_id); - } - } - - // Create edges - for (const auto &edge_config : config["edges"]) { - CHECK(edge_config.is_object()) << "Edge config must be a dict"; - const std::string &from = edge_config["from"]; - const std::string &to = edge_config["to"]; - for (int i = 0; i < edge_config["count"]; i++) - writer.WriteEdge(edge_config["type"], - value_generator.MakeProperties( - GetWithDefault(edge_config, "properties", nullptr)), - state.RandomNode(from), state.RandomNode(to)); - } - - writer.Close(); return 0; } diff --git a/tests/manual/snapshot_generation/graph_state.hpp b/tests/manual/snapshot_generation/graph_state.hpp new file mode 100644 index 000000000..5def4f8a2 --- /dev/null +++ b/tests/manual/snapshot_generation/graph_state.hpp @@ -0,0 +1,250 @@ +#pragma once + +#include +#include +#include + +#include "cppitertools/itertools.hpp" +#include "json/json.hpp" + +#include "storage/gid.hpp" +#include "storage/property_value.hpp" +#include "utils/string.hpp" + +#include "value_generator.hpp" + +namespace snapshot_generation { + +nlohmann::json GetWithDefault(const nlohmann::json &object, + const std::string &key, + const nlohmann::json &default_value) { + const auto &found = object.find(key); + if (found == object.end()) return default_value; + return *found; +} + +struct Node { + gid::Gid gid; + std::vector labels; + std::unordered_map props; + std::vector in_edges; + std::vector out_edges; +}; + +struct Edge { + gid::Gid gid; + gid::Gid from; + gid::Gid to; + std::string type; + std::unordered_map props; +}; + +/// Helper class for tracking info about the generated graph. +class GraphState { + public: + explicit GraphState(int num_workers) + : num_workers_(num_workers), + worker_nodes_(num_workers), + worker_edges_(num_workers) { + for (int worker_id = 0; worker_id < num_workers; ++worker_id) { + edge_generators_.emplace_back( + std::make_unique(worker_id)); + node_generators_.emplace_back( + std::make_unique(worker_id)); + } + } + + int NumWorkers() { return num_workers_; } + + int64_t NumNodesOnWorker(int worker_id) { + return node_generators_[worker_id]->LocalCount(); + } + + int64_t NumEdgesOnWorker(int worker_id) { + return edge_generators_[worker_id]->LocalCount(); + } + + auto &NodesWithLabel(const std::string &label, int worker_id) { + return worker_nodes_[worker_id][label]; + } + + auto NodesWithLabel(const std::string &label) { + return iter::chain.from_iterable( + iter::imap([ this, label ](int worker_id) -> auto & { + return NodesWithLabel(label, worker_id); + }, + iter::range(num_workers_))); + } + + auto NodesOnWorker(int worker_id) { + return iter::chain.from_iterable( + iter::imap([](auto &p) -> auto & { return p.second; }, + worker_nodes_[worker_id])); + } + + auto EdgesOnWorker(int worker_id) { + return iter::chain.from_iterable( + iter::imap([](auto &p) -> auto & { return p.second; }, + worker_edges_[worker_id])); + } + + gid::Gid &RandomNode(const std::string &label, int worker_id) { + CHECK(0 <= worker_id && worker_id < (int)worker_nodes_.size()) + << "Worker ID should be between 0 and " << worker_nodes_.size() - 1; + auto &label_nodes = worker_nodes_[worker_id]; + auto found = label_nodes.find(label); + CHECK(found != label_nodes.end()) << "Label not found"; + return found->second[rand_(gen_) * found->second.size()]; + } + + gid::Gid &RandomNode(const std::string &label) { + return RandomNode(label, rand_(gen_) * worker_nodes_.size()); + } + + gid::Gid &RandomNodeOnOtherWorker(const std::string &label, int worker_id) { + int worker_id2 = rand_(gen_) * (worker_nodes_.size() - 1); + if (worker_id2 >= worker_id) ++worker_id2; + return RandomNode(label, worker_id2); + } + + gid::Gid CreateNode( + int worker_id, const std::vector &labels, + const std::unordered_map &props) { + auto node_gid = + node_generators_[worker_id]->Next(std::experimental::nullopt); + nodes_[node_gid] = {node_gid, labels, props, {}, {}}; + + for (const auto &label : labels) { + worker_nodes_[worker_id][label].push_back(node_gid); + } + + return node_gid; + } + + gid::Gid CreateEdge( + gid::Gid from, gid::Gid to, const std::string &type, + const std::unordered_map &props) { + int worker_id = gid::CreatorWorker(from); + auto edge_gid = + edge_generators_[worker_id]->Next(std::experimental::nullopt); + nodes_[from].out_edges.emplace_back(edge_gid); + nodes_[to].in_edges.emplace_back(edge_gid); + edges_[edge_gid] = Edge{edge_gid, from, to, type, props}; + worker_edges_[worker_id][type].push_back(edge_gid); + return edge_gid; + } + + auto &GetNode(gid::Gid gid) { return nodes_[gid]; } + auto &GetEdge(gid::Gid gid) { return edges_[gid]; } + auto &GetNodes() { return nodes_; } + auto &GetEdges() { return edges_; } + + void CreateIndex(std::string label, std::string property) { + indices_.emplace_back(std::move(label)); + indices_.emplace_back(std::move(property)); + } + + auto &Indices() { return indices_; } + + private: + typedef std::unordered_map> LabelGid; + + int num_workers_; + + std::vector indices_; + + std::vector worker_nodes_; + std::vector worker_edges_; + + std::unordered_map nodes_; + std::unordered_map edges_; + + std::vector> edge_generators_; + std::vector> node_generators_; + + std::mt19937 gen_{std::random_device{}()}; + std::uniform_real_distribution<> rand_{0.0, 1.0}; +}; + +int Worker(gid::Gid gid) { return gid::CreatorWorker(gid); } + +GraphState BuildFromConfig(int num_workers, const nlohmann::json &config) { + ValueGenerator value_generator; + GraphState state(num_workers); + + for (const auto &index : GetWithDefault(config, "indexes", {})) { + auto index_parts = utils::Split(index, "."); + CHECK(index_parts.size() == 2) << "Index format should be Label.Property"; + state.CreateIndex(index_parts[0], index_parts[1]); + } + + CHECK(config["nodes"].is_array() && config["nodes"].size() > 0) + << "Generator config must have 'nodes' array with at least one " + "element"; + for (const auto &node_config : config["nodes"]) { + CHECK(node_config.is_object()) << "Node config must be a dict"; + + const auto &labels = node_config["labels"]; + CHECK(labels.is_array()) << "Must provide an array of node labels"; + CHECK(node_config.size() > 0) + << "Node labels array must contain at least one element"; + + for (int i = 0; i < node_config["count_per_worker"]; ++i) { + for (int worker_id = 0; worker_id < num_workers; ++worker_id) { + const auto properties = + value_generator.MakeProperties(node_config["properties"]); + state.CreateNode(worker_id, labels, properties); + } + } + } + + int num_hops = 0; + auto get_edge_endpoint = [num_workers, &state, &num_hops, &value_generator]( + gid::Gid from, std::string label_to, + double hop_probability) { + if (num_workers > 1 && value_generator.Bernoulli(hop_probability)) { + ++num_hops; + return state.RandomNodeOnOtherWorker(label_to, Worker(from)); + } + return state.RandomNode(label_to, Worker(from)); + }; + + for (const auto &edge_config : config["edges"]) { + CHECK(edge_config.is_object()) << "Edge config must be a dict"; + const std::string &label_from = edge_config["from"]; + const std::string &label_to = edge_config["to"]; + const std::string &type = edge_config["type"]; + const double hop_probability = edge_config["hop_probability"]; + + if (edge_config["kind"] == "random") { + for (int i = 0; i < edge_config["count"]; i++) { + gid::Gid from = state.RandomNode(label_from); + gid::Gid to = get_edge_endpoint(from, label_to, hop_probability); + + const auto &props = value_generator.MakeProperties( + GetWithDefault(edge_config, "properties", nullptr)); + state.CreateEdge(from, to, type, props); + } + } + + if (edge_config["kind"] == "unique") { + for (const auto &from : state.NodesWithLabel(label_from)) { + gid::Gid to = get_edge_endpoint(from, label_to, hop_probability); + const auto &props = value_generator.MakeProperties( + GetWithDefault(edge_config, "properties", nullptr)); + state.CreateEdge(from, to, type, props); + } + } + } + + for (int worker_id = 0; worker_id < num_workers; ++worker_id) { + LOG(INFO) << "-- Summary for worker: " << worker_id; + LOG(INFO) << "---- Total nodes: " << state.NumNodesOnWorker(worker_id); + LOG(INFO) << "---- Total edges: " << state.NumEdgesOnWorker(worker_id); + } + LOG(INFO) << "-- Total number of hops: " << num_hops; + + return state; +} + +} // namespace snapshot_generation diff --git a/tests/manual/snapshot_generation/snapshot_writer.hpp b/tests/manual/snapshot_generation/snapshot_writer.hpp new file mode 100644 index 000000000..579527ad0 --- /dev/null +++ b/tests/manual/snapshot_generation/snapshot_writer.hpp @@ -0,0 +1,143 @@ +#pragma once + +#include +#include + +#include "communication/bolt/v1/encoder/base_encoder.hpp" +#include "durability/paths.hpp" +#include "durability/version.hpp" +#include "query/typed_value.hpp" + +#include "graph_state.hpp" + +namespace snapshot_generation { + +// Snapshot layout is described in durability/version.hpp +static_assert(durability::kVersion == 5, + "Wrong snapshot version, please update!"); + +class SnapshotWriter { + public: + SnapshotWriter(const std::string &path, int worker_id, + uint64_t vertex_generator_local_count = 0, + uint64_t edge_generator_local_count = 0) + : worker_id_(worker_id), buffer_(path) { + encoder_.WriteRAW(durability::kMagicNumber.data(), + durability::kMagicNumber.size()); + encoder_.WriteTypedValue(durability::kVersion); + encoder_.WriteInt(worker_id_); + encoder_.WriteInt(vertex_generator_local_count); + encoder_.WriteInt(edge_generator_local_count); + encoder_.WriteInt(0); + encoder_.WriteList(std::vector{}); + } + + // reference to `buffer_` gets broken when moving, so let's just forbid moving + SnapshotWriter(SnapshotWriter &&rhs) = delete; + SnapshotWriter &operator=(SnapshotWriter &&rhs) = delete; + + template + void WriteList(const std::vector &list) { + encoder_.WriteList( + std::vector(list.begin(), list.end())); + } + + storage::VertexAddress DefaultVertexAddress(gid::Gid gid) { + return storage::VertexAddress(gid, gid::CreatorWorker(gid)); + } + + storage::EdgeAddress DefaultEdgeAddress(gid::Gid gid) { + return storage::EdgeAddress(gid, gid::CreatorWorker(gid)); + } + + void WriteInlineEdge(const Edge &edge, bool write_from) { + encoder_.WriteInt(DefaultEdgeAddress(edge.gid).raw()); + encoder_.WriteInt(write_from ? DefaultVertexAddress(edge.from).raw() + : DefaultVertexAddress(edge.to).raw()); + encoder_.WriteString(edge.type); + } + + void WriteNode(const Node &node, + const std::unordered_map &edges) { + encoder_.WriteRAW(underlying_cast(communication::bolt::Marker::TinyStruct) + + 3); + encoder_.WriteRAW(underlying_cast(communication::bolt::Signature::Node)); + encoder_.WriteInt(node.gid); + + WriteList(node.labels); + encoder_.WriteMap(node.props); + + encoder_.WriteInt(node.in_edges.size()); + for (const auto &edge_idx : node.in_edges) { + WriteInlineEdge(edges.at(edge_idx), true); + } + + encoder_.WriteInt(node.out_edges.size()); + for (const auto &edge_idx : node.out_edges) { + WriteInlineEdge(edges.at(edge_idx), false); + } + + ++nodes_written_; + } + + void WriteEdge(const Edge &edge) { + encoder_.WriteRAW(underlying_cast(communication::bolt::Marker::TinyStruct) + + 5); + encoder_.WriteRAW( + underlying_cast(communication::bolt::Signature::Relationship)); + encoder_.WriteInt(edge.gid); + encoder_.WriteInt(edge.from); + encoder_.WriteInt(edge.to); + encoder_.WriteString(edge.type); + encoder_.WriteMap(edge.props); + + ++edges_written_; + } + + void Close() { + buffer_.WriteValue(nodes_written_); + buffer_.WriteValue(edges_written_); + buffer_.WriteValue(buffer_.hash()); + buffer_.Close(); + } + + private: + int worker_id_; + int64_t nodes_written_{0}; + int64_t edges_written_{0}; + + HashedFileWriter buffer_; + communication::bolt::BaseEncoder encoder_{buffer_}; +}; + +void WriteToSnapshot(GraphState &state, const std::string &path) { + for (int worker_id = 0; worker_id < state.NumWorkers(); ++worker_id) { + const std::experimental::filesystem::path durability_dir = + path / std::experimental::filesystem::path("worker_" + + std::to_string(worker_id)); + if (!durability::EnsureDir(durability_dir / "snapshots")) { + LOG(ERROR) << "Unable to create durability directory!"; + exit(0); + } + const auto snapshot_file = + durability::MakeSnapshotPath(durability_dir, worker_id); + + SnapshotWriter writer(snapshot_file, worker_id, + state.NumNodesOnWorker(worker_id), + state.NumEdgesOnWorker(worker_id)); + + writer.WriteList(state.Indices()); + + for (const auto &node : state.NodesOnWorker(worker_id)) { + writer.WriteNode(state.GetNode(node), state.GetEdges()); + } + + for (const auto &edge : state.EdgesOnWorker(worker_id)) { + writer.WriteEdge(state.GetEdge(edge)); + } + + writer.Close(); + } +} + +} // namespace snapshot_generation diff --git a/tests/manual/snapshot_generation/value_generator.hpp b/tests/manual/snapshot_generation/value_generator.hpp new file mode 100644 index 000000000..356ffdac8 --- /dev/null +++ b/tests/manual/snapshot_generation/value_generator.hpp @@ -0,0 +1,107 @@ +#pragma once + +#include +#include +#include + +#include "glog/logging.h" +#include "json/json.hpp" + +#include "storage/property_value.hpp" + +namespace snapshot_generation { + +// Helper class for property value generation. +class ValueGenerator { + using json = nlohmann::json; + + public: + // Generates the whole property map based on the given config. + std::unordered_map MakeProperties( + const json &config) { + std::unordered_map props; + if (config.is_null()) return props; + + CHECK(config.is_object()) << "Properties config must be a dict"; + for (auto it = config.begin(); it != config.end(); it++) { + auto value = MakeValue(it.value()); + if (value) props.emplace(it.key(), *value); + } + return props; + } + + // Generates a single value based on the given config. + std::experimental::optional MakeValue(const json &config) { + if (config.is_object()) { + const std::string &type = config["type"]; + const auto ¶m = config["param"]; + if (type == "primitive") return Primitive(param); + if (type == "counter") return Counter(param); + if (type == "optional") return Optional(param); + if (type == "bernoulli") return Bernoulli(param); + if (type == "randint") return RandInt(param); + if (type == "randstring") return RandString(param); + + LOG(FATAL) << "Unknown value type"; + } else { + return Primitive(config); + } + } + + // Returns whatever value is stored in the JSON + PropertyValue Primitive(const json &config) { + if (config.is_string()) return config.get(); + if (config.is_number_integer()) return config.get(); + if (config.is_number_float()) return config.get(); + if (config.is_boolean()) return config.get(); + + LOG(FATAL) << "Unsupported primitive type"; + } + + // Increments the counter and returns its current value + int64_t Counter(const std::string &name) { return counters_[name]++; } + + // Generates a random integer in range specified by JSON config + int64_t RandInt(const json &range) { + CHECK(range.is_array() && range.size() == 2) + << "RandInt value gen config must be a list with 2 elements"; + auto from = MakeValue(range[0])->Value(); + auto to = MakeValue(range[1])->Value(); + CHECK(from < to) << "RandInt lower range must be lesser than upper range"; + return (int64_t)(rand_(gen_) * (to - from)) + from; + } + + // Generates a random alphanumeric string with length specified by JSON config + std::string RandString(const json &length) { + static const char alphanum[] = + "0123456789" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz"; + + int length_int = MakeValue(length)->Value(); + std::string r_val(length_int, 'a'); + for (int i = 0; i < length_int; ++i) + r_val[i] = alphanum[(int64_t)(rand_(gen_) * strlen(alphanum))]; + + return r_val; + } + + // Returns true with given probability + bool Bernoulli(double p) { return rand_(gen_) < p; } + + // Returns a value specified by config with some probability, and nullopt + // otherwise + std::experimental::optional Optional(const json &config) { + CHECK(config.is_array() && config.size() == 2) + << "Optional value gen config must be a list with 2 elements"; + return Bernoulli(config[0]) ? MakeValue(config[1]) + : std::experimental::nullopt; + } + + private: + std::mt19937 gen_{std::random_device{}()}; + std::uniform_real_distribution<> rand_{0.0, 1.0}; + std::unordered_map counters_; +}; + +} // namespace snapshot_generation