Add memgraph ha normal operation long running test

Summary:
The functionality of the test is the same as for single node Memgraph.
Locally, it seems to work fine. I'll update the apollo related files when I feel
a bit more certain that everything works locally.

Reviewers: mferencevic

Reviewed By: mferencevic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2135
This commit is contained in:
Ivan Paljak 2019-07-09 13:01:06 +02:00
parent 76d8020169
commit 2936312612
5 changed files with 660 additions and 0 deletions

View File

@ -17,3 +17,6 @@ endfunction(add_stress_test)
add_stress_test(long_running.cpp)
target_link_libraries(${test_prefix}long_running mg-communication mg-io mg-utils)
add_stress_test(ha_normal_operation_long_running)
target_link_libraries(${test_prefix}ha_normal_operation_long_running mg-communication mg-io mg-utils)

View File

@ -19,6 +19,13 @@
commands: TIMEOUT=43200 ./continuous_integration --large-dataset
infiles: *STRESS_INFILES
- name: stress_ha
commands: TIMEOUT=200 ./continuous_integration_ha
infiles: &STRESS_HA_INFILES
- . # current directory
- ../../build_release/memgraph_ha # memgraph release binary
- ../../build_release/tests/stress/ # stress client binaries
- name: durability
commands: TIMEOUT=300 ./ve3/bin/python3 durability --num-steps 5
infiles: &DURABILITY_INFILES

View File

