Add bfs benchmark on pokec dataset

Reviewers: ipaljak, mferencevic

Reviewed By: ipaljak, mferencevic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1533
This commit is contained in:
Marko Culinovic 2018-08-10 14:35:43 +02:00
parent cd3210fb9b
commit 158f97206d
9 changed files with 188 additions and 11 deletions

View File

@ -78,11 +78,12 @@ class QueryClient:
class LongRunningClient:
def __init__(self, args, default_num_workers):
def __init__(self, args, default_num_workers, workload):
self.log = logging.getLogger("LongRunningClient")
self.client = jail.get_process()
set_cpus("client-cpu-ids", self.client, args)
self.default_num_workers = default_num_workers
self.workload = workload
# TODO: This is quite similar to __call__ method of QueryClient. Remove
# duplication.
@ -114,7 +115,9 @@ class LongRunningClient:
client_args = ["--port", database.args.port,
"--num-workers", str(num_workers),
"--output", output,
"--duration", str(duration)]
"--duration", str(duration),
"--db", database.name,
"--scenario", self.workload]
return_code = self.client.run_and_wait(
client, client_args, timeout=600, stdin=config_path)

View File

@ -0,0 +1,122 @@
#include <array>
#include <chrono>
#include <fstream>
#include <iostream>
#include <queue>
#include <random>
#include <sstream>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <json/json.hpp>
#include "io/network/utils.hpp"
#include "utils/algorithm.hpp"
#include "utils/timer.hpp"
#include "long_running_common.hpp"
using communication::bolt::Edge;
using communication::bolt::Value;
using communication::bolt::Vertex;
class BfsPokecClient : public TestClient {
public:
BfsPokecClient(int id, const std::string &db)
: TestClient(), rg_(id), db_(db) {
auto result = Execute("MATCH (n:User) RETURN count(1)", {}, "NumNodes");
CHECK(result) << "Read-only query should not fail";
num_nodes_ = result->records[0][0].ValueInt();
}
private:
std::mt19937 rg_;
std::string db_;
int num_nodes_;
int RandomId() {
std::uniform_int_distribution<int64_t> dist(1, num_nodes_);
auto id = dist(rg_);
return id;
}
void BfsWithDestinationNode() {
auto start = RandomId();
auto end = RandomId();
while (start == end) {
end = RandomId();
}
if (FLAGS_db == "memgraph") {
auto result = Execute(
"MATCH p = (n:User {id: $start})-[*bfs..15]->(m:User {id: $end}) "
"RETURN nodes(p) AS path LIMIT 1",
{{"start", start}, {"end", end}}, "Bfs");
CHECK(result) << "Read-only query should not fail!";
} else if (FLAGS_db == "neo4j") {
auto result = Execute(
"MATCH p = shortestPath("
"(n:User {id: $start})-[*..15]->(m:User {id: $end}))"
"RETURN [x in nodes(p) | x.id] AS path;",
{{"start", start}, {"end", end}}, "Bfs");
CHECK(result) << "Read-only query should not fail!";
}
}
void BfsWithoutDestinationNode() {
auto start = RandomId();
if (FLAGS_db == "memgraph") {
auto result = Execute(
"MATCH p = (n:User {id: $start})-[*bfs..15]->(m:User) WHERE m != n "
"RETURN nodes(p) AS path",
{{"start", start}}, "Bfs");
CHECK(result) << "Read-only query should not fail!";
} else {
auto result = Execute(
"MATCH p = shortestPath("
"(n:User {id: $start})-[*..15]->(m:User)) WHERE m <> n "
"RETURN [x in nodes(p) | x.id] AS path;",
{{"start", start}}, "Bfs");
CHECK(result) << "Read-only query should not fail!";
}
}
public:
virtual void Step() override {
if (FLAGS_scenario == "with_destination_node") {
BfsWithDestinationNode();
return;
}
if (FLAGS_scenario == "without_destination_node") {
BfsWithoutDestinationNode();
return;
}
LOG(FATAL) << "Should not get here: unknown scenario!";
}
};
int main(int argc, char **argv) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
google::InitGoogleLogging(argv[0]);
communication::Init();
Endpoint endpoint(FLAGS_address, FLAGS_port);
ClientContext context(FLAGS_use_ssl);
Client client(&context);
if (!client.Connect(endpoint, FLAGS_username, FLAGS_password)) {
LOG(FATAL) << "Couldn't connect to " << endpoint;
}
std::vector<std::unique_ptr<TestClient>> clients;
for (auto i = 0; i < FLAGS_num_workers; ++i) {
clients.emplace_back(std::make_unique<BfsPokecClient>(i, "memgraph"));
}
RunMultithreadedTest(clients);
return 0;
}

View File

