Fix pass by values

Summary: Also, some other minor changes

Reviewers: buda

Reviewed By: buda

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1170
This commit is contained in:
Marko Culinovic 2018-02-05 13:46:18 +01:00
parent f808252142
commit 523d54392b

View File

@ -8,15 +8,17 @@
#include <json/json.hpp>
#include "communication/bolt/v1/encoder/base_encoder.hpp"
#include "durability/paths.hpp"
#include "durability/snapshot_encoder.hpp"
#include "durability/version.hpp"
#include "utils/datetime/timestamp.hpp"
#include "utils/string.hpp"
#include "utils/timer.hpp"
DEFINE_string(num_workers, "1",
"Number of distributed workers (including master)");
DEFINE_string(prefix, "snapshot",
"Filename prefix for all generated snapshots");
DEFINE_string(durability_directory_prefix, "tmp",
"Prefix for durability directories");
DEFINE_string(config, "", "Path to config JSON file");
/**
@ -43,6 +45,8 @@ DEFINE_string(config, "", "Path to config JSON file");
*}
*/
namespace fs = std::experimental::filesystem;
/// Helper class for tracking info about the generated graph.
class GraphState {
typedef std::unordered_map<std::string, std::vector<int64_t>> LabelNodes;
@ -67,7 +71,7 @@ class GraphState {
// Gets the ID of a random node on worker that has the given label.
int64_t RandomNode(int worker_id, const std::string &label) {
auto label_nodes = worker_nodes_[worker_id];
auto &label_nodes = worker_nodes_[worker_id];
auto found = label_nodes.find(label);
CHECK(found != label_nodes.end()) << "Label not found";
return found->second[rand_(gen_) * found->second.size()];
@ -78,22 +82,22 @@ class GraphState {
return worker_nodes_[worker_id][label];
}
void AddCompromisedPos(int worker_id, int pos_id) {
void AddCompromisedPos(int worker_id, int64_t pos_id) {
std::unordered_set<int64_t> &compromised = compromised_pos_[worker_id];
compromised.insert(pos_id);
}
bool IsCompromisedPos(int worker_id, int pos_id) {
bool IsCompromisedPos(int worker_id, int64_t pos_id) {
std::unordered_set<int64_t> &compromised = compromised_pos_[worker_id];
return compromised.find(pos_id) != compromised.end();
}
void AddCompromisedCard(int worker_id, int card_id) {
void AddCompromisedCard(int worker_id, int64_t card_id) {
std::unordered_set<int64_t> &compromised = compromised_card_[worker_id];
compromised.insert(card_id);
}
bool IsCompromisedCard(int worker_id, int card_id) {
bool IsCompromisedCard(int worker_id, int64_t card_id) {
std::unordered_set<int64_t> &compromised = compromised_card_[worker_id];
return compromised.find(card_id) != compromised.end();
}
@ -207,7 +211,7 @@ class Writer {
}
void WriteNode(int64_t id, const std::vector<std::string> &labels,
std::unordered_map<std::string, DecodedValue> properties,
std::unordered_map<std::string, DecodedValue> &properties,
const std::vector<GraphState::Edge> &out_edges,
const std::vector<GraphState::Edge> &in_edges) {
encoder_.WriteRAW(underlying_cast(communication::bolt::Marker::TinyStruct) +
@ -219,7 +223,7 @@ class Writer {
encoder_.WriteMap(properties);
encoder_.WriteInt(in_edges.size());
for (auto edge : in_edges) {
for (auto &edge : in_edges) {
auto edge_addr = Edges::EdgeAddress(edge.gid, edge.from.first);
auto vertex_addr =
Edges::VertexAddress(edge.from.second, edge.from.first);
@ -230,7 +234,7 @@ class Writer {
}
encoder_.WriteInt(out_edges.size());
for (auto edge : out_edges) {
for (auto &edge : out_edges) {
auto edge_addr = Edges::EdgeAddress(edge.gid, edge.from.first);
auto vertex_addr = Edges::VertexAddress(edge.to.second, edge.to.first);
encoder_.WriteInt(edge_addr.raw());
@ -240,9 +244,10 @@ class Writer {
}
}
void WriteEdge(int64_t gid, GraphState::Edge::Type type,
const std::unordered_map<std::string, DecodedValue> properties,
int64_t gid_from, int64_t gid_to) {
void WriteEdge(
int64_t gid, GraphState::Edge::Type type,
const std::unordered_map<std::string, DecodedValue> &properties,
int64_t gid_from, int64_t gid_to) {
encoder_.WriteRAW(underlying_cast(communication::bolt::Marker::TinyStruct) +
5);
encoder_.WriteRAW(
@ -281,25 +286,28 @@ class ValueGenerator {
public:
std::unordered_map<std::string, DecodedValue> MakePosProperties(
bool compromised) {
bool compromised, int worker_id) {
std::unordered_map<std::string, DecodedValue> props;
props.emplace("id", DecodedValue(Counter("Pos.id")));
props.emplace("worker_id", DecodedValue(worker_id));
props.emplace("compromised", DecodedValue(compromised));
return props;
}
std::unordered_map<std::string, DecodedValue> MakeTxProperties(
bool fraud_reported) {
bool fraud_reported, int worker_id) {
std::unordered_map<std::string, DecodedValue> props;
props.emplace("id", DecodedValue(Counter("Transaction.id")));
props.emplace("worker_id", DecodedValue(worker_id));
props.emplace("fraud_reported", DecodedValue(fraud_reported));
return props;
}
std::unordered_map<std::string, DecodedValue> MakeCardProperties(
bool compromised) {
bool compromised, int worker_id) {
std::unordered_map<std::string, DecodedValue> props;
props.emplace("id", DecodedValue(Counter("Card.id")));
props.emplace("worker_id", DecodedValue(worker_id));
props.emplace("compromised", DecodedValue(compromised));
return props;
}
@ -348,6 +356,7 @@ int main(int argc, char **argv) {
const std::string kLabelTransaction = "Transaction";
const std::string kLabelCard = "Card";
const std::string kLabelPos = "Pos";
const fs::path kSnapshotDir = "snapshots";
double compromised_pos_probability = config["compromised_pos_probability"];
double hop_probability = config["hop_probability"];
@ -390,7 +399,7 @@ int main(int argc, char **argv) {
// Create edges for each transaction.
LOG(INFO) << "Creating edges...";
for (auto worker_id : worker_ids) {
for (auto &worker_id : worker_ids) {
gid::Generator edge_generator{worker_id};
auto filter = [worker_id, num_workers](const int other_worker) {
@ -406,7 +415,7 @@ int main(int argc, char **argv) {
// Write compromised cards to state.
const auto &transactions =
state.NodesWithLabel(worker_id, kLabelTransaction);
for (auto transaction_id : transactions) {
for (auto &transaction_id : transactions) {
int card_worker_id = worker_id;
if (value_generator.Bernoulli(hop_probability)) {
card_worker_id = hop_workers[dist(engine)];
@ -441,8 +450,16 @@ int main(int argc, char **argv) {
// Write snapshot files.
LOG(INFO) << "Writing snapshots...";
for (int worker_id = 0; worker_id < num_workers; ++worker_id) {
std::string filename = FLAGS_prefix + std::to_string(worker_id);
Writer writer(filename, worker_id);
const fs::path durability_dir = FLAGS_durability_directory_prefix +
"_worker_" + std::to_string(worker_id);
if (!durability::EnsureDir(durability_dir / kSnapshotDir)) {
LOG(ERROR) << "Unable to create durability directory!";
exit(0);
}
const auto snapshot_file =
durability::MakeSnapshotPath(durability_dir, worker_id);
Writer writer(snapshot_file, worker_id);
// Write indexes to snapshot.
std::vector<std::string> indexes;
@ -453,9 +470,10 @@ int main(int argc, char **argv) {
// Write Cards to snapshot.
const auto &cards = state.NodesWithLabel(worker_id, kLabelCard);
for (auto card_id : cards) {
for (auto &card_id : cards) {
bool is_compromised = state.IsCompromisedCard(worker_id, card_id);
auto props = value_generator.MakeCardProperties(is_compromised);
auto props =
value_generator.MakeCardProperties(is_compromised, worker_id);
DCHECK(state.OutEdges(worker_id, card_id).size() == 0);
writer.WriteNode(card_id, std::vector<std::string>{kLabelCard}, props,
state.OutEdges(worker_id, card_id),
@ -464,9 +482,9 @@ int main(int argc, char **argv) {
// Write Pos to snapshot.
const auto &pos_ids = state.NodesWithLabel(worker_id, kLabelPos);
for (auto pos_id : pos_ids) {
for (auto &pos_id : pos_ids) {
bool is_compromised = state.IsCompromisedPos(worker_id, pos_id);
auto props = value_generator.MakePosProperties(is_compromised);
auto props = value_generator.MakePosProperties(is_compromised, worker_id);
DCHECK(state.OutEdges(worker_id, pos_id).size() == 0);
writer.WriteNode(pos_id, std::vector<std::string>{kLabelPos}, props,
state.OutEdges(worker_id, pos_id),
@ -475,7 +493,7 @@ int main(int argc, char **argv) {
// Write Transactions to snapshot.
const auto &transactions =
state.NodesWithLabel(worker_id, kLabelTransaction);
for (auto tx_id : transactions) {
for (auto &tx_id : transactions) {
const auto &out_edges = state.OutEdges(worker_id, tx_id);
const auto &in_edges = state.InEdges(worker_id, tx_id);
DCHECK(out_edges.size() == 2);
@ -489,7 +507,7 @@ int main(int argc, char **argv) {
out_edges[0].to.second)) {
fraud_reported = value_generator.Bernoulli(0.1);
}
auto props = value_generator.MakeTxProperties(fraud_reported);
auto props = value_generator.MakeTxProperties(fraud_reported, worker_id);
writer.WriteNode(tx_id, std::vector<std::string>{kLabelTransaction},
props, state.OutEdges(worker_id, tx_id),
state.InEdges(worker_id, tx_id));
@ -498,7 +516,7 @@ int main(int argc, char **argv) {
std::unordered_map<std::string, communication::bolt::DecodedValue>
empty_props;
const auto &edges = state.Edges(worker_id);
for (auto edge : edges) {
for (auto &edge : edges) {
writer.WriteEdge(edge.gid, edge.type, empty_props, edge.from.second,
edge.to.second);
}