@ -0,0 +1,197 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import atexit
import json
import multiprocessing
import os
import subprocess
import sys
import tempfile
import time
# dataset calibrated for running on Apollo (total 3min)
# long_running runs for 1min
# long_running runs for 2min
SMALL_DATASET = [
{
"test": "ha_normal_operation_long_running.cpp",
"options": ["--vertex-count", "1000", "--edge-count", "5000", "--max-time", "1", "--verify", "20"],
"timeout": 5,
},
{
"test": "ha_normal_operation_long_running.cpp",
"options": ["--vertex-count", "10000", "--edge-count", "50000", "--max-time", "2", "--verify", "30"],
"timeout": 5,
},
]
# dataset calibrated for running on daily stress instance (total 2h)
# long_running runs for 2h
LARGE_DATASET = [
{
"test": "long_running.cpp",
"options": ["--vertex-count", "200000", "--edge-count", "1000000", "--max-time", "120", "--verify", "300"],
"timeout": 140,
},
]
# paths
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
BASE_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", ".."))
BUILD_DIR = os.path.join(BASE_DIR, "build")
CONFIG_DIR = os.path.join(BASE_DIR, "config")
MEASUREMENTS_FILE = os.path.join(SCRIPT_DIR, ".apollo_measurements")
KEY_FILE = os.path.join(SCRIPT_DIR, ".key.pem")
CERT_FILE = os.path.join(SCRIPT_DIR, ".cert.pem")
# long running stats file
STATS_FILE = os.path.join(SCRIPT_DIR, ".ha_normal_operation_long_running_stats")
SMALL_DATASET[-1]["options"].extend(["--stats-file", STATS_FILE])
LARGE_DATASET[-1]["options"].extend(["--stats-file", STATS_FILE])
# HA related constants
CLUSTER_SIZE = 3
PORT = 7687
RAFT_CONFIG_FILE = os.path.join(SCRIPT_DIR, "raft.json")
# parse arguments
parser = argparse.ArgumentParser(description = "Run stress tests on Memgraph.")
parser.add_argument("--memgraph", default = os.path.join(BUILD_DIR,
"memgraph_ha"))
parser.add_argument("--config", default = os.path.join(CONFIG_DIR,
"stress.conf"))
parser.add_argument("--log-file", default = "")
parser.add_argument("--durability-directory", default = "")
parser.add_argument("--python", default = os.path.join(SCRIPT_DIR,
"ve3", "bin", "python3"), type = str)
parser.add_argument("--large-dataset", action = "store_const",
const = True, default = False)
parser.add_argument("--verbose", action = "store_const",
const = True, default = False)
parser.add_argument("--threads", type = int, default = 8)
args = parser.parse_args()
# run test helper function
def run_test(args, test, options, timeout):
print("Running test '{}'".format(test))
# find binary
if test.endswith(".py"):
logging = "DEBUG" if args.verbose else "WARNING"
binary = [args.python, "-u", os.path.join(SCRIPT_DIR, test),
"--logging", logging]
elif test.endswith(".cpp"):
exe = os.path.join(BUILD_DIR, "tests", "stress", test[:-4])
if not os.path.exists(exe):
exe = os.path.join(BASE_DIR, "build_release", "tests", "stress",
test[:-4])
binary = [exe]
else:
raise Exception("Test '{}' binary not supported!".format(test))
# start test
cmd = binary + ["--worker-count", str(args.threads)] + options
start = time.time()
ret_test = subprocess.run(cmd, cwd = SCRIPT_DIR, timeout = timeout * 60)
if ret_test.returncode != 0:
raise Exception("Test '{}' binary returned non-zero ({})!".format(
test, ret_test.returncode))
runtime = time.time() - start
print(" Done after {:.3f} seconds".format(runtime))
return runtime
# find HA memgraph binary
if not os.path.exists(args.memgraph):
args.memgraph = os.path.join(BASE_DIR, "build_release", "memgraph_ha")
# Generate database instance flags
workers = [None for worker in range(CLUSTER_SIZE)]
durability_directory = tempfile.TemporaryDirectory()
coordination_config_file = tempfile.NamedTemporaryFile(mode="w")
def generate_cmd_args(worker_id):
cmd = [args.memgraph]
cmd.extend(["--server-id", str(worker_id + 1)])
cmd.extend(["--port", str(PORT + worker_id)])
cmd.extend(["--raft-config-file", RAFT_CONFIG_FILE])
cmd.extend(["--coordination-config-file", coordination_config_file.name])
# generate durability directory
dur = os.path.join(durability_directory.name, "worker" + str(worker_id))
cmd.extend(["--durability-directory", dur])
return cmd
def wait_for_server(port, delay=0.1):
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)]
while subprocess.call(cmd) != 0:
time.sleep(delay)
time.sleep(delay)
def start_worker(worker_id):
global workers
assert worker_id >= 0 and worker_id < CLUSTER_SIZE, \
"Invalid worker ID {}".format(worker_id)
assert workers[worker_id] is None, \
"Worker with id {} already exists".format(worker_id)
workers[worker_id] = subprocess.Popen(generate_cmd_args(worker_id))
time.sleep(0.2)
assert workers[worker_id].poll() is None, \
"Worker {} process died prematurely!".format(worker_id)
wait_for_server(PORT + worker_id)
# Generate coorindation config file
data = []
for i in range(CLUSTER_SIZE):
data.append([i + 1, "127.0.0.1", 10000 + i])
coordination_config_file.write(json.dumps(data))
coordination_config_file.flush()
# Start Ha Memgraph cluster
for worker_id in range(CLUSTER_SIZE):
start_worker(worker_id)
# at exit cleanup
@atexit.register
def cleanup():
for proc_mg in workers:
if proc_mg.poll() is not None: continue
proc_mg.kill()
proc_mg.wait()
# run tests
runtimes = {}
dataset = LARGE_DATASET if args.large_dataset else SMALL_DATASET
for test in dataset:
runtime = run_test(args, **test)
runtimes[os.path.splitext(test["test"])[0]] = runtime
# stop memgraph
for proc_mg in workers:
proc_mg.terminate()
ret_mg = proc_mg.wait()
if ret_mg != 0:
raise Exception("Memgraph binary returned non-zero ({})!".format(ret_mg))
# measurements
measurements = ""
for key, value in runtimes.items():
measurements += "{}.runtime {}\n".format(key, value)
with open(STATS_FILE) as f:
stats = f.read().split("\n")
measurements += "long_running.queries.executed {}\n".format(stats[0])
measurements += "long_running.queries.failed {}\n".format(stats[1])
with open(MEASUREMENTS_FILE, "w") as f:
f.write(measurements)
print("Done!")

