From 10c8256ec92b1da85e4c7fa1eaff3c0a56ca645f Mon Sep 17 00:00:00 2001
From: antonio2368 <antonio2368@users.noreply.github.com>
Date: Tue, 12 Jan 2021 19:08:08 +0100
Subject: [PATCH] Fix epoch id handling (#73)

---
 src/storage/v2/durability/snapshot.cpp        |  4 +-
 .../v2/replication/replication_client.cpp     | 42 +++++++++++-----
 .../v2/replication/replication_server.cpp     | 50 +++----------------
 src/storage/v2/replication/rpc.lcp            |  8 ++-
 src/storage/v2/storage.cpp                    | 29 -----------
 5 files changed, 44 insertions(+), 89 deletions(-)

diff --git a/src/storage/v2/durability/snapshot.cpp b/src/storage/v2/durability/snapshot.cpp
index 5aa4208c7..243f5ff2d 100644
--- a/src/storage/v2/durability/snapshot.cpp
+++ b/src/storage/v2/durability/snapshot.cpp
@@ -969,9 +969,7 @@ void CreateSnapshot(
       for (uint64_t i = 0; i < *pos; ++i) {
         const auto &[seq_num, from_timestamp, to_timestamp, wal_path] =
             wal_files[i];
-        if (!utils::DeleteFile(wal_path)) {
-          LOG(WARNING) << "Couldn't delete WAL file " << wal_path << "!";
-        }
+        file_retainer->DeleteFile(wal_path);
       }
     }
   }
diff --git a/src/storage/v2/replication/replication_client.cpp b/src/storage/v2/replication/replication_client.cpp
index c53b687ea..ac93bfbc4 100644
--- a/src/storage/v2/replication/replication_client.cpp
+++ b/src/storage/v2/replication/replication_client.cpp
@@ -6,6 +6,7 @@
 #include "storage/v2/durability/durability.hpp"
 #include "storage/v2/replication/config.hpp"
 #include "storage/v2/replication/enums.hpp"
