Console and random-graph generation improvements

Summary: Random graph generation is now parallel. Slow, though.

Reviewers: teon.banek, buda, mislav.bradac

Reviewed By: mislav.bradac

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D586
This commit is contained in:
florijan 2017-07-30 14:51:23 +02:00
parent 2ba3df942b
commit 57dea09b5b
3 changed files with 212 additions and 107 deletions

View File

@ -1,6 +1,7 @@
#pragma once
#include "database/graph_db.hpp"
#include "utils/total_ordering.hpp"
//#include "database/graph_db_accessor.hpp"
#include "mvcc/version_list.hpp"
#include "storage/property_value.hpp"
@ -19,7 +20,7 @@ class GraphDbAccessor;
* @tparam TRecord Type of record (MVCC Version) of the accessor.
*/
template <typename TRecord>
class RecordAccessor {
class RecordAccessor : public TotalOrdering<RecordAccessor<TRecord>> {
public:
/**
* The GraphDbAccessor is friend to this accessor so it can
@ -87,25 +88,16 @@ class RecordAccessor {
* This should be used with care as it's comparing vlist_ pointer records and
* not actual values inside RecordAccessors.
*/
friend bool operator<(const RecordAccessor &a, const RecordAccessor &b) {
debug_assert(a.db_accessor_ == b.db_accessor_,
"Not in the same transaction."); // assume the same
// db_accessor / transaction
return a.vlist_ < b.vlist_;
bool operator<(const RecordAccessor &other) const {
debug_assert(db_accessor_ == other.db_accessor_,
"Not in the same transaction.");
return vlist_ < other.vlist_;
}
friend bool operator==(const RecordAccessor &a, const RecordAccessor &b) {
debug_assert(a.db_accessor_ == b.db_accessor_,
"Not in the same transaction."); // assume the same
// db_accessor / transaction
return a.vlist_ == b.vlist_;
}
friend bool operator!=(const RecordAccessor &a, const RecordAccessor &b) {
debug_assert(a.db_accessor_ == b.db_accessor_,
"Not in the same transaction."); // assume the same
// db_accessor / transaction
return !(a == b);
bool operator==(const RecordAccessor &other) const {
debug_assert(db_accessor_ == other.db_accessor_,
"Not in the same transaction.");
return vlist_ == other.vlist_;
}
/**

View File

@ -8,12 +8,16 @@
#include <algorithm>
#include <cstdlib>
#include <functional>
#include <thread>
#include <vector>
#include "database/graph_db_accessor.hpp"
#include "data_structures/concurrent/skiplist.hpp"
#include "database/dbms.hpp"
#include "database/graph_db_datatypes.hpp"
#include "mvcc/version_list.hpp"
#include "storage/property_value.hpp"
#include "storage/vertex_accessor.hpp"
#include "threading/sync/lock_timeout_exception.hpp"
#include "utils/assert.hpp"
namespace utils {
@ -29,64 +33,110 @@ auto RandomIntGenerator(int from, int to) {
}
/**
* Random graph generator. Define a random graph
* over a sequence of steps, and then commit.
* You can only commit once and can't change
* graph afterwards.
* Random graph generator. Create a graph
* with a sequence of steps.
*/
class RandomGraphGenerator {
public:
RandomGraphGenerator(GraphDbAccessor &dba) : dba_(dba) {}
explicit RandomGraphGenerator(Dbms &dbms) : dbms_(dbms) {}
/**
* Adds a progress listener that gets notified when
* edges / vertices get created.
*
* A listener is a function that gets notified after every
* vertex / edge insertion. If data creation is multi-threaded,
* then so is progress listener notification.
*/
void AddProgressListener(
std::function<void(RandomGraphGenerator &)> listener) {
progress_listeners_.emplace_back(listener);
}
/**
* Adds the given number of vertices, with
* the given labels.
*
* @param count the number of vertices to add
* @param label_names a vector of label names to assign to each
* created vertex
* @param thread_count The number of threads in which to add edges
* @param batch_size The number of vertices to be created in
* a single transcation
*/
void AddVertices(uint count, std::vector<std::string> label_names) {
permanent_assert(!did_commit_, "Already committed");
void AddVertices(int count, const std::vector<std::string> &label_names,
int thread_count, int batch_size = 2000) {
auto dba = dbms_.active();
std::vector<GraphDbTypes::Label> labels;
for (const auto &label_name : label_names)
labels.push_back(dba_.label(label_name));
labels.push_back(dba->label(label_name));
for (uint i = 0; i < count; ++i) {
auto vertex = dba_.insert_vertex();
for (auto label : labels) vertex.add_label(label);
vertices_.push_back(vertex);
}
Map(
[&labels, this](GraphDbAccessor &dba) {
auto vertex = dba.insert_vertex();
for (auto label : labels) vertex.add_label(label);
NotifyProgressListeners();
},
count, thread_count, batch_size);
}
/**
* Returns the number of vertices created by this generator,
* regardless of their labels.
*/
int64_t VertexCount() const { return dbms_.active()->vertices_count(); }
/**
* Adds the given number of edges to the graph.
*
* @param count The number of edges to add.
* @param edge_type_name Name of the edge type.
* @param thread_count The number of threads in which to add edges.
* @param batch_size The number of vertices to be created in
* a single transcation
* @param from_filter Filter of from vertices for new edges.
* By default all vertices are accepted.
* @param to_filter Filter of to vertices for new edges.
* By default all vertices are accepted.
*/
void AddEdges(uint count, const std::string &edge_type_name,
std::function<bool(VertexAccessor &va)> from_filter = {},
std::function<bool(VertexAccessor &va)> = {}) {
permanent_assert(!did_commit_, "Already committed");
void AddEdges(int count, const std::string &edge_type_name, int thread_count,
int batch_size = 50,
const std::function<bool(VertexAccessor &va)> &from_filter = {},
const std::function<bool(VertexAccessor &va)> &to_filter = {}) {
// create two temporary sets of vertices we will poll from
auto vertices_from = Filter(vertices_, from_filter);
auto vertices_to = Filter(vertices_, from_filter);
auto vertices_from = FilterVertices(from_filter);
auto vertices_to = FilterVertices(to_filter);
auto edge_type = dba_.edge_type(edge_type_name);
for (int i = 0; i < static_cast<int>(count); ++i)
edges_.push_back(dba_.insert_edge(
vertices_from[rand() % vertices_from.size()].get(),
vertices_to[rand() % vertices_to.size()].get(), edge_type));
auto dba = dbms_.active();
auto edge_type = dba->edge_type(edge_type_name);
// for small vertex counts reduce the batch size
batch_size = std::min(batch_size,
static_cast<int>(dba->vertices_count() / 1000 + 1));
Map(
[&vertices_from, &vertices_to, edge_type, this](GraphDbAccessor &dba) {
auto from =
dba.Transfer(vertices_from[rand() % vertices_from.size()]);
auto to = dba.Transfer(vertices_to[rand() % vertices_to.size()]);
debug_assert(from, "From not visible in current GraphDbAccessor");
debug_assert(to, "From not visible in current GraphDbAccessor");
dba.insert_edge(from.value(), to.value(), edge_type);
NotifyProgressListeners();
},
count, thread_count, batch_size);
}
/**
* Returns the number of edges created by this generator,
* regardless of their types and origin/destination labels.
*/
int64_t EdgeCount() const { return dbms_.active()->edges_count(); }
/**
* Sets a generated property on a random vertex.
*
* @tparam TValue Type of value to set.
* @param count The number of vertices to set the property on.
* @param prop_name Name of the property.
* @param predicate Filter that accepts or rejects a Vertex.
* @param value_generator Function that accepts nothing and
@ -94,57 +144,87 @@ class RandomGraphGenerator {
*/
template <typename TValue>
void SetVertexProperty(
uint, const std::string &prop_name,
std::function<TValue()> value_generator,
const std::string &prop_name, std::function<TValue()> value_generator,
std::function<bool(VertexAccessor &va)> predicate = {}) {
permanent_assert(!did_commit_, "Already committed");
auto property = dba_.property(prop_name);
for (auto va : Filter(vertices_, predicate))
va.get().PropsSet(property, value_generator());
}
/**
* Commits the random graph to storage.
* Can only be called once.
*/
void Commit() {
debug_assert(!did_commit_, "Already committed random graph");
dba_.commit();
if (!predicate) predicate = [](VertexAccessor &) { return true; };
auto dba = dbms_.active();
auto property = dba->property(prop_name);
for (VertexAccessor va : dba->vertices(false))
if (predicate(va)) va.PropsSet(property, value_generator());
}
private:
GraphDbAccessor &dba_;
Dbms &dbms_;
// storage for data we operate on
std::vector<VertexAccessor> vertices_;
std::vector<EdgeAccessor> edges_;
// can't perform ops after committing
bool did_commit_{false};
// progress listeners, they get notified about vertices and edges being
// created
std::vector<std::function<void(RandomGraphGenerator &)>> progress_listeners_;
/**
* Helper function for filtering.
* Accepts a vector of TItems, a predicate
* that accepts it or not, and returns a
* vector of reference wrappers to accepted
* items.
*
* @tparam TItem Type of item.
* @param collection The collection to filter on.
* @param predicate A predicate. By default always true.
* @return A collection of reference_wrappers to TItems.
*/
template <typename TItem>
std::vector<std::reference_wrapper<TItem>> Filter(
std::vector<TItem> &collection,
std::function<bool(TItem &item)> predicate = {}) {
if (!predicate) predicate = [](TItem &) { return true; };
std::vector<std::reference_wrapper<TItem>> r_val;
for (TItem &item : collection)
if (predicate(item)) r_val.emplace_back(std::ref(item));
* Helper function for filtering. Accepts a vector of TItems, a predicate
* that accepts it or not, and returns a vector of reference wrappers to
* accepted items.
*
*
* @param predicate A predicate. By default always true.
* @return A vector of vertex accessors. They belong to a GraphDbAccesor
* that is dead when this function retuns, make sure to
* GraphDbAccessor::Transfer them.
*/
std::vector<VertexAccessor> FilterVertices(
std::function<bool(VertexAccessor &item)> predicate = {}) {
if (!predicate) predicate = [](VertexAccessor &) { return true; };
std::vector<VertexAccessor> r_val;
auto dba = dbms_.active();
for (VertexAccessor &item : dba->vertices(false))
if (predicate(item)) r_val.emplace_back(item);
return r_val;
}
/** Sends notifications to all progress listeners */
void NotifyProgressListeners() {
for (const auto &listener : progress_listeners_) listener(*this);
}
/**
* Performs function `f` `count` times across `thread_count`
* threads. Returns only once all of the threads have
* finished.
*/
void Map(std::function<void(GraphDbAccessor &)> f, int count,
int thread_count, int elements_per_commit) {
debug_assert(thread_count > 0, "Can't work on less then 1 thread");
// split count across thread_count
int count_per_thread = count / thread_count;
int count_remainder = count % thread_count;
std::vector<std::thread> threads;
for (int thread_ind = 0; thread_ind < thread_count; thread_ind++) {
if (thread_ind == thread_count - 1) count_per_thread += count_remainder;
threads.emplace_back([count_per_thread, &f, this, elements_per_commit]() {
auto dba = dbms_.active();
for (int i = 0; i < count_per_thread; i++) {
try {
f(*dba);
} catch (LockTimeoutException &e) {
i--;
continue;
} catch (mvcc::SerializationError &e) {
i--;
continue;
}
if (i == (count_per_thread - 1) ||
(i >= 0 && i % elements_per_commit == 0)) {
dba->commit();
auto dba2 = dbms_.active();
dba.swap(dba2);
}
}
});
}
for (auto &thread : threads) thread.join();
}
};
}

View File

@ -1,4 +1,6 @@
#include <iostream>
#include <mutex>
#include <thread>
#include <gflags/gflags.h>
#include <glog/logging.h>
@ -8,39 +10,70 @@
#include "query/interpreter.hpp"
#include "utils/random_graph_generator.hpp"
void random_generate(Dbms &dbms, uint node_count, int edge_factor = 5) {
auto dba = dbms.active();
utils::RandomGraphGenerator generator(*dba);
/** A graph-generation progress reporter */
class ProgressReporter {
public:
ProgressReporter(int64_t node_count, int64_t edge_count, int64_t skip)
: node_count_(node_count), edge_count_(edge_count), skip_(skip) {}
auto edge_count = node_count * edge_factor;
generator.AddVertices(node_count, {"Person"});
generator.AddEdges(edge_count, "Friend");
generator.SetVertexProperty<int>(node_count, "age",
utils::RandomIntGenerator(3, 60));
generator.SetVertexProperty<int>(node_count, "height",
void operator()(utils::RandomGraphGenerator &rgg) {
auto vc = rgg.VertexCount();
auto ec = rgg.EdgeCount();
bool last = ec + vc == node_count_ + edge_count_;
auto percent = std::lround(100. * (vc + ec) / (node_count_ + edge_count_));
if (last || (vc + ec) % skip_ == 0) {
std::lock_guard<std::mutex> lock(mutex_);
std::cout << "\rCreated " << rgg.VertexCount() << " vertices and "
<< rgg.EdgeCount() << " edges (" << percent
<< "% of all elements)";
std::flush(std::cout);
}
if (last) std::cout << std::endl;
}
private:
// the desired counts of nodes and edges
const int64_t node_count_;
const int64_t edge_count_;
// how many notifications we skip between each report
const int64_t skip_;
// std output synchronization
std::mutex mutex_{};
};
void random_generate(Dbms &dbms, int64_t node_count, int64_t edge_count) {
utils::RandomGraphGenerator generator(dbms);
ProgressReporter reporter(node_count, edge_count,
std::max(1l, (node_count + edge_count) / 100));
generator.AddProgressListener([&reporter](auto &rgg) { reporter(rgg); });
utils::Timer generation_timer;
generator.AddVertices(node_count, {"Person"}, 4);
generator.AddEdges(edge_count, "Friend", 7);
generator.SetVertexProperty<int>("age", utils::RandomIntGenerator(3, 60));
generator.SetVertexProperty<int>("height",
utils::RandomIntGenerator(120, 200));
generator.Commit();
std::cout << "Generation done in " << generation_timer.Elapsed().count()
<< " seconds" << std::endl;
}
int main(int argc, char *argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
// parse the first cmd line argument as the count of nodes to randomly create
uint node_count = 0;
if (argc > 1) {
node_count = (uint)std::stoul(argv[1]);
permanent_assert(node_count < 10000000,
"More then 10M nodes requested, that's too much");
}
int node_count = argc > 1 ? std::stoi(argv[1]) : 0;
int edge_count = argc > 2 ? std::stoi(argv[2]) : 0;
// TODO switch to GFlags, once finally available
if (argc > 2) google::InitGoogleLogging(argv[0]);
if (argc > 3) google::InitGoogleLogging(argv[0]);
Dbms dbms;
std::cout << "Generating graph..." << std::endl;
// fill_db(dbms);
random_generate(dbms, node_count);
random_generate(dbms, node_count, edge_count);
query::Repl(dbms);
return 0;
}