View File

@ -0,0 +1,446 @@
#include <fstream>
#include <random>
#include <set>
#include <thread>
#include <fmt/format.h>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "communication/bolt/ha_client.hpp"
#include "io/network/endpoint.hpp"
#include "io/network/utils.hpp"
#include "utils/exceptions.hpp"
#include "utils/string.hpp"
#include "utils/timer.hpp"
using EndpointT = io::network::Endpoint;
using ClientContextT = communication::ClientContext;
using ClientT = communication::bolt::HAClient;
using ValueT = communication::bolt::Value;
using QueryDataT = communication::bolt::QueryData;
using ExceptionT = communication::bolt::ClientQueryException;
DEFINE_string(endpoints, "127.0.0.1:7687,127.0.0.1:7688,127.0.0.1:7689",
"Cluster server endpoints (host:port, separated by comma).");
DEFINE_int32(cluster_size, 3, "Size of the raft cluster.");
DEFINE_string(username, "", "Username for the database");
DEFINE_string(password, "", "Password for the database");
DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server.");
DEFINE_int32(vertex_count, 0,
"The average number of vertices in the graph per worker");
DEFINE_int32(edge_count, 0,
"The average number of edges in the graph per worker");
DEFINE_int32(prop_count, 5, "The max number of properties on a node");
DEFINE_uint64(max_queries, 1U << 30U, "Maximum number of queries to execute");
DEFINE_int32(max_time, 1, "Maximum execution time in minutes");
DEFINE_int32(verify, 0, "Interval (seconds) between checking local info");
DEFINE_int32(worker_count, 1,
"The number of workers that operate on the graph independently");
DEFINE_bool(global_queries, true,
"If queries that modifiy globally should be executed sometimes");
DEFINE_string(stats_file, "", "File into which to write statistics.");
std::vector<EndpointT> GetEndpoints() {
std::vector<io::network::Endpoint> ret;
for (const auto &endpoint : utils::Split(FLAGS_endpoints, ",")) {
auto split = utils::Split(utils::Trim(endpoint), ":");
CHECK(split.size() == 2) << "Invalid endpoint!";
ret.emplace_back(
io::network::ResolveHostname(std::string(utils::Trim(split[0]))),
static_cast<uint16_t>(std::stoi(std::string(utils::Trim(split[1])))));
}
return ret;
}
/**
* Encapsulates a Graph and a Bolt session and provides CRUD op functions.
* Also defines a run-loop for a generic exectutor, and a graph state
* verification function.
*/
class GraphSession {
public:
explicit GraphSession(int id)
: id_(id),
indexed_label_(fmt::format("indexed_label{}", id)),
generator_{std::random_device{}()} {
for (int i = 0; i < FLAGS_prop_count; ++i) {
auto label = fmt::format("label{}", i);
labels_.insert(label);
labels_vertices_.insert({label, {}});
}
std::vector<EndpointT> endpoints = GetEndpoints();
uint64_t retries = 15;
std::chrono::milliseconds retry_delay(1000);
ClientContextT context(FLAGS_use_ssl);
client_ = std::make_unique<ClientT>(endpoints, &context_, FLAGS_username,
FLAGS_password, retries, retry_delay);
}
private:
uint64_t id_;
ClientContextT context_{FLAGS_use_ssl};
std::unique_ptr<ClientT> client_;
uint64_t vertex_id_{0};
uint64_t edge_id_{0};
std::set<uint64_t> vertices_;
std::set<uint64_t> edges_;
std::string indexed_label_;
std::set<std::string> labels_;
std::map<std::string, std::set<uint64_t>> labels_vertices_;
uint64_t executed_queries_{0};
std::map<std::string, uint64_t> query_failures_;
std::mt19937 generator_;
utils::Timer timer_;
private:
double GetRandom() { return std::generate_canonical<double, 10>(generator_); }
bool Bernoulli(double p) { return GetRandom() < p; }
template<typename T>
T RandomElement(const std::set<T> &data) {
uint32_t pos = std::floor(GetRandom() * data.size());
auto it = data.begin();
std::advance(it, pos);
return *it;
}
void AddQueryFailure(std::string what) {
auto it = query_failures_.find(what);
if (it != query_failures_.end()) {
++it->second;
} else {
query_failures_.insert(std::make_pair(what, 1));
}
}
QueryDataT Execute(std::string query) {
try {
DLOG(INFO) << "Runner " << id_ << " executing query: " << query;
executed_queries_ += 1;
return client_->Execute(query, {});
} catch (const ExceptionT &e) {
AddQueryFailure(std::string{e.what()});
return QueryDataT();
}
}
void CreateVertices(uint64_t vertices_count) {
if (vertices_count == 0) return;
auto ret = Execute(fmt::format(
"UNWIND RANGE({}, {}) AS r CREATE (n:{} {{id: r}}) RETURN count(n)",
vertex_id_, vertex_id_ + vertices_count - 1, indexed_label_));
CHECK(ret.records.size() == 1) << "Vertices creation failed!";
CHECK(ret.records[0][0].ValueInt() == vertices_count)
<< "Created " << ret.records[0][0].ValueInt() << " vertices instead of "
<< vertices_count << "!";
for (uint64_t i = 0; i < vertices_count; ++i) {
vertices_.insert(vertex_id_ + i);
}
vertex_id_ += vertices_count;
}
void RemoveVertex() {
auto vertex_id = RandomElement(vertices_);
auto ret =
Execute(fmt::format("MATCH (n:{} {{id: {}}}) OPTIONAL MATCH (n)-[r]-() "
"DETACH DELETE n RETURN n.id, labels(n), r.id",
indexed_label_, vertex_id));
if (!ret.records.empty()) {
std::set<uint64_t> processed_vertices;
for (auto &record : ret.records) {
// remove vertex but note there could be duplicates
auto n_id = record[0].ValueInt();
if (processed_vertices.insert(n_id).second) {
vertices_.erase(n_id);
for (auto &label : record[1].ValueList()) {
if (label.ValueString() == indexed_label_) {
continue;
}
labels_vertices_[label.ValueString()].erase(n_id);
}
}
// remove edge
auto &edge = record[2];
if (edge.type() == ValueT::Type::Int) {
edges_.erase(edge.ValueInt());
}
}
}
}
void CreateEdges(uint64_t edges_count) {
if (edges_count == 0) return;
auto edges_per_node = (double)edges_count / vertices_.size();
CHECK(std::abs(edges_per_node - (int64_t)edges_per_node) < 0.0001)
<< "Edges per node not a whole number";
auto ret = Execute(fmt::format(
"MATCH (a:{0}) WITH a "
"UNWIND range(0, {1}) AS i WITH a, tointeger(rand() * {2}) AS id "
"MATCH (b:{0} {{id: id}}) WITH a, b "
"CREATE (a)-[e:EdgeType {{id: counter(\"edge\", {3})}}]->(b) "
"RETURN count(e)",
indexed_label_, (int64_t)edges_per_node - 1, vertices_.size(),
edge_id_));
CHECK(ret.records.size() == 1) << "Failed to create edges";
uint64_t count = ret.records[0][0].ValueInt();
for (uint64_t i = 0; i < count; ++i) {
edges_.insert(edge_id_ + i);
}
edge_id_ += count;
}
void CreateEdge() {
auto ret = Execute(
fmt::format("MATCH (from:{} {{id: {}}}), (to:{} {{id: {}}}) "
"CREATE (from)-[e:EdgeType {{id: "
"counter(\"edge\", {})}}]->(to) RETURN e.id",
indexed_label_, RandomElement(vertices_), indexed_label_,
RandomElement(vertices_), edge_id_));
if (!ret.records.empty()) {
edges_.insert(ret.records[0][0].ValueInt());
edge_id_ += 1;
}
}
void AddLabel() {
auto vertex_id = RandomElement(vertices_);
auto label = RandomElement(labels_);
// add a label on a vertex that didn't have that label
// yet (we need that for book-keeping)
auto ret = Execute(fmt::format(
"MATCH (v:{} {{id: {}}}) WHERE not v:{} SET v:{} RETURN v.id",
indexed_label_, vertex_id, label, label));
if (!ret.records.empty()) {
labels_vertices_[label].insert(vertex_id);
}
}
void UpdateGlobalVertices() {
uint64_t vertex_id = *vertices_.rbegin();
uint64_t lo = std::floor(GetRandom() * vertex_id);
uint64_t hi = std::floor(lo + vertex_id * 0.01);
uint64_t num = std::floor(GetRandom() * (1U << 30U));
Execute(
fmt::format("MATCH (n) WHERE n.id > {} AND n.id < {} SET n.value = {}",
lo, hi, num));
}
void UpdateGlobalEdges() {
uint64_t vertex_id = *vertices_.rbegin();
uint64_t lo = std::floor(GetRandom() * vertex_id);
uint64_t hi = std::floor(lo + vertex_id * 0.01);
uint64_t num = std::floor(GetRandom() * (1U << 30U));
Execute(fmt::format(
"MATCH ()-[e]->() WHERE e.id > {} AND e.id < {} SET e.value = {}", lo,
hi, num));
}
/** Checks if the local info corresponds to DB state */
void VerifyGraph() {
// helper lambda for count verification
auto test_count = [this](std::string query, int64_t count,
std::string message) {
auto ret = Execute(query);
if (ret.records.empty()) {
throw utils::BasicException("Couldn't execute count!");
}
if (ret.records[0][0].ValueInt() != count) {
throw utils::BasicException(
fmt::format(message, id_, count, ret.records[0][0].ValueInt()));
}
};
test_count(fmt::format("MATCH (n:{}) RETURN count(n)", indexed_label_),
vertices_.size(), "Runner {} expected {} vertices, found {}!");
test_count(
fmt::format("MATCH (:{0})-[r]->(:{0}) RETURN count(r)", indexed_label_),
edges_.size(), "Runner {} expected {} edges, found {}!");
for (auto &item : labels_vertices_) {
test_count(
fmt::format("MATCH (n:{}:{}) RETURN count(n)", indexed_label_,
item.first),
item.second.size(),
fmt::format(
"Runner {{}} expected {{}} vertices with label '{}', found {{}}!",
item.first));
}
// generate report
std::ostringstream report;
report << fmt::format("Runner {} graph verification success:", id_)
<< std::endl
<< fmt::format("\tExecuted {} queries in {:.2f} seconds",
executed_queries_, timer_.Elapsed().count())
<< std::endl
<< fmt::format("\tGraph has {} vertices and {} edges",
vertices_.size(), edges_.size())
<< std::endl;
for (auto &label : labels_) {
report << fmt::format("\tVertices with label '{}': {}", label,
labels_vertices_[label].size())
<< std::endl;
}
if (!query_failures_.empty()) {
report << "\tQuery failed (reason: count)" << std::endl;
for (auto &item : query_failures_) {
report << fmt::format("\t\t'{}': {}", item.first, item.second)
<< std::endl;
}
}
LOG(INFO) << report.str();
}
public:
void Run() {
// initial vertex creation
CreateVertices(FLAGS_vertex_count);
// initial edge creation
CreateEdges(FLAGS_edge_count);
if (FLAGS_verify > 0) VerifyGraph();
double last_verify = timer_.Elapsed().count();
// run rest
while (executed_queries_ < FLAGS_max_queries &&
timer_.Elapsed().count() / 60.0 < FLAGS_max_time) {
if (FLAGS_verify > 0 &&
timer_.Elapsed().count() - last_verify > FLAGS_verify) {
VerifyGraph();
last_verify = timer_.Elapsed().count();
}
double ratio_e = (double)edges_.size() / (double)FLAGS_edge_count;
double ratio_v = (double)vertices_.size() / (double)FLAGS_vertex_count;
// try to edit vertices globally
if (FLAGS_global_queries) {
if (Bernoulli(0.01)) {
UpdateGlobalVertices();
}
// try to edit edges globally
if (Bernoulli(0.01)) {
UpdateGlobalEdges();
}
}
// if we're missing edges (due to vertex detach delete), add some!
if (Bernoulli(ratio_e < 0.9)) {
CreateEdge();
continue;
}
// if we are near vertex balance, we can also do updates
// instad of update / deletes
if (std::fabs(1.0 - ratio_v) < 0.5 && Bernoulli(0.5)) {
AddLabel();
continue;
}
if (Bernoulli(ratio_v / 2.0)) {
RemoveVertex();
} else {
CreateVertices(1);
}
}
}
uint64_t GetExecutedQueries() { return executed_queries_; }
uint64_t GetFailedQueries() {
uint64_t failed = 0;
for (const auto &item : query_failures_) {
failed += item.second;
}
return failed;
}
};
int main(int argc, char **argv) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
google::InitGoogleLogging(argv[0]);
CHECK(FLAGS_vertex_count > 0) << "Vertex count must be greater than 0!";
CHECK(FLAGS_edge_count > 0) << "Edge count must be greater than 0!";
communication::Init();
LOG(INFO) << "Starting Memgraph HA normal operation long running test";
try {
std::vector<EndpointT> endpoints = GetEndpoints();
// create client
uint64_t retries = 15;
std::chrono::milliseconds retry_delay(1000);
ClientContextT context(FLAGS_use_ssl);
ClientT client(endpoints, &context, FLAGS_username, FLAGS_password, retries,
retry_delay);
// cleanup and create indexes
client.Execute("MATCH (n) DETACH DELETE n", {});
for (int i = 0; i < FLAGS_worker_count; ++i) {
client.Execute(fmt::format("CREATE INDEX ON :indexed_label{}(id)", i), {});
}
} catch (const communication::bolt::ClientFatalException &e) {
LOG(WARNING) << "Unable to find cluster leader";
return 1;
} catch (const communication::bolt::ClientQueryException &e) {
LOG(WARNING)
<< "Transient error while executing query. (eg. mistyped query, etc.)\n"
<< e.what();
return 1;
} catch (const utils::BasicException &e) {
LOG(WARNING) << "Error while executing query\n" << e.what();
return 1;
}
// sessions
std::vector<GraphSession> sessions;
sessions.reserve(FLAGS_worker_count);
for (int i = 0; i < FLAGS_worker_count; ++i)
sessions.emplace_back(i);
// workers
std::vector<std::thread> threads;
threads.reserve(FLAGS_worker_count);
for (int i = 0; i < FLAGS_worker_count; ++i)
threads.emplace_back([&, i]() { sessions[i].Run(); });
for (int i = 0; i < FLAGS_worker_count; ++i)
threads[i].join();
if (!FLAGS_stats_file.empty()) {
uint64_t executed = 0;
uint64_t failed = 0;
for (int i = 0; i < FLAGS_worker_count; ++i) {
executed += sessions[i].GetExecutedQueries();
failed += sessions[i].GetFailedQueries();
}
std::ofstream stream(FLAGS_stats_file);
stream << executed << std::endl << failed << std::endl;
LOG(INFO) << fmt::format("Written statistics to file: {}",
FLAGS_stats_file);
}
LOG(INFO) << "All query runners done";
return 0;
}

7
tests/stress/raft.json Normal file
View File

@ -0,0 +1,7 @@
{
"election_timeout_min": 750,
"election_timeout_max": 1000,
"heartbeat_interval": 100,
"replication_timeout": 10000,
"log_size_snapshot_threshold": -1
}