From b7bbd026de11212fe61765446612f2c6623597f9 Mon Sep 17 00:00:00 2001
From: antonio2368 <antonio2368@users.noreply.github.com>
Date: Mon, 21 Dec 2020 10:59:50 +0100
Subject: [PATCH] Add large Jepsen test (#67)

* Fix WAL recovery step
---
 src/storage/v2/durability/durability.cpp      |   2 +-
 .../v2/replication/replication_client.cpp     |   5 +-
 tests/jepsen/src/jepsen/memgraph/core.clj     |   6 +-
 tests/jepsen/src/jepsen/memgraph/large.clj    | 106 ++++++++++++++++++
 4 files changed, 114 insertions(+), 5 deletions(-)
 create mode 100644 tests/jepsen/src/jepsen/memgraph/large.clj

diff --git a/src/storage/v2/durability/durability.cpp b/src/storage/v2/durability/durability.cpp
index 9dbb051a7..97835223d 100644
--- a/src/storage/v2/durability/durability.cpp
+++ b/src/storage/v2/durability/durability.cpp
@@ -89,7 +89,7 @@ std::optional<std::vector<WalDurabilityInfo>> GetWalFiles(
     try {
       auto info = ReadWalInfo(item.path());
       if ((uuid.empty() || info.uuid == uuid) &&
-          (!current_seq_num || info.seq_num < current_seq_num))
+          (!current_seq_num || info.seq_num < *current_seq_num))
         wal_files.emplace_back(info.seq_num, info.from_timestamp,
                                info.to_timestamp, std::move(info.uuid),
                                std::move(info.epoch_id), item.path());
diff --git a/src/storage/v2/replication/replication_client.cpp b/src/storage/v2/replication/replication_client.cpp
index 48c5fe298..c53b687ea 100644
--- a/src/storage/v2/replication/replication_client.cpp
+++ b/src/storage/v2/replication/replication_client.cpp
@@ -443,8 +443,9 @@ Storage::ReplicationClient::GetRecoverySteps(
 
     // Find first WAL that contains up to replica commit, i.e. WAL
     // that is before the replica commit or conatins the replica commit
-    // as the last committed transaction.
-    if (replica_commit >= rwal_it->from_timestamp) {
+    // as the last committed transaction OR we managed to find the first WAL
+    // file.
+    if (replica_commit >= rwal_it->from_timestamp || rwal_it->seq_num == 0) {
       if (replica_commit >= rwal_it->to_timestamp) {
         // We want the WAL after because the replica already contains all the
         // commits from this WAL
diff --git a/tests/jepsen/src/jepsen/memgraph/core.clj b/tests/jepsen/src/jepsen/memgraph/core.clj
index 9a6675a68..fcf456a71 100644
--- a/tests/jepsen/src/jepsen/memgraph/core.clj
+++ b/tests/jepsen/src/jepsen/memgraph/core.clj
@@ -12,6 +12,7 @@
             [slingshot.slingshot :refer [try+ throw+]]
             [jepsen.memgraph [basic :as basic]
                              [bank :as bank]
+                             [large :as large]
                              [sequential :as sequential]
                              [support :as s]
                              [nemesis :as nemesis]
@@ -21,7 +22,8 @@
   "A map of workload names to functions that can take opts and construct
    workloads."
    {:bank       bank/workload
-    :sequential sequential/workload})
+    :sequential sequential/workload
+    :large      large/workload})
 
 (def nemesis-configuration
   "Nemesis configuration"
@@ -43,7 +45,7 @@
                                (gen/log "Healing cluster.")
                                (gen/nemesis (:final-generator nemesis))
                                (gen/log "Waiting for recovery")
-                               (gen/sleep 10)
+                               (gen/sleep 20)
                                (gen/clients final-generator))
                    gen)]
     (merge tests/noop-test
diff --git a/tests/jepsen/src/jepsen/memgraph/large.clj b/tests/jepsen/src/jepsen/memgraph/large.clj
new file mode 100644
index 000000000..d58248df7
--- /dev/null
+++ b/tests/jepsen/src/jepsen/memgraph/large.clj
@@ -0,0 +1,106 @@
+(ns jepsen.memgraph.large
+  "Large write test"
+  (:require [neo4j-clj.core :as dbclient]
+            [clojure.tools.logging :refer [info]]
+            [jepsen [client :as client]
+                    [checker :as checker]
+                    [generator :as gen]]
+            [jepsen.checker.timeline :as timeline]
+            [jepsen.memgraph.client :as c]))
+
+(def node-num 100000)
+
+(dbclient/defquery get-node-count
+  "MATCH (n:Node) RETURN count(n) as c;")
+
+(defn create-nodes-builder
+  []
+  (dbclient/create-query
+    (str "UNWIND range(1, " node-num ") AS i "
+         "CREATE (n:Node:Additional {id: i, property1: 0, property2: 1, property3: 2});")))
+
+(def create-nodes (create-nodes-builder))
+
+(c/replication-client Client []
+  (open! [this test node]
+    (c/replication-open-connection this node node-config))
+  (setup! [this test]
+    (when (= replication-role :main)
+      (c/with-session conn session
+        (c/detach-delete-all session)
+        (create-nodes session))))
+  (invoke! [this test op]
+    (c/replication-invoke-case (:f op)
+      :read   (c/with-session conn session
+                (assoc op
+                       :type :ok
+                       :value {:count (->> (get-node-count session)
+                                           first
+                                           :c)
+                               :node node}))
+      :add    (if (= replication-role :main)
+                (c/with-session conn session
+                  (create-nodes session)
+                  (assoc op :type :ok))
+                (assoc op :type :fail))))
+  (teardown! [this test]
+    (when (= replication-role :main)
+      (c/with-session conn session
+        (c/detach-delete-all session))))
+  (close! [_ est]
+    (dbclient/disconnect conn)))
+
+(defn add-nodes
+  "Add nodes"
+  [test process]
+  {:type :invoke :f :add :value nil})
+
+(defn read-nodes
+  "Read node count"
+  [test process]
+  {:type :invoke :f :read :value nil})
+
+(defn large-checker
+  "Check if every read has a count divisible with node-num."
+  []
+  (reify checker/Checker
+    (check [this test history opts]
+      (let [ok-reads (->> history
+                          (filter #(= :ok (:type %)))
+                          (filter #(= :read (:f %))))
+            bad-reads (->> ok-reads
+                           (map (fn [op]
+                                  (let [count (-> op :value :count)]
+                                    (when (not= 0 (mod count node-num))
+                                      {:type :invalid-count
+                                       :op op}))))
+                           (filter identity)
+                           (into []))
+            empty-nodes (let [all-nodes (->> ok-reads
+                                             (map #(-> % :value :node))
+                                             (reduce conj #{}))]
+                          (->> all-nodes
+                               (filter (fn [node]
+                                        (every?
+                                          #(= 0 %)
+                                          (->> ok-reads
+                                               (map :value)
+                                               (filter #(= node (:node %)))
+                                               (map :count)))))
+                               (filter identity)
+                               (into [])))]
+        {:valid? (and
+                   (empty? bad-reads)
+                   (empty? empty-nodes))
+         :empty-nodes empty-nodes
+         :bad-reads bad-reads}))))
+
+(defn workload
+  [opts]
+  {:client (Client. nil nil nil (:node-config opts))
+   :checker (checker/compose
+              {:large    (large-checker)
+               :timeline (timeline/html)})
+   :generator (c/replication-gen
+                (gen/mix [read-nodes add-nodes]))
+   :final-generator (gen/once read-nodes)})