From 2936312612e2fa193547dd5560093864672dc844 Mon Sep 17 00:00:00 2001
From: Ivan Paljak <ivan.paljak@memgraph.io>
Date: Tue, 9 Jul 2019 13:01:06 +0200
Subject: [PATCH] Add memgraph ha normal operation long running test

Summary:
The functionality of the test is the same as for single node Memgraph.
Locally, it seems to work fine. I'll update the apollo related files when I feel
a bit more certain that everything works locally.

Reviewers: mferencevic

Reviewed By: mferencevic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2135
---
 tests/stress/CMakeLists.txt                   |   3 +
 tests/stress/apollo_runs.yaml                 |   7 +
 tests/stress/continuous_integration_ha        | 197 ++++++++
 .../ha_normal_operation_long_running.cpp      | 446 ++++++++++++++++++
 tests/stress/raft.json                        |   7 +
 5 files changed, 660 insertions(+)
 create mode 100755 tests/stress/continuous_integration_ha
 create mode 100644 tests/stress/ha_normal_operation_long_running.cpp
 create mode 100644 tests/stress/raft.json

diff --git a/tests/stress/CMakeLists.txt b/tests/stress/CMakeLists.txt
index f4ca745fa..c0a25a770 100644
--- a/tests/stress/CMakeLists.txt
+++ b/tests/stress/CMakeLists.txt
@@ -17,3 +17,6 @@ endfunction(add_stress_test)
 
 add_stress_test(long_running.cpp)
 target_link_libraries(${test_prefix}long_running mg-communication mg-io mg-utils)
+
+add_stress_test(ha_normal_operation_long_running)
+target_link_libraries(${test_prefix}ha_normal_operation_long_running mg-communication mg-io mg-utils)
diff --git a/tests/stress/apollo_runs.yaml b/tests/stress/apollo_runs.yaml
index 2f31f9f8e..f5792fefc 100644
--- a/tests/stress/apollo_runs.yaml
+++ b/tests/stress/apollo_runs.yaml
@@ -19,6 +19,13 @@
   commands: TIMEOUT=43200 ./continuous_integration --large-dataset
   infiles: *STRESS_INFILES
 
+- name: stress_ha
+  commands: TIMEOUT=200 ./continuous_integration_ha
+  infiles: &STRESS_HA_INFILES
+    - . # current directory
+    - ../../build_release/memgraph_ha # memgraph release binary
+    - ../../build_release/tests/stress/ # stress client binaries
+
 - name: durability
   commands: TIMEOUT=300 ./ve3/bin/python3 durability --num-steps 5
   infiles: &DURABILITY_INFILES
