Refactor long running benchmark

Reviewers: buda

Reviewed By: buda

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D801
This commit is contained in:
Mislav Bradac 2017-09-19 15:31:44 +02:00
parent d6a885cce6
commit cf7190ecc6
4 changed files with 371 additions and 146 deletions

View File

@ -6,6 +6,7 @@
#include <queue>
#include <random>
#include <sstream>
#include <unordered_map>
#include <vector>
#include <gflags/gflags.h>
@ -44,52 +45,66 @@ struct VertexAndEdges {
std::vector<DecodedVertex> vertices;
};
std::pair<VertexAndEdges, int> DetachDeleteVertex(BoltClient &client,
const std::string &label,
int64_t id) {
auto vertex_record =
ExecuteNTimesTillSuccess(
client, "MATCH (n :" + label + " {id : $id}) RETURN n",
std::map<std::string, DecodedValue>{{"id", id}}, MAX_RETRIES)
.records;
CHECK(vertex_record.size() == 1U) << "id : " << id << " "
<< vertex_record.size();
std::atomic<int64_t> executed_queries;
auto records =
ExecuteNTimesTillSuccess(
client, "MATCH (n :" + label + " {id : $id})-[e]-(m) RETURN n, e, m",
std::map<std::string, DecodedValue>{{"id", id}}, MAX_RETRIES)
.records;
class Session {
public:
Session(const nlohmann::json &config, const std::string &address,
const std::string &port, const std::string &username,
const std::string &password)
: config_(config), client_(address, port, username, password) {}
ExecuteNTimesTillSuccess(
client, "MATCH (n :" + label + " {id : $id})-[]-(m) DETACH DELETE n",
std::map<std::string, DecodedValue>{{"id", id}}, MAX_RETRIES);
private:
const nlohmann::json &config_;
BoltClient client_;
std::unordered_map<std::string,
std::vector<std::map<std::string, DecodedValue>>>
stats_;
SpinLock lock_;
std::vector<DecodedEdge> edges;
edges.reserve(records.size());
for (const auto &record : records) {
edges.push_back(record[1].ValueEdge());
auto Execute(const std::string &query,
const std::map<std::string, DecodedValue> &params,
const std::string &query_name = "") {
utils::Timer timer;
auto result = ExecuteNTimesTillSuccess(client_, query, params, MAX_RETRIES);
++executed_queries;
auto wall_time = timer.Elapsed();
auto metadata = result.metadata;
metadata["wall_time"] = wall_time.count();
{
std::unique_lock<SpinLock> guard(lock_);
if (query_name != "") {
stats_[query_name].push_back(std::move(metadata));
} else {
stats_[query].push_back(std::move(metadata));
}
}
return result;
}
std::vector<DecodedVertex> vertices;
vertices.reserve(records.size());
for (const auto &record : records) {
vertices.push_back(record[2].ValueVertex());
auto MatchVertex(const std::string &label, int64_t id) {
return Execute(fmt::format("MATCH (n :{} {{id : $id}}) RETURN n", label),
{{"id", id}});
}
return {{vertex_record[0][0].ValueVertex(), edges, vertices}, 3};
}
auto MatchNeighbours(const std::string &label, int64_t id) {
return Execute(
fmt::format("MATCH (n :{} {{id : $id}})-[e]-(m) RETURN n, e, m", label),
{{"id", id}});
}
int ReturnVertexAndEdges(BoltClient &client,
const VertexAndEdges &vertex_and_edges,
const std::string &independent_label) {
int num_queries = 0;
{
auto DetachDeleteVertex(const std::string &label, int64_t id) {
return Execute(
fmt::format("MATCH (n :{} {{id : $id}}) DETACH DELETE n", label),
{{"id", id}});
}
auto CreateVertex(const DecodedVertex &vertex) {
std::stringstream os;
os << "CREATE (n :";
PrintIterable(os, vertex_and_edges.vertex.labels, ":");
PrintIterable(os, vertex.labels, ":");
os << " {";
PrintIterable(os, vertex_and_edges.vertex.properties, ", ",
PrintIterable(os, vertex.properties, ", ",
[&](auto &stream, const auto &pair) {
if (pair.second.type() == DecodedValue::Type::String) {
stream << pair.first << ": \"" << pair.second << "\"";
@ -98,19 +113,17 @@ int ReturnVertexAndEdges(BoltClient &client,
}
});
os << "})";
ExecuteNTimesTillSuccess(client, os.str(), {}, MAX_RETRIES);
++num_queries;
return Execute(os.str(), {}, "CREATE (n :labels... {...})");
}
for (int i = 0; i < static_cast<int>(vertex_and_edges.vertices.size()); ++i) {
auto CreateEdge(const DecodedVertex &from, const std::string &from_label,
int64_t from_id, const std::string &to_label, int64_t to_id,
const DecodedEdge &edge) {
std::stringstream os;
os << "MATCH (n :" << independent_label
<< " {id: " << vertex_and_edges.vertex.properties.at("id") << "}) ";
os << "MATCH (m :" << independent_label
<< " {id: " << vertex_and_edges.vertices[i].properties.at("id") << "}) ";
const auto &edge = vertex_and_edges.edges[i];
os << fmt::format("MATCH (n :{} {{id : {}}}) ", from_label, from_id);
os << fmt::format("MATCH (m :{} {{id : {}}}) ", to_label, to_id);
os << "CREATE (n)";
if (edge.to == vertex_and_edges.vertex.id) {
if (edge.to == from.id) {
os << "<-";
} else {
os << "-";
@ -125,27 +138,128 @@ int ReturnVertexAndEdges(BoltClient &client,
}
});
os << "}]";
if (edge.from == vertex_and_edges.vertex.id) {
if (edge.from == from.id) {
os << "->";
} else {
os << "-";
}
os << "(m)";
os << " RETURN n.id";
auto ret = ExecuteNTimesTillSuccess(client, os.str(), {}, MAX_RETRIES);
auto x = ret.metadata["plan_execution_time"];
auto y = ret.metadata["planning_time"];
if (x.type() == DecodedValue::Type::Double) {
LOG_EVERY_N(INFO, 5000) << "exec " << x.ValueDouble() << " planning "
<< y.ValueDouble();
}
os << "(m) ";
os << "RETURN n.id";
auto ret = Execute(os.str(), {},
"MATCH (n :label {id: ...}) MATCH (m :label {id: ...}) "
"CREATE (n)-[:type ...]-(m)");
CHECK(ret.records.size() == 1U)
<< "Graph in invalid state "
<< vertex_and_edges.vertex.properties.at("id");
++num_queries;
<< "from_id: " << from_id << " "
<< "to_id: " << to_id << " "
<< "ret.records.size(): " << ret.records.size();
return ret;
}
return num_queries;
}
VertexAndEdges RetrieveAndDeleteVertex(const std::string &label, int64_t id) {
auto vertex_record = MatchVertex(label, id).records;
CHECK(vertex_record.size() == 1U)
<< "id: " << id << " "
<< "vertex_record.size(): " << vertex_record.size();
auto records = MatchNeighbours(label, id).records;
DetachDeleteVertex(label, id);
std::vector<DecodedEdge> edges;
edges.reserve(records.size());
for (const auto &record : records) {
edges.push_back(record[1].ValueEdge());
}
std::vector<DecodedVertex> vertices;
vertices.reserve(records.size());
for (const auto &record : records) {
vertices.push_back(record[2].ValueVertex());
}
return {vertex_record[0][0].ValueVertex(), edges, vertices};
}
void ReturnVertexAndEdges(const VertexAndEdges &vertex_and_edges,
const std::string &label) {
int num_queries = 0;
CreateVertex(vertex_and_edges.vertex);
++num_queries;
for (int i = 0; i < static_cast<int>(vertex_and_edges.vertices.size());
++i) {
auto records =
CreateEdge(
vertex_and_edges.vertex, label,
vertex_and_edges.vertex.properties.at("id").ValueInt(), label,
vertex_and_edges.vertices[i].properties.at("id").ValueInt(),
vertex_and_edges.edges[i])
.records;
CHECK(records.size() == 1U)
<< "Graph in invalid state "
<< vertex_and_edges.vertex.properties.at("id");
++num_queries;
}
}
public:
void Run(int id, std::vector<int64_t> to_remove,
std::atomic<bool> &keep_running) {
std::mt19937 rg(id);
std::vector<VertexAndEdges> removed;
const auto &queries = config_["queries"];
const double read_probability = config_["read_probability"];
const std::string independent_label = config_["independent_label"];
while (keep_running) {
std::uniform_real_distribution<> real_dist(0.0, 1.0);
// Read query.
if (real_dist(rg) < read_probability) {
CHECK(queries.size())
<< "Specify at least one read query or set read_probability to 0";
std::uniform_int_distribution<> read_query_dist(0, queries.size() - 1);
const auto &query = queries[read_query_dist(rg)];
std::map<std::string, DecodedValue> params;
for (const auto &param : query["params"]) {
std::uniform_int_distribution<int64_t> param_value_dist(
param["low"], param["high"]);
params[param["name"]] = param_value_dist(rg);
}
Execute(query["query"], params);
} else {
auto remove_random = [&](auto &v) {
CHECK(v.size());
std::uniform_int_distribution<> int_dist(0, v.size() - 1);
std::swap(v.back(), v[int_dist(rg)]);
auto ret = v.back();
v.pop_back();
return ret;
};
if (real_dist(rg) < static_cast<double>(removed.size()) /
(removed.size() + to_remove.size())) {
auto vertices_and_edges = remove_random(removed);
ReturnVertexAndEdges(vertices_and_edges, independent_label);
to_remove.push_back(
vertices_and_edges.vertex.properties["id"].ValueInt());
} else {
auto node_id = remove_random(to_remove);
auto ret = RetrieveAndDeleteVertex(independent_label, node_id);
removed.push_back(ret);
}
}
}
}
auto ConsumeStats() {
std::unique_lock<SpinLock> guard(lock_);
auto stats = stats_;
stats_.clear();
return stats;
}
};
int64_t NumNodes(BoltClient &client, const std::string &label) {
auto result = ExecuteNTimesTillSuccess(
@ -167,56 +281,66 @@ std::vector<int64_t> Neighbours(BoltClient &client, const std::string &label,
return ret;
}
std::vector<int64_t> IndependentSet(BoltClient &client,
const std::string &label) {
const int64_t num_nodes = NumNodes(client, label);
std::vector<int64_t> independent_nodes_ids;
std::vector<int64_t> ids;
std::unordered_set<int64_t> independent;
for (int64_t i = 1; i <= num_nodes; ++i) {
ids.push_back(i);
independent.insert(i);
}
{
std::mt19937 mt;
std::shuffle(ids.begin(), ids.end(), mt);
}
for (auto i : ids) {
if (independent.find(i) == independent.end()) continue;
independent.erase(i);
std::vector<int64_t> neighbour_ids = Neighbours(client, label, i);
independent_nodes_ids.push_back(i);
for (auto j : neighbour_ids) {
independent.erase(j);
}
}
LOG(INFO) << "Number of nodes nodes: " << num_nodes << "\n"
<< "Number of independent nodes: " << independent_nodes_ids.size();
return independent_nodes_ids;
}
int main(int argc, char **argv) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
google::InitGoogleLogging(argv[0]);
nlohmann::json config;
std::cin >> config;
const auto &queries = config["queries"];
const double read_probability = config["read_probability"];
const std::string independent_label = config["independent_label"];
std::vector<int64_t> independent_nodes_ids;
BoltClient client(FLAGS_address, FLAGS_port, FLAGS_username, FLAGS_password);
const int64_t num_nodes = NumNodes(client, independent_label);
{
std::vector<int64_t> ids;
std::unordered_set<int64_t> independent;
for (int64_t i = 1; i <= num_nodes; ++i) {
ids.push_back(i);
independent.insert(i);
}
{
std::mt19937 mt;
std::shuffle(ids.begin(), ids.end(), mt);
}
for (auto i : ids) {
if (independent.find(i) == independent.end()) continue;
independent.erase(i);
std::vector<int64_t> neighbour_ids =
Neighbours(client, independent_label, i);
independent_nodes_ids.push_back(i);
for (auto j : neighbour_ids) {
independent.erase(j);
}
}
}
auto independent_nodes_ids = [&] {
BoltClient client(FLAGS_address, FLAGS_port, FLAGS_username,
FLAGS_password);
return IndependentSet(client, independent_label);
}();
utils::Timer timer;
std::vector<std::thread> threads;
std::atomic<int64_t> executed_queries{0};
std::atomic<bool> keep_running{true};
LOG(INFO) << "nodes " << num_nodes << " independent "
<< independent_nodes_ids.size();
int64_t next_to_assign = 0;
std::vector<std::unique_ptr<Session>> sessions;
sessions.reserve(FLAGS_num_workers);
for (int i = 0; i < FLAGS_num_workers; ++i) {
int64_t size = independent_nodes_ids.size();
int64_t next_next_to_assign = next_to_assign + size / FLAGS_num_workers +
(i < size % FLAGS_num_workers);
sessions.push_back(std::make_unique<Session>(
config, FLAGS_address, FLAGS_port, FLAGS_username, FLAGS_password));
std::vector<int64_t> to_remove(
independent_nodes_ids.begin() + next_to_assign,
independent_nodes_ids.begin() + next_next_to_assign);
@ -224,57 +348,9 @@ int main(int argc, char **argv) {
next_to_assign = next_next_to_assign;
threads.emplace_back(
[&](int thread_id, std::vector<int64_t> to_remove) {
BoltClient client(FLAGS_address, FLAGS_port, FLAGS_username,
FLAGS_password);
std::mt19937 random_gen(thread_id);
std::vector<VertexAndEdges> removed;
while (keep_running) {
std::uniform_real_distribution<> real_dist(0.0, 1.0);
// Read query.
if (real_dist(random_gen) < read_probability) {
std::uniform_int_distribution<> read_query_dist(
0, static_cast<int>(queries.size()) - 1);
const auto &query = queries[read_query_dist(random_gen)];
std::map<std::string, DecodedValue> params;
for (const auto &param : query["params"]) {
std::uniform_int_distribution<int64_t> param_value_dist(
param["low"], param["high"]);
params[param["name"]] = param_value_dist(random_gen);
}
ExecuteNTimesTillSuccess(client, query["query"], params,
MAX_RETRIES);
++executed_queries;
} else {
if (real_dist(random_gen) <
static_cast<double>(removed.size()) /
(removed.size() + to_remove.size())) {
CHECK(removed.size());
std::uniform_int_distribution<> int_dist(0, removed.size() - 1);
std::swap(removed.back(), removed[int_dist(random_gen)]);
executed_queries += ReturnVertexAndEdges(client, removed.back(),
independent_label);
to_remove.push_back(
removed.back().vertex.properties["id"].ValueInt());
removed.pop_back();
} else {
CHECK(to_remove.size());
std::uniform_int_distribution<> int_dist(0,
to_remove.size() - 1);
std::swap(to_remove.back(), to_remove[int_dist(random_gen)]);
auto ret = DetachDeleteVertex(client, independent_label,
to_remove.back());
removed.push_back(ret.first);
to_remove.pop_back();
executed_queries += ret.second;
}
}
}
client.Close();
[&](int thread_id, const std::vector<int64_t> to_remove) {
sessions[thread_id]->Run(thread_id, std::move(to_remove),
keep_running);
},
i, std::move(to_remove));
}
@ -291,9 +367,63 @@ int main(int argc, char **argv) {
std::ostream out(buf);
while (timer.Elapsed().count() < FLAGS_duration) {
std::unordered_map<std::string, std::map<std::string, DecodedValue>>
aggregated_stats;
using namespace std::chrono_literals;
out << "{ \"num_executed_queries\": " << executed_queries << ", "
<< "\"elapsed_time\": " << timer.Elapsed().count() << "}" << std::endl;
std::unordered_map<std::string,
std::vector<std::map<std::string, DecodedValue>>>
stats;
for (const auto &session : sessions) {
auto session_stats = session->ConsumeStats();
for (const auto &session_query_stats : session_stats) {
auto &query_stats = stats[session_query_stats.first];
query_stats.insert(query_stats.end(),
session_query_stats.second.begin(),
session_query_stats.second.end());
}
}
// TODO: Here we combine pure values, json and DecodedValue which is a
// little bit chaotic. Think about refactoring this part to only use json
// and write DecodedValue to json converter.
const std::vector<std::string> fields = {
"wall_time", "parsing_time", "planning_time", "plan_execution_time",
};
for (const auto &query_stats : stats) {
std::map<std::string, double> new_aggregated_query_stats;
for (const auto &stats : query_stats.second) {
for (const auto &field : fields) {
auto it = stats.find(field);
if (it != stats.end()) {
new_aggregated_query_stats[field] += it->second.ValueDouble();
}
}
}
int64_t new_count = query_stats.second.size();
auto &aggregated_query_stats = aggregated_stats[query_stats.first];
aggregated_query_stats.insert({"count", DecodedValue(0)});
auto old_count = aggregated_query_stats["count"].ValueInt();
aggregated_query_stats["count"].ValueInt() += new_count;
for (const auto &stat : new_aggregated_query_stats) {
auto it = aggregated_query_stats.insert({stat.first, DecodedValue(0.0)})
.first;
it->second =
(it->second.ValueDouble() * old_count + stat.second * new_count) /
(old_count + new_count);
}
}
out << "{\"num_executed_queries\": " << executed_queries << ", "
<< "\"elapsed_time\": " << timer.Elapsed().count()
<< ", \"queries\": [";
PrintIterable(out, aggregated_stats, ", ", [](auto &stream, const auto &x) {
stream << "{\"query\": " << nlohmann::json(x.first) << ", \"stats\": ";
PrintJsonDecodedValue(stream, DecodedValue(x.second));
stream << "}";
});
out << "]}" << std::endl;
out.flush();
std::this_thread::sleep_for(1s);
}

View File

@ -19,6 +19,7 @@ class LongRunningSuite:
def __init__(self, args):
argp = ArgumentParser("LongRunningSuiteArgumentParser")
argp.add_argument("--num-client-workers", default=4)
argp.add_argument("--duration", type=int)
self.args, _ = argp.parse_known_args(args)
pass
@ -30,6 +31,8 @@ class LongRunningSuite:
config = next(scenario.get("config")())
duration = config["duration"]
if self.args.duration:
duration = self.args.duration
log.info("Executing run for {} seconds with {} client workers".format(
duration, self.args.num_client_workers))
results = runner.run(next(scenario.get("run")()), duration,
@ -38,15 +41,19 @@ class LongRunningSuite:
runner.stop()
measurements = []
summary_format = "{:>15} {:>22}\n"
self.summary = summary_format.format(
"elapsed_time", "num_executed_queries")
for result in results:
print(result["num_executed_queries"], result["elapsed_time"])
self.summary += summary_format.format(
result["elapsed_time"], result["num_executed_queries"])
# TODO: Revise this.
measurements.append({
"target": "throughput",
"value": result["num_executed_queries"] / result["elapsed_time"],
"unit": "queries per second",
"type": "throughput"})
self.summary = "Throughtput: " + str(measurements[-1]["value"])
self.summary += "\n\nThroughtput: " + str(measurements[-1]["value"])
return measurements
def runners(self):

80
tools/plot_througput Executable file
View File

@ -0,0 +1,80 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import os
import matplotlib.pyplot as plt
from matplotlib.cbook import get_sample_data
from argparse import ArgumentParser
COLORS = {
'memgraph': '#ff7300',
'neo4j': '#008cc2'
}
def parse_args():
argp = ArgumentParser(description=__doc__)
argp.add_argument("--vendor-references", nargs="+",
help="Short references that represent all the "
"vendors that are going to be "
"visualized on the plot.")
argp.add_argument("--vendor-titles", nargs="+",
help="Vender titles that are going to appear "
"on the plot, e.g. legend titles.")
argp.add_argument("--results", nargs="+",
help="Result files for each vendor")
argp.add_argument("--plot-title", default="{{Plot title placeholder}}",
help="Plot title.")
#argp.add_argument("--max-label-width", default=11, type=int,
# help="Maximum length of the x-axis labels (-1 is unlimited)")
return argp.parse_args()
def main():
# Read the arguments.
args = parse_args()
# Prepare the datastructure.
vendors = {}
for vendor_reference, vendor_title, vendor_results in \
zip(args.vendor_references, args.vendor_titles, args.results):
vendors[vendor_reference] = {}
vendors[vendor_reference]['title'] = vendor_title
vendors[vendor_reference]['results_path'] = vendor_results
vendors[vendor_reference]['color'] = COLORS[vendor_reference]
vendors[vendor_reference]['t'] = []
vendors[vendor_reference]['dq/dt'] = []
fig, ax = plt.subplots()
ax.set_ylabel('Throughput (queries per second)')
ax.set_xlabel('Time (seconds)')
ax.set_title(args.plot_title)
ax.set_aspect(0.01)
# Collect the benchmark data and plot lines.
print("Pokec throughput")
for vendor_reference, vendor_data in vendors.items():
print("Vendor: %s" % vendor_reference)
with open(vendor_data['results_path']) as results_file:
# Skip first line which contains titles.
prev_time, prev_num_queries = 0.0, 0
for line in results_file.readlines()[1:]:
data = line.split()
if data == []: break
assert len(data) == 2, "Invalid data"
new_time = float(data[0])
new_num_quries = int(data[1])
dt, dq = new_time - prev_time, new_num_quries - prev_num_queries
prev_time, prev_num_queries = new_time, new_num_quries
vendor_data['t'].append(new_time)
vendor_data['dq/dt'].append(dq / dt)
line1, = ax.plot(vendor_data['t'], vendor_data['dq/dt'], '-', linewidth=2,
label=vendor_data['title'], color=vendor_data['color'])
ax.legend(loc='lower right')
plt.show()
if __name__ == '__main__':
main()

8
tools/requirements.txt Normal file
View File

@ -0,0 +1,8 @@
cycler==0.10.0
matplotlib==2.0.2
numpy==1.13.1
pkg-resources==0.0.0
pyparsing==2.2.0
python-dateutil==2.6.1
pytz==2017.2
six==1.11.0