From 4790d6458e3bd40c7752649cd3e495a304cf9554 Mon Sep 17 00:00:00 2001
From: Matija Santl <matija.santl@memgraph.com>
Date: Mon, 25 Feb 2019 14:55:26 +0100
Subject: [PATCH] Add index support in HA

Summary:
Added index creation and deletion handling in StateDelta.
Also included an integration test that creates an index and makes sure that it
gets replicated by killing each peer eventually causing a leader re-election.

Reviewers: ipaljak

Reviewed By: ipaljak

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1886
---
 .../single_node_ha/state_delta_applier.cpp    |  4 -
 src/durability/single_node_ha/state_delta.cpp |  8 +-
 src/raft/raft_server.cpp                      |  2 +-
 tests/integration/CMakeLists.txt              |  3 +
 tests/integration/apollo_runs.yaml            | 14 ++-
 tests/integration/ha/index/CMakeLists.txt     |  6 ++
 tests/integration/ha/index/raft.json          |  6 ++
 tests/integration/ha/index/runner.py          | 93 +++++++++++++++++++
 tests/integration/ha/index/tester.cpp         | 88 ++++++++++++++++++
 9 files changed, 215 insertions(+), 9 deletions(-)
 create mode 100644 tests/integration/ha/index/CMakeLists.txt
 create mode 100644 tests/integration/ha/index/raft.json
 create mode 100755 tests/integration/ha/index/runner.py
 create mode 100644 tests/integration/ha/index/tester.cpp

diff --git a/src/database/single_node_ha/state_delta_applier.cpp b/src/database/single_node_ha/state_delta_applier.cpp
index 83f54854a..846d85931 100644
--- a/src/database/single_node_ha/state_delta_applier.cpp
+++ b/src/database/single_node_ha/state_delta_applier.cpp
@@ -20,10 +20,6 @@ void StateDeltaApplier::Apply(const std::vector<StateDelta> &deltas) {
         LOG(FATAL) << "StateDeltaApplier shouldn't know about aborted "
                       "transactions";
         break;
-      case StateDelta::Type::BUILD_INDEX:
-      case StateDelta::Type::DROP_INDEX:
-        throw utils::NotYetImplemented(
-            "High availability doesn't support index at the moment!");
       default:
         delta.Apply(*GetAccessor(delta.transaction_id));
     }
diff --git a/src/durability/single_node_ha/state_delta.cpp b/src/durability/single_node_ha/state_delta.cpp
index 3570af079..1344badd2 100644
--- a/src/durability/single_node_ha/state_delta.cpp
+++ b/src/durability/single_node_ha/state_delta.cpp
@@ -336,9 +336,13 @@ void StateDelta::Apply(GraphDbAccessor &dba) const {
       dba.RemoveEdge(edge);
       break;
     }
-    case Type::BUILD_INDEX:
+    case Type::BUILD_INDEX: {
+      dba.BuildIndex(dba.Label(label_name), dba.Property(property_name),
+                     unique);
+      break;
+    }
     case Type::DROP_INDEX: {
-      LOG(FATAL) << "Index handling not handled in Apply";
+      dba.DeleteIndex(dba.Label(label_name), dba.Property(property_name));
       break;
     }
     case Type::NO_OP:
diff --git a/src/raft/raft_server.cpp b/src/raft/raft_server.cpp
index 9a0546c3c..dcbdd93d7 100644
--- a/src/raft/raft_server.cpp
+++ b/src/raft/raft_server.cpp
@@ -130,7 +130,7 @@ void RaftServer::Start() {
     // [Raft paper 5.1]
     // "If a server receives a request with a stale term, it rejects the
     // request"
-    if (exiting_ || req.term < current_term_) {
+    if (req.term < current_term_) {
       AppendEntriesRes res(false, current_term_);
       Save(res, res_builder);
       return;
diff --git a/tests/integration/CMakeLists.txt b/tests/integration/CMakeLists.txt
index 0bb188d74..7b2368758 100644
--- a/tests/integration/CMakeLists.txt
+++ b/tests/integration/CMakeLists.txt
@@ -19,5 +19,8 @@ add_subdirectory(distributed)
 # distributed ha/basic binaries
 add_subdirectory(ha/basic)
 
+# distributed ha/index binaries
+add_subdirectory(ha/index)
+
 # audit test binaries
 add_subdirectory(audit)
diff --git a/tests/integration/apollo_runs.yaml b/tests/integration/apollo_runs.yaml
index eb28191b3..3e595d0cc 100644
--- a/tests/integration/apollo_runs.yaml
+++ b/tests/integration/apollo_runs.yaml
@@ -66,9 +66,19 @@
     - runner.py # runner script
     - raft.json # raft configuration
     - ../ha_test.py # raft test base module
-    - ../../../../build_debug/memgraph_ha # memgraph distributed binary
+    - ../../../../build_debug/memgraph_ha # memgraph ha binary
     - ../../../../build_debug/tests/integration/ha/basic/tester # tester binary
 
+- name: integration__ha_index
+  cd: ha/index
+  commands: ./runner.py
+  infiles:
+    - runner.py # runner script
+    - raft.json # raft configuration
+    - ../ha_test.py # raft test base module
+    - ../../../../build_debug/memgraph_ha # memgraph ha binary
+    - ../../../../build_debug/tests/integration/ha/index/tester # tester binary
+
 - name: integration__ha_log_compaction
   cd: ha/log_compaction
   commands: ./runner.py
@@ -76,5 +86,5 @@
     - runner.py # runner script
     - raft.json # raft configuration
     - ../ha_test.py # raft test base module
-    - ../../../../build_debug/memgraph_ha # memgraph distributed binary
+    - ../../../../build_debug/memgraph_ha # memgraph ha binary
     - ../../../../build_debug/tests/manual/ha_client # tester binary
diff --git a/tests/integration/ha/index/CMakeLists.txt b/tests/integration/ha/index/CMakeLists.txt
new file mode 100644
index 000000000..3269a1ee1
--- /dev/null
+++ b/tests/integration/ha/index/CMakeLists.txt
@@ -0,0 +1,6 @@
+set(target_name memgraph__integration__ha_index)
+set(tester_target_name ${target_name}__tester)
+
+add_executable(${tester_target_name} tester.cpp)
+set_target_properties(${tester_target_name} PROPERTIES OUTPUT_NAME tester)
+target_link_libraries(${tester_target_name} mg-utils mg-communication)
diff --git a/tests/integration/ha/index/raft.json b/tests/integration/ha/index/raft.json
new file mode 100644
index 000000000..5983863d8
--- /dev/null
+++ b/tests/integration/ha/index/raft.json
@@ -0,0 +1,6 @@
+{
+  "election_timeout_min": 200,
+  "election_timeout_max": 500,
+  "heartbeat_interval": 100,
+  "log_size_snapshot_threshold": -1
+}
diff --git a/tests/integration/ha/index/runner.py b/tests/integration/ha/index/runner.py
new file mode 100755
index 000000000..404aee717
--- /dev/null
+++ b/tests/integration/ha/index/runner.py
@@ -0,0 +1,93 @@
+#!/usr/bin/python3
+
+import argparse
+import os
+import time
+import random
+import subprocess
+import sys
+
+SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
+PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", "..", ".."))
+
+# append parent directory
+sys.path.append(os.path.join(SCRIPT_DIR, ".."))
+
+from ha_test import HaTestBase
+
+
+class HaIndexTest(HaTestBase):
+    def execute_step(self, step):
+        if step == "create":
+            print("Executing create query")
+            client = subprocess.Popen([self.tester_binary, "--step", "create",
+                "--cluster_size", str(self.cluster_size)])
+
+        elif step == "check":
+            print("Executing check query")
+            client = subprocess.Popen([self.tester_binary, "--step", "check",
+                "--cluster_size", str(self.cluster_size)])
+        else:
+            return 0
+
+        # Check what happened with query execution.
+        try:
+            code = client.wait(timeout=30)
+        except subprocess.TimeoutExpired as e:
+            print("HA client timed out!")
+            client.kill()
+            return 1
+
+        return code
+
+
+    def execute(self):
+        self.start_cluster()
+
+        assert self.execute_step("create") == 0, \
+                "Error while executing create query"
+
+        for i in range(self.cluster_size):
+            # Kill worker.
+            print("Killing worker {}".format(i))
+            self.kill_worker(i)
+
+            time.sleep(5) # allow some time for possible leader re-election
+
+            assert self.execute_step("check") == 0, \
+                    "Error while executing check query"
+
+            # Bring worker back to life.
+            print("Starting worker {}".format(i))
+            self.start_worker(i)
+
+            time.sleep(5) # allow some time for possible leader re-election
+
+
+def find_correct_path(path):
+    f = os.path.join(PROJECT_DIR, "build", path)
+    if not os.path.exists(f):
+        f = os.path.join(PROJECT_DIR, "build_debug", path)
+    return f
+
+
+if __name__ == "__main__":
+    memgraph_binary = find_correct_path("memgraph_ha")
+    tester_binary = find_correct_path(os.path.join("tests", "integration", "ha",
+        "index", "tester"))
+
+    raft_config_file = os.path.join(PROJECT_DIR, "tests", "integration", "ha",
+        "index", "raft.json")
+
+    parser = argparse.ArgumentParser()
+    parser.add_argument("--memgraph", default=memgraph_binary)
+    parser.add_argument("--raft_config_file", default=raft_config_file)
+    args = parser.parse_args()
+
+    for cluster_size in [3, 5]:
+        print("\033[1;36m~~ Executing test with cluster size: %d~~\033[0m" % (cluster_size))
+        HaIndexTest(
+            args.memgraph, tester_binary, args.raft_config_file, cluster_size)
+        print("\033[1;32m~~ The test finished successfully ~~\033[0m")
+
+    sys.exit(0)
diff --git a/tests/integration/ha/index/tester.cpp b/tests/integration/ha/index/tester.cpp
new file mode 100644
index 000000000..0acf3621c
--- /dev/null
+++ b/tests/integration/ha/index/tester.cpp
@@ -0,0 +1,88 @@
+#include <chrono>
+#include <thread>
+#include <vector>
+
+#include <fmt/format.h>
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "communication/bolt/client.hpp"
+#include "io/network/endpoint.hpp"
+#include "io/network/utils.hpp"
+#include "utils/timer.hpp"
+
+DEFINE_string(address, "127.0.0.1", "Server address");
+DEFINE_int32(port, 7687, "Server port");
+DEFINE_int32(cluster_size, 3, "Size of the raft cluster.");
+DEFINE_string(username, "", "Username for the database");
+DEFINE_string(password, "", "Password for the database");
+DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server.");
+DEFINE_string(step, "", "The step to execute (available: create, check)");
+
+using namespace std::chrono_literals;
+using communication::bolt::Value;
+
+int main(int argc, char **argv) {
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
+  google::InitGoogleLogging(argv[0]);
+
+  const std::string index = ":Node(id)";
+
+  communication::Init();
+
+  bool successful = false;
+  for (int retry = 0; !successful && retry < 10; ++retry) {
+    for (int i = 0; !successful && i < FLAGS_cluster_size; ++i) {
+      try {
+        communication::ClientContext context(FLAGS_use_ssl);
+        communication::bolt::Client client(&context);
+
+        uint16_t port = FLAGS_port + i;
+        io::network::Endpoint endpoint{FLAGS_address, port};
+        client.Connect(endpoint, FLAGS_username, FLAGS_password);
+
+        if (FLAGS_step == "create") {
+          client.Execute(fmt::format("create index on {}", index), {});
+          successful = true;
+
+        } else if (FLAGS_step == "check") {
+          auto result = client.Execute("show index info", {});
+
+          auto checker = [&index](const std::vector<Value> &record) {
+            if (record.size() != 1) return false;
+            return record[0].ValueString() == index;
+          };
+
+          // Check that index ":Node(id)" exists
+          if (!std::any_of(result.records.begin(), result.records.end(),
+                           checker)) {
+            LOG(WARNING) << "Missing index!";
+            return 2;
+          }
+
+          successful = true;
+
+        } else {
+          LOG(FATAL) << "Unexpected client step!";
+        }
+      } catch (const communication::bolt::ClientQueryException &) {
+        // This one is not the leader, continue.
+        continue;
+      } catch (const communication::bolt::ClientFatalException &) {
+        // This one seems to be down, continue.
+        continue;
+      }
+      LOG(INFO) << "Current Raft cluster leader is " << i;
+    }
+    if (!successful) {
+      LOG(INFO) << "Couldn't find Raft cluster leader, retrying.";
+      std::this_thread::sleep_for(1s);
+    }
+  }
+
+  if (!successful) {
+    LOG(WARNING) << "Couldn't find Raft cluster leader.";
+    return 1;
+  }
+  return 0;
+}