diff --git a/tests/stress/continuous_integration_ha b/tests/stress/continuous_integration_ha
new file mode 100755
index 000000000..bcd204229
--- /dev/null
+++ b/tests/stress/continuous_integration_ha
@@ -0,0 +1,197 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+import argparse
+import atexit
+import json
+import multiprocessing
+import os
+import subprocess
+import sys
+import tempfile
+import time
+
+# dataset calibrated for running on Apollo (total 3min)
+# long_running runs for 1min
+# long_running runs for 2min
+SMALL_DATASET = [
+    {
+        "test": "ha_normal_operation_long_running.cpp",
+        "options": ["--vertex-count", "1000", "--edge-count", "5000", "--max-time", "1", "--verify", "20"],
+        "timeout": 5,
+    },
+    {
+        "test": "ha_normal_operation_long_running.cpp",
+        "options": ["--vertex-count", "10000", "--edge-count", "50000", "--max-time", "2", "--verify", "30"],
+        "timeout": 5,
+    },
+]
+
+# dataset calibrated for running on daily stress instance (total 2h)
+# long_running runs for 2h
+LARGE_DATASET = [
+    {
+        "test": "long_running.cpp",
+        "options": ["--vertex-count", "200000", "--edge-count", "1000000", "--max-time", "120", "--verify", "300"],
+        "timeout": 140,
+    },
+]
+
+# paths
+SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
+BASE_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", ".."))
+BUILD_DIR = os.path.join(BASE_DIR, "build")
+CONFIG_DIR = os.path.join(BASE_DIR, "config")
+MEASUREMENTS_FILE = os.path.join(SCRIPT_DIR, ".apollo_measurements")
+KEY_FILE = os.path.join(SCRIPT_DIR, ".key.pem")
+CERT_FILE = os.path.join(SCRIPT_DIR, ".cert.pem")
+
+# long running stats file
+STATS_FILE = os.path.join(SCRIPT_DIR, ".ha_normal_operation_long_running_stats")
+SMALL_DATASET[-1]["options"].extend(["--stats-file", STATS_FILE])
+LARGE_DATASET[-1]["options"].extend(["--stats-file", STATS_FILE])
+
+# HA related constants
+CLUSTER_SIZE = 3
+PORT = 7687
+RAFT_CONFIG_FILE = os.path.join(SCRIPT_DIR, "raft.json")
+
+# parse arguments
+parser = argparse.ArgumentParser(description = "Run stress tests on Memgraph.")
+parser.add_argument("--memgraph", default = os.path.join(BUILD_DIR,
+        "memgraph_ha"))
+parser.add_argument("--config", default = os.path.join(CONFIG_DIR,
+        "stress.conf"))
+parser.add_argument("--log-file", default = "")
+parser.add_argument("--durability-directory", default = "")
+parser.add_argument("--python", default = os.path.join(SCRIPT_DIR,
+        "ve3", "bin", "python3"), type = str)
+parser.add_argument("--large-dataset", action = "store_const",
+        const = True, default = False)
+parser.add_argument("--verbose", action = "store_const",
+        const = True, default = False)
+parser.add_argument("--threads", type = int, default = 8)
+args = parser.parse_args()
+
+# run test helper function
+def run_test(args, test, options, timeout):
+    print("Running test '{}'".format(test))
+
+    # find binary
+    if test.endswith(".py"):
+        logging = "DEBUG" if args.verbose else "WARNING"
+        binary = [args.python, "-u", os.path.join(SCRIPT_DIR, test),
+                "--logging", logging]
+    elif test.endswith(".cpp"):
+        exe = os.path.join(BUILD_DIR, "tests", "stress", test[:-4])
+        if not os.path.exists(exe):
+            exe = os.path.join(BASE_DIR, "build_release", "tests", "stress",
+                    test[:-4])
+        binary = [exe]
+    else:
+        raise Exception("Test '{}' binary not supported!".format(test))
+
+    # start test
+    cmd = binary + ["--worker-count", str(args.threads)] + options
+    start = time.time()
+    ret_test = subprocess.run(cmd, cwd = SCRIPT_DIR, timeout = timeout * 60)
+
+    if ret_test.returncode != 0:
+        raise Exception("Test '{}' binary returned non-zero ({})!".format(
+                test, ret_test.returncode))
+
+    runtime = time.time() - start
+    print("    Done after {:.3f} seconds".format(runtime))
+
+    return runtime
+
+
+# find HA memgraph binary
+if not os.path.exists(args.memgraph):
+    args.memgraph = os.path.join(BASE_DIR, "build_release", "memgraph_ha")
+
+# Generate database instance flags
+workers = [None for worker in range(CLUSTER_SIZE)]
+durability_directory = tempfile.TemporaryDirectory()
+coordination_config_file = tempfile.NamedTemporaryFile(mode="w")
+def generate_cmd_args(worker_id):
+    cmd = [args.memgraph]
+    cmd.extend(["--server-id", str(worker_id + 1)])
+    cmd.extend(["--port", str(PORT + worker_id)])
+    cmd.extend(["--raft-config-file", RAFT_CONFIG_FILE])
+    cmd.extend(["--coordination-config-file", coordination_config_file.name])
+
+    # generate durability directory
+    dur = os.path.join(durability_directory.name, "worker" + str(worker_id))
+    cmd.extend(["--durability-directory", dur])
+
+    return cmd
+
+
+def wait_for_server(port, delay=0.1):
+    cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)]
+    while subprocess.call(cmd) != 0:
+        time.sleep(delay)
+    time.sleep(delay)
+
+
+def start_worker(worker_id):
+    global workers
+    assert worker_id >= 0 and worker_id < CLUSTER_SIZE, \
+            "Invalid worker ID {}".format(worker_id)
+    assert workers[worker_id] is None, \
+            "Worker with id {} already exists".format(worker_id)
+
+    workers[worker_id] = subprocess.Popen(generate_cmd_args(worker_id))
+
+    time.sleep(0.2)
+    assert workers[worker_id].poll() is None, \
+            "Worker {} process died prematurely!".format(worker_id)
+
+    wait_for_server(PORT + worker_id)
+
+# Generate coorindation config file
+data = []
+for i in range(CLUSTER_SIZE):
+    data.append([i + 1, "127.0.0.1", 10000 + i])
+coordination_config_file.write(json.dumps(data))
+coordination_config_file.flush()
+
+# Start Ha Memgraph cluster
+for worker_id in range(CLUSTER_SIZE):
+    start_worker(worker_id)
+
+# at exit cleanup
+@atexit.register
+def cleanup():
+    for proc_mg in workers:
+        if proc_mg.poll() is not None: continue
+        proc_mg.kill()
+        proc_mg.wait()
+
+# run tests
+runtimes = {}
+dataset = LARGE_DATASET if args.large_dataset else SMALL_DATASET
+for test in dataset:
+    runtime = run_test(args, **test)
+    runtimes[os.path.splitext(test["test"])[0]] = runtime
+
+# stop memgraph
+for proc_mg in workers:
+    proc_mg.terminate()
+    ret_mg = proc_mg.wait()
+    if ret_mg != 0:
+        raise Exception("Memgraph binary returned non-zero ({})!".format(ret_mg))
+
+# measurements
+measurements = ""
+for key, value in runtimes.items():
+    measurements += "{}.runtime {}\n".format(key, value)
+with open(STATS_FILE) as f:
+    stats = f.read().split("\n")
+measurements += "long_running.queries.executed {}\n".format(stats[0])
+measurements += "long_running.queries.failed {}\n".format(stats[1])
+with open(MEASUREMENTS_FILE, "w") as f:
+    f.write(measurements)
+
+print("Done!")
diff --git a/tests/stress/ha_normal_operation_long_running.cpp b/tests/stress/ha_normal_operation_long_running.cpp
new file mode 100644
index 000000000..25ce87252
--- /dev/null
+++ b/tests/stress/ha_normal_operation_long_running.cpp
@@ -0,0 +1,446 @@
+#include <fstream>
+#include <random>
+#include <set>
+#include <thread>
+
+#include <fmt/format.h>
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "communication/bolt/ha_client.hpp"
+#include "io/network/endpoint.hpp"
+#include "io/network/utils.hpp"
+#include "utils/exceptions.hpp"
+#include "utils/string.hpp"
+#include "utils/timer.hpp"
+
+using EndpointT = io::network::Endpoint;
+using ClientContextT = communication::ClientContext;
+using ClientT = communication::bolt::HAClient;
+using ValueT = communication::bolt::Value;
+using QueryDataT = communication::bolt::QueryData;
+using ExceptionT = communication::bolt::ClientQueryException;
+
+DEFINE_string(endpoints, "127.0.0.1:7687,127.0.0.1:7688,127.0.0.1:7689",
+              "Cluster server endpoints (host:port, separated by comma).");
+DEFINE_int32(cluster_size, 3, "Size of the raft cluster.");
+DEFINE_string(username, "", "Username for the database");
+DEFINE_string(password, "", "Password for the database");
+DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server.");
+
+DEFINE_int32(vertex_count, 0,
+             "The average number of vertices in the graph per worker");
+DEFINE_int32(edge_count, 0,
+             "The average number of edges in the graph per worker");
+DEFINE_int32(prop_count, 5, "The max number of properties on a node");
+DEFINE_uint64(max_queries, 1U << 30U, "Maximum number of queries to execute");
+DEFINE_int32(max_time, 1, "Maximum execution time in minutes");
+DEFINE_int32(verify, 0, "Interval (seconds) between checking local info");
+DEFINE_int32(worker_count, 1,
+             "The number of workers that operate on the graph independently");
+DEFINE_bool(global_queries, true,
+            "If queries that modifiy globally should be executed sometimes");
+
+DEFINE_string(stats_file, "", "File into which to write statistics.");
+
+std::vector<EndpointT> GetEndpoints() {
+  std::vector<io::network::Endpoint> ret;
+  for (const auto &endpoint : utils::Split(FLAGS_endpoints, ",")) {
+    auto split = utils::Split(utils::Trim(endpoint), ":");
+    CHECK(split.size() == 2) << "Invalid endpoint!";
+    ret.emplace_back(
+        io::network::ResolveHostname(std::string(utils::Trim(split[0]))),
+        static_cast<uint16_t>(std::stoi(std::string(utils::Trim(split[1])))));
+  }
+  return ret;
+}
+
+/**
+ * Encapsulates a Graph and a Bolt session and provides CRUD op functions.
+ * Also defines a run-loop for a generic exectutor, and a graph state
+ * verification function.
+ */
+class GraphSession {
+ public:
+  explicit GraphSession(int id)
+      : id_(id),
+        indexed_label_(fmt::format("indexed_label{}", id)),
+        generator_{std::random_device{}()} {
+    for (int i = 0; i < FLAGS_prop_count; ++i) {
+      auto label = fmt::format("label{}", i);
+      labels_.insert(label);
+      labels_vertices_.insert({label, {}});
+    }
+
+    std::vector<EndpointT> endpoints = GetEndpoints();
+
+    uint64_t retries = 15;
+    std::chrono::milliseconds retry_delay(1000);
+    ClientContextT context(FLAGS_use_ssl);
+    client_ = std::make_unique<ClientT>(endpoints, &context_, FLAGS_username,
+                                        FLAGS_password, retries, retry_delay);
+  }
+
+ private:
+  uint64_t id_;
+  ClientContextT context_{FLAGS_use_ssl};
+  std::unique_ptr<ClientT> client_;
+
+  uint64_t vertex_id_{0};
+  uint64_t edge_id_{0};
+
+  std::set<uint64_t> vertices_;
+  std::set<uint64_t> edges_;
+
+  std::string indexed_label_;
+  std::set<std::string> labels_;
+
+  std::map<std::string, std::set<uint64_t>> labels_vertices_;
+
+  uint64_t executed_queries_{0};
+  std::map<std::string, uint64_t> query_failures_;
+
+  std::mt19937 generator_;
+
+  utils::Timer timer_;
+
+ private:
+  double GetRandom() { return std::generate_canonical<double, 10>(generator_); }
+
+  bool Bernoulli(double p) { return GetRandom() < p; }
+
+  template<typename T>
+  T RandomElement(const std::set<T> &data) {
+    uint32_t pos = std::floor(GetRandom() * data.size());
+    auto it = data.begin();
+    std::advance(it, pos);
+    return *it;
+  }
+
+  void AddQueryFailure(std::string what) {
+    auto it = query_failures_.find(what);
+    if (it != query_failures_.end()) {
+      ++it->second;
+    } else {
+      query_failures_.insert(std::make_pair(what, 1));
+    }
+  }
+
+  QueryDataT Execute(std::string query) {
+    try {
+      DLOG(INFO) << "Runner " << id_ << " executing query: " << query;
+      executed_queries_ += 1;
+      return client_->Execute(query, {});
+    } catch (const ExceptionT &e) {
+      AddQueryFailure(std::string{e.what()});
+      return QueryDataT();
+    }
+  }
+
+  void CreateVertices(uint64_t vertices_count) {
+    if (vertices_count == 0) return;
+    auto ret = Execute(fmt::format(
+        "UNWIND RANGE({}, {}) AS r CREATE (n:{} {{id: r}}) RETURN count(n)",
+        vertex_id_, vertex_id_ + vertices_count - 1, indexed_label_));
+    CHECK(ret.records.size() == 1) << "Vertices creation failed!";
+    CHECK(ret.records[0][0].ValueInt() == vertices_count)
+        << "Created " << ret.records[0][0].ValueInt() << " vertices instead of "
+        << vertices_count << "!";
+    for (uint64_t i = 0; i < vertices_count; ++i) {
+      vertices_.insert(vertex_id_ + i);
+    }
+    vertex_id_ += vertices_count;
+  }
+
+  void RemoveVertex() {
+    auto vertex_id = RandomElement(vertices_);
+    auto ret =
+        Execute(fmt::format("MATCH (n:{} {{id: {}}}) OPTIONAL MATCH (n)-[r]-() "
+                            "DETACH DELETE n RETURN n.id, labels(n), r.id",
+                            indexed_label_, vertex_id));
+    if (!ret.records.empty()) {
+      std::set<uint64_t> processed_vertices;
+      for (auto &record : ret.records) {
+        // remove vertex but note there could be duplicates
+        auto n_id = record[0].ValueInt();
+        if (processed_vertices.insert(n_id).second) {
+          vertices_.erase(n_id);
+          for (auto &label : record[1].ValueList()) {
+            if (label.ValueString() == indexed_label_) {
+              continue;
+            }
+            labels_vertices_[label.ValueString()].erase(n_id);
+          }
+        }
+        // remove edge
+        auto &edge = record[2];
+        if (edge.type() == ValueT::Type::Int) {
+          edges_.erase(edge.ValueInt());
+        }
+      }
+    }
+  }
+
+  void CreateEdges(uint64_t edges_count) {
+    if (edges_count == 0) return;
+    auto edges_per_node = (double)edges_count / vertices_.size();
+    CHECK(std::abs(edges_per_node - (int64_t)edges_per_node) < 0.0001)
+        << "Edges per node not a whole number";
+
+    auto ret = Execute(fmt::format(
+        "MATCH (a:{0}) WITH a "
+        "UNWIND range(0, {1}) AS i WITH a, tointeger(rand() * {2}) AS id "
+        "MATCH (b:{0} {{id: id}}) WITH a, b "
+        "CREATE (a)-[e:EdgeType {{id: counter(\"edge\", {3})}}]->(b) "
+        "RETURN count(e)",
+        indexed_label_, (int64_t)edges_per_node - 1, vertices_.size(),
+        edge_id_));
+
+    CHECK(ret.records.size() == 1) << "Failed to create edges";
+    uint64_t count = ret.records[0][0].ValueInt();
+    for (uint64_t i = 0; i < count; ++i) {
+      edges_.insert(edge_id_ + i);
+    }
+    edge_id_ += count;
+  }
+
+  void CreateEdge() {
+    auto ret = Execute(
+        fmt::format("MATCH (from:{} {{id: {}}}), (to:{} {{id: {}}}) "
+                    "CREATE (from)-[e:EdgeType {{id: "
+                    "counter(\"edge\", {})}}]->(to) RETURN e.id",
+                    indexed_label_, RandomElement(vertices_), indexed_label_,
+                    RandomElement(vertices_), edge_id_));
+    if (!ret.records.empty()) {
+      edges_.insert(ret.records[0][0].ValueInt());
+      edge_id_ += 1;
+    }
+  }
+
+  void AddLabel() {
+    auto vertex_id = RandomElement(vertices_);
+    auto label = RandomElement(labels_);
+    // add a label on a vertex that didn't have that label
+    // yet (we need that for book-keeping)
+    auto ret = Execute(fmt::format(
+        "MATCH (v:{} {{id: {}}}) WHERE not v:{} SET v:{} RETURN v.id",
+        indexed_label_, vertex_id, label, label));
+    if (!ret.records.empty()) {
+      labels_vertices_[label].insert(vertex_id);
+    }
+  }
+
+  void UpdateGlobalVertices() {
+    uint64_t vertex_id = *vertices_.rbegin();
+    uint64_t lo = std::floor(GetRandom() * vertex_id);
+    uint64_t hi = std::floor(lo + vertex_id * 0.01);
+    uint64_t num = std::floor(GetRandom() * (1U << 30U));
+    Execute(
+        fmt::format("MATCH (n) WHERE n.id > {} AND n.id < {} SET n.value = {}",
+                    lo, hi, num));
+  }
+
+  void UpdateGlobalEdges() {
+    uint64_t vertex_id = *vertices_.rbegin();
+    uint64_t lo = std::floor(GetRandom() * vertex_id);
+    uint64_t hi = std::floor(lo + vertex_id * 0.01);
+    uint64_t num = std::floor(GetRandom() * (1U << 30U));
+    Execute(fmt::format(
+        "MATCH ()-[e]->() WHERE e.id > {} AND e.id < {} SET e.value = {}", lo,
+        hi, num));
+  }
+
+  /** Checks if the local info corresponds to DB state */
+  void VerifyGraph() {
+    // helper lambda for count verification
+    auto test_count = [this](std::string query, int64_t count,
+                             std::string message) {
+      auto ret = Execute(query);
+      if (ret.records.empty()) {
+        throw utils::BasicException("Couldn't execute count!");
+      }
+      if (ret.records[0][0].ValueInt() != count) {
+        throw utils::BasicException(
+            fmt::format(message, id_, count, ret.records[0][0].ValueInt()));
+      }
+    };
+
+    test_count(fmt::format("MATCH (n:{}) RETURN count(n)", indexed_label_),
+               vertices_.size(), "Runner {} expected {} vertices, found {}!");
+    test_count(
+        fmt::format("MATCH (:{0})-[r]->(:{0}) RETURN count(r)", indexed_label_),
+        edges_.size(), "Runner {} expected {} edges, found {}!");
+
+    for (auto &item : labels_vertices_) {
+      test_count(
+          fmt::format("MATCH (n:{}:{}) RETURN count(n)", indexed_label_,
+                      item.first),
+          item.second.size(),
+          fmt::format(
+              "Runner {{}} expected {{}} vertices with label '{}', found {{}}!",
+              item.first));
+    }
+
+    // generate report
+    std::ostringstream report;
+    report << fmt::format("Runner {} graph verification success:", id_)
+           << std::endl
+           << fmt::format("\tExecuted {} queries in {:.2f} seconds",
+                          executed_queries_, timer_.Elapsed().count())
+           << std::endl
+           << fmt::format("\tGraph has {} vertices and {} edges",
+                          vertices_.size(), edges_.size())
+           << std::endl;
+    for (auto &label : labels_) {
+      report << fmt::format("\tVertices with label '{}': {}", label,
+                            labels_vertices_[label].size())
+             << std::endl;
+    }
+    if (!query_failures_.empty()) {
+      report << "\tQuery failed (reason: count)" << std::endl;
+      for (auto &item : query_failures_) {
+        report << fmt::format("\t\t'{}': {}", item.first, item.second)
+               << std::endl;
+      }
+    }
+    LOG(INFO) << report.str();
+  }
+
+ public:
+  void Run() {
+    // initial vertex creation
+    CreateVertices(FLAGS_vertex_count);
+
+    // initial edge creation
+    CreateEdges(FLAGS_edge_count);
+
+    if (FLAGS_verify > 0) VerifyGraph();
+    double last_verify = timer_.Elapsed().count();
+
+    // run rest
+    while (executed_queries_ < FLAGS_max_queries &&
+           timer_.Elapsed().count() / 60.0 < FLAGS_max_time) {
+      if (FLAGS_verify > 0 &&
+          timer_.Elapsed().count() - last_verify > FLAGS_verify) {
+        VerifyGraph();
+        last_verify = timer_.Elapsed().count();
+      }
+
+      double ratio_e = (double)edges_.size() / (double)FLAGS_edge_count;
+      double ratio_v = (double)vertices_.size() / (double)FLAGS_vertex_count;
+
+      // try to edit vertices globally
+      if (FLAGS_global_queries) {
+        if (Bernoulli(0.01)) {
+          UpdateGlobalVertices();
+        }
+
+        // try to edit edges globally
+        if (Bernoulli(0.01)) {
+          UpdateGlobalEdges();
+        }
+      }
+
+      // if we're missing edges (due to vertex detach delete), add some!
+      if (Bernoulli(ratio_e < 0.9)) {
+        CreateEdge();
+        continue;
+      }
+
+      // if we are near vertex balance, we can also do updates
+      // instad of update / deletes
+      if (std::fabs(1.0 - ratio_v) < 0.5 && Bernoulli(0.5)) {
+        AddLabel();
+        continue;
+      }
+
+      if (Bernoulli(ratio_v / 2.0)) {
+        RemoveVertex();
+      } else {
+        CreateVertices(1);
+      }
+    }
+  }
+
+  uint64_t GetExecutedQueries() { return executed_queries_; }
+
+  uint64_t GetFailedQueries() {
+    uint64_t failed = 0;
+    for (const auto &item : query_failures_) {
+      failed += item.second;
+    }
+    return failed;
+  }
+};
+
+int main(int argc, char **argv) {
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
+  google::InitGoogleLogging(argv[0]);
+
+  CHECK(FLAGS_vertex_count > 0) << "Vertex count must be greater than 0!";
+  CHECK(FLAGS_edge_count > 0) << "Edge count must be greater than 0!";
+
+  communication::Init();
+
+  LOG(INFO) << "Starting Memgraph HA normal operation long running test";
+
+  try {
+    std::vector<EndpointT> endpoints = GetEndpoints();
+
+    // create client
+    uint64_t retries = 15;
+    std::chrono::milliseconds retry_delay(1000);
+    ClientContextT context(FLAGS_use_ssl);
+    ClientT client(endpoints, &context, FLAGS_username, FLAGS_password, retries,
+                   retry_delay);
+
+    // cleanup and create indexes
+    client.Execute("MATCH (n) DETACH DELETE n", {});
+    for (int i = 0; i < FLAGS_worker_count; ++i) {
+      client.Execute(fmt::format("CREATE INDEX ON :indexed_label{}(id)", i), {});
+    }
+  } catch (const communication::bolt::ClientFatalException &e) {
+    LOG(WARNING) << "Unable to find cluster leader";
+    return 1;
+  } catch (const communication::bolt::ClientQueryException &e) {
+    LOG(WARNING)
+        << "Transient error while executing query. (eg. mistyped query, etc.)\n"
+        << e.what();
+    return 1;
+  } catch (const utils::BasicException &e) {
+    LOG(WARNING) << "Error while executing query\n" << e.what();
+    return 1;
+  }
+
+  // sessions
+  std::vector<GraphSession> sessions;
+  sessions.reserve(FLAGS_worker_count);
+  for (int i = 0; i < FLAGS_worker_count; ++i)
+    sessions.emplace_back(i);
+
+  // workers
+  std::vector<std::thread> threads;
+  threads.reserve(FLAGS_worker_count);
+  for (int i = 0; i < FLAGS_worker_count; ++i)
+    threads.emplace_back([&, i]() { sessions[i].Run(); });
+
+  for (int i = 0; i < FLAGS_worker_count; ++i)
+    threads[i].join();
+
+  if (!FLAGS_stats_file.empty()) {
+    uint64_t executed = 0;
+    uint64_t failed = 0;
+    for (int i = 0; i < FLAGS_worker_count; ++i) {
+      executed += sessions[i].GetExecutedQueries();
+      failed += sessions[i].GetFailedQueries();
+    }
+    std::ofstream stream(FLAGS_stats_file);
+    stream << executed << std::endl << failed << std::endl;
+    LOG(INFO) << fmt::format("Written statistics to file: {}",
+                             FLAGS_stats_file);
+  }
+
+  LOG(INFO) << "All query runners done";
+
+  return 0;
+}
diff --git a/tests/stress/raft.json b/tests/stress/raft.json
new file mode 100644
index 000000000..ede2ddc62
--- /dev/null
+++ b/tests/stress/raft.json
@@ -0,0 +1,7 @@
+{
+  "election_timeout_min": 750,
+  "election_timeout_max": 1000,
+  "heartbeat_interval": 100,
+  "replication_timeout": 10000,
+  "log_size_snapshot_threshold": -1
+}