From cbf826e0c36fdde9f9f20a7364596c36bb67fb2c Mon Sep 17 00:00:00 2001
From: antonio2368 <antonio2368@users.noreply.github.com>
Date: Wed, 16 Jun 2021 13:22:48 +0200
Subject: [PATCH] Load WAL on replica using transactions (#95)

---
 .../v2/replication/replication_server.cpp     | 415 +++++++++---------
 .../v2/replication/replication_server.hpp     |   4 +-
 tests/jepsen/src/jepsen/memgraph/bank.clj     |   4 +-
 .../jepsen/src/jepsen/memgraph/sequential.clj |  39 +-
 4 files changed, 226 insertions(+), 236 deletions(-)

diff --git a/src/storage/v2/replication/replication_server.cpp b/src/storage/v2/replication/replication_server.cpp
index 495b03d04..503898ef1 100644
--- a/src/storage/v2/replication/replication_server.cpp
+++ b/src/storage/v2/replication/replication_server.cpp
@@ -1,12 +1,33 @@
 #include "storage/v2/replication/replication_server.hpp"
+#include <atomic>
+#include <filesystem>
 
 #include "storage/v2/durability/durability.hpp"
+#include "storage/v2/durability/paths.hpp"
+#include "storage/v2/durability/serialization.hpp"
 #include "storage/v2/durability/snapshot.hpp"
+#include "storage/v2/durability/version.hpp"
+#include "storage/v2/durability/wal.hpp"
 #include "storage/v2/replication/config.hpp"
 #include "storage/v2/transaction.hpp"
 #include "utils/exceptions.hpp"
 
 namespace storage {
+namespace {
+std::pair<uint64_t, durability::WalDeltaData> ReadDelta(durability::BaseDecoder *decoder) {
+  try {
+    auto timestamp = ReadWalDeltaHeader(decoder);
+    SPDLOG_INFO("       Timestamp {}", timestamp);
+    auto delta = ReadWalDeltaData(decoder);
+    return {timestamp, delta};
+  } catch (const slk::SlkReaderException &) {
+    throw utils::BasicException("Missing data!");
+  } catch (const durability::RecoveryFailure &) {
+    throw utils::BasicException("Invalid data!");
+  }
+};
+}  // namespace
+
 Storage::ReplicationServer::ReplicationServer(Storage *storage, io::network::Endpoint endpoint,
                                               const replication::ReplicationServerConfig &config)
     : storage_(storage) {
@@ -68,33 +89,6 @@ void Storage::ReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, sl
     storage_->epoch_id_ = std::move(*maybe_epoch_id);
   }
 
-  const auto read_delta = [&]() -> std::pair<uint64_t, durability::WalDeltaData> {
-    try {
-      auto timestamp = ReadWalDeltaHeader(&decoder);
-      SPDLOG_INFO("       Timestamp {}", timestamp);
-      auto delta = ReadWalDeltaData(&decoder);
-      return {timestamp, delta};
-    } catch (const slk::SlkReaderException &) {
-      throw utils::BasicException("Missing data!");
-    } catch (const durability::RecoveryFailure &) {
-      throw utils::BasicException("Invalid data!");
-    }
-  };
-
-  if (req.previous_commit_timestamp != storage_->last_commit_timestamp_.load()) {
-    // Empty the stream
-    bool transaction_complete = false;
-    while (!transaction_complete) {
-      SPDLOG_INFO("Skipping delta");
-      const auto [timestamp, delta] = read_delta();
-      transaction_complete = durability::IsWalDeltaDataTypeTransactionEnd(delta.type);
-    }
-
-    AppendDeltasRes res{false, storage_->last_commit_timestamp_.load()};
-    slk::Save(res, res_builder);
-    return;
-  }
-
   if (storage_->wal_file_) {
     if (req.seq_num > storage_->wal_file_->SequenceNumber() || *maybe_epoch_id != storage_->epoch_id_) {
       storage_->wal_file_->FinalizeWal();
@@ -108,6 +102,173 @@ void Storage::ReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, sl
     storage_->wal_seq_num_ = req.seq_num;
   }
 
+  if (req.previous_commit_timestamp != storage_->last_commit_timestamp_.load()) {
+    // Empty the stream
+    bool transaction_complete = false;
+    while (!transaction_complete) {
+      SPDLOG_INFO("Skipping delta");
+      const auto [timestamp, delta] = ReadDelta(&decoder);
+      transaction_complete = durability::IsWalDeltaDataTypeTransactionEnd(delta.type);
+    }
+
+    AppendDeltasRes res{false, storage_->last_commit_timestamp_.load()};
+    slk::Save(res, res_builder);
+    return;
+  }
+
+  ReadAndApplyDelta(&decoder);
+
+  AppendDeltasRes res{true, storage_->last_commit_timestamp_.load()};
+  slk::Save(res, res_builder);
+}
+
+void Storage::ReplicationServer::SnapshotHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
+  SnapshotReq req;
+  slk::Load(&req, req_reader);
+
+  replication::Decoder decoder(req_reader);
+
+  utils::EnsureDirOrDie(storage_->snapshot_directory_);
+
+  const auto maybe_snapshot_path = decoder.ReadFile(storage_->snapshot_directory_);
+  MG_ASSERT(maybe_snapshot_path, "Failed to load snapshot!");
+  spdlog::info("Received snapshot saved to {}", *maybe_snapshot_path);
+
+  std::unique_lock<utils::RWLock> storage_guard(storage_->main_lock_);
+  // Clear the database
+  storage_->vertices_.clear();
+  storage_->edges_.clear();
+
+  storage_->constraints_ = Constraints();
+  storage_->indices_.label_index = LabelIndex(&storage_->indices_, &storage_->constraints_, storage_->config_.items);
+  storage_->indices_.label_property_index =
+      LabelPropertyIndex(&storage_->indices_, &storage_->constraints_, storage_->config_.items);
+  try {
+    spdlog::debug("Loading snapshot");
+    auto recovered_snapshot = durability::LoadSnapshot(*maybe_snapshot_path, &storage_->vertices_, &storage_->edges_,
+                                                       &storage_->epoch_history_, &storage_->name_id_mapper_,
+                                                       &storage_->edge_count_, storage_->config_.items);
+    spdlog::debug("Snapshot loaded successfully");
+    // If this step is present it should always be the first step of
+    // the recovery so we use the UUID we read from snasphost
+    storage_->uuid_ = std::move(recovered_snapshot.snapshot_info.uuid);
+    storage_->epoch_id_ = std::move(recovered_snapshot.snapshot_info.epoch_id);
+    const auto &recovery_info = recovered_snapshot.recovery_info;
+    storage_->vertex_id_ = recovery_info.next_vertex_id;
+    storage_->edge_id_ = recovery_info.next_edge_id;
+    storage_->timestamp_ = std::max(storage_->timestamp_, recovery_info.next_timestamp);
+
+    durability::RecoverIndicesAndConstraints(recovered_snapshot.indices_constraints, &storage_->indices_,
+                                             &storage_->constraints_, &storage_->vertices_);
+  } catch (const durability::RecoveryFailure &e) {
+    LOG_FATAL("Couldn't load the snapshot because of: {}", e.what());
+  }
+  storage_guard.unlock();
+
+  SnapshotRes res{true, storage_->last_commit_timestamp_.load()};
+  slk::Save(res, res_builder);
+
+  // Delete other durability files
+  auto snapshot_files = durability::GetSnapshotFiles(storage_->snapshot_directory_, storage_->uuid_);
+  for (const auto &[path, uuid, _] : snapshot_files) {
+    if (path != *maybe_snapshot_path) {
+      storage_->file_retainer_.DeleteFile(path);
+    }
+  }
+
+  auto wal_files = durability::GetWalFiles(storage_->wal_directory_, storage_->uuid_);
+  if (wal_files) {
+    for (const auto &wal_file : *wal_files) {
+      storage_->file_retainer_.DeleteFile(wal_file.path);
+    }
+
+    storage_->wal_file_.reset();
+  }
+}
+
+void Storage::ReplicationServer::WalFilesHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
+  WalFilesReq req;
+  slk::Load(&req, req_reader);
+
+  const auto wal_file_number = req.file_number;
+  spdlog::debug("Received WAL files: {}", wal_file_number);
+
+  replication::Decoder decoder(req_reader);
+
+  utils::EnsureDirOrDie(storage_->wal_directory_);
+
+  for (auto i = 0; i < wal_file_number; ++i) {
+    LoadWal(&decoder);
+  }
+
+  WalFilesRes res{true, storage_->last_commit_timestamp_.load()};
+  slk::Save(res, res_builder);
+}
+
+void Storage::ReplicationServer::CurrentWalHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
+  CurrentWalReq req;
+  slk::Load(&req, req_reader);
+
+  replication::Decoder decoder(req_reader);
+
+  utils::EnsureDirOrDie(storage_->wal_directory_);
+
+  LoadWal(&decoder);
+
+  CurrentWalRes res{true, storage_->last_commit_timestamp_.load()};
+  slk::Save(res, res_builder);
+}
+
+void Storage::ReplicationServer::LoadWal(replication::Decoder *decoder) {
+  const auto temp_wal_directory = std::filesystem::temp_directory_path() / "memgraph" / durability::kWalDirectory;
+  utils::EnsureDir(temp_wal_directory);
+  auto maybe_wal_path = decoder->ReadFile(temp_wal_directory);
+  MG_ASSERT(maybe_wal_path, "Failed to load WAL!");
+  spdlog::trace("Received WAL saved to {}", *maybe_wal_path);
+  try {
+    auto wal_info = durability::ReadWalInfo(*maybe_wal_path);
+    if (wal_info.seq_num == 0) {
+      storage_->uuid_ = wal_info.uuid;
+    }
+
+    if (wal_info.epoch_id != storage_->epoch_id_) {
+      storage_->epoch_history_.emplace_back(wal_info.epoch_id, storage_->last_commit_timestamp_);
+      storage_->epoch_id_ = std::move(wal_info.epoch_id);
+    }
+
+    if (storage_->wal_file_) {
+      if (storage_->wal_file_->SequenceNumber() != wal_info.seq_num) {
+        storage_->wal_file_->FinalizeWal();
+        storage_->wal_seq_num_ = wal_info.seq_num;
+        storage_->wal_file_.reset();
+      }
+    } else {
+      storage_->wal_seq_num_ = wal_info.seq_num;
+    }
+
+    durability::Decoder wal;
+    const auto version = wal.Initialize(*maybe_wal_path, durability::kWalMagic);
+    if (!version) throw durability::RecoveryFailure("Couldn't read WAL magic and/or version!");
+    if (!durability::IsVersionSupported(*version)) throw durability::RecoveryFailure("Invalid WAL version!");
+    wal.SetPosition(wal_info.offset_deltas);
+
+    for (size_t i = 0; i < wal_info.num_deltas;) {
+      i += ReadAndApplyDelta(&wal);
+    }
+
+    spdlog::debug("{} loaded successfully", *maybe_wal_path);
+  } catch (const durability::RecoveryFailure &e) {
+    LOG_FATAL("Couldn't recover WAL deltas from {} because of: {}", *maybe_wal_path, e.what());
+  }
+}
+
+Storage::ReplicationServer::~ReplicationServer() {
+  if (rpc_server_) {
+    rpc_server_->Shutdown();
+    rpc_server_->AwaitShutdown();
+  }
+}
+uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *decoder) {
   auto edge_acc = storage_->edges_.access();
   auto vertex_acc = storage_->vertices_.access();
 
@@ -121,11 +282,22 @@ void Storage::ReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, sl
     return &commit_timestamp_and_accessor->second;
   };
 
-  bool transaction_complete = false;
-  for (uint64_t i = 0; !transaction_complete; ++i) {
-    SPDLOG_INFO("  Delta {}", i);
-    const auto [timestamp, delta] = read_delta();
+  uint64_t applied_deltas = 0;
+  auto max_commit_timestamp = storage_->last_commit_timestamp_.load();
 
+  for (bool transaction_complete = false; !transaction_complete; ++applied_deltas) {
+    const auto [timestamp, delta] = ReadDelta(decoder);
+    if (timestamp > max_commit_timestamp) {
+      max_commit_timestamp = timestamp;
+    }
+
+    transaction_complete = durability::IsWalDeltaDataTypeTransactionEnd(delta.type);
+
+    if (timestamp < storage_->timestamp_) {
+      continue;
+    }
+
+    SPDLOG_INFO("  Delta {}", applied_deltas);
     switch (delta.type) {
       case durability::WalDeltaData::Type::VERTEX_CREATE: {
         spdlog::trace("       Create vertex {}", delta.vertex_create_delete.gid.AsUint());
@@ -368,189 +540,12 @@ void Storage::ReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, sl
         break;
       }
     }
-    transaction_complete = durability::IsWalDeltaDataTypeTransactionEnd(delta.type);
   }
 
   if (commit_timestamp_and_accessor) throw utils::BasicException("Invalid data!");
 
-  AppendDeltasRes res{true, storage_->last_commit_timestamp_.load()};
-  slk::Save(res, res_builder);
-}
+  storage_->last_commit_timestamp_ = max_commit_timestamp;
 
-void Storage::ReplicationServer::SnapshotHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
-  SnapshotReq req;
-  slk::Load(&req, req_reader);
-
-  replication::Decoder decoder(req_reader);
-
-  utils::EnsureDirOrDie(storage_->snapshot_directory_);
-
-  const auto maybe_snapshot_path = decoder.ReadFile(storage_->snapshot_directory_);
-  MG_ASSERT(maybe_snapshot_path, "Failed to load snapshot!");
-  spdlog::info("Received snapshot saved to {}", *maybe_snapshot_path);
-
-  {
-    std::unique_lock<utils::RWLock> storage_guard(storage_->main_lock_);
-    // Clear the database
-    storage_->vertices_.clear();
-    storage_->edges_.clear();
-
-    storage_->constraints_ = Constraints();
-    storage_->indices_.label_index = LabelIndex(&storage_->indices_, &storage_->constraints_, storage_->config_.items);
-    storage_->indices_.label_property_index =
-        LabelPropertyIndex(&storage_->indices_, &storage_->constraints_, storage_->config_.items);
-    try {
-      spdlog::debug("Loading snapshot");
-      auto recovered_snapshot = durability::LoadSnapshot(*maybe_snapshot_path, &storage_->vertices_, &storage_->edges_,
-                                                         &storage_->epoch_history_, &storage_->name_id_mapper_,
-                                                         &storage_->edge_count_, storage_->config_.items);
-      spdlog::debug("Snapshot loaded successfully");
-      // If this step is present it should always be the first step of
-      // the recovery so we use the UUID we read from snasphost
-      storage_->uuid_ = std::move(recovered_snapshot.snapshot_info.uuid);
-      storage_->epoch_id_ = std::move(recovered_snapshot.snapshot_info.epoch_id);
-      const auto &recovery_info = recovered_snapshot.recovery_info;
-      storage_->vertex_id_ = recovery_info.next_vertex_id;
-      storage_->edge_id_ = recovery_info.next_edge_id;
-      storage_->timestamp_ = std::max(storage_->timestamp_, recovery_info.next_timestamp);
-      storage_->commit_log_.emplace(storage_->timestamp_);
-
-      durability::RecoverIndicesAndConstraints(recovered_snapshot.indices_constraints, &storage_->indices_,
-                                               &storage_->constraints_, &storage_->vertices_);
-    } catch (const durability::RecoveryFailure &e) {
-      LOG_FATAL("Couldn't load the snapshot because of: {}", e.what());
-    }
-  }
-
-  SnapshotRes res{true, storage_->last_commit_timestamp_.load()};
-  slk::Save(res, res_builder);
-
-  // Delete other durability files
-  auto snapshot_files = durability::GetSnapshotFiles(storage_->snapshot_directory_, storage_->uuid_);
-  for (const auto &[path, uuid, _] : snapshot_files) {
-    if (path != *maybe_snapshot_path) {
-      storage_->file_retainer_.DeleteFile(path);
-    }
-  }
-
-  auto wal_files = durability::GetWalFiles(storage_->wal_directory_, storage_->uuid_);
-  if (wal_files) {
-    for (const auto &wal_file : *wal_files) {
-      storage_->file_retainer_.DeleteFile(wal_file.path);
-    }
-
-    storage_->wal_file_.reset();
-  }
-}
-
-void Storage::ReplicationServer::WalFilesHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
-  WalFilesReq req;
-  slk::Load(&req, req_reader);
-
-  const auto wal_file_number = req.file_number;
-  spdlog::debug("Received WAL files: {}", wal_file_number);
-
-  replication::Decoder decoder(req_reader);
-
-  utils::EnsureDirOrDie(storage_->wal_directory_);
-
-  {
-    std::unique_lock<utils::RWLock> storage_guard(storage_->main_lock_);
-    durability::RecoveredIndicesAndConstraints indices_constraints;
-    auto [wal_info, path] = LoadWal(&decoder, &indices_constraints);
-    if (wal_info.seq_num == 0) {
-      storage_->uuid_ = wal_info.uuid;
-    }
-
-    // Check the seq number of the first wal file to see if it's the
-    // finalized form of the current wal on replica
-    if (storage_->wal_file_) {
-      if (storage_->wal_file_->SequenceNumber() == wal_info.seq_num && storage_->wal_file_->Path() != path) {
-        storage_->wal_file_->DeleteWal();
-      }
-      storage_->wal_file_.reset();
-    }
-
-    for (auto i = 1; i < wal_file_number; ++i) {
-      LoadWal(&decoder, &indices_constraints);
-    }
-
-    durability::RecoverIndicesAndConstraints(indices_constraints, &storage_->indices_, &storage_->constraints_,
-                                             &storage_->vertices_);
-  }
-
-  WalFilesRes res{true, storage_->last_commit_timestamp_.load()};
-  slk::Save(res, res_builder);
-}
-
-void Storage::ReplicationServer::CurrentWalHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
-  CurrentWalReq req;
-  slk::Load(&req, req_reader);
-
-  replication::Decoder decoder(req_reader);
-
-  utils::EnsureDirOrDie(storage_->wal_directory_);
-
-  {
-    std::unique_lock<utils::RWLock> storage_guard(storage_->main_lock_);
-    durability::RecoveredIndicesAndConstraints indices_constraints;
-    auto [wal_info, path] = LoadWal(&decoder, &indices_constraints);
-    if (wal_info.seq_num == 0) {
-      storage_->uuid_ = wal_info.uuid;
-    }
-
-    if (storage_->wal_file_ && storage_->wal_file_->SequenceNumber() == wal_info.seq_num &&
-        storage_->wal_file_->Path() != path) {
-      // Delete the old wal file
-      storage_->file_retainer_.DeleteFile(storage_->wal_file_->Path());
-    }
-    MG_ASSERT(storage_->config_.durability.snapshot_wal_mode ==
-              Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL);
-    storage_->wal_file_.emplace(std::move(path), storage_->config_.items, &storage_->name_id_mapper_, wal_info.seq_num,
-                                wal_info.from_timestamp, wal_info.to_timestamp, wal_info.num_deltas,
-                                &storage_->file_retainer_);
-    durability::RecoverIndicesAndConstraints(indices_constraints, &storage_->indices_, &storage_->constraints_,
-                                             &storage_->vertices_);
-  }
-
-  CurrentWalRes res{true, storage_->last_commit_timestamp_.load()};
-  slk::Save(res, res_builder);
-}
-
-std::pair<durability::WalInfo, std::filesystem::path> Storage::ReplicationServer::LoadWal(
-    replication::Decoder *decoder, durability::RecoveredIndicesAndConstraints *indices_constraints) {
-  auto maybe_wal_path = decoder->ReadFile(storage_->wal_directory_, "_MAIN");
-  MG_ASSERT(maybe_wal_path, "Failed to load WAL!");
-  spdlog::trace("Received WAL saved to {}", *maybe_wal_path);
-  try {
-    auto wal_info = durability::ReadWalInfo(*maybe_wal_path);
-    if (wal_info.epoch_id != storage_->epoch_id_) {
-      storage_->epoch_history_.emplace_back(wal_info.epoch_id, storage_->last_commit_timestamp_);
-      storage_->epoch_id_ = std::move(wal_info.epoch_id);
-    }
-    const auto last_loaded_timestamp =
-        storage_->timestamp_ == kTimestampInitialId ? std::nullopt : std::optional{storage_->timestamp_ - 1};
-    auto info = durability::LoadWal(*maybe_wal_path, indices_constraints, last_loaded_timestamp, &storage_->vertices_,
-                                    &storage_->edges_, &storage_->name_id_mapper_, &storage_->edge_count_,
-                                    storage_->config_.items);
-    storage_->vertex_id_ = std::max(storage_->vertex_id_.load(), info.next_vertex_id);
-    storage_->edge_id_ = std::max(storage_->edge_id_.load(), info.next_edge_id);
-    storage_->timestamp_ = std::max(storage_->timestamp_, info.next_timestamp);
-    storage_->commit_log_.emplace(storage_->timestamp_);
-    if (info.last_commit_timestamp) {
-      storage_->last_commit_timestamp_ = *info.last_commit_timestamp;
-    }
-    spdlog::debug("{} loaded successfully", *maybe_wal_path);
-    return {std::move(wal_info), std::move(*maybe_wal_path)};
-  } catch (const durability::RecoveryFailure &e) {
-    LOG_FATAL("Couldn't recover WAL deltas from {} because of: {}", *maybe_wal_path, e.what());
-  }
-}
-
-Storage::ReplicationServer::~ReplicationServer() {
-  if (rpc_server_) {
-    rpc_server_->Shutdown();
-    rpc_server_->AwaitShutdown();
-  }
+  return applied_deltas;
 }
 }  // namespace storage
diff --git a/src/storage/v2/replication/replication_server.hpp b/src/storage/v2/replication/replication_server.hpp
index bff872f80..2d0846ab6 100644
--- a/src/storage/v2/replication/replication_server.hpp
+++ b/src/storage/v2/replication/replication_server.hpp
@@ -23,8 +23,8 @@ class Storage::ReplicationServer {
   void WalFilesHandler(slk::Reader *req_reader, slk::Builder *res_builder);
   void CurrentWalHandler(slk::Reader *req_reader, slk::Builder *res_builder);
 
-  std::pair<durability::WalInfo, std::filesystem::path> LoadWal(
-      replication::Decoder *decoder, durability::RecoveredIndicesAndConstraints *indices_constraints);
+  void LoadWal(replication::Decoder *decoder);
+  uint64_t ReadAndApplyDelta(durability::BaseDecoder *decoder);
 
   std::optional<communication::ServerContext> rpc_server_context_;
   std::optional<rpc::Server> rpc_server_;
diff --git a/tests/jepsen/src/jepsen/memgraph/bank.clj b/tests/jepsen/src/jepsen/memgraph/bank.clj
index 3bfb79409..4b5955903 100644
--- a/tests/jepsen/src/jepsen/memgraph/bank.clj
+++ b/tests/jepsen/src/jepsen/memgraph/bank.clj
@@ -118,8 +118,10 @@
                            (filter #(= :ok (:type %)))
                            (filter #(= :read (:f %))))
             bad-reads (->> ok-reads
+                           (map #(->> % :value :accounts))
+                           (filter #(= (count %) 5))
                            (map (fn [op]
-                                  (let [balances       (->> op :value :accounts (map :balance))
+                                  (let [balances       (map :balance op)
                                         expected-total (* account-num starting-balance)]
                                     (cond (and
                                              (not-empty balances)
diff --git a/tests/jepsen/src/jepsen/memgraph/sequential.clj b/tests/jepsen/src/jepsen/memgraph/sequential.clj
index b8f772cf2..d50a241cb 100644
--- a/tests/jepsen/src/jepsen/memgraph/sequential.clj
+++ b/tests/jepsen/src/jepsen/memgraph/sequential.clj
@@ -9,19 +9,7 @@
             [jepsen.memgraph.client :as c]))
 
 (dbclient/defquery get-all-nodes
-  "MATCH (n:Node) RETURN n;")
-
-(dbclient/defquery get-max-id
-  "MATCH (n:Node)
-  RETURN n.id AS id
-  ORDER BY id DESC
-  LIMIT 1;")
-
-(dbclient/defquery get-min-id
-  "MATCH (n:Node)
-  RETURN n.id AS id
-  ORDER BY id
-  LIMIT 1;")
+  "MATCH (n:Node) RETURN n ORDER BY n.id;")
 
 (dbclient/defquery create-node
   "CREATE (n:Node {id: $id});")
@@ -29,19 +17,23 @@
 (dbclient/defquery delete-node-with-id
   "MATCH (n:Node {id: $id}) DELETE n;")
 
+(def next-node-for-add (atom 0))
+
 (defn add-next-node
   "Add a new node with its id set to the next highest"
   [conn]
-  (dbclient/with-transaction conn tx
-    (let [max-id (-> (get-max-id tx) first :id)]
-      (create-node tx {:id (inc max-id)}))))
+  (when (dbclient/with-transaction conn tx
+      (create-node tx {:id (swap! next-node-for-add identity)}))
+      (swap! next-node-for-add inc)))
+
+(def next-node-for-delete (atom 0))
 
 (defn delete-oldest-node
   "Delete a node with the lowest id"
   [conn]
-  (dbclient/with-transaction conn tx
-    (let [min-id (-> (get-min-id tx) first :id)]
-      (delete-node-with-id tx {:id min-id}))))
+  (when (dbclient/with-transaction conn tx
+      (delete-node-with-id tx {:id (swap! next-node-for-delete identity)}))
+      (swap! next-node-for-delete inc)))
 
 (c/replication-client Client []
   (open! [this test node]
@@ -123,11 +115,12 @@
                                     (when (not-empty ids)
                                       (cond ((complement strictly-increasing) ids)
                                             {:type :not-increasing-ids
-                                             :op op}
-
-                                            ((complement increased-by-1) ids)
-                                            {:type :ids-missing
                                              :op op})))))
+
+                                            ;; if there are multiple threads not sure how to guarante that the ids are created in order
+                                            ;;((complement increased-by-1) ids)
+                                            ;;{:type :ids-missing
+                                            ;; :op op})))))
                            (filter identity)
                            (into []))
             empty-nodes (let [all-nodes (->> ok-reads