From 4fa44c3edd04b0c1b5dfcba0e397c942602ac0ec Mon Sep 17 00:00:00 2001
From: Matija Santl <matija.santl@memgraph.com>
Date: Wed, 16 Jan 2019 17:06:08 +0100
Subject: [PATCH] Fix Raft's ReplicationLog

Summary:
`ReplicationLog` had a classic off-by-one bug. The `valid_prefix`
variable wasn't set properly.

This diff also includes a poor man's version of a HA client. This client
assumes that all the HA instances run on a single machine and that the
corresponding Bold endpoints have open ports ranging from `7687` to
`7687 + num_machines - 1`.

This should make it easeir to test certain things, ie. disk usage, P25.

This test revealed the bug with `ReplicationLog`

Reviewers: ipaljak

Reviewed By: ipaljak

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1813
---
 src/raft/replication_log.hpp   |   4 +-
 tests/manual/CMakeLists.txt    |   3 +
 tests/manual/ha_client.cpp     | 106 +++++++++++++++++++++++++++++++++
 tests/unit/replication_log.cpp |   5 +-
 4 files changed, 115 insertions(+), 3 deletions(-)
 create mode 100644 tests/manual/ha_client.cpp

diff --git a/src/raft/replication_log.hpp b/src/raft/replication_log.hpp
index 2a7a517cf..c49b88bc5 100644
--- a/src/raft/replication_log.hpp
+++ b/src/raft/replication_log.hpp
@@ -38,8 +38,10 @@ class ReplicationLog final {
   // Clears the replication log from bits associated with transactions with an
   // id lower than `id`.
   void garbage_collect_older(tx::TransactionId id) {
+    // We keep track of the valid prefix in order to avoid the `CHECK` inside
+    // the `DynamicBitset`.
+    valid_prefix = 2 * id;
     log.delete_prefix(2 * id);
-    valid_prefix = 2 * (id + 1);
   }
 
   class Info final {
diff --git a/tests/manual/CMakeLists.txt b/tests/manual/CMakeLists.txt
index 4e805b93c..328f0ee2a 100644
--- a/tests/manual/CMakeLists.txt
+++ b/tests/manual/CMakeLists.txt
@@ -54,6 +54,9 @@ target_link_libraries(${test_prefix}generate_snapshot mg-distributed kvstore_dum
 add_manual_test(graph_500_generate_snapshot.cpp)
 target_link_libraries(${test_prefix}graph_500_generate_snapshot mg-distributed kvstore_dummy_lib)
 
+add_manual_test(ha_client.cpp)
+target_link_libraries(${test_prefix}ha_client mg-utils mg-communication)
+
 add_manual_test(kvstore_console.cpp)
 target_link_libraries(${test_prefix}kvstore_console kvstore_lib gflags glog)
 
diff --git a/tests/manual/ha_client.cpp b/tests/manual/ha_client.cpp
new file mode 100644
index 000000000..c996a4fd4
--- /dev/null
+++ b/tests/manual/ha_client.cpp
@@ -0,0 +1,106 @@
+#include <chrono>
+#include <iostream>
+#include <thread>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "communication/bolt/client.hpp"
+#include "io/network/endpoint.hpp"
+#include "io/network/utils.hpp"
+#include "utils/timer.hpp"
+
+DEFINE_string(address, "127.0.0.1", "Server address");
+DEFINE_int32(port, 7687, "Server port");
+DEFINE_int32(cluster_size, 3, "Size of the raft cluster.");
+DEFINE_string(username, "", "Username for the database");
+DEFINE_string(password, "", "Password for the database");
+DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server.");
+
+using namespace std::chrono_literals;
+
+void Execute(const std::vector<std::string> &queries) {
+  communication::ClientContext context(FLAGS_use_ssl);
+  std::experimental::optional<communication::bolt::Client> client;
+  communication::bolt::QueryData result;
+
+  for (size_t k = 0; k < queries.size();) {
+    if (!client) {
+      // Find the leader by executing query on all machines until one responds
+      // with success.
+      for (int retry = 0; !client && retry < 10; ++retry) {
+        for (int i = 0; !client && i < FLAGS_cluster_size; ++i) {
+          try {
+            client.emplace(&context);
+
+            uint16_t port = FLAGS_port + i;
+            io::network::Endpoint endpoint{FLAGS_address, port};
+
+            client->Connect(endpoint, FLAGS_username, FLAGS_password);
+            result = client->Execute(queries[k], {});
+          } catch (const communication::bolt::ClientQueryException &) {
+            // This one is not the leader, continue.
+            client = std::experimental::nullopt;
+            continue;
+          } catch (const communication::bolt::ClientFatalException &) {
+            // This one seems to be down, continue.
+            client = std::experimental::nullopt;
+            continue;
+          }
+        }
+
+        if (!client) {
+          LOG(INFO) << "Couldn't find Raft cluster leader, retrying...";
+          std::this_thread::sleep_for(1s);
+        }
+      }
+
+      if (!client) {
+        LOG(ERROR) << "Couldn't find Raft cluster leader.";
+        return;
+      }
+
+    } else {
+      // Try reusing the previous client.
+      try {
+        result = client->Execute(queries[k], {});
+      } catch (const communication::bolt::ClientQueryException &) {
+        client = std::experimental::nullopt;
+        continue;
+      } catch (const communication::bolt::ClientFatalException &e) {
+        client = std::experimental::nullopt;
+        continue;
+      }
+    }
+
+    if (result.records.size() > 0) {
+      std::cout << "Results: " << std::endl;
+      for (auto &record : result.records) {
+        std::cout << record << std::endl;
+      }
+    }
+
+    k += 1;
+  }
+
+  return;
+}
+
+int main(int argc, char **argv) {
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
+  google::InitGoogleLogging(argv[0]);
+
+  communication::Init();
+
+  std::vector<std::string> queries;
+
+  std::string line;
+  while (std::getline(std::cin, line)) {
+    queries.emplace_back(std::move(line));
+  }
+
+  Execute(queries);
+
+  return 0;
+}
diff --git a/tests/unit/replication_log.cpp b/tests/unit/replication_log.cpp
index 7943a5fb5..befe0dfaa 100644
--- a/tests/unit/replication_log.cpp
+++ b/tests/unit/replication_log.cpp
@@ -49,17 +49,18 @@ TEST(ReplicationLog, GarbageCollect) {
 
   rlog.garbage_collect_older(n);
 
-  for (int i = 1; i <= n; ++i) {
+  for (int i = 1; i < n; ++i) {
     EXPECT_FALSE(rlog.is_active(i));
     EXPECT_FALSE(rlog.is_replicated(i));
   }
 
-  for (int i = n + 1; i < 2 * n; ++i) {
+  for (int i = n; i < 2 * n; ++i) {
     EXPECT_FALSE(rlog.is_active(i));
     EXPECT_TRUE(rlog.is_replicated(i));
   }
 
   for (int i = 2 * n; i < 3 * n; ++i) {
     EXPECT_TRUE(rlog.is_active(i));
+    EXPECT_FALSE(rlog.is_replicated(i));
   }
 }