@ -22,6 +22,7 @@
const int MAX_RETRIES = 30;
DEFINE_string(db, "", "Database queries are executed on.");
DEFINE_string(address, "127.0.0.1", "Server address");
DEFINE_int32(port, 7687, "Server port");
DEFINE_int32(num_workers, 1, "Number of workers");
@ -35,6 +36,7 @@ DEFINE_string(group, "unknown", "Test group name");
DEFINE_string(scenario, "unknown", "Test scenario name");
auto &executed_queries = stats::GetCounter("executed_queries");
auto &executed_steps = stats::GetCounter("executed_steps");
auto &serialization_errors = stats::GetCounter("serialization_errors");
class TestClient {
@ -59,6 +61,7 @@ class TestClient {
runner_thread_ = std::thread([&] {
while (keep_running_) {
Step();
executed_steps.Bump();
}
});
}
@ -185,6 +188,7 @@ void RunMultithreadedTest(std::vector<std::unique_ptr<TestClient>> &clients) {
}
out << "{\"num_executed_queries\": " << executed_queries.Value() << ", "
<< "\"num_executed_steps\": " << executed_steps.Value() << ", "
<< "\"elapsed_time\": " << timer.Elapsed().count()
<< ", \"queries\": [";
utils::PrintIterable(

View File

@ -40,6 +40,7 @@ class Memgraph:
self.config = config
self.num_workers = num_workers
self.database_bin = jail.get_process()
self.name = "memgraph"
set_cpus("database-cpu-ids", self.database_bin, args)
def start(self):
@ -90,6 +91,7 @@ class Neo:
self.args, _ = argp.parse_known_args(args)
self.config = config
self.database_bin = jail.get_process()
self.name = "neo4j"
set_cpus("database-cpu-ids", self.database_bin, args)
def start(self):

View File

@ -0,0 +1,4 @@
{
"duration": 30,
"client": "bfs_pokec_client"
}

View File

@ -0,0 +1,6 @@
#!/bin/bash -e
working_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
cd ${working_dir}
wget -nv -O pokec_small.setup.cypher http://deps.memgraph.io/pokec_small.setup.cypher

View File

@ -0,0 +1 @@
{}

View File

@ -34,34 +34,38 @@ class LongRunningSuite:
runner.stop()
measurements = []
summary_format = "{:>15} {:>22}\n"
summary_format = "{:>15} {:>22} {:>22}\n"
self.summary = summary_format.format(
"elapsed_time", "num_executed_queries")
"elapsed_time", "num_executed_queries", "num_executed_steps")
for result in results:
self.summary += summary_format.format(
result["elapsed_time"], result["num_executed_queries"])
result["elapsed_time"], result["num_executed_queries"],
result["num_executed_steps"])
measurements.append({
"target": "throughput",
"time": result["elapsed_time"],
"value": result["num_executed_queries"],
"steps": result["num_executed_steps"],
"unit": "number of executed queries",
"type": "throughput"})
self.summary += "\n\nThroughtput: " + str(measurements[-1]["value"])
self.summary += "\n\nThroughput: " + str(measurements[-1]["value"])
self.summary += "\nExecuted steps: " + str(measurements[-1]["steps"])
return measurements
def runners(self):
return {"MemgraphRunner": MemgraphRunner, "NeoRunner": NeoRunner}
def groups(self):
return ["pokec", "card_fraud"]
return ["pokec", "card_fraud", "bfs_pokec"]
class _LongRunningRunner:
def __init__(self, args, database, num_client_workers):
def __init__(self, args, database, num_client_workers, workload):
self.log = logging.getLogger("_LongRunningRunner")
self.database = database
self.query_client = QueryClient(args, num_client_workers)
self.long_running_client = LongRunningClient(args, num_client_workers)
self.long_running_client = LongRunningClient(args, num_client_workers,
workload)
def start(self):
self.database.start()
@ -93,6 +97,9 @@ class MemgraphRunner(_LongRunningRunner):
help="Number of workers")
argp.add_argument("--num-client-workers", type=int, default=24,
help="Number of clients")
argp.add_argument("--workload", type=str, default="",
help="Type of client workload. Sets \
scenario flag for 'TestClient'")
self.args, remaining_args = argp.parse_known_args(args)
assert not APOLLO or self.args.num_database_workers, \
"--num-database-workers is obligatory flag on apollo"
@ -101,7 +108,8 @@ class MemgraphRunner(_LongRunningRunner):
database = Memgraph(remaining_args, self.args.runner_config,
self.args.num_database_workers)
super(MemgraphRunner, self).__init__(
remaining_args, database, self.args.num_client_workers)
remaining_args, database, self.args.num_client_workers,
self.args.workload)
class NeoRunner(_LongRunningRunner):
@ -115,9 +123,13 @@ class NeoRunner(_LongRunningRunner):
help="Path to neo config file")
argp.add_argument("--num-client-workers", type=int, default=24,
help="Number of clients")
argp.add_argument("--workload", type=str, default="",
help="Type of client workload. Sets \
scenario flag for 'TestClient'")
self.args, remaining_args = argp.parse_known_args(args)
assert not APOLLO or self.args.num_client_workers, \
"--client-num-clients is obligatory flag on apollo"
database = Neo(remaining_args, self.args.runner_config)
super(NeoRunner, self).__init__(
remaining_args, database, self.args.num_client_workers)
remaining_args, database, self.args.num_client_workers,
self.args.workload)

View File

@ -0,0 +1,23 @@
#!/bin/bash -e
script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
# Run bfs pokec bench (download dataset, run neo and memgraph, plot the results).
cd ${script_dir}
mkdir -p .results/bfs_pokec/
${script_dir}/groups/bfs_pokec/download_dataset
./harness LongRunningSuite MemgraphRunner --groups bfs_pokec --workload with_destination_node
mv .harness_summary ${script_dir}/.results/bfs_pokec/memgraph_bfs_1.summary
./harness LongRunningSuite NeoRunner --groups bfs_pokec --workload with_destination_node
mv .harness_summary ${script_dir}/.results/bfs_pokec/neo4j_bfs_1.summary
./harness LongRunningSuite MemgraphRunner --groups bfs_pokec --workload without_destination_node
mv .harness_summary ${script_dir}/.results/bfs_pokec/memgraph_bfs_2.summary
./harness LongRunningSuite NeoRunner --groups bfs_pokec --workload without_destination_node
mv .harness_summary ${script_dir}/.results/bfs_pokec/neo4j_bfs_2.summary