diff --git a/.github/workflows/diff.yaml b/.github/workflows/diff.yaml index a8e8aa502..3cd5e7e05 100644 --- a/.github/workflows/diff.yaml +++ b/.github/workflows/diff.yaml @@ -329,7 +329,7 @@ jobs: with: name: "Enterprise DEB package" path: build/output/memgraph*.deb - + release_jepsen_test: name: "Release Jepsen Test" runs-on: [self-hosted, Linux, X64, Debian10, JepsenControl] diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index e5dce6a8e..fc7a2d5f0 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -316,3 +316,43 @@ jobs: cd tests/stress source ve3/bin/activate python3 durability --num-steps 20 + + release_jepsen_test: + name: "Release Jepsen Test" + runs-on: [self-hosted, Linux, X64, Debian10, JepsenControl] + env: + THREADS: 24 + timeout-minutes: 60 + + steps: + - name: Set up repository + uses: actions/checkout@v2 + with: + # Number of commits to fetch. `0` indicates all history for all + # branches and tags. (default: 1) + fetch-depth: 0 + + - name: Build release binaries + run: | + # Activate toolchain. + source /opt/toolchain-v2/activate + + # Initialize dependencies. + ./init + + # Build only memgraph release binary. + cd build + cmake -DCMAKE_BUILD_TYPE=release .. + make -j$THREADS memgraph + + - name: Run Jepsen tests + run: | + cd tests/jepsen + ./run.sh test --binary ../../build/memgraph --run-args "test-all --node-configs resources/node-config.edn" --ignore-run-stdout-logs --ignore-run-stderr-logs + + - name: Save Jepsen report + uses: actions/upload-artifact@v2 + if: ${{ always() }} + with: + name: "Jepsen Report" + path: tests/jepsen/Jepsen.tar.gz diff --git a/src/communication/bolt/ha_client.hpp b/src/communication/bolt/ha_client.hpp deleted file mode 100644 index d8bff9f3a..000000000 --- a/src/communication/bolt/ha_client.hpp +++ /dev/null @@ -1,179 +0,0 @@ -#pragma once - -#include <chrono> -#include <memory> -#include <thread> -#include <vector> - -#include "communication/bolt/client.hpp" - -namespace communication::bolt { - -/// HA Bolt client. -/// It has methods used to execute queries against a cluster of servers. It -/// supports both SSL and plaintext connections. -class HAClient final { - public: - HAClient(const std::vector<io::network::Endpoint> &endpoints, - communication::ClientContext *context, const std::string &username, - const std::string &password, uint64_t num_retries, - const std::chrono::milliseconds &retry_delay, - const std::string &client_name = "memgraph-bolt") - : endpoints_(endpoints), - context_(context), - username_(username), - password_(password), - num_retries_(num_retries), - retry_delay_(retry_delay), - client_name_(client_name) { - if (endpoints.size() < 3) { - throw ClientFatalException( - "You should specify at least three server endpoints to connect to!"); - } - // Create all clients. - for (size_t i = 0; i < endpoints.size(); ++i) { - clients_.push_back(std::make_unique<Client>(context_)); - } - } - - HAClient(const HAClient &) = delete; - HAClient(HAClient &&) = delete; - HAClient &operator=(const HAClient &) = delete; - HAClient &operator=(HAClient &&) = delete; - - /// Function used to execute queries against the leader server. - /// @throws ClientQueryException when there is some transient error while - /// executing the query (eg. mistyped query, - /// etc.) - /// @throws ClientFatalException when we couldn't communicate with the leader - /// server even after `num_retries` tries - QueryData Execute(const std::string &query, - const std::map<std::string, Value> ¶meters) { - for (int i = 0; i < num_retries_; ++i) { - // Try to find a leader. - if (!leader_) { - for (int j = 0; j < num_retries_; ++j) { - if (!(i == 0 && j == 0)) { - std::this_thread::sleep_for( - std::chrono::milliseconds(retry_delay_)); - } - try { - FindLeader(); - break; - } catch (const ClientFatalException &e) { - continue; - } - } - if (!leader_) { - throw ClientFatalException("Couldn't find leader after {} tries!", - num_retries_); - } - } - // Try to execute the query. - try { - return leader_->Execute(query, parameters); - } catch (const utils::BasicException &e) { - // Check if this is a cluster failure or a Raft failure. - auto qe = dynamic_cast<const ClientQueryException *>(&e); - if (dynamic_cast<const ClientFatalException *>(&e) || - (qe && qe->code() == "Memgraph.DatabaseError.Raft.Error")) { - // We need to look for a new leader. - leader_ = nullptr; - continue; - } - // If it isn't just forward the exception to the client. - throw; - } - } - throw ClientFatalException("Couldn't execute query after {} tries!", - num_retries_); - } - - /// Function that returns the current leader ID. - /// - /// @throws ClientFatalException when we couldn't find the leader server even - /// after `num_retries` tries - uint64_t GetLeaderId() { - Execute("SHOW RAFT INFO", {}); - return leader_id_; - } - - private: - void FindLeader() { - // Reconnect clients that aren't available - bool connected = false; - for (size_t i = 0; i < clients_.size(); ++i) { - const auto &ep = endpoints_[i]; - const auto &client = clients_[i]; - try { - client->Execute("SHOW RAFT INFO", {}); - connected = true; - continue; - } catch (const ClientQueryException &e) { - continue; - } catch (const ClientFatalException &e) { - client->Close(); - try { - client->Connect(ep, username_, password_, client_name_); - connected = true; - } catch (const utils::BasicException &) { - // Suppress any exceptions. - } - } - } - if (!connected) { - throw ClientFatalException("Couldn't connect to any server!"); - } - - // Determine which server is the leader - leader_ = nullptr; - uint64_t leader_id = 0; - int64_t leader_term = -1; - for (uint64_t i = 0; i < clients_.size(); ++i) { - auto &client = clients_[i]; - try { - auto ret = client->Execute("SHOW RAFT INFO", {}); - int64_t term_id = -1; - bool is_leader = false; - for (const auto &rec : ret.records) { - if (rec.size() != 2) continue; - if (!rec[0].IsString()) continue; - const auto &key = rec[0].ValueString(); - if (key == "term_id") { - if (!rec[1].IsInt()) continue; - term_id = rec[1].ValueInt(); - } else if (key == "is_leader") { - if (!rec[1].IsBool()) continue; - is_leader = rec[1].ValueBool(); - } else { - continue; - } - } - if (is_leader && term_id > leader_term) { - leader_term = term_id; - leader_id = i + 1; - leader_ = client.get(); - } - } catch (const utils::BasicException &) { - continue; - } - } - leader_id_ = leader_id; - if (!leader_) { - throw ClientFatalException("Couldn't find leader server!"); - } - } - - std::vector<io::network::Endpoint> endpoints_; - communication::ClientContext *context_; - std::string username_; - std::string password_; - uint64_t num_retries_; - std::chrono::milliseconds retry_delay_; - std::string client_name_; - - uint64_t leader_id_ = 0; - Client *leader_ = nullptr; - std::vector<std::unique_ptr<Client>> clients_; -}; -} // namespace communication::bolt diff --git a/tests/stress/CMakeLists.txt b/tests/stress/CMakeLists.txt index 4187db083..f4ca745fa 100644 --- a/tests/stress/CMakeLists.txt +++ b/tests/stress/CMakeLists.txt @@ -17,6 +17,3 @@ endfunction(add_stress_test) add_stress_test(long_running.cpp) target_link_libraries(${test_prefix}long_running mg-communication mg-io mg-utils) - -#add_stress_test(ha_normal_operation_long_running) -#target_link_libraries(${test_prefix}ha_normal_operation_long_running mg-communication mg-io mg-utils) diff --git a/tests/stress/continuous_integration b/tests/stress/continuous_integration index 19697b986..697f5158d 100755 --- a/tests/stress/continuous_integration +++ b/tests/stress/continuous_integration @@ -57,7 +57,7 @@ LARGE_DATASET = [ { "test": "long_running.cpp", "options": ["--vertex-count", "10000", "--edge-count", "40000", "--max-time", "5", "--verify", "60"], - "timeout": 8, + "timeout": 16, }, ] * 6 + [ { diff --git a/tests/stress/continuous_integration_ha b/tests/stress/continuous_integration_ha deleted file mode 100755 index d7c51488b..000000000 --- a/tests/stress/continuous_integration_ha +++ /dev/null @@ -1,192 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -import argparse -import atexit -import json -import multiprocessing -import os -import subprocess -import sys -import tempfile -import time - -# dataset calibrated for running on Apollo (total 3min) -# long_running runs for 1min -# long_running runs for 2min -SMALL_DATASET = [ - { - "test": "ha_normal_operation_long_running.cpp", - "options": ["--vertex-count", "1000", "--edge-count", "5000", "--max-time", "1", "--verify", "20"], - "timeout": 5, - }, - { - "test": "ha_normal_operation_long_running.cpp", - "options": ["--vertex-count", "10000", "--edge-count", "50000", "--max-time", "2", "--verify", "30"], - "timeout": 5, - }, -] - -# dataset calibrated for running on daily stress instance (total 2h) -# long_running runs for 2h -LARGE_DATASET = [ - { - "test": "long_running.cpp", - "options": ["--vertex-count", "10000", "--edge-count", "50000", "--max-time", "120", "--verify", "300"], - "timeout": 140, - }, -] - -# paths -SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) -BASE_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..")) -BUILD_DIR = os.path.join(BASE_DIR, "build") -CONFIG_DIR = os.path.join(BASE_DIR, "config") -MEASUREMENTS_FILE = os.path.join(SCRIPT_DIR, ".apollo_measurements") -KEY_FILE = os.path.join(SCRIPT_DIR, ".key.pem") -CERT_FILE = os.path.join(SCRIPT_DIR, ".cert.pem") - -# long running stats file -STATS_FILE = os.path.join(SCRIPT_DIR, ".ha_normal_operation_long_running_stats") -SMALL_DATASET[-1]["options"].extend(["--stats-file", STATS_FILE]) -LARGE_DATASET[-1]["options"].extend(["--stats-file", STATS_FILE]) - -# HA related constants -CLUSTER_SIZE = 3 -PORT = 7687 -RAFT_CONFIG_FILE = os.path.join(SCRIPT_DIR, "raft.json") - -# parse arguments -parser = argparse.ArgumentParser(description = "Run stress tests on Memgraph.") -parser.add_argument("--memgraph", default = os.path.join(BUILD_DIR, - "memgraph_ha")) -parser.add_argument("--config", default = os.path.join(CONFIG_DIR, - "stress.conf")) -parser.add_argument("--log-file", default = "") -parser.add_argument("--durability-directory", default = "") -parser.add_argument("--python", default = os.path.join(SCRIPT_DIR, - "ve3", "bin", "python3"), type = str) -parser.add_argument("--large-dataset", action = "store_const", - const = True, default = False) -parser.add_argument("--verbose", action = "store_const", - const = True, default = False) -parser.add_argument("--threads", type = int, default = 8) -args = parser.parse_args() - -# run test helper function -def run_test(args, test, options, timeout): - print("Running test '{}'".format(test)) - - # find binary - if test.endswith(".py"): - logging = "DEBUG" if args.verbose else "WARNING" - binary = [args.python, "-u", os.path.join(SCRIPT_DIR, test), - "--logging", logging] - elif test.endswith(".cpp"): - exe = os.path.join(BUILD_DIR, "tests", "stress", test[:-4]) - binary = [exe] - else: - raise Exception("Test '{}' binary not supported!".format(test)) - - # start test - cmd = binary + ["--worker-count", str(args.threads)] + options - start = time.time() - ret_test = subprocess.run(cmd, cwd = SCRIPT_DIR, timeout = timeout * 60) - - if ret_test.returncode != 0: - raise Exception("Test '{}' binary returned non-zero ({})!".format( - test, ret_test.returncode)) - - runtime = time.time() - start - print(" Done after {:.3f} seconds".format(runtime)) - - return runtime - - -# Generate database instance flags -workers = [None for worker in range(CLUSTER_SIZE)] -durability_directory = tempfile.TemporaryDirectory() -coordination_config_file = tempfile.NamedTemporaryFile(mode="w") -def generate_cmd_args(worker_id): - cmd = [args.memgraph] - cmd.extend(["--server-id", str(worker_id + 1)]) - cmd.extend(["--bolt-port", str(PORT + worker_id)]) - cmd.extend(["--raft-config-file", RAFT_CONFIG_FILE]) - cmd.extend(["--coordination-config-file", coordination_config_file.name]) - - # generate durability directory - dur = os.path.join(durability_directory.name, "worker" + str(worker_id)) - cmd.extend(["--durability-directory", dur]) - - return cmd - - -def wait_for_server(port, delay=0.1): - cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)] - while subprocess.call(cmd) != 0: - time.sleep(delay) - time.sleep(delay) - - -def start_worker(worker_id): - global workers - assert worker_id >= 0 and worker_id < CLUSTER_SIZE, \ - "Invalid worker ID {}".format(worker_id) - assert workers[worker_id] is None, \ - "Worker with id {} already exists".format(worker_id) - - workers[worker_id] = subprocess.Popen(generate_cmd_args(worker_id)) - - time.sleep(0.2) - assert workers[worker_id].poll() is None, \ - "Worker {} process died prematurely!".format(worker_id) - - wait_for_server(PORT + worker_id) - -# Generate coorindation config file -data = [] -for i in range(CLUSTER_SIZE): - data.append([i + 1, "127.0.0.1", 10000 + i]) -coordination_config_file.write(json.dumps(data)) -coordination_config_file.flush() - -# Start Ha Memgraph cluster -for worker_id in range(CLUSTER_SIZE): - start_worker(worker_id) - -# at exit cleanup -@atexit.register -def cleanup(): - for proc_mg in workers: - if proc_mg.poll() is not None: continue - proc_mg.kill() - proc_mg.wait() - -time.sleep(3) - -# run tests -runtimes = {} -dataset = LARGE_DATASET if args.large_dataset else SMALL_DATASET -for test in dataset: - runtime = run_test(args, **test) - runtimes[os.path.splitext(test["test"])[0]] = runtime - -# stop memgraph -for proc_mg in workers: - proc_mg.terminate() - ret_mg = proc_mg.wait() - if ret_mg != 0: - raise Exception("Memgraph binary returned non-zero ({})!".format(ret_mg)) - -# measurements -measurements = "" -for key, value in runtimes.items(): - measurements += "{}.runtime {}\n".format(key, value) -with open(STATS_FILE) as f: - stats = f.read().split("\n") -measurements += "long_running.queries.executed {}\n".format(stats[0]) -measurements += "long_running.queries.failed {}\n".format(stats[1]) -with open(MEASUREMENTS_FILE, "w") as f: - f.write(measurements) - -print("Done!") diff --git a/tests/stress/ha_normal_operation_long_running.cpp b/tests/stress/ha_normal_operation_long_running.cpp deleted file mode 100644 index 28590728d..000000000 --- a/tests/stress/ha_normal_operation_long_running.cpp +++ /dev/null @@ -1,442 +0,0 @@ -#include <fstream> -#include <random> -#include <set> -#include <thread> - -#include <fmt/format.h> -#include <gflags/gflags.h> - -#include "communication/bolt/ha_client.hpp" -#include "io/network/endpoint.hpp" -#include "io/network/utils.hpp" -#include "utils/exceptions.hpp" -#include "utils/string.hpp" -#include "utils/timer.hpp" - -using EndpointT = io::network::Endpoint; -using ClientContextT = communication::ClientContext; -using ClientT = communication::bolt::HAClient; -using ValueT = communication::bolt::Value; -using QueryDataT = communication::bolt::QueryData; -using ExceptionT = communication::bolt::ClientQueryException; - -DEFINE_string(endpoints, "127.0.0.1:7687,127.0.0.1:7688,127.0.0.1:7689", - "Cluster server endpoints (host:port, separated by comma)."); -DEFINE_int32(cluster_size, 3, "Size of the raft cluster."); -DEFINE_string(username, "", "Username for the database"); -DEFINE_string(password, "", "Password for the database"); -DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server."); - -DEFINE_int32(vertex_count, 0, - "The average number of vertices in the graph per worker"); -DEFINE_int32(edge_count, 0, - "The average number of edges in the graph per worker"); -DEFINE_int32(prop_count, 5, "The max number of properties on a node"); -DEFINE_uint64(max_queries, 1U << 30U, "Maximum number of queries to execute"); -DEFINE_int32(max_time, 1, "Maximum execution time in minutes"); -DEFINE_int32(verify, 0, "Interval (seconds) between checking local info"); -DEFINE_int32(worker_count, 1, - "The number of workers that operate on the graph independently"); -DEFINE_bool(global_queries, true, - "If queries that modifiy globally should be executed sometimes"); - -DEFINE_string(stats_file, "", "File into which to write statistics."); - -std::vector<EndpointT> GetEndpoints() { - std::vector<io::network::Endpoint> ret; - for (const auto &endpoint : utils::Split(FLAGS_endpoints, ",")) { - auto split = utils::Split(utils::Trim(endpoint), ":"); - MG_ASSERT(split.size() == 2, "Invalid endpoint!"); - ret.emplace_back( - io::network::ResolveHostname(std::string(utils::Trim(split[0]))), - static_cast<uint16_t>(std::stoi(std::string(utils::Trim(split[1]))))); - } - return ret; -} - -/** - * Encapsulates a Graph and a Bolt session and provides CRUD op functions. - * Also defines a run-loop for a generic exectutor, and a graph state - * verification function. - */ -class GraphSession { - public: - explicit GraphSession(int id) - : id_(id), - indexed_label_(fmt::format("indexed_label{}", id)), - generator_{std::random_device{}()} { - for (int i = 0; i < FLAGS_prop_count; ++i) { - auto label = fmt::format("label{}", i); - labels_.insert(label); - labels_vertices_.insert({label, {}}); - } - - std::vector<EndpointT> endpoints = GetEndpoints(); - - uint64_t retries = 15; - std::chrono::milliseconds retry_delay(1000); - ClientContextT context(FLAGS_use_ssl); - client_ = std::make_unique<ClientT>(endpoints, &context_, FLAGS_username, - FLAGS_password, retries, retry_delay); - } - - private: - uint64_t id_; - ClientContextT context_{FLAGS_use_ssl}; - std::unique_ptr<ClientT> client_; - - uint64_t vertex_id_{0}; - uint64_t edge_id_{0}; - - std::set<uint64_t> vertices_; - std::set<uint64_t> edges_; - - std::string indexed_label_; - std::set<std::string> labels_; - - std::map<std::string, std::set<uint64_t>> labels_vertices_; - - uint64_t executed_queries_{0}; - std::map<std::string, uint64_t> query_failures_; - - std::mt19937 generator_; - - utils::Timer timer_; - - private: - double GetRandom() { return std::generate_canonical<double, 10>(generator_); } - - bool Bernoulli(double p) { return GetRandom() < p; } - - template <typename T> - T RandomElement(const std::set<T> &data) { - uint32_t pos = std::floor(GetRandom() * data.size()); - auto it = data.begin(); - std::advance(it, pos); - return *it; - } - - void AddQueryFailure(std::string what) { - auto it = query_failures_.find(what); - if (it != query_failures_.end()) { - ++it->second; - } else { - query_failures_.insert(std::make_pair(what, 1)); - } - } - - QueryDataT Execute(std::string query) { - try { - SPDLOG_INFO("Runner {} executing query: {}", id_, query); - executed_queries_ += 1; - return client_->Execute(query, {}); - } catch (const ExceptionT &e) { - AddQueryFailure(std::string{e.what()}); - return QueryDataT(); - } - } - - void CreateVertices(uint64_t vertices_count) { - if (vertices_count == 0) return; - auto ret = Execute(fmt::format( - "UNWIND RANGE({}, {}) AS r CREATE (n:{} {{id: r}}) RETURN count(n)", - vertex_id_, vertex_id_ + vertices_count - 1, indexed_label_)); - MG_ASSERT(ret.records.size() == 1, "Vertices creation failed!"); - MG_ASSERT(ret.records[0][0].ValueInt() == vertices_count, - "Created {} vertices instead of {}!", - ret.records[0][0].ValueInt(), vertices_count); - for (uint64_t i = 0; i < vertices_count; ++i) { - vertices_.insert(vertex_id_ + i); - } - vertex_id_ += vertices_count; - } - - void RemoveVertex() { - auto vertex_id = RandomElement(vertices_); - auto ret = - Execute(fmt::format("MATCH (n:{} {{id: {}}}) OPTIONAL MATCH (n)-[r]-() " - "DETACH DELETE n RETURN n.id, labels(n), r.id", - indexed_label_, vertex_id)); - if (!ret.records.empty()) { - std::set<uint64_t> processed_vertices; - for (auto &record : ret.records) { - // remove vertex but note there could be duplicates - auto n_id = record[0].ValueInt(); - if (processed_vertices.insert(n_id).second) { - vertices_.erase(n_id); - for (auto &label : record[1].ValueList()) { - if (label.ValueString() == indexed_label_) { - continue; - } - labels_vertices_[label.ValueString()].erase(n_id); - } - } - // remove edge - auto &edge = record[2]; - if (edge.type() == ValueT::Type::Int) { - edges_.erase(edge.ValueInt()); - } - } - } - } - - void CreateEdges(uint64_t edges_count) { - if (edges_count == 0) return; - auto edges_per_node = (double)edges_count / vertices_.size(); - MG_ASSERT(std::abs(edges_per_node - (int64_t)edges_per_node) < 0.0001, - "Edges per node not a whole number"); - - auto ret = Execute(fmt::format( - "MATCH (a:{0}) WITH a " - "UNWIND range(0, {1}) AS i WITH a, tointeger(rand() * {2}) AS id " - "MATCH (b:{0} {{id: id}}) WITH a, b " - "CREATE (a)-[e:EdgeType {{id: counter(\"edge\", {3})}}]->(b) " - "RETURN count(e)", - indexed_label_, (int64_t)edges_per_node - 1, vertices_.size(), - edge_id_)); - - MG_ASSERT(ret.records.size() == 1, "Failed to create edges"); - uint64_t count = ret.records[0][0].ValueInt(); - for (uint64_t i = 0; i < count; ++i) { - edges_.insert(edge_id_ + i); - } - edge_id_ += count; - } - - void CreateEdge() { - auto ret = Execute( - fmt::format("MATCH (from:{} {{id: {}}}), (to:{} {{id: {}}}) " - "CREATE (from)-[e:EdgeType {{id: " - "counter(\"edge\", {})}}]->(to) RETURN e.id", - indexed_label_, RandomElement(vertices_), indexed_label_, - RandomElement(vertices_), edge_id_)); - if (!ret.records.empty()) { - edges_.insert(ret.records[0][0].ValueInt()); - edge_id_ += 1; - } - } - - void AddLabel() { - auto vertex_id = RandomElement(vertices_); - auto label = RandomElement(labels_); - // add a label on a vertex that didn't have that label - // yet (we need that for book-keeping) - auto ret = Execute(fmt::format( - "MATCH (v:{} {{id: {}}}) WHERE not v:{} SET v:{} RETURN v.id", - indexed_label_, vertex_id, label, label)); - if (!ret.records.empty()) { - labels_vertices_[label].insert(vertex_id); - } - } - - void UpdateGlobalVertices() { - uint64_t vertex_id = *vertices_.rbegin(); - uint64_t lo = std::floor(GetRandom() * vertex_id); - uint64_t hi = std::floor(lo + vertex_id * 0.01); - uint64_t num = std::floor(GetRandom() * (1U << 30U)); - Execute( - fmt::format("MATCH (n) WHERE n.id > {} AND n.id < {} SET n.value = {}", - lo, hi, num)); - } - - void UpdateGlobalEdges() { - uint64_t vertex_id = *vertices_.rbegin(); - uint64_t lo = std::floor(GetRandom() * vertex_id); - uint64_t hi = std::floor(lo + vertex_id * 0.01); - uint64_t num = std::floor(GetRandom() * (1U << 30U)); - Execute(fmt::format( - "MATCH ()-[e]->() WHERE e.id > {} AND e.id < {} SET e.value = {}", lo, - hi, num)); - } - - /** Checks if the local info corresponds to DB state */ - void VerifyGraph() { - // helper lambda for count verification - auto test_count = [this](std::string query, int64_t count, - std::string message) { - auto ret = Execute(query); - if (ret.records.empty()) { - throw utils::BasicException("Couldn't execute count!"); - } - if (ret.records[0][0].ValueInt() != count) { - throw utils::BasicException( - fmt::format(message, id_, count, ret.records[0][0].ValueInt())); - } - }; - - test_count(fmt::format("MATCH (n:{}) RETURN count(n)", indexed_label_), - vertices_.size(), "Runner {} expected {} vertices, found {}!"); - test_count( - fmt::format("MATCH (:{0})-[r]->(:{0}) RETURN count(r)", indexed_label_), - edges_.size(), "Runner {} expected {} edges, found {}!"); - - for (auto &item : labels_vertices_) { - test_count( - fmt::format("MATCH (n:{}:{}) RETURN count(n)", indexed_label_, - item.first), - item.second.size(), - fmt::format( - "Runner {{}} expected {{}} vertices with label '{}', found {{}}!", - item.first)); - } - - // generate report - std::ostringstream report; - report << fmt::format("Runner {} graph verification success:", id_) - << std::endl - << fmt::format("\tExecuted {} queries in {:.2f} seconds", - executed_queries_, timer_.Elapsed().count()) - << std::endl - << fmt::format("\tGraph has {} vertices and {} edges", - vertices_.size(), edges_.size()) - << std::endl; - for (auto &label : labels_) { - report << fmt::format("\tVertices with label '{}': {}", label, - labels_vertices_[label].size()) - << std::endl; - } - if (!query_failures_.empty()) { - report << "\tQuery failed (reason: count)" << std::endl; - for (auto &item : query_failures_) { - report << fmt::format("\t\t'{}': {}", item.first, item.second) - << std::endl; - } - } - spdlog::info(report.str()); - } - - public: - void Run() { - // initial vertex creation - CreateVertices(FLAGS_vertex_count); - - // initial edge creation - CreateEdges(FLAGS_edge_count); - - if (FLAGS_verify > 0) VerifyGraph(); - double last_verify = timer_.Elapsed().count(); - - // run rest - while (executed_queries_ < FLAGS_max_queries && - timer_.Elapsed().count() / 60.0 < FLAGS_max_time) { - if (FLAGS_verify > 0 && - timer_.Elapsed().count() - last_verify > FLAGS_verify) { - VerifyGraph(); - last_verify = timer_.Elapsed().count(); - } - - double ratio_e = (double)edges_.size() / (double)FLAGS_edge_count; - double ratio_v = (double)vertices_.size() / (double)FLAGS_vertex_count; - - // try to edit vertices globally - if (FLAGS_global_queries) { - if (Bernoulli(0.01)) { - UpdateGlobalVertices(); - } - - // try to edit edges globally - if (Bernoulli(0.01)) { - UpdateGlobalEdges(); - } - } - - // if we're missing edges (due to vertex detach delete), add some! - if (Bernoulli(ratio_e < 0.9)) { - CreateEdge(); - continue; - } - - // if we are near vertex balance, we can also do updates - // instad of update / deletes - if (std::fabs(1.0 - ratio_v) < 0.5 && Bernoulli(0.5)) { - AddLabel(); - continue; - } - - if (Bernoulli(ratio_v / 2.0)) { - RemoveVertex(); - } else { - CreateVertices(1); - } - } - } - - uint64_t GetExecutedQueries() { return executed_queries_; } - - uint64_t GetFailedQueries() { - uint64_t failed = 0; - for (const auto &item : query_failures_) { - failed += item.second; - } - return failed; - } -}; - -int main(int argc, char **argv) { - gflags::ParseCommandLineFlags(&argc, &argv, true); - - MG_ASSERT(FLAGS_vertex_count > 0, "Vertex count must be greater than 0!"); - MG_ASSERT(FLAGS_edge_count > 0, "Edge count must be greater than 0!"); - - communication::SSLInit sslInit; - - spdlog::info("Starting Memgraph HA normal operation long running test"); - - try { - std::vector<EndpointT> endpoints = GetEndpoints(); - - // create client - uint64_t retries = 15; - std::chrono::milliseconds retry_delay(1000); - ClientContextT context(FLAGS_use_ssl); - ClientT client(endpoints, &context, FLAGS_username, FLAGS_password, retries, - retry_delay); - - // cleanup and create indexes - client.Execute("MATCH (n) DETACH DELETE n", {}); - for (int i = 0; i < FLAGS_worker_count; ++i) { - client.Execute(fmt::format("CREATE INDEX ON :indexed_label{}(id)", i), - {}); - } - } catch (const communication::bolt::ClientFatalException &e) { - spdlog::warn("Unable to find cluster leader"); - return 1; - } catch (const communication::bolt::ClientQueryException &e) { - spdlog::warn( - "Transient error while executing query. (eg. mistyped query, etc.)\n{}", - e.what()); - return 1; - } catch (const utils::BasicException &e) { - spdlog::warn("Error while executing query\n{}", e.what()); - return 1; - } - - // sessions - std::vector<GraphSession> sessions; - sessions.reserve(FLAGS_worker_count); - for (int i = 0; i < FLAGS_worker_count; ++i) sessions.emplace_back(i); - - // workers - std::vector<std::thread> threads; - threads.reserve(FLAGS_worker_count); - for (int i = 0; i < FLAGS_worker_count; ++i) - threads.emplace_back([&, i]() { sessions[i].Run(); }); - - for (int i = 0; i < FLAGS_worker_count; ++i) threads[i].join(); - - if (!FLAGS_stats_file.empty()) { - uint64_t executed = 0; - uint64_t failed = 0; - for (int i = 0; i < FLAGS_worker_count; ++i) { - executed += sessions[i].GetExecutedQueries(); - failed += sessions[i].GetFailedQueries(); - } - std::ofstream stream(FLAGS_stats_file); - stream << executed << std::endl << failed << std::endl; - spdlog::info("Written statistics to file: {}", FLAGS_stats_file); - } - - spdlog::info("All query runners done"); - - return 0; -} diff --git a/tests/stress/raft.json b/tests/stress/raft.json deleted file mode 100644 index 1bc222c9a..000000000 --- a/tests/stress/raft.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "election_timeout_min": 1200, - "election_timeout_max": 1500, - "heartbeat_interval": 200, - "replication_timeout": 10000, - "log_size_snapshot_threshold": 2500 -}