Polish CI scripts (#83)
* Add Jepsen to the release workflow * Increase the long-running stress timeout to 16 minutes * Remove ha_client code
This commit is contained in:
parent
151812da1a
commit
90d4ebdb1e
2
.github/workflows/diff.yaml
vendored
2
.github/workflows/diff.yaml
vendored
@ -329,7 +329,7 @@ jobs:
|
||||
with:
|
||||
name: "Enterprise DEB package"
|
||||
path: build/output/memgraph*.deb
|
||||
|
||||
|
||||
release_jepsen_test:
|
||||
name: "Release Jepsen Test"
|
||||
runs-on: [self-hosted, Linux, X64, Debian10, JepsenControl]
|
||||
|
40
.github/workflows/release.yaml
vendored
40
.github/workflows/release.yaml
vendored
@ -316,3 +316,43 @@ jobs:
|
||||
cd tests/stress
|
||||
source ve3/bin/activate
|
||||
python3 durability --num-steps 20
|
||||
|
||||
release_jepsen_test:
|
||||
name: "Release Jepsen Test"
|
||||
runs-on: [self-hosted, Linux, X64, Debian10, JepsenControl]
|
||||
env:
|
||||
THREADS: 24
|
||||
timeout-minutes: 60
|
||||
|
||||
steps:
|
||||
- name: Set up repository
|
||||
uses: actions/checkout@v2
|
||||
with:
|
||||
# Number of commits to fetch. `0` indicates all history for all
|
||||
# branches and tags. (default: 1)
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Build release binaries
|
||||
run: |
|
||||
# Activate toolchain.
|
||||
source /opt/toolchain-v2/activate
|
||||
|
||||
# Initialize dependencies.
|
||||
./init
|
||||
|
||||
# Build only memgraph release binary.
|
||||
cd build
|
||||
cmake -DCMAKE_BUILD_TYPE=release ..
|
||||
make -j$THREADS memgraph
|
||||
|
||||
- name: Run Jepsen tests
|
||||
run: |
|
||||
cd tests/jepsen
|
||||
./run.sh test --binary ../../build/memgraph --run-args "test-all --node-configs resources/node-config.edn" --ignore-run-stdout-logs --ignore-run-stderr-logs
|
||||
|
||||
- name: Save Jepsen report
|
||||
uses: actions/upload-artifact@v2
|
||||
if: ${{ always() }}
|
||||
with:
|
||||
name: "Jepsen Report"
|
||||
path: tests/jepsen/Jepsen.tar.gz
|
||||
|
@ -1,179 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#include "communication/bolt/client.hpp"
|
||||
|
||||
namespace communication::bolt {
|
||||
|
||||
/// HA Bolt client.
|
||||
/// It has methods used to execute queries against a cluster of servers. It
|
||||
/// supports both SSL and plaintext connections.
|
||||
class HAClient final {
|
||||
public:
|
||||
HAClient(const std::vector<io::network::Endpoint> &endpoints,
|
||||
communication::ClientContext *context, const std::string &username,
|
||||
const std::string &password, uint64_t num_retries,
|
||||
const std::chrono::milliseconds &retry_delay,
|
||||
const std::string &client_name = "memgraph-bolt")
|
||||
: endpoints_(endpoints),
|
||||
context_(context),
|
||||
username_(username),
|
||||
password_(password),
|
||||
num_retries_(num_retries),
|
||||
retry_delay_(retry_delay),
|
||||
client_name_(client_name) {
|
||||
if (endpoints.size() < 3) {
|
||||
throw ClientFatalException(
|
||||
"You should specify at least three server endpoints to connect to!");
|
||||
}
|
||||
// Create all clients.
|
||||
for (size_t i = 0; i < endpoints.size(); ++i) {
|
||||
clients_.push_back(std::make_unique<Client>(context_));
|
||||
}
|
||||
}
|
||||
|
||||
HAClient(const HAClient &) = delete;
|
||||
HAClient(HAClient &&) = delete;
|
||||
HAClient &operator=(const HAClient &) = delete;
|
||||
HAClient &operator=(HAClient &&) = delete;
|
||||
|
||||
/// Function used to execute queries against the leader server.
|
||||
/// @throws ClientQueryException when there is some transient error while
|
||||
/// executing the query (eg. mistyped query,
|
||||
/// etc.)
|
||||
/// @throws ClientFatalException when we couldn't communicate with the leader
|
||||
/// server even after `num_retries` tries
|
||||
QueryData Execute(const std::string &query,
|
||||
const std::map<std::string, Value> ¶meters) {
|
||||
for (int i = 0; i < num_retries_; ++i) {
|
||||
// Try to find a leader.
|
||||
if (!leader_) {
|
||||
for (int j = 0; j < num_retries_; ++j) {
|
||||
if (!(i == 0 && j == 0)) {
|
||||
std::this_thread::sleep_for(
|
||||
std::chrono::milliseconds(retry_delay_));
|
||||
}
|
||||
try {
|
||||
FindLeader();
|
||||
break;
|
||||
} catch (const ClientFatalException &e) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if (!leader_) {
|
||||
throw ClientFatalException("Couldn't find leader after {} tries!",
|
||||
num_retries_);
|
||||
}
|
||||
}
|
||||
// Try to execute the query.
|
||||
try {
|
||||
return leader_->Execute(query, parameters);
|
||||
} catch (const utils::BasicException &e) {
|
||||
// Check if this is a cluster failure or a Raft failure.
|
||||
auto qe = dynamic_cast<const ClientQueryException *>(&e);
|
||||
if (dynamic_cast<const ClientFatalException *>(&e) ||
|
||||
(qe && qe->code() == "Memgraph.DatabaseError.Raft.Error")) {
|
||||
// We need to look for a new leader.
|
||||
leader_ = nullptr;
|
||||
continue;
|
||||
}
|
||||
// If it isn't just forward the exception to the client.
|
||||
throw;
|
||||
}
|
||||
}
|
||||
throw ClientFatalException("Couldn't execute query after {} tries!",
|
||||
num_retries_);
|
||||
}
|
||||
|
||||
/// Function that returns the current leader ID.
|
||||
///
|
||||
/// @throws ClientFatalException when we couldn't find the leader server even
|
||||
/// after `num_retries` tries
|
||||
uint64_t GetLeaderId() {
|
||||
Execute("SHOW RAFT INFO", {});
|
||||
return leader_id_;
|
||||
}
|
||||
|
||||
private:
|
||||
void FindLeader() {
|
||||
// Reconnect clients that aren't available
|
||||
bool connected = false;
|
||||
for (size_t i = 0; i < clients_.size(); ++i) {
|
||||
const auto &ep = endpoints_[i];
|
||||
const auto &client = clients_[i];
|
||||
try {
|
||||
client->Execute("SHOW RAFT INFO", {});
|
||||
connected = true;
|
||||
continue;
|
||||
} catch (const ClientQueryException &e) {
|
||||
continue;
|
||||
} catch (const ClientFatalException &e) {
|
||||
client->Close();
|
||||
try {
|
||||
client->Connect(ep, username_, password_, client_name_);
|
||||
connected = true;
|
||||
} catch (const utils::BasicException &) {
|
||||
// Suppress any exceptions.
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!connected) {
|
||||
throw ClientFatalException("Couldn't connect to any server!");
|
||||
}
|
||||
|
||||
// Determine which server is the leader
|
||||
leader_ = nullptr;
|
||||
uint64_t leader_id = 0;
|
||||
int64_t leader_term = -1;
|
||||
for (uint64_t i = 0; i < clients_.size(); ++i) {
|
||||
auto &client = clients_[i];
|
||||
try {
|
||||
auto ret = client->Execute("SHOW RAFT INFO", {});
|
||||
int64_t term_id = -1;
|
||||
bool is_leader = false;
|
||||
for (const auto &rec : ret.records) {
|
||||
if (rec.size() != 2) continue;
|
||||
if (!rec[0].IsString()) continue;
|
||||
const auto &key = rec[0].ValueString();
|
||||
if (key == "term_id") {
|
||||
if (!rec[1].IsInt()) continue;
|
||||
term_id = rec[1].ValueInt();
|
||||
} else if (key == "is_leader") {
|
||||
if (!rec[1].IsBool()) continue;
|
||||
is_leader = rec[1].ValueBool();
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if (is_leader && term_id > leader_term) {
|
||||
leader_term = term_id;
|
||||
leader_id = i + 1;
|
||||
leader_ = client.get();
|
||||
}
|
||||
} catch (const utils::BasicException &) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
leader_id_ = leader_id;
|
||||
if (!leader_) {
|
||||
throw ClientFatalException("Couldn't find leader server!");
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<io::network::Endpoint> endpoints_;
|
||||
communication::ClientContext *context_;
|
||||
std::string username_;
|
||||
std::string password_;
|
||||
uint64_t num_retries_;
|
||||
std::chrono::milliseconds retry_delay_;
|
||||
std::string client_name_;
|
||||
|
||||
uint64_t leader_id_ = 0;
|
||||
Client *leader_ = nullptr;
|
||||
std::vector<std::unique_ptr<Client>> clients_;
|
||||
};
|
||||
} // namespace communication::bolt
|
@ -17,6 +17,3 @@ endfunction(add_stress_test)
|
||||
|
||||
add_stress_test(long_running.cpp)
|
||||
target_link_libraries(${test_prefix}long_running mg-communication mg-io mg-utils)
|
||||
|
||||
#add_stress_test(ha_normal_operation_long_running)
|
||||
#target_link_libraries(${test_prefix}ha_normal_operation_long_running mg-communication mg-io mg-utils)
|
||||
|
@ -57,7 +57,7 @@ LARGE_DATASET = [
|
||||
{
|
||||
"test": "long_running.cpp",
|
||||
"options": ["--vertex-count", "10000", "--edge-count", "40000", "--max-time", "5", "--verify", "60"],
|
||||
"timeout": 8,
|
||||
"timeout": 16,
|
||||
},
|
||||
] * 6 + [
|
||||
{
|
||||
|
@ -1,192 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import argparse
|
||||
import atexit
|
||||
import json
|
||||
import multiprocessing
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
|
||||
# dataset calibrated for running on Apollo (total 3min)
|
||||
# long_running runs for 1min
|
||||
# long_running runs for 2min
|
||||
SMALL_DATASET = [
|
||||
{
|
||||
"test": "ha_normal_operation_long_running.cpp",
|
||||
"options": ["--vertex-count", "1000", "--edge-count", "5000", "--max-time", "1", "--verify", "20"],
|
||||
"timeout": 5,
|
||||
},
|
||||
{
|
||||
"test": "ha_normal_operation_long_running.cpp",
|
||||
"options": ["--vertex-count", "10000", "--edge-count", "50000", "--max-time", "2", "--verify", "30"],
|
||||
"timeout": 5,
|
||||
},
|
||||
]
|
||||
|
||||
# dataset calibrated for running on daily stress instance (total 2h)
|
||||
# long_running runs for 2h
|
||||
LARGE_DATASET = [
|
||||
{
|
||||
"test": "long_running.cpp",
|
||||
"options": ["--vertex-count", "10000", "--edge-count", "50000", "--max-time", "120", "--verify", "300"],
|
||||
"timeout": 140,
|
||||
},
|
||||
]
|
||||
|
||||
# paths
|
||||
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
||||
BASE_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", ".."))
|
||||
BUILD_DIR = os.path.join(BASE_DIR, "build")
|
||||
CONFIG_DIR = os.path.join(BASE_DIR, "config")
|
||||
MEASUREMENTS_FILE = os.path.join(SCRIPT_DIR, ".apollo_measurements")
|
||||
KEY_FILE = os.path.join(SCRIPT_DIR, ".key.pem")
|
||||
CERT_FILE = os.path.join(SCRIPT_DIR, ".cert.pem")
|
||||
|
||||
# long running stats file
|
||||
STATS_FILE = os.path.join(SCRIPT_DIR, ".ha_normal_operation_long_running_stats")
|
||||
SMALL_DATASET[-1]["options"].extend(["--stats-file", STATS_FILE])
|
||||
LARGE_DATASET[-1]["options"].extend(["--stats-file", STATS_FILE])
|
||||
|
||||
# HA related constants
|
||||
CLUSTER_SIZE = 3
|
||||
PORT = 7687
|
||||
RAFT_CONFIG_FILE = os.path.join(SCRIPT_DIR, "raft.json")
|
||||
|
||||
# parse arguments
|
||||
parser = argparse.ArgumentParser(description = "Run stress tests on Memgraph.")
|
||||
parser.add_argument("--memgraph", default = os.path.join(BUILD_DIR,
|
||||
"memgraph_ha"))
|
||||
parser.add_argument("--config", default = os.path.join(CONFIG_DIR,
|
||||
"stress.conf"))
|
||||
parser.add_argument("--log-file", default = "")
|
||||
parser.add_argument("--durability-directory", default = "")
|
||||
parser.add_argument("--python", default = os.path.join(SCRIPT_DIR,
|
||||
"ve3", "bin", "python3"), type = str)
|
||||
parser.add_argument("--large-dataset", action = "store_const",
|
||||
const = True, default = False)
|
||||
parser.add_argument("--verbose", action = "store_const",
|
||||
const = True, default = False)
|
||||
parser.add_argument("--threads", type = int, default = 8)
|
||||
args = parser.parse_args()
|
||||
|
||||
# run test helper function
|
||||
def run_test(args, test, options, timeout):
|
||||
print("Running test '{}'".format(test))
|
||||
|
||||
# find binary
|
||||
if test.endswith(".py"):
|
||||
logging = "DEBUG" if args.verbose else "WARNING"
|
||||
binary = [args.python, "-u", os.path.join(SCRIPT_DIR, test),
|
||||
"--logging", logging]
|
||||
elif test.endswith(".cpp"):
|
||||
exe = os.path.join(BUILD_DIR, "tests", "stress", test[:-4])
|
||||
binary = [exe]
|
||||
else:
|
||||
raise Exception("Test '{}' binary not supported!".format(test))
|
||||
|
||||
# start test
|
||||
cmd = binary + ["--worker-count", str(args.threads)] + options
|
||||
start = time.time()
|
||||
ret_test = subprocess.run(cmd, cwd = SCRIPT_DIR, timeout = timeout * 60)
|
||||
|
||||
if ret_test.returncode != 0:
|
||||
raise Exception("Test '{}' binary returned non-zero ({})!".format(
|
||||
test, ret_test.returncode))
|
||||
|
||||
runtime = time.time() - start
|
||||
print(" Done after {:.3f} seconds".format(runtime))
|
||||
|
||||
return runtime
|
||||
|
||||
|
||||
# Generate database instance flags
|
||||
workers = [None for worker in range(CLUSTER_SIZE)]
|
||||
durability_directory = tempfile.TemporaryDirectory()
|
||||
coordination_config_file = tempfile.NamedTemporaryFile(mode="w")
|
||||
def generate_cmd_args(worker_id):
|
||||
cmd = [args.memgraph]
|
||||
cmd.extend(["--server-id", str(worker_id + 1)])
|
||||
cmd.extend(["--bolt-port", str(PORT + worker_id)])
|
||||
cmd.extend(["--raft-config-file", RAFT_CONFIG_FILE])
|
||||
cmd.extend(["--coordination-config-file", coordination_config_file.name])
|
||||
|
||||
# generate durability directory
|
||||
dur = os.path.join(durability_directory.name, "worker" + str(worker_id))
|
||||
cmd.extend(["--durability-directory", dur])
|
||||
|
||||
return cmd
|
||||
|
||||
|
||||
def wait_for_server(port, delay=0.1):
|
||||
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)]
|
||||
while subprocess.call(cmd) != 0:
|
||||
time.sleep(delay)
|
||||
time.sleep(delay)
|
||||
|
||||
|
||||
def start_worker(worker_id):
|
||||
global workers
|
||||
assert worker_id >= 0 and worker_id < CLUSTER_SIZE, \
|
||||
"Invalid worker ID {}".format(worker_id)
|
||||
assert workers[worker_id] is None, \
|
||||
"Worker with id {} already exists".format(worker_id)
|
||||
|
||||
workers[worker_id] = subprocess.Popen(generate_cmd_args(worker_id))
|
||||
|
||||
time.sleep(0.2)
|
||||
assert workers[worker_id].poll() is None, \
|
||||
"Worker {} process died prematurely!".format(worker_id)
|
||||
|
||||
wait_for_server(PORT + worker_id)
|
||||
|
||||
# Generate coorindation config file
|
||||
data = []
|
||||
for i in range(CLUSTER_SIZE):
|
||||
data.append([i + 1, "127.0.0.1", 10000 + i])
|
||||
coordination_config_file.write(json.dumps(data))
|
||||
coordination_config_file.flush()
|
||||
|
||||
# Start Ha Memgraph cluster
|
||||
for worker_id in range(CLUSTER_SIZE):
|
||||
start_worker(worker_id)
|
||||
|
||||
# at exit cleanup
|
||||
@atexit.register
|
||||
def cleanup():
|
||||
for proc_mg in workers:
|
||||
if proc_mg.poll() is not None: continue
|
||||
proc_mg.kill()
|
||||
proc_mg.wait()
|
||||
|
||||
time.sleep(3)
|
||||
|
||||
# run tests
|
||||
runtimes = {}
|
||||
dataset = LARGE_DATASET if args.large_dataset else SMALL_DATASET
|
||||
for test in dataset:
|
||||
runtime = run_test(args, **test)
|
||||
runtimes[os.path.splitext(test["test"])[0]] = runtime
|
||||
|
||||
# stop memgraph
|
||||
for proc_mg in workers:
|
||||
proc_mg.terminate()
|
||||
ret_mg = proc_mg.wait()
|
||||
if ret_mg != 0:
|
||||
raise Exception("Memgraph binary returned non-zero ({})!".format(ret_mg))
|
||||
|
||||
# measurements
|
||||
measurements = ""
|
||||
for key, value in runtimes.items():
|
||||
measurements += "{}.runtime {}\n".format(key, value)
|
||||
with open(STATS_FILE) as f:
|
||||
stats = f.read().split("\n")
|
||||
measurements += "long_running.queries.executed {}\n".format(stats[0])
|
||||
measurements += "long_running.queries.failed {}\n".format(stats[1])
|
||||
with open(MEASUREMENTS_FILE, "w") as f:
|
||||
f.write(measurements)
|
||||
|
||||
print("Done!")
|
@ -1,442 +0,0 @@
|
||||
#include <fstream>
|
||||
#include <random>
|
||||
#include <set>
|
||||
#include <thread>
|
||||
|
||||
#include <fmt/format.h>
|
||||
#include <gflags/gflags.h>
|
||||
|
||||
#include "communication/bolt/ha_client.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "io/network/utils.hpp"
|
||||
#include "utils/exceptions.hpp"
|
||||
#include "utils/string.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
using EndpointT = io::network::Endpoint;
|
||||
using ClientContextT = communication::ClientContext;
|
||||
using ClientT = communication::bolt::HAClient;
|
||||
using ValueT = communication::bolt::Value;
|
||||
using QueryDataT = communication::bolt::QueryData;
|
||||
using ExceptionT = communication::bolt::ClientQueryException;
|
||||
|
||||
DEFINE_string(endpoints, "127.0.0.1:7687,127.0.0.1:7688,127.0.0.1:7689",
|
||||
"Cluster server endpoints (host:port, separated by comma).");
|
||||
DEFINE_int32(cluster_size, 3, "Size of the raft cluster.");
|
||||
DEFINE_string(username, "", "Username for the database");
|
||||
DEFINE_string(password, "", "Password for the database");
|
||||
DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server.");
|
||||
|
||||
DEFINE_int32(vertex_count, 0,
|
||||
"The average number of vertices in the graph per worker");
|
||||
DEFINE_int32(edge_count, 0,
|
||||
"The average number of edges in the graph per worker");
|
||||
DEFINE_int32(prop_count, 5, "The max number of properties on a node");
|
||||
DEFINE_uint64(max_queries, 1U << 30U, "Maximum number of queries to execute");
|
||||
DEFINE_int32(max_time, 1, "Maximum execution time in minutes");
|
||||
DEFINE_int32(verify, 0, "Interval (seconds) between checking local info");
|
||||
DEFINE_int32(worker_count, 1,
|
||||
"The number of workers that operate on the graph independently");
|
||||
DEFINE_bool(global_queries, true,
|
||||
"If queries that modifiy globally should be executed sometimes");
|
||||
|
||||
DEFINE_string(stats_file, "", "File into which to write statistics.");
|
||||
|
||||
std::vector<EndpointT> GetEndpoints() {
|
||||
std::vector<io::network::Endpoint> ret;
|
||||
for (const auto &endpoint : utils::Split(FLAGS_endpoints, ",")) {
|
||||
auto split = utils::Split(utils::Trim(endpoint), ":");
|
||||
MG_ASSERT(split.size() == 2, "Invalid endpoint!");
|
||||
ret.emplace_back(
|
||||
io::network::ResolveHostname(std::string(utils::Trim(split[0]))),
|
||||
static_cast<uint16_t>(std::stoi(std::string(utils::Trim(split[1])))));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* Encapsulates a Graph and a Bolt session and provides CRUD op functions.
|
||||
* Also defines a run-loop for a generic exectutor, and a graph state
|
||||
* verification function.
|
||||
*/
|
||||
class GraphSession {
|
||||
public:
|
||||
explicit GraphSession(int id)
|
||||
: id_(id),
|
||||
indexed_label_(fmt::format("indexed_label{}", id)),
|
||||
generator_{std::random_device{}()} {
|
||||
for (int i = 0; i < FLAGS_prop_count; ++i) {
|
||||
auto label = fmt::format("label{}", i);
|
||||
labels_.insert(label);
|
||||
labels_vertices_.insert({label, {}});
|
||||
}
|
||||
|
||||
std::vector<EndpointT> endpoints = GetEndpoints();
|
||||
|
||||
uint64_t retries = 15;
|
||||
std::chrono::milliseconds retry_delay(1000);
|
||||
ClientContextT context(FLAGS_use_ssl);
|
||||
client_ = std::make_unique<ClientT>(endpoints, &context_, FLAGS_username,
|
||||
FLAGS_password, retries, retry_delay);
|
||||
}
|
||||
|
||||
private:
|
||||
uint64_t id_;
|
||||
ClientContextT context_{FLAGS_use_ssl};
|
||||
std::unique_ptr<ClientT> client_;
|
||||
|
||||
uint64_t vertex_id_{0};
|
||||
uint64_t edge_id_{0};
|
||||
|
||||
std::set<uint64_t> vertices_;
|
||||
std::set<uint64_t> edges_;
|
||||
|
||||
std::string indexed_label_;
|
||||
std::set<std::string> labels_;
|
||||
|
||||
std::map<std::string, std::set<uint64_t>> labels_vertices_;
|
||||
|
||||
uint64_t executed_queries_{0};
|
||||
std::map<std::string, uint64_t> query_failures_;
|
||||
|
||||
std::mt19937 generator_;
|
||||
|
||||
utils::Timer timer_;
|
||||
|
||||
private:
|
||||
double GetRandom() { return std::generate_canonical<double, 10>(generator_); }
|
||||
|
||||
bool Bernoulli(double p) { return GetRandom() < p; }
|
||||
|
||||
template <typename T>
|
||||
T RandomElement(const std::set<T> &data) {
|
||||
uint32_t pos = std::floor(GetRandom() * data.size());
|
||||
auto it = data.begin();
|
||||
std::advance(it, pos);
|
||||
return *it;
|
||||
}
|
||||
|
||||
void AddQueryFailure(std::string what) {
|
||||
auto it = query_failures_.find(what);
|
||||
if (it != query_failures_.end()) {
|
||||
++it->second;
|
||||
} else {
|
||||
query_failures_.insert(std::make_pair(what, 1));
|
||||
}
|
||||
}
|
||||
|
||||
QueryDataT Execute(std::string query) {
|
||||
try {
|
||||
SPDLOG_INFO("Runner {} executing query: {}", id_, query);
|
||||
executed_queries_ += 1;
|
||||
return client_->Execute(query, {});
|
||||
} catch (const ExceptionT &e) {
|
||||
AddQueryFailure(std::string{e.what()});
|
||||
return QueryDataT();
|
||||
}
|
||||
}
|
||||
|
||||
void CreateVertices(uint64_t vertices_count) {
|
||||
if (vertices_count == 0) return;
|
||||
auto ret = Execute(fmt::format(
|
||||
"UNWIND RANGE({}, {}) AS r CREATE (n:{} {{id: r}}) RETURN count(n)",
|
||||
vertex_id_, vertex_id_ + vertices_count - 1, indexed_label_));
|
||||
MG_ASSERT(ret.records.size() == 1, "Vertices creation failed!");
|
||||
MG_ASSERT(ret.records[0][0].ValueInt() == vertices_count,
|
||||
"Created {} vertices instead of {}!",
|
||||
ret.records[0][0].ValueInt(), vertices_count);
|
||||
for (uint64_t i = 0; i < vertices_count; ++i) {
|
||||
vertices_.insert(vertex_id_ + i);
|
||||
}
|
||||
vertex_id_ += vertices_count;
|
||||
}
|
||||
|
||||
void RemoveVertex() {
|
||||
auto vertex_id = RandomElement(vertices_);
|
||||
auto ret =
|
||||
Execute(fmt::format("MATCH (n:{} {{id: {}}}) OPTIONAL MATCH (n)-[r]-() "
|
||||
"DETACH DELETE n RETURN n.id, labels(n), r.id",
|
||||
indexed_label_, vertex_id));
|
||||
if (!ret.records.empty()) {
|
||||
std::set<uint64_t> processed_vertices;
|
||||
for (auto &record : ret.records) {
|
||||
// remove vertex but note there could be duplicates
|
||||
auto n_id = record[0].ValueInt();
|
||||
if (processed_vertices.insert(n_id).second) {
|
||||
vertices_.erase(n_id);
|
||||
for (auto &label : record[1].ValueList()) {
|
||||
if (label.ValueString() == indexed_label_) {
|
||||
continue;
|
||||
}
|
||||
labels_vertices_[label.ValueString()].erase(n_id);
|
||||
}
|
||||
}
|
||||
// remove edge
|
||||
auto &edge = record[2];
|
||||
if (edge.type() == ValueT::Type::Int) {
|
||||
edges_.erase(edge.ValueInt());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void CreateEdges(uint64_t edges_count) {
|
||||
if (edges_count == 0) return;
|
||||
auto edges_per_node = (double)edges_count / vertices_.size();
|
||||
MG_ASSERT(std::abs(edges_per_node - (int64_t)edges_per_node) < 0.0001,
|
||||
"Edges per node not a whole number");
|
||||
|
||||
auto ret = Execute(fmt::format(
|
||||
"MATCH (a:{0}) WITH a "
|
||||
"UNWIND range(0, {1}) AS i WITH a, tointeger(rand() * {2}) AS id "
|
||||
"MATCH (b:{0} {{id: id}}) WITH a, b "
|
||||
"CREATE (a)-[e:EdgeType {{id: counter(\"edge\", {3})}}]->(b) "
|
||||
"RETURN count(e)",
|
||||
indexed_label_, (int64_t)edges_per_node - 1, vertices_.size(),
|
||||
edge_id_));
|
||||
|
||||
MG_ASSERT(ret.records.size() == 1, "Failed to create edges");
|
||||
uint64_t count = ret.records[0][0].ValueInt();
|
||||
for (uint64_t i = 0; i < count; ++i) {
|
||||
edges_.insert(edge_id_ + i);
|
||||
}
|
||||
edge_id_ += count;
|
||||
}
|
||||
|
||||
void CreateEdge() {
|
||||
auto ret = Execute(
|
||||
fmt::format("MATCH (from:{} {{id: {}}}), (to:{} {{id: {}}}) "
|
||||
"CREATE (from)-[e:EdgeType {{id: "
|
||||
"counter(\"edge\", {})}}]->(to) RETURN e.id",
|
||||
indexed_label_, RandomElement(vertices_), indexed_label_,
|
||||
RandomElement(vertices_), edge_id_));
|
||||
if (!ret.records.empty()) {
|
||||
edges_.insert(ret.records[0][0].ValueInt());
|
||||
edge_id_ += 1;
|
||||
}
|
||||
}
|
||||
|
||||
void AddLabel() {
|
||||
auto vertex_id = RandomElement(vertices_);
|
||||
auto label = RandomElement(labels_);
|
||||
// add a label on a vertex that didn't have that label
|
||||
// yet (we need that for book-keeping)
|
||||
auto ret = Execute(fmt::format(
|
||||
"MATCH (v:{} {{id: {}}}) WHERE not v:{} SET v:{} RETURN v.id",
|
||||
indexed_label_, vertex_id, label, label));
|
||||
if (!ret.records.empty()) {
|
||||
labels_vertices_[label].insert(vertex_id);
|
||||
}
|
||||
}
|
||||
|
||||
void UpdateGlobalVertices() {
|
||||
uint64_t vertex_id = *vertices_.rbegin();
|
||||
uint64_t lo = std::floor(GetRandom() * vertex_id);
|
||||
uint64_t hi = std::floor(lo + vertex_id * 0.01);
|
||||
uint64_t num = std::floor(GetRandom() * (1U << 30U));
|
||||
Execute(
|
||||
fmt::format("MATCH (n) WHERE n.id > {} AND n.id < {} SET n.value = {}",
|
||||
lo, hi, num));
|
||||
}
|
||||
|
||||
void UpdateGlobalEdges() {
|
||||
uint64_t vertex_id = *vertices_.rbegin();
|
||||
uint64_t lo = std::floor(GetRandom() * vertex_id);
|
||||
uint64_t hi = std::floor(lo + vertex_id * 0.01);
|
||||
uint64_t num = std::floor(GetRandom() * (1U << 30U));
|
||||
Execute(fmt::format(
|
||||
"MATCH ()-[e]->() WHERE e.id > {} AND e.id < {} SET e.value = {}", lo,
|
||||
hi, num));
|
||||
}
|
||||
|
||||
/** Checks if the local info corresponds to DB state */
|
||||
void VerifyGraph() {
|
||||
// helper lambda for count verification
|
||||
auto test_count = [this](std::string query, int64_t count,
|
||||
std::string message) {
|
||||
auto ret = Execute(query);
|
||||
if (ret.records.empty()) {
|
||||
throw utils::BasicException("Couldn't execute count!");
|
||||
}
|
||||
if (ret.records[0][0].ValueInt() != count) {
|
||||
throw utils::BasicException(
|
||||
fmt::format(message, id_, count, ret.records[0][0].ValueInt()));
|
||||
}
|
||||
};
|
||||
|
||||
test_count(fmt::format("MATCH (n:{}) RETURN count(n)", indexed_label_),
|
||||
vertices_.size(), "Runner {} expected {} vertices, found {}!");
|
||||
test_count(
|
||||
fmt::format("MATCH (:{0})-[r]->(:{0}) RETURN count(r)", indexed_label_),
|
||||
edges_.size(), "Runner {} expected {} edges, found {}!");
|
||||
|
||||
for (auto &item : labels_vertices_) {
|
||||
test_count(
|
||||
fmt::format("MATCH (n:{}:{}) RETURN count(n)", indexed_label_,
|
||||
item.first),
|
||||
item.second.size(),
|
||||
fmt::format(
|
||||
"Runner {{}} expected {{}} vertices with label '{}', found {{}}!",
|
||||
item.first));
|
||||
}
|
||||
|
||||
// generate report
|
||||
std::ostringstream report;
|
||||
report << fmt::format("Runner {} graph verification success:", id_)
|
||||
<< std::endl
|
||||
<< fmt::format("\tExecuted {} queries in {:.2f} seconds",
|
||||
executed_queries_, timer_.Elapsed().count())
|
||||
<< std::endl
|
||||
<< fmt::format("\tGraph has {} vertices and {} edges",
|
||||
vertices_.size(), edges_.size())
|
||||
<< std::endl;
|
||||
for (auto &label : labels_) {
|
||||
report << fmt::format("\tVertices with label '{}': {}", label,
|
||||
labels_vertices_[label].size())
|
||||
<< std::endl;
|
||||
}
|
||||
if (!query_failures_.empty()) {
|
||||
report << "\tQuery failed (reason: count)" << std::endl;
|
||||
for (auto &item : query_failures_) {
|
||||
report << fmt::format("\t\t'{}': {}", item.first, item.second)
|
||||
<< std::endl;
|
||||
}
|
||||
}
|
||||
spdlog::info(report.str());
|
||||
}
|
||||
|
||||
public:
|
||||
void Run() {
|
||||
// initial vertex creation
|
||||
CreateVertices(FLAGS_vertex_count);
|
||||
|
||||
// initial edge creation
|
||||
CreateEdges(FLAGS_edge_count);
|
||||
|
||||
if (FLAGS_verify > 0) VerifyGraph();
|
||||
double last_verify = timer_.Elapsed().count();
|
||||
|
||||
// run rest
|
||||
while (executed_queries_ < FLAGS_max_queries &&
|
||||
timer_.Elapsed().count() / 60.0 < FLAGS_max_time) {
|
||||
if (FLAGS_verify > 0 &&
|
||||
timer_.Elapsed().count() - last_verify > FLAGS_verify) {
|
||||
VerifyGraph();
|
||||
last_verify = timer_.Elapsed().count();
|
||||
}
|
||||
|
||||
double ratio_e = (double)edges_.size() / (double)FLAGS_edge_count;
|
||||
double ratio_v = (double)vertices_.size() / (double)FLAGS_vertex_count;
|
||||
|
||||
// try to edit vertices globally
|
||||
if (FLAGS_global_queries) {
|
||||
if (Bernoulli(0.01)) {
|
||||
UpdateGlobalVertices();
|
||||
}
|
||||
|
||||
// try to edit edges globally
|
||||
if (Bernoulli(0.01)) {
|
||||
UpdateGlobalEdges();
|
||||
}
|
||||
}
|
||||
|
||||
// if we're missing edges (due to vertex detach delete), add some!
|
||||
if (Bernoulli(ratio_e < 0.9)) {
|
||||
CreateEdge();
|
||||
continue;
|
||||
}
|
||||
|
||||
// if we are near vertex balance, we can also do updates
|
||||
// instad of update / deletes
|
||||
if (std::fabs(1.0 - ratio_v) < 0.5 && Bernoulli(0.5)) {
|
||||
AddLabel();
|
||||
continue;
|
||||
}
|
||||
|
||||
if (Bernoulli(ratio_v / 2.0)) {
|
||||
RemoveVertex();
|
||||
} else {
|
||||
CreateVertices(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t GetExecutedQueries() { return executed_queries_; }
|
||||
|
||||
uint64_t GetFailedQueries() {
|
||||
uint64_t failed = 0;
|
||||
for (const auto &item : query_failures_) {
|
||||
failed += item.second;
|
||||
}
|
||||
return failed;
|
||||
}
|
||||
};
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
|
||||
MG_ASSERT(FLAGS_vertex_count > 0, "Vertex count must be greater than 0!");
|
||||
MG_ASSERT(FLAGS_edge_count > 0, "Edge count must be greater than 0!");
|
||||
|
||||
communication::SSLInit sslInit;
|
||||
|
||||
spdlog::info("Starting Memgraph HA normal operation long running test");
|
||||
|
||||
try {
|
||||
std::vector<EndpointT> endpoints = GetEndpoints();
|
||||
|
||||
// create client
|
||||
uint64_t retries = 15;
|
||||
std::chrono::milliseconds retry_delay(1000);
|
||||
ClientContextT context(FLAGS_use_ssl);
|
||||
ClientT client(endpoints, &context, FLAGS_username, FLAGS_password, retries,
|
||||
retry_delay);
|
||||
|
||||
// cleanup and create indexes
|
||||
client.Execute("MATCH (n) DETACH DELETE n", {});
|
||||
for (int i = 0; i < FLAGS_worker_count; ++i) {
|
||||
client.Execute(fmt::format("CREATE INDEX ON :indexed_label{}(id)", i),
|
||||
{});
|
||||
}
|
||||
} catch (const communication::bolt::ClientFatalException &e) {
|
||||
spdlog::warn("Unable to find cluster leader");
|
||||
return 1;
|
||||
} catch (const communication::bolt::ClientQueryException &e) {
|
||||
spdlog::warn(
|
||||
"Transient error while executing query. (eg. mistyped query, etc.)\n{}",
|
||||
e.what());
|
||||
return 1;
|
||||
} catch (const utils::BasicException &e) {
|
||||
spdlog::warn("Error while executing query\n{}", e.what());
|
||||
return 1;
|
||||
}
|
||||
|
||||
// sessions
|
||||
std::vector<GraphSession> sessions;
|
||||
sessions.reserve(FLAGS_worker_count);
|
||||
for (int i = 0; i < FLAGS_worker_count; ++i) sessions.emplace_back(i);
|
||||
|
||||
// workers
|
||||
std::vector<std::thread> threads;
|
||||
threads.reserve(FLAGS_worker_count);
|
||||
for (int i = 0; i < FLAGS_worker_count; ++i)
|
||||
threads.emplace_back([&, i]() { sessions[i].Run(); });
|
||||
|
||||
for (int i = 0; i < FLAGS_worker_count; ++i) threads[i].join();
|
||||
|
||||
if (!FLAGS_stats_file.empty()) {
|
||||
uint64_t executed = 0;
|
||||
uint64_t failed = 0;
|
||||
for (int i = 0; i < FLAGS_worker_count; ++i) {
|
||||
executed += sessions[i].GetExecutedQueries();
|
||||
failed += sessions[i].GetFailedQueries();
|
||||
}
|
||||
std::ofstream stream(FLAGS_stats_file);
|
||||
stream << executed << std::endl << failed << std::endl;
|
||||
spdlog::info("Written statistics to file: {}", FLAGS_stats_file);
|
||||
}
|
||||
|
||||
spdlog::info("All query runners done");
|
||||
|
||||
return 0;
|
||||
}
|
@ -1,7 +0,0 @@
|
||||
{
|
||||
"election_timeout_min": 1200,
|
||||
"election_timeout_max": 1500,
|
||||
"heartbeat_interval": 200,
|
||||
"replication_timeout": 10000,
|
||||
"log_size_snapshot_threshold": 2500
|
||||
}
|
Loading…
Reference in New Issue
Block a user