+#include "storage/v2/transaction.hpp"
 #include "utils/file_locker.hpp"
 
 namespace storage {
@@ -39,23 +40,40 @@ Storage::ReplicationClient::ReplicationClient(
 /// @throws rpc::RpcFailedException
 void Storage::ReplicationClient::InitializeClient() {
   uint64_t current_commit_timestamp{kTimestampInitialId};
-  auto stream{
-      rpc_client_->Stream<HeartbeatRpc>(storage_->last_commit_timestamp_)};
-  replication::Encoder encoder{stream.GetBuilder()};
-  // Write epoch id
+
+  std::optional<std::string> epoch_id;
   {
-    // We need to lock so the epoch id isn't overwritten
-    std::unique_lock engine_guard{storage_->engine_lock_};
-    encoder.WriteString(storage_->epoch_id_);
+    // epoch_id_ can be changed if we don't take this lock
+    std::unique_lock engine_guard(storage_->engine_lock_);
+    epoch_id.emplace(storage_->epoch_id_);
   }
+
+  auto stream{rpc_client_->Stream<HeartbeatRpc>(
+      storage_->last_commit_timestamp_, std::move(*epoch_id))};
+
   const auto response = stream.AwaitResponse();
-  if (!response.success) {
-    LOG(ERROR)
-        << "Replica " << name_
-        << " is ahead of this instance. The branching point is on commit "
-        << response.current_commit_timestamp;
+  std::optional<uint64_t> branching_point;
+  if (response.epoch_id != storage_->epoch_id_ &&
+      response.current_commit_timestamp != kTimestampInitialId) {
+    const auto &epoch_history = storage_->epoch_history_;
+    const auto epoch_info_iter =
+        std::find_if(epoch_history.crbegin(), epoch_history.crend(),
+                     [&](const auto &epoch_info) {
+                       return epoch_info.first == response.epoch_id;
+                     });
+    if (epoch_info_iter == epoch_history.crend()) {
+      branching_point = 0;
+    } else if (epoch_info_iter->second != response.current_commit_timestamp) {
+      branching_point = epoch_info_iter->second;
+    }
+  }
+  if (branching_point) {
+    LOG(ERROR) << "Replica " << name_ << " cannot be used with this instance."
+               << " Please start a clean instance of Memgraph server"
+               << " on the specified endpoint.";
     return;
   }
+
   current_commit_timestamp = response.current_commit_timestamp;
   DLOG(INFO) << "Current timestamp on replica: " << current_commit_timestamp;
   DLOG(INFO) << "Current MAIN timestamp: "
diff --git a/src/storage/v2/replication/replication_server.cpp b/src/storage/v2/replication/replication_server.cpp
index da46e65a3..2f4049d5b 100644
--- a/src/storage/v2/replication/replication_server.cpp
+++ b/src/storage/v2/replication/replication_server.cpp
@@ -62,43 +62,8 @@ void Storage::ReplicationServer::HeartbeatHandler(slk::Reader *req_reader,
                                                   slk::Builder *res_builder) {
   HeartbeatReq req;
   slk::Load(&req, req_reader);
-  replication::Decoder decoder{req_reader};
-  auto maybe_epoch_id = decoder.ReadString();
-  CHECK(maybe_epoch_id) << "Invalid value read form HeartbeatRpc!";
-  if (storage_->last_commit_timestamp_ == kTimestampInitialId) {
-    // The replica has no commits
-    // use the main's epoch id
-    storage_->epoch_id_ = std::move(*maybe_epoch_id);
-  } else if (*maybe_epoch_id != storage_->epoch_id_) {
-    auto &epoch_history = storage_->epoch_history_;
-    const auto result =
-        std::find_if(epoch_history.rbegin(), epoch_history.rend(),
-                     [&](const auto &epoch_info) {
-                       return epoch_info.first == *maybe_epoch_id;
-                     });
-    auto branching_point = kTimestampInitialId;
-    if (result == epoch_history.rend()) {
-      // we couldn't find the epoch_id inside the history so if it has
-      // the same or larger commit timestamp, some old replica became a main
-      // This isn't always the case, there is one case where an old main
-      // becomes a replica then main again and it should have a commit timestamp
-      // larger than the one on replica.
-      if (req.main_commit_timestamp >= storage_->last_commit_timestamp_) {
-        epoch_history.emplace_back(std::move(storage_->epoch_id_),
-                                   storage_->last_commit_timestamp_);
-        storage_->epoch_id_ = std::move(*maybe_epoch_id);
-        HeartbeatRes res{true, storage_->last_commit_timestamp_.load()};
-        slk::Save(res, res_builder);
-        return;
-      }
-    } else {
-      branching_point = result->second;
-    }
-    HeartbeatRes res{false, branching_point};
-    slk::Save(res, res_builder);
-    return;
-  }
-  HeartbeatRes res{true, storage_->last_commit_timestamp_.load()};
+  HeartbeatRes res{true, storage_->last_commit_timestamp_.load(),
+                   storage_->epoch_id_};
   slk::Save(res, res_builder);
 }
 
@@ -112,12 +77,11 @@ void Storage::ReplicationServer::AppendDeltasHandler(
   auto maybe_epoch_id = decoder.ReadString();
   CHECK(maybe_epoch_id) << "Invalid replication message";
 
-  // Different epoch ids should not be possible in AppendDeltas
-  // because Recovery and Heartbeat handlers should resolve
-  // any issues with timestamp and epoch id
-  CHECK(*maybe_epoch_id == storage_->epoch_id_)
-      << "Received Deltas from transaction with incompatible"
-         " epoch id";
+  if (*maybe_epoch_id != storage_->epoch_id_) {
+    storage_->epoch_history_.emplace_back(std::move(storage_->epoch_id_),
+                                          storage_->last_commit_timestamp_);
+    storage_->epoch_id_ = std::move(*maybe_epoch_id);
+  }
 
   const auto read_delta =
       [&]() -> std::pair<uint64_t, durability::WalDeltaData> {
diff --git a/src/storage/v2/replication/rpc.lcp b/src/storage/v2/replication/rpc.lcp
index 8dec6807a..659b19091 100644
--- a/src/storage/v2/replication/rpc.lcp
+++ b/src/storage/v2/replication/rpc.lcp
@@ -3,6 +3,7 @@
 
 #include <cstdint>
 #include <cstring>
+#include <string>
 
 #include "rpc/messages.hpp"
 #include "slk/serialization.hpp"
@@ -23,10 +24,13 @@ cpp<#
      (current-commit-timestamp :uint64_t))))
 
 (lcp:define-rpc heartbeat
-  (:request ((main-commit-timestamp :uint64_t)))
+  (:request
+    ((main-commit-timestamp :uint64_t)
+     (epoch-id "std::string")))
   (:response
     ((success :bool)
-     (current-commit-timestamp :uint64_t))))
+     (current-commit-timestamp :uint64_t)
+     (epoch-id "std::string"))))
 
 (lcp:define-rpc snapshot
   (:request ())
diff --git a/src/storage/v2/storage.cpp b/src/storage/v2/storage.cpp
index 603bb00b5..7056e6c99 100644
--- a/src/storage/v2/storage.cpp
+++ b/src/storage/v2/storage.cpp
@@ -30,12 +30,6 @@
 #include "storage/v2/replication/rpc.hpp"
 #endif
 
-#ifdef MG_ENTERPRISE
-DEFINE_bool(main, false, "Set to true to be the main");
-DEFINE_bool(replica, false, "Set to true to be the replica");
-DEFINE_bool(async_replica, false, "Set to true to be the replica");
-#endif
-
 namespace storage {
 
 namespace {
@@ -421,29 +415,6 @@ Storage::Storage(Config config)
     gc_runner_.Run("Storage GC", config_.gc.interval,
                    [this] { this->CollectGarbage(); });
   }
-
-#ifdef MG_ENTERPRISE
-  // For testing purposes until we can define the instance type from
-  // a query.
-  if (FLAGS_main) {
-    if (RegisterReplica("REPLICA_SYNC",
-                        io::network::Endpoint{"127.0.0.1", 10000},
-                        replication::ReplicationMode::SYNC)
-            .HasError()) {
-      LOG(WARNING) << "Couldn't connect to REPLICA_SYNC";
-    }
-    if (RegisterReplica("REPLICA_ASYNC",
-                        io::network::Endpoint{"127.0.0.1", 10002},
-                        replication::ReplicationMode::ASYNC)
-            .HasError()) {
-      LOG(WARNING) << "Couldn't connect to REPLICA_SYNC";
-    }
-  } else if (FLAGS_replica) {
-    SetReplicaRole(io::network::Endpoint{"127.0.0.1", 10000});
-  } else if (FLAGS_async_replica) {
-    SetReplicaRole(io::network::Endpoint{"127.0.0.1", 10002});
-  }
-#endif
 }
 
 Storage::~Storage() {