Jepsen test infrastracture improvements and bank test (#62)

* Define replication config for tests

* Add support for final generator

* Add bank test

* Add host name resolution and basic replication setup

* Add timeout support

* Define helper macros for replication tests

* Add nemesis configuration
This commit is contained in:
antonio2368 2020-12-16 09:02:47 +01:00 committed by Antonio Andelic
parent a0fb3fc463
commit 60d742a2dc
10 changed files with 477 additions and 43 deletions

@ -324,6 +324,7 @@ jobs:
name: "Release Jepsen Test"
runs-on: [self-hosted, Linux, X64, Debian10, JepsenControl]
continue-on-error: true

@ -0,0 +1,6 @@
[{"n1" {:replication-role :main}
"n2" {:replication-role :replica :replication-mode :async :port 10000}
"n3" {:replication-role :replica :replication-mode :async :port 10000}}
{"n1" {:replication-role :main}
"n2" {:replication-role :replica :replication-mode :async :port 10000}
"n3" {:replication-role :replica :replication-mode :sync :port 10000 :timeout 5}}]

@ -49,6 +49,7 @@ case $1 in
docker exec jepsen-control mkdir -p /jepsen/memgraph
docker cp "$script_dir/src/." jepsen-control:/jepsen/memgraph/src/
docker cp "$script_dir/test/." jepsen-control:/jepsen/memgraph/test/
docker cp "$script_dir/resources/." jepsen-control:/jepsen/memgraph/resources/
docker cp "$script_dir/project.clj" jepsen-control:/jepsen/memgraph/project.clj

@ -0,0 +1,167 @@
"Bank account test on Memgraph.
The test should do random transfers on
the main instance while randomly reading
the total sum of the all nodes which
should be consistent."
(:require [neo4j-clj.core :as dbclient]
[ :refer [info]]
[jepsen [client :as client]
[checker :as checker]
[generator :as gen]]
[jepsen.checker.timeline :as timeline]
[jepsen.memgraph.client :as c]))
(def account-num
"Number of accounts to be created"
(def starting-balance
"Starting balance of each account"
(def max-transfer-amount
(dbclient/defquery create-account
"CREATE (n:Account {id: $id, balance: $balance});")
(dbclient/defquery get-all-accounts
"MATCH (n:Account) RETURN n;")
(dbclient/defquery get-account
"MATCH (n:Account {id: $id}) RETURN n;")
(dbclient/defquery update-balance
"MATCH (n:Account {id: $id})
SET n.balance = n.balance + $amount
(defn transfer-money
"Transfer money from one account to another by some amount
if the account you're transfering money from has enough
[conn from to amount]
(dbclient/with-transaction conn tx
(when (-> (get-account tx {:id from}) first :n :balance (>= amount))
(update-balance tx {:id from :amount (- amount)})
(update-balance tx {:id to :amount amount})))))
(c/replication-client Client []
(open! [this test node]
(c/replication-open-connection this node node-config))
(setup! [this test]
(when (= replication-role :main)
(c/with-session conn session
(c/detach-delete-all session)
(dotimes [i account-num]
(info "Creating account:" i)
(create-account session {:id i :balance starting-balance}))))))
(invoke! [this test op]
(c/replication-invoke-case (:f op)
:read (c/with-session conn session
(assoc op
:type :ok
:value {:accounts (->> (get-all-accounts session) (map :n) (reduce conj []))
:node node}))
:transfer (if (= replication-role :main)
(let [value (:value op)]
(assoc op
:type (if
(:from value)
(:to value)
(:amount value))
(catch Exception e
; Transaction can fail on serialization errors
(assoc op :type :fail :info (str e))))
(assoc op :type :fail))))
(teardown! [this test]
(when (= replication-role :main)
(c/with-session conn session
(c/detach-delete-all session))))
(close! [_ est]
(dbclient/disconnect conn)))
(defn read-balances
"Read the current state of all accounts"
[test process]
{:type :invoke, :f :read, :value nil})
(defn transfer
"Transfer money"
[test process]
{:type :invoke :f :transfer :value {:from (rand-int account-num)
:to (rand-int account-num)
:amount (rand-int max-transfer-amount)}})
(def valid-transfer
"Filter only valid transfers (where :from and :to are different)"
(gen/filter (fn [op] (not= (-> op :value :from) (-> op :value :to))) transfer))
(defn bank-checker
"Balances must all be non-negative and sum to the model's total
Each node should have at least one read that returned all accounts.
We allow the reads to be empty because the replica can connect to
main at some later point, until that point the replica is empty."
(reify checker/Checker
(check [this test history opts]
(let [ok-reads (->> history
(filter #(= :ok (:type %)))
(filter #(= :read (:f %))))
bad-reads (->> ok-reads
(map (fn [op]
(let [balances (->> op :value :accounts (map :balance))
expected-total (* account-num starting-balance)]
(cond (and
(not-empty balances)
(reduce + balances)))
{:type :wrong-total
:expected expected-total
:found (reduce + balances)
:op op}
(some neg? balances)
{:type :negative-value
:found balances
:op op}))))
(filter identity)
(into []))
empty-nodes (let [all-nodes (->> ok-reads
(map #(-> % :value :node))
(reduce conj #{}))]
(->> all-nodes
(filter (fn [node]
(->> ok-reads
(map :value)
(filter #(= node (:node %)))
(map :accounts)))))
(filter identity)
(into [])))]
{:valid? (and
(empty? bad-reads)
(empty? empty-nodes))
:empty-nodes empty-nodes
:bad-reads bad-reads}))))
(defn workload
"Basic test workload"
{:client (Client. nil nil nil (:node-config opts))
:checker (checker/compose
{:bank (bank-checker)
:timeline (timeline/html)})
:generator (c/replication-gen (gen/mix [read-balances valid-transfer]))
:final-generator (gen/once read-balances)})

@ -71,5 +71,6 @@
{:model (model/cas-register 0)
:algorithm :linear})
:timeline (timeline/html)})
:generator (gen/mix [r w cas])})
:generator (gen/mix [r w cas])
:final-generator (gen/once r)})

@ -1,6 +1,8 @@
(ns jepsen.memgraph.client
"Neo4j Clojure driver helper functions/macros"
(:require [neo4j-clj.core :as dbclient])
(:require [neo4j-clj.core :as dbclient]
[ :refer [info]]
[jepsen [generator :as gen]])
(:import ( URI)))
;; Jepsen related utils.
@ -21,3 +23,92 @@
"Open client connection to the node"
(dbclient/connect (URI. (instance-url node 7687)) "" ""))
(dbclient/defquery detach-delete-all
(defn replication-mode-str
(case (:replication-mode node-config)
:async "ASYNC"
:sync (str "SYNC" (when-let [timeout (:timeout node-config)] (str " WITH TIMEOUT " timeout)))))
(defn create-register-replica-query
[name node-config]
" "
(replication-mode-str node-config)
" TO \""
(:ip node-config)
(:port node-config)
(defn create-set-replica-role-query
(defn register-replicas
"Register all replicas."
[test process]
{:type :invoke :f :register :value nil})
(defn replication-gen
"Generator which should be used for replication tests
as it adds register replica invoke."
(gen/each-thread(gen/phases (cycle [(gen/once register-replicas)
(gen/time-limit 5 generator)]))))
(defmacro replication-client
"Create Client for replication tests.
Every replication client contains connection, node, replication role and
the node config for all nodes.
Adding additional fields is also possible."
[name [& fields] & specs]
(concat `(defrecord ~name [~'conn ~'node ~'replication-role ~'node-config ~@fields]
(defn replication-open-connection
"Open a connection to a node using the client.
After the connection is opened set the correct
replication role of instance."
[client node node-config]
(let [connection (open node)
nc (get node-config node)
role (:replication-role nc)]
(when (= :replica role)
(with-session connection session
((create-set-replica-role-query (:port nc)) session)
(catch Exception e
(info "Already setup the role")))))
(assoc client :replication-role role
:conn connection
:node node)))
(defmacro replication-invoke-case
"Call the case method on the op using the defined cases
while a handler for :register case is added."
[f & cases]
(concat (list 'case f
:register '(if (= replication-role :main)
(doseq [n (filter #(= (:replication-role (val %))
(c/with-session conn session
(first n)
(second n)) session))
(catch Exception e)))
(assoc op :type :ok))
(assoc op :type :fail)))

@ -2,67 +2,168 @@
(:require [ :refer :all]
[clojure.string :as str]
[ :refer [sh]]
[jepsen [cli :as cli]
[core :as jepsen]
[checker :as checker]
[nemesis :as nemesis]
[control :as c]
[core :as jepsen]
[generator :as gen]
[tests :as tests]]
[slingshot.slingshot :refer [try+ throw+]]
[jepsen.memgraph [basic :as basic]
[support :as s]]))
[bank :as bank]
[support :as s]
[nemesis :as nemesis]
[edn :as e]]))
(def workloads
"A map of workload names to functions that can take opts and construct
{:basic basic/workload}
{:bank bank/workload})
(def nemesis-configuration
"Nemesis configuration"
{:interval 5
:kill-node true
:partition-halves true})
(defn memgraph-test
"Given an options map from the command line runner (e.g. :nodes, :ssh,
:concurrency, ...), constructs a test map."
(let [workload ((get workloads (:workload opts)) opts)]
(let [workload ((get workloads (:workload opts)) opts)
nemesis (nemesis/nemesis nemesis-configuration)
gen (->> (:generator workload)
(gen/nemesis (:generator nemesis))
(gen/time-limit (:time-limit opts)))
gen (if-let [final-generator (:final-generator workload)]
(gen/phases gen
(gen/log "Healing cluster.")
(gen/nemesis (:final-generator nemesis))
(gen/log "Waiting for recovery")
(gen/sleep 10)
(gen/clients final-generator))
(merge tests/noop-test
{:pure-generators true
:name (str "test-" (name (:workload opts)))
:db (s/db (:package-url opts) (:local-binary opts))
:db (s/db opts)
:client (:client workload)
:checker (checker/compose
;; Fails on a cluster of independent Memgraphs.
{;; :stats (checker/stats) CAS always fails
;; so enable this
;; if all test have
;; at least 1 ok op
{:stats (checker/stats)
:exceptions (checker/unhandled-exceptions)
:perf (checker/perf)
:workload (:checker workload)})
:nemesis (nemesis/partition-random-halves)
:generator (->> (:generator workload)
(cycle [(gen/sleep 5)
{:type :info, :f :start}
(gen/sleep 5)
{:type :info, :f :stop}]))
(gen/time-limit (:time-limit opts)))})))
:nemesis (:nemesis nemesis)
:generator gen})))
(defn default-node-configuration
"Creates default replication configuration for nodes.
All of them are replicas in sync mode."
(reduce (fn [cur n]
(conj cur {n
{:replication-role :replica
:replication-mode :sync
:port 10000
:timeout 10}}))
(defn resolve-hostname
"Resolve hostnames to ip address"
(:out (sh "getent" "hosts" host)))))
(defn resolve-all-node-hostnames
"Resolve all hostnames in config and assign it to the node"
(reduce (fn [curr node]
(let [k (first node)
v (second node)]
(assoc curr
k (assoc v
:ip (resolve-hostname k)))))
(defn throw-if-key-missing-in-any
[map-coll key error-msg]
(when-not (every? #(contains? % key) map-coll)
(throw (Exception. error-msg))))
(defn merge-node-configurations
"Merge user defined configuration with default configuration.
Check if the configuration is valid."
[nodes node-configs]
(when-not (every? (fn [config]
(= 1
#(= (:replication-role %) :main)
(vals config)))))
(throw (Exception. "Invalid node configuration. There can only be one :main.")))
(doseq [node-config node-configs]
(let [replica-nodes-configs (filter
#(= (:replication-role %) :replica)
(vals node-config))]
(str "Invalid node configuration. "
"Every replication node requires "
":port to be defined."))
(str "Invalid node configuration. "
"Every replication node requires "
":replication-mode to be defined."))
(filter #(= (:replication-mode %) :sync) replica-nodes-configs)
(str "Invalid node confiruation. "
"Every SYNC replication node requires "
":timeout to be defined."))))
(map (fn [node-config] (resolve-all-node-hostnames
(default-node-configuration nodes)
(def cli-opts
"CLI options for tests."
[[nil "--package-url URL" "What package of Memgraph should we test?"
:default nil]
:default nil
:validate [nil? "Memgraph package-url setup not yet implemented."]]
[nil "--local-binary PATH" "Ignore package; use this local binary instead."
:default "/opt/memgraph/memgraph"]
:default "/opt/memgraph/memgraph"
:validate [#(and (some? %) (not-empty %)) "local-binary should be defined."]]
["-w" "--workload NAME" "Test workload to run"
:parse-fn keyword
:validate [workloads (cli/one-of workloads)]]])
:validate [workloads (cli/one-of workloads)]]
[nil "--node-configs PATH" "Path to the node configuration file."
:parse-fn #(-> % e/load-configuration)]])
(defn all-tests
"Takes base CLI options and constructs a sequence of test options."
(let [counts (range (:test-count opts))
workloads (if-let [w (:workload opts)] [w] (keys workloads))
test-opts (for [i counts, w workloads]
node-configs (if (:node-configs opts)
(merge-node-configurations (:nodes opts) (:node-configs opts))
[(resolve-all-node-hostnames (default-node-configuration (:nodes opts)))])
test-opts (for [i counts c node-configs w workloads]
(assoc opts
:node-config c
:workload w))]
(map memgraph-test test-opts)))

@ -0,0 +1,7 @@
(ns jepsen.memgraph.edn
(:require [clojure.edn :as edn]))
(defn load-configuration
"Load a configuration file."
(-> path slurp edn/read-string))

@ -0,0 +1,49 @@
(ns jepsen.memgraph.nemesis
"Memgraph nemesis"
(:require [jepsen [nemesis :as nemesis]
[generator :as gen]]
[ :as s]))
(defn node-killer
"Responds to :start by killing a random node, and to :stop
by resuming them."
(nemesis/node-start-stopper identity
(defn full-nemesis
"Can kill and restart all processess and initiate network partitions."
{{:kill-node :start
:restart-node :stop} (node-killer)
{:start-partition-halves :start
:stop-partition-halves :stop} (nemesis/partition-random-halves)}))
(defn op
"Construct a nemesis op"
{:type :info :f f})
(defn full-generator
"Construct nemesis generator."
(->> [(when (:kill-node? opts)
[(cycle (map op [:kill-node :restart-node]))])
(when (:partition-halves? opts)
[(cycle (map op [:start-partition-halves :stop-partition-halves]))])]
(apply concat)
(gen/stagger (:interval opts))))
(defn nemesis
"Composite nemesis and generator"
{:nemesis (full-nemesis opts)
:generator (full-generator opts)
(->> [(when (:partition-halves? opts) :stop-partition-halves)
(when (:kill-node? opts) :restart-node)]
(remove nil?)
(map op))})

@ -12,30 +12,40 @@
(def mgdata (str mgdir "/mg_data"))
(def mglog (str mgdir "/memgraph.log"))
(def mgpid (str mgdir "/"))
(defn start-node!
[test node]
{:logfile mglog
:pidfile mgpid
:chdir mgdir}
(:local-binary test)
:--storage-snapshot-interval-sec 300
(defn stop-node!
[test node]
(cu/stop-daemon! (:local-binary test) mgpid))
(defn db
"Manage Memgraph DB on each node."
[package-url local-binary]
(reify db/DB
(setup! [_ test node]
(c/su (debian/install ['python3 'python3-dev]))
(c/su (meh (c/exec :killall :memgraph)))
(when-not (nil? package-url)
(throw (Exception. "Memgraph package-url setup not yet implemented.")))
(when (nil? local-binary)
(throw (Exception. "Memgraph local-binary has to be defined.")))
(try (c/exec :command :-v local-binary)
(catch Exception e
(throw (Exception. (str local-binary " is not there.")))))
(info node "Memgraph binary is there" local-binary)
{:logfile mglog
:pidfile mgpid
:chdir mgdir}
(Thread/sleep 2000))
(let [local-binary (:local-binary opts)]
(c/su (debian/install ['python3 'python3-dev]))
(c/su (meh (c/exec :killall :memgraph)))
(try (c/exec :command :-v local-binary)
(catch Exception e
(throw (Exception. (str local-binary " is not there.")))))
(info node "Memgraph binary is there" local-binary)
(start-node! test node)
(Thread/sleep 2000)))
(teardown! [_ test node]
(info node "Tearing down Memgraph")
(when (and local-binary mgpid) (cu/stop-daemon! local-binary mgpid))
(stop-node! test node)
(c/exec :rm :-rf mgdata)
(c/exec :rm :-rf mglog)))