diff --git a/src/coordination/coordinator_client.cpp b/src/coordination/coordinator_client.cpp
index 84044b04a..f4d2da838 100644
--- a/src/coordination/coordinator_client.cpp
+++ b/src/coordination/coordinator_client.cpp
@@ -16,6 +16,7 @@
 
 #include "coordination/coordinator_config.hpp"
 #include "coordination/coordinator_rpc.hpp"
+#include "replication_coordination_glue/common.hpp"
 #include "replication_coordination_glue/messages.hpp"
 #include "utils/result.hpp"
 
@@ -30,7 +31,7 @@ auto CreateClientContext(memgraph::coordination::CoordinatorClientConfig const &
 }  // namespace
 
 CoordinatorClient::CoordinatorClient(CoordinatorInstance *coord_instance, CoordinatorClientConfig config,
-                                     HealthCheckCallback succ_cb, HealthCheckCallback fail_cb)
+                                     HealthCheckClientCallback succ_cb, HealthCheckClientCallback fail_cb)
     : rpc_context_{CreateClientContext(config)},
       rpc_client_{io::network::Endpoint(io::network::Endpoint::needs_resolving, config.ip_address, config.port),
                   &rpc_context_},
@@ -68,6 +69,10 @@ void CoordinatorClient::StartFrequentCheck() {
             auto stream{rpc_client_.Stream<memgraph::replication_coordination_glue::FrequentHeartbeatRpc>()};
             stream.AwaitResponse();
           }
+          // Subtle race condition:
+          // acquiring of lock needs to happen before function call, as function callback can be changed
+          // for instance after lock is already acquired
+          // (failover case when instance is promoted to MAIN)
           succ_cb_(coord_instance_, instance_name);
         } catch (rpc::RpcFailedException const &) {
           fail_cb_(coord_instance_, instance_name);
@@ -79,11 +84,6 @@ void CoordinatorClient::StopFrequentCheck() { instance_checker_.Stop(); }
 void CoordinatorClient::PauseFrequentCheck() { instance_checker_.Pause(); }
 void CoordinatorClient::ResumeFrequentCheck() { instance_checker_.Resume(); }
 
-auto CoordinatorClient::SetCallbacks(HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) -> void {
-  succ_cb_ = std::move(succ_cb);
-  fail_cb_ = std::move(fail_cb);
-}
-
 auto CoordinatorClient::ReplicationClientInfo() const -> ReplClientInfo { return config_.replication_client_info; }
 
 auto CoordinatorClient::SendPromoteReplicaToMainRpc(const utils::UUID &uuid,
@@ -171,5 +171,19 @@ auto CoordinatorClient::SendEnableWritingOnMainRpc() const -> bool {
   return false;
 }
 
+auto CoordinatorClient::SendGetInstanceTimestampsRpc() const
+    -> utils::BasicResult<GetInstanceUUIDError, replication_coordination_glue::DatabaseHistories> {
+  try {
+    auto stream{rpc_client_.Stream<coordination::GetDatabaseHistoriesRpc>()};
+    auto res = stream.AwaitResponse();
+
+    return res.database_histories;
+
+  } catch (const rpc::RpcFailedException &) {
+    spdlog::error("RPC error occured while sending GetInstance UUID RPC");
+    return GetInstanceUUIDError::RPC_EXCEPTION;
+  }
+}
+
 }  // namespace memgraph::coordination
 #endif
diff --git a/src/coordination/coordinator_handlers.cpp b/src/coordination/coordinator_handlers.cpp
index f605069fe..637360267 100644
--- a/src/coordination/coordinator_handlers.cpp
+++ b/src/coordination/coordinator_handlers.cpp
@@ -57,6 +57,17 @@ void CoordinatorHandlers::Register(memgraph::coordination::CoordinatorServer &se
         spdlog::info("Received GetInstanceUUIDRpc on coordinator server");
         CoordinatorHandlers::GetInstanceUUIDHandler(replication_handler, req_reader, res_builder);
       });
+
+  server.Register<coordination::GetDatabaseHistoriesRpc>(
+      [&replication_handler](slk::Reader *req_reader, slk::Builder *res_builder) -> void {
+        spdlog::info("Received GetDatabasesHistoryRpc on coordinator server");
+        CoordinatorHandlers::GetDatabaseHistoriesHandler(replication_handler, req_reader, res_builder);
+      });
+}
+
+void CoordinatorHandlers::GetDatabaseHistoriesHandler(replication::ReplicationHandler &replication_handler,
+                                                      slk::Reader * /*req_reader*/, slk::Builder *res_builder) {
+  slk::Save(coordination::GetDatabaseHistoriesRes{replication_handler.GetDatabasesHistories()}, res_builder);
 }
 
 void CoordinatorHandlers::SwapMainUUIDHandler(replication::ReplicationHandler &replication_handler,
diff --git a/src/coordination/coordinator_instance.cpp b/src/coordination/coordinator_instance.cpp
index 90674cf3c..ba94d9d5f 100644
--- a/src/coordination/coordinator_instance.cpp
+++ b/src/coordination/coordinator_instance.cpp
@@ -15,10 +15,12 @@
 
 #include "coordination/coordinator_exceptions.hpp"
 #include "coordination/fmt.hpp"
+#include "dbms/constants.hpp"
 #include "nuraft/coordinator_state_machine.hpp"
 #include "nuraft/coordinator_state_manager.hpp"
 #include "utils/counter.hpp"
 #include "utils/functional.hpp"
+#include "utils/resource_lock.hpp"
 
 #include <range/v3/view.hpp>
 #include <shared_mutex>
@@ -32,96 +34,28 @@ CoordinatorInstance::CoordinatorInstance()
     : raft_state_(RaftState::MakeRaftState(
           [this] { std::ranges::for_each(repl_instances_, &ReplicationInstance::StartFrequentCheck); },
           [this] { std::ranges::for_each(repl_instances_, &ReplicationInstance::StopFrequentCheck); })) {
-  auto find_repl_instance = [](CoordinatorInstance *self,
-                               std::string_view repl_instance_name) -> ReplicationInstance & {
-    auto repl_instance =
-        std::ranges::find_if(self->repl_instances_, [repl_instance_name](ReplicationInstance const &instance) {
-          return instance.InstanceName() == repl_instance_name;
-        });
-
-    MG_ASSERT(repl_instance != self->repl_instances_.end(), "Instance {} not found during callback!",
-              repl_instance_name);
-    return *repl_instance;
+  client_succ_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
+    auto lock = std::unique_lock{self->coord_instance_lock_};
+    auto &repl_instance = self->FindReplicationInstance(repl_instance_name);
+    std::invoke(repl_instance.GetSuccessCallback(), self, repl_instance_name);
   };
 
-  replica_succ_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
-    auto lock = std::lock_guard{self->coord_instance_lock_};
-    spdlog::trace("Instance {} performing replica successful callback", repl_instance_name);
-    auto &repl_instance = find_repl_instance(self, repl_instance_name);
-
-    // We need to get replicas UUID from time to time to ensure replica is listening to correct main
-    // and that it didn't go down for less time than we could notice
-    // We need to get id of main replica is listening to
-    // and swap if necessary
-    if (!repl_instance.EnsureReplicaHasCorrectMainUUID(self->GetMainUUID())) {
-      spdlog::error("Failed to swap uuid for replica instance {} which is alive", repl_instance.InstanceName());
-      return;
-    }
-
-    repl_instance.OnSuccessPing();
+  client_fail_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
+    auto lock = std::unique_lock{self->coord_instance_lock_};
+    auto &repl_instance = self->FindReplicationInstance(repl_instance_name);
+    std::invoke(repl_instance.GetFailCallback(), self, repl_instance_name);
   };
+}
 
-  replica_fail_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
-    auto lock = std::lock_guard{self->coord_instance_lock_};
-    spdlog::trace("Instance {} performing replica failure callback", repl_instance_name);
-    auto &repl_instance = find_repl_instance(self, repl_instance_name);
-    repl_instance.OnFailPing();
-  };
+auto CoordinatorInstance::FindReplicationInstance(std::string_view replication_instance_name) -> ReplicationInstance & {
+  auto repl_instance =
+      std::ranges::find_if(repl_instances_, [replication_instance_name](ReplicationInstance const &instance) {
+        return instance.InstanceName() == replication_instance_name;
+      });
 
-  main_succ_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
-    auto lock = std::lock_guard{self->coord_instance_lock_};
-    spdlog::trace("Instance {} performing main successful callback", repl_instance_name);
-
-    auto &repl_instance = find_repl_instance(self, repl_instance_name);
-
-    if (repl_instance.IsAlive()) {
-      repl_instance.OnSuccessPing();
-      return;
-    }
-
-    const auto &repl_instance_uuid = repl_instance.GetMainUUID();
-    MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set.");
-
-    auto const curr_main_uuid = self->GetMainUUID();
-    if (curr_main_uuid == repl_instance_uuid.value()) {
-      if (!repl_instance.EnableWritingOnMain()) {
-        spdlog::error("Failed to enable writing on main instance {}", repl_instance_name);
-        return;
-      }
-
-      repl_instance.OnSuccessPing();
-      return;
-    }
-
-    // TODO(antoniof) make demoteToReplica idempotent since main can be demoted to replica but
-    // swapUUID can fail
-    if (repl_instance.DemoteToReplica(self->replica_succ_cb_, self->replica_fail_cb_)) {
-      repl_instance.OnSuccessPing();
-      spdlog::info("Instance {} demoted to replica", repl_instance_name);
-    } else {
-      spdlog::error("Instance {} failed to become replica", repl_instance_name);
-      return;
-    }
-
-    if (!repl_instance.SendSwapAndUpdateUUID(curr_main_uuid)) {
-      spdlog::error(fmt::format("Failed to swap uuid for demoted main instance {}", repl_instance.InstanceName()));
-      return;
-    }
-  };
-
-  main_fail_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
-    auto lock = std::lock_guard{self->coord_instance_lock_};
-    spdlog::trace("Instance {} performing main failure callback", repl_instance_name);
-    auto &repl_instance = find_repl_instance(self, repl_instance_name);
-    repl_instance.OnFailPing();
-    const auto &repl_instance_uuid = repl_instance.GetMainUUID();
-    MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set");
-
-    if (!repl_instance.IsAlive() && self->GetMainUUID() == repl_instance_uuid.value()) {
-      spdlog::info("Cluster without main instance, trying automatic failover");
-      self->TryFailover();  // TODO: (andi) Initiate failover
-    }
-  };
+  MG_ASSERT(repl_instance != repl_instances_.end(), "Instance {} not found during callback!",
+            replication_instance_name);
+  return *repl_instance;
 }
 
 auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> {
@@ -166,8 +100,36 @@ auto CoordinatorInstance::TryFailover() -> void {
     return;
   }
 
-  // TODO: Smarter choice
-  auto new_main = ranges::begin(alive_replicas);
+  // for each DB in instance we get one DatabaseHistory
+  using DatabaseHistories = replication_coordination_glue::DatabaseHistories;
+  std::vector<std::pair<std::string, DatabaseHistories>> instance_database_histories;
+
+  bool success{true};
+  std::ranges::for_each(alive_replicas, [&success, &instance_database_histories](ReplicationInstance &replica) {
+    if (!success) {
+      return;
+    }
+    auto res = replica.GetClient().SendGetInstanceTimestampsRpc();
+    if (res.HasError()) {
+      spdlog::error("Could get per db history data for instance {}", replica.InstanceName());
+      success = false;
+      return;
+    }
+    instance_database_histories.emplace_back(replica.InstanceName(), std::move(res.GetValue()));
+  });
+
+  if (!success) {
+    spdlog::error("Aborting failover as at least one instance didn't provide per database history.");
+    return;
+  }
+
+  auto [most_up_to_date_instance, latest_epoch, latest_commit_timestamp] =
+      ChooseMostUpToDateInstance(instance_database_histories);
+
+  spdlog::trace("The most up to date instance is {} with epoch {} and {} latest commit timestamp",
+                most_up_to_date_instance, *latest_epoch, *latest_commit_timestamp);
+
+  auto *new_main = &FindReplicationInstance(most_up_to_date_instance);
 
   new_main->PauseFrequentCheck();
   utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }};
@@ -191,7 +153,8 @@ auto CoordinatorInstance::TryFailover() -> void {
                            ranges::views::transform(&ReplicationInstance::ReplicationClientInfo) |
                            ranges::to<ReplicationClientsInfo>();
 
-  if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), main_succ_cb_, main_fail_cb_)) {
+  if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), &CoordinatorInstance::MainSuccessCallback,
+                               &CoordinatorInstance::MainFailCallback)) {
     spdlog::warn("Failover failed since promoting replica to main failed!");
     return;
   }
@@ -242,7 +205,8 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string instance_name
   std::ranges::transform(repl_instances_ | ranges::views::filter(is_not_new_main),
                          std::back_inserter(repl_clients_info), &ReplicationInstance::ReplicationClientInfo);
 
-  if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), main_succ_cb_, main_fail_cb_)) {
+  if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), &CoordinatorInstance::MainSuccessCallback,
+                               &CoordinatorInstance::MainFailCallback)) {
     return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN;
   }
 
@@ -290,7 +254,9 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig co
 
   spdlog::info("Request for registering instance {} accepted", instance_name);
   try {
-    repl_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_);
+    repl_instances_.emplace_back(this, std::move(config), client_succ_cb_, client_fail_cb_,
+                                 &CoordinatorInstance::ReplicaSuccessCallback,
+                                 &CoordinatorInstance::ReplicaFailCallback);
   } catch (CoordinatorRegisterInstanceException const &) {
     return RegisterInstanceCoordinatorStatus::RPC_FAILED;
   }
@@ -304,6 +270,85 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig co
   return RegisterInstanceCoordinatorStatus::SUCCESS;
 }
 
+void CoordinatorInstance::MainFailCallback(std::string_view repl_instance_name) {
+  auto &repl_instance = FindReplicationInstance(repl_instance_name);
+  repl_instance.OnFailPing();
+  const auto &repl_instance_uuid = repl_instance.GetMainUUID();
+  MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set");
+
+  if (!repl_instance.IsAlive() && GetMainUUID() == repl_instance_uuid.value()) {
+    spdlog::info("Cluster without main instance, trying automatic failover");
+    TryFailover();
+  }
+}
+
+void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_name) {
+  auto &repl_instance = FindReplicationInstance(repl_instance_name);
+  spdlog::trace("Instance {} performing main successful callback", repl_instance_name);
+
+  if (repl_instance.IsAlive()) {
+    repl_instance.OnSuccessPing();
+    return;
+  }
+
+  const auto &repl_instance_uuid = repl_instance.GetMainUUID();
+  MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set.");
+
+  auto const curr_main_uuid = GetMainUUID();
+  if (curr_main_uuid == repl_instance_uuid.value()) {
+    if (!repl_instance.EnableWritingOnMain()) {
+      spdlog::error("Failed to enable writing on main instance {}", repl_instance_name);
+      return;
+    }
+
+    repl_instance.OnSuccessPing();
+    return;
+  }
+
+  if (repl_instance.DemoteToReplica(&CoordinatorInstance::ReplicaSuccessCallback,
+                                    &CoordinatorInstance::ReplicaFailCallback)) {
+    repl_instance.OnSuccessPing();
+    spdlog::info("Instance {} demoted to replica", repl_instance_name);
+  } else {
+    spdlog::error("Instance {} failed to become replica", repl_instance_name);
+    return;
+  }
+
+  if (!repl_instance.SendSwapAndUpdateUUID(curr_main_uuid)) {
+    spdlog::error(fmt::format("Failed to swap uuid for demoted main instance {}", repl_instance.InstanceName()));
+    return;
+  }
+}
+
+void CoordinatorInstance::ReplicaSuccessCallback(std::string_view repl_instance_name) {
+  auto &repl_instance = FindReplicationInstance(repl_instance_name);
+  if (!repl_instance.IsReplica()) {
+    spdlog::error("Aborting replica callback since instance {} is not replica anymore", repl_instance_name);
+    return;
+  }
+  spdlog::trace("Instance {} performing replica successful callback", repl_instance_name);
+  // We need to get replicas UUID from time to time to ensure replica is listening to correct main
+  // and that it didn't go down for less time than we could notice
+  // We need to get id of main replica is listening to
+  // and swap if necessary
+  if (!repl_instance.EnsureReplicaHasCorrectMainUUID(GetMainUUID())) {
+    spdlog::error("Failed to swap uuid for replica instance {} which is alive", repl_instance.InstanceName());
+    return;
+  }
+
+  repl_instance.OnSuccessPing();
+}
+
+void CoordinatorInstance::ReplicaFailCallback(std::string_view repl_instance_name) {
+  auto &repl_instance = FindReplicationInstance(repl_instance_name);
+  if (!repl_instance.IsReplica()) {
+    spdlog::error("Aborting replica fail callback since instance {} is not replica anymore", repl_instance_name);
+    return;
+  }
+  spdlog::trace("Instance {} performing replica failure callback", repl_instance_name);
+  repl_instance.OnFailPing();
+}
+
 auto CoordinatorInstance::UnregisterReplicationInstance(std::string instance_name)
     -> UnregisterInstanceCoordinatorStatus {
   auto lock = std::lock_guard{coord_instance_lock_};
@@ -343,5 +388,82 @@ auto CoordinatorInstance::GetMainUUID() const -> utils::UUID { return main_uuid_
 // TODO: (andi) Add to the RAFT log.
 auto CoordinatorInstance::SetMainUUID(utils::UUID new_uuid) -> void { main_uuid_ = new_uuid; }
 
+auto CoordinatorInstance::ChooseMostUpToDateInstance(
+    const std::vector<std::pair<std::string, replication_coordination_glue::DatabaseHistories>>
+        &instance_database_histories) -> NewMainRes {
+  NewMainRes new_main_res;
+  std::for_each(
+      instance_database_histories.begin(), instance_database_histories.end(),
+      [&new_main_res](const InstanceNameDbHistories &instance_res_pair) {
+        const auto &[instance_name, instance_db_histories] = instance_res_pair;
+
+        // Find default db for instance and its history
+        auto default_db_history_data = std::ranges::find_if(
+            instance_db_histories, [default_db = memgraph::dbms::kDefaultDB](
+                                       const replication_coordination_glue::DatabaseHistory &db_timestamps) {
+              return db_timestamps.name == default_db;
+            });
+
+        std::ranges::for_each(
+            instance_db_histories,
+            [&instance_name = instance_name](const replication_coordination_glue::DatabaseHistory &db_history) {
+              spdlog::trace("Instance {}: name {}, default db {}", instance_name, db_history.name,
+                            memgraph::dbms::kDefaultDB);
+            });
+
+        MG_ASSERT(default_db_history_data != instance_db_histories.end(), "No history for instance");
+
+        const auto &instance_default_db_history = default_db_history_data->history;
+
+        std::ranges::for_each(instance_default_db_history | ranges::views::reverse,
+                              [&instance_name = instance_name](const auto &epoch_history_it) {
+                                spdlog::trace("Instance {}: epoch {}, last_commit_timestamp: {}", instance_name,
+                                              std::get<0>(epoch_history_it), std::get<1>(epoch_history_it));
+                              });
+
+        // get latest epoch
+        // get latest timestamp
+
+        if (!new_main_res.latest_epoch) {
+          const auto &[epoch, timestamp] = *instance_default_db_history.crbegin();
+          new_main_res = NewMainRes{
+              .most_up_to_date_instance = instance_name,
+              .latest_epoch = epoch,
+              .latest_commit_timestamp = timestamp,
+          };
+          spdlog::trace("Currently the most up to date instance is {} with epoch {} and {} latest commit timestamp",
+                        instance_name, epoch, timestamp);
+          return;
+        }
+
+        bool found_same_point{false};
+        std::string last_most_up_to_date_epoch{*new_main_res.latest_epoch};
+        for (auto [epoch, timestamp] : ranges::reverse_view(instance_default_db_history)) {
+          if (*new_main_res.latest_commit_timestamp < timestamp) {
+            new_main_res = NewMainRes{
+                .most_up_to_date_instance = instance_name,
+                .latest_epoch = epoch,
+                .latest_commit_timestamp = timestamp,
+            };
+
+            spdlog::trace("Found the new most up to date instance {} with epoch {} and {} latest commit timestamp",
+                          instance_name, epoch, timestamp);
+          }
+
+          // we found point at which they were same
+          if (epoch == last_most_up_to_date_epoch) {
+            found_same_point = true;
+            break;
+          }
+        }
+
+        if (!found_same_point) {
+          spdlog::error("Didn't find same history epoch {} for instance {} and instance {}", last_most_up_to_date_epoch,
+                        new_main_res.most_up_to_date_instance, instance_name);
+        }
+      });
+
+  return new_main_res;
+}
 }  // namespace memgraph::coordination
 #endif
diff --git a/src/coordination/coordinator_rpc.cpp b/src/coordination/coordinator_rpc.cpp
index 4115f1979..815693824 100644
--- a/src/coordination/coordinator_rpc.cpp
+++ b/src/coordination/coordinator_rpc.cpp
@@ -76,9 +76,9 @@ void EnableWritingOnMainRes::Load(EnableWritingOnMainRes *self, memgraph::slk::R
   memgraph::slk::Load(self, reader);
 }
 
-void EnableWritingOnMainReq::Save(EnableWritingOnMainReq const &self, memgraph::slk::Builder *builder) {}
+void EnableWritingOnMainReq::Save(EnableWritingOnMainReq const & /*self*/, memgraph::slk::Builder * /*builder*/) {}
 
-void EnableWritingOnMainReq::Load(EnableWritingOnMainReq *self, memgraph::slk::Reader *reader) {}
+void EnableWritingOnMainReq::Load(EnableWritingOnMainReq * /*self*/, memgraph::slk::Reader * /*reader*/) {}
 
 // GetInstanceUUID
 void GetInstanceUUIDReq::Save(const GetInstanceUUIDReq &self, memgraph::slk::Builder *builder) {
@@ -97,6 +97,24 @@ void GetInstanceUUIDRes::Load(GetInstanceUUIDRes *self, memgraph::slk::Reader *r
   memgraph::slk::Load(self, reader);
 }
 
+// GetDatabaseHistoriesRpc
+
+void GetDatabaseHistoriesReq::Save(const GetDatabaseHistoriesReq & /*self*/, memgraph::slk::Builder * /*builder*/) {
+  /* nothing to serialize */
+}
+
+void GetDatabaseHistoriesReq::Load(GetDatabaseHistoriesReq * /*self*/, memgraph::slk::Reader * /*reader*/) {
+  /* nothing to serialize */
+}
+
+void GetDatabaseHistoriesRes::Save(const GetDatabaseHistoriesRes &self, memgraph::slk::Builder *builder) {
+  memgraph::slk::Save(self, builder);
+}
+
+void GetDatabaseHistoriesRes::Load(GetDatabaseHistoriesRes *self, memgraph::slk::Reader *reader) {
+  memgraph::slk::Load(self, reader);
+}
+
 }  // namespace coordination
 
 constexpr utils::TypeInfo coordination::PromoteReplicaToMainReq::kType{utils::TypeId::COORD_FAILOVER_REQ,
@@ -130,6 +148,12 @@ constexpr utils::TypeInfo coordination::GetInstanceUUIDReq::kType{utils::TypeId:
 constexpr utils::TypeInfo coordination::GetInstanceUUIDRes::kType{utils::TypeId::COORD_GET_UUID_RES, "CoordGetUUIDRes",
                                                                   nullptr};
 
+constexpr utils::TypeInfo coordination::GetDatabaseHistoriesReq::kType{utils::TypeId::COORD_GET_INSTANCE_DATABASES_REQ,
+                                                                       "GetInstanceDatabasesReq", nullptr};
+
+constexpr utils::TypeInfo coordination::GetDatabaseHistoriesRes::kType{utils::TypeId::COORD_GET_INSTANCE_DATABASES_RES,
+                                                                       "GetInstanceDatabasesRes", nullptr};
+
 namespace slk {
 
 // PromoteReplicaToMainRpc
@@ -213,6 +237,16 @@ void Load(memgraph::coordination::GetInstanceUUIDRes *self, memgraph::slk::Reade
   memgraph::slk::Load(&self->uuid, reader);
 }
 
+// GetInstanceTimestampsReq
+
+void Save(const memgraph::coordination::GetDatabaseHistoriesRes &self, memgraph::slk::Builder *builder) {
+  memgraph::slk::Save(self.database_histories, builder);
+}
+
+void Load(memgraph::coordination::GetDatabaseHistoriesRes *self, memgraph::slk::Reader *reader) {
+  memgraph::slk::Load(&self->database_histories, reader);
+}
+
 }  // namespace slk
 
 }  // namespace memgraph
diff --git a/src/coordination/include/coordination/coordinator_client.hpp b/src/coordination/include/coordination/coordinator_client.hpp
index 5e10af89d..994c78d18 100644
--- a/src/coordination/include/coordination/coordinator_client.hpp
+++ b/src/coordination/include/coordination/coordinator_client.hpp
@@ -14,6 +14,7 @@
 #ifdef MG_ENTERPRISE
 
 #include "coordination/coordinator_config.hpp"
+#include "replication_coordination_glue/common.hpp"
 #include "rpc/client.hpp"
 #include "rpc_errors.hpp"
 #include "utils/result.hpp"
@@ -23,13 +24,13 @@
 namespace memgraph::coordination {
 
 class CoordinatorInstance;
-using HealthCheckCallback = std::function<void(CoordinatorInstance *, std::string_view)>;
+using HealthCheckClientCallback = std::function<void(CoordinatorInstance *, std::string_view)>;
 using ReplicationClientsInfo = std::vector<ReplClientInfo>;
 
 class CoordinatorClient {
  public:
   explicit CoordinatorClient(CoordinatorInstance *coord_instance, CoordinatorClientConfig config,
-                             HealthCheckCallback succ_cb, HealthCheckCallback fail_cb);
+                             HealthCheckClientCallback succ_cb, HealthCheckClientCallback fail_cb);
 
   ~CoordinatorClient() = default;
 
@@ -62,7 +63,8 @@ class CoordinatorClient {
 
   auto ReplicationClientInfo() const -> ReplClientInfo;
 
-  auto SetCallbacks(HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) -> void;
+  auto SendGetInstanceTimestampsRpc() const
+      -> utils::BasicResult<GetInstanceUUIDError, replication_coordination_glue::DatabaseHistories>;
 
   auto RpcClient() -> rpc::Client & { return rpc_client_; }
 
@@ -82,8 +84,8 @@ class CoordinatorClient {
 
   CoordinatorClientConfig config_;
   CoordinatorInstance *coord_instance_;
-  HealthCheckCallback succ_cb_;
-  HealthCheckCallback fail_cb_;
+  HealthCheckClientCallback succ_cb_;
+  HealthCheckClientCallback fail_cb_;
 };
 
 }  // namespace memgraph::coordination
diff --git a/src/coordination/include/coordination/coordinator_handlers.hpp b/src/coordination/include/coordination/coordinator_handlers.hpp
index b9ed4b519..18aecc9cf 100644
--- a/src/coordination/include/coordination/coordinator_handlers.hpp
+++ b/src/coordination/include/coordination/coordinator_handlers.hpp
@@ -41,6 +41,9 @@ class CoordinatorHandlers {
 
   static void GetInstanceUUIDHandler(replication::ReplicationHandler &replication_handler, slk::Reader *req_reader,
                                      slk::Builder *res_builder);
+
+  static void GetDatabaseHistoriesHandler(replication::ReplicationHandler &replication_handler, slk::Reader *req_reader,
+                                          slk::Builder *res_builder);
 };
 
 }  // namespace memgraph::dbms
diff --git a/src/coordination/include/coordination/coordinator_instance.hpp b/src/coordination/include/coordination/coordinator_instance.hpp
index 15b377ed9..bed202744 100644
--- a/src/coordination/include/coordination/coordinator_instance.hpp
+++ b/src/coordination/include/coordination/coordinator_instance.hpp
@@ -18,6 +18,7 @@
 #include "coordination/raft_state.hpp"
 #include "coordination/register_main_replica_coordinator_status.hpp"
 #include "coordination/replication_instance.hpp"
+#include "utils/resource_lock.hpp"
 #include "utils/rw_lock.hpp"
 #include "utils/thread_pool.hpp"
 
@@ -25,6 +26,13 @@
 
 namespace memgraph::coordination {
 
+struct NewMainRes {
+  std::string most_up_to_date_instance;
+  std::optional<std::string> latest_epoch;
+  std::optional<uint64_t> latest_commit_timestamp;
+};
+using InstanceNameDbHistories = std::pair<std::string, replication_coordination_glue::DatabaseHistories>;
+
 class CoordinatorInstance {
  public:
   CoordinatorInstance();
@@ -44,12 +52,24 @@ class CoordinatorInstance {
 
   auto SetMainUUID(utils::UUID new_uuid) -> void;
 
+  auto FindReplicationInstance(std::string_view replication_instance_name) -> ReplicationInstance &;
+
+  void MainFailCallback(std::string_view);
+
+  void MainSuccessCallback(std::string_view);
+
+  void ReplicaSuccessCallback(std::string_view);
+
+  void ReplicaFailCallback(std::string_view);
+
+  static auto ChooseMostUpToDateInstance(const std::vector<InstanceNameDbHistories> &) -> NewMainRes;
+
  private:
-  HealthCheckCallback main_succ_cb_, main_fail_cb_, replica_succ_cb_, replica_fail_cb_;
+  HealthCheckClientCallback client_succ_cb_, client_fail_cb_;
 
   // NOTE: Must be std::list because we rely on pointer stability
   std::list<ReplicationInstance> repl_instances_;
-  mutable utils::RWLock coord_instance_lock_{utils::RWLock::Priority::READ};
+  mutable utils::ResourceLock coord_instance_lock_{};
 
   utils::UUID main_uuid_;
 
diff --git a/src/coordination/include/coordination/coordinator_rpc.hpp b/src/coordination/include/coordination/coordinator_rpc.hpp
index 1578b4577..2bf88fe46 100644
--- a/src/coordination/include/coordination/coordinator_rpc.hpp
+++ b/src/coordination/include/coordination/coordinator_rpc.hpp
@@ -15,6 +15,7 @@
 #ifdef MG_ENTERPRISE
 
 #include "coordination/coordinator_config.hpp"
+#include "replication_coordination_glue/common.hpp"
 #include "rpc/messages.hpp"
 #include "slk/serialization.hpp"
 
@@ -161,6 +162,32 @@ struct GetInstanceUUIDRes {
 
 using GetInstanceUUIDRpc = rpc::RequestResponse<GetInstanceUUIDReq, GetInstanceUUIDRes>;
 
+struct GetDatabaseHistoriesReq {
+  static const utils::TypeInfo kType;
+  static const utils::TypeInfo &GetTypeInfo() { return kType; }
+
+  static void Load(GetDatabaseHistoriesReq *self, memgraph::slk::Reader *reader);
+  static void Save(const GetDatabaseHistoriesReq &self, memgraph::slk::Builder *builder);
+
+  GetDatabaseHistoriesReq() = default;
+};
+
+struct GetDatabaseHistoriesRes {
+  static const utils::TypeInfo kType;
+  static const utils::TypeInfo &GetTypeInfo() { return kType; }
+
+  static void Load(GetDatabaseHistoriesRes *self, memgraph::slk::Reader *reader);
+  static void Save(const GetDatabaseHistoriesRes &self, memgraph::slk::Builder *builder);
+
+  explicit GetDatabaseHistoriesRes(const replication_coordination_glue::DatabaseHistories &database_histories)
+      : database_histories(database_histories) {}
+  GetDatabaseHistoriesRes() = default;
+
+  replication_coordination_glue::DatabaseHistories database_histories;
+};
+
+using GetDatabaseHistoriesRpc = rpc::RequestResponse<GetDatabaseHistoriesReq, GetDatabaseHistoriesRes>;
+
 }  // namespace memgraph::coordination
 
 // SLK serialization declarations
@@ -183,15 +210,21 @@ void Save(const memgraph::coordination::GetInstanceUUIDReq &self, memgraph::slk:
 void Load(memgraph::coordination::GetInstanceUUIDReq *self, memgraph::slk::Reader *reader);
 void Save(const memgraph::coordination::GetInstanceUUIDRes &self, memgraph::slk::Builder *builder);
 void Load(memgraph::coordination::GetInstanceUUIDRes *self, memgraph::slk::Reader *reader);
+
 // UnregisterReplicaRpc
 void Save(memgraph::coordination::UnregisterReplicaRes const &self, memgraph::slk::Builder *builder);
 void Load(memgraph::coordination::UnregisterReplicaRes *self, memgraph::slk::Reader *reader);
 void Save(memgraph::coordination::UnregisterReplicaReq const &self, memgraph::slk::Builder *builder);
 void Load(memgraph::coordination::UnregisterReplicaReq *self, memgraph::slk::Reader *reader);
 
+// EnableWritingOnMainRpc
 void Save(memgraph::coordination::EnableWritingOnMainRes const &self, memgraph::slk::Builder *builder);
 void Load(memgraph::coordination::EnableWritingOnMainRes *self, memgraph::slk::Reader *reader);
 
+// GetDatabaseHistoriesRpc
+void Save(const memgraph::coordination::GetDatabaseHistoriesRes &self, memgraph::slk::Builder *builder);
+void Load(memgraph::coordination::GetDatabaseHistoriesRes *self, memgraph::slk::Reader *reader);
+
 }  // namespace memgraph::slk
 
 #endif
diff --git a/src/coordination/include/coordination/coordinator_slk.hpp b/src/coordination/include/coordination/coordinator_slk.hpp
index 49834be41..ee393b7b6 100644
--- a/src/coordination/include/coordination/coordinator_slk.hpp
+++ b/src/coordination/include/coordination/coordinator_slk.hpp
@@ -14,6 +14,7 @@
 #ifdef MG_ENTERPRISE
 
 #include "coordination/coordinator_config.hpp"
+#include "replication_coordination_glue/common.hpp"
 #include "slk/serialization.hpp"
 #include "slk/streams.hpp"
 
@@ -34,5 +35,18 @@ inline void Load(ReplicationClientInfo *obj, Reader *reader) {
   Load(&obj->replication_ip_address, reader);
   Load(&obj->replication_port, reader);
 }
+
+inline void Save(const replication_coordination_glue::DatabaseHistory &obj, Builder *builder) {
+  Save(obj.db_uuid, builder);
+  Save(obj.history, builder);
+  Save(obj.name, builder);
+}
+
+inline void Load(replication_coordination_glue::DatabaseHistory *obj, Reader *reader) {
+  Load(&obj->db_uuid, reader);
+  Load(&obj->history, reader);
+  Load(&obj->name, reader);
+}
+
 }  // namespace memgraph::slk
 #endif
diff --git a/src/coordination/include/coordination/replication_instance.hpp b/src/coordination/include/coordination/replication_instance.hpp
index 8001d0905..e8e00a0a8 100644
--- a/src/coordination/include/coordination/replication_instance.hpp
+++ b/src/coordination/include/coordination/replication_instance.hpp
@@ -18,17 +18,22 @@
 #include "replication_coordination_glue/role.hpp"
 
 #include <libnuraft/nuraft.hxx>
+#include "utils/resource_lock.hpp"
 #include "utils/result.hpp"
 #include "utils/uuid.hpp"
 
 namespace memgraph::coordination {
 
 class CoordinatorInstance;
+class ReplicationInstance;
+
+using HealthCheckInstanceCallback = void (CoordinatorInstance::*)(std::string_view);
 
 class ReplicationInstance {
  public:
-  ReplicationInstance(CoordinatorInstance *peer, CoordinatorClientConfig config, HealthCheckCallback succ_cb,
-                      HealthCheckCallback fail_cb);
+  ReplicationInstance(CoordinatorInstance *peer, CoordinatorClientConfig config, HealthCheckClientCallback succ_cb,
+                      HealthCheckClientCallback fail_cb, HealthCheckInstanceCallback succ_instance_cb,
+                      HealthCheckInstanceCallback fail_instance_cb);
 
   ReplicationInstance(ReplicationInstance const &other) = delete;
   ReplicationInstance &operator=(ReplicationInstance const &other) = delete;
@@ -50,9 +55,10 @@ class ReplicationInstance {
   auto IsReplica() const -> bool;
   auto IsMain() const -> bool;
 
-  auto PromoteToMain(utils::UUID uuid, ReplicationClientsInfo repl_clients_info, HealthCheckCallback main_succ_cb,
-                     HealthCheckCallback main_fail_cb) -> bool;
-  auto DemoteToReplica(HealthCheckCallback replica_succ_cb, HealthCheckCallback replica_fail_cb) -> bool;
+  auto PromoteToMain(utils::UUID uuid, ReplicationClientsInfo repl_clients_info,
+                     HealthCheckInstanceCallback main_succ_cb, HealthCheckInstanceCallback main_fail_cb) -> bool;
+  auto DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb, HealthCheckInstanceCallback replica_fail_cb)
+      -> bool;
 
   auto StartFrequentCheck() -> void;
   auto StopFrequentCheck() -> void;
@@ -66,16 +72,17 @@ class ReplicationInstance {
   auto SendSwapAndUpdateUUID(const utils::UUID &new_main_uuid) -> bool;
   auto SendUnregisterReplicaRpc(std::string const &instance_name) -> bool;
 
-
   auto SendGetInstanceUUID() -> utils::BasicResult<coordination::GetInstanceUUIDError, std::optional<utils::UUID>>;
   auto GetClient() -> CoordinatorClient &;
 
   auto EnableWritingOnMain() -> bool;
 
   auto SetNewMainUUID(utils::UUID const &main_uuid) -> void;
-  auto ResetMainUUID() -> void;
   auto GetMainUUID() const -> const std::optional<utils::UUID> &;
 
+  auto GetSuccessCallback() -> HealthCheckInstanceCallback &;
+  auto GetFailCallback() -> HealthCheckInstanceCallback &;
+
  private:
   CoordinatorClient client_;
   replication_coordination_glue::ReplicationRole replication_role_;
@@ -90,6 +97,9 @@ class ReplicationInstance {
   // so we need to send swap uuid again
   std::optional<utils::UUID> main_uuid_;
 
+  HealthCheckInstanceCallback succ_cb_;
+  HealthCheckInstanceCallback fail_cb_;
+
   friend bool operator==(ReplicationInstance const &first, ReplicationInstance const &second) {
     return first.client_ == second.client_ && first.replication_role_ == second.replication_role_;
   }
diff --git a/src/coordination/include/coordination/rpc_errors.hpp b/src/coordination/include/coordination/rpc_errors.hpp
index f6bfbf3e0..3829d430a 100644
--- a/src/coordination/include/coordination/rpc_errors.hpp
+++ b/src/coordination/include/coordination/rpc_errors.hpp
@@ -11,4 +11,5 @@
 
 namespace memgraph::coordination {
 enum class GetInstanceUUIDError { NO_RESPONSE, RPC_EXCEPTION };
+enum class GetInstanceTimestampsError { NO_RESPONSE, RPC_EXCEPTION };
 }  // namespace memgraph::coordination
diff --git a/src/coordination/replication_instance.cpp b/src/coordination/replication_instance.cpp
index 0d16db648..50f1be468 100644
--- a/src/coordination/replication_instance.cpp
+++ b/src/coordination/replication_instance.cpp
@@ -13,15 +13,21 @@
 
 #include "coordination/replication_instance.hpp"
 
+#include <utility>
+
 #include "replication_coordination_glue/handler.hpp"
 #include "utils/result.hpp"
 
 namespace memgraph::coordination {
 
 ReplicationInstance::ReplicationInstance(CoordinatorInstance *peer, CoordinatorClientConfig config,
-                                         HealthCheckCallback succ_cb, HealthCheckCallback fail_cb)
+                                         HealthCheckClientCallback succ_cb, HealthCheckClientCallback fail_cb,
+                                         HealthCheckInstanceCallback succ_instance_cb,
+                                         HealthCheckInstanceCallback fail_instance_cb)
     : client_(peer, std::move(config), std::move(succ_cb), std::move(fail_cb)),
-      replication_role_(replication_coordination_glue::ReplicationRole::REPLICA) {
+      replication_role_(replication_coordination_glue::ReplicationRole::REPLICA),
+      succ_cb_(succ_instance_cb),
+      fail_cb_(fail_instance_cb) {
   if (!client_.DemoteToReplica()) {
     throw CoordinatorRegisterInstanceException("Failed to demote instance {} to replica", client_.InstanceName());
   }
@@ -57,26 +63,29 @@ auto ReplicationInstance::IsMain() const -> bool {
 }
 
 auto ReplicationInstance::PromoteToMain(utils::UUID new_uuid, ReplicationClientsInfo repl_clients_info,
-                                        HealthCheckCallback main_succ_cb, HealthCheckCallback main_fail_cb) -> bool {
+                                        HealthCheckInstanceCallback main_succ_cb,
+                                        HealthCheckInstanceCallback main_fail_cb) -> bool {
   if (!client_.SendPromoteReplicaToMainRpc(new_uuid, std::move(repl_clients_info))) {
     return false;
   }
 
   replication_role_ = replication_coordination_glue::ReplicationRole::MAIN;
   main_uuid_ = new_uuid;
-  client_.SetCallbacks(std::move(main_succ_cb), std::move(main_fail_cb));
+  succ_cb_ = main_succ_cb;
+  fail_cb_ = main_fail_cb;
 
   return true;
 }
 
-auto ReplicationInstance::DemoteToReplica(HealthCheckCallback replica_succ_cb, HealthCheckCallback replica_fail_cb)
-    -> bool {
+auto ReplicationInstance::DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb,
+                                          HealthCheckInstanceCallback replica_fail_cb) -> bool {
   if (!client_.DemoteToReplica()) {
     return false;
   }
 
   replication_role_ = replication_coordination_glue::ReplicationRole::REPLICA;
-  client_.SetCallbacks(std::move(replica_succ_cb), std::move(replica_fail_cb));
+  succ_cb_ = replica_succ_cb;
+  fail_cb_ = replica_fail_cb;
 
   return true;
 }
@@ -90,10 +99,12 @@ auto ReplicationInstance::ReplicationClientInfo() const -> CoordinatorClientConf
   return client_.ReplicationClientInfo();
 }
 
+auto ReplicationInstance::GetSuccessCallback() -> HealthCheckInstanceCallback & { return succ_cb_; }
+auto ReplicationInstance::GetFailCallback() -> HealthCheckInstanceCallback & { return fail_cb_; }
+
 auto ReplicationInstance::GetClient() -> CoordinatorClient & { return client_; }
 
 auto ReplicationInstance::SetNewMainUUID(utils::UUID const &main_uuid) -> void { main_uuid_ = main_uuid; }
-auto ReplicationInstance::ResetMainUUID() -> void { main_uuid_ = std::nullopt; }
 auto ReplicationInstance::GetMainUUID() const -> std::optional<utils::UUID> const & { return main_uuid_; }
 
 auto ReplicationInstance::EnsureReplicaHasCorrectMainUUID(utils::UUID const &curr_main_uuid) -> bool {
diff --git a/src/dbms/dbms_handler.hpp b/src/dbms/dbms_handler.hpp
index 7b1d45335..87d1257a6 100644
--- a/src/dbms/dbms_handler.hpp
+++ b/src/dbms/dbms_handler.hpp
@@ -266,10 +266,6 @@ class DbmsHandler {
   bool IsMain() const { return repl_state_.IsMain(); }
   bool IsReplica() const { return repl_state_.IsReplica(); }
 
-#ifdef MG_ENTERPRISE
-  // coordination::CoordinatorState &CoordinatorState() { return coordinator_state_; }
-#endif
-
   /**
    * @brief Return all active databases.
    *
diff --git a/src/replication_coordination_glue/CMakeLists.txt b/src/replication_coordination_glue/CMakeLists.txt
index f81aed4ba..f452e1c1f 100644
--- a/src/replication_coordination_glue/CMakeLists.txt
+++ b/src/replication_coordination_glue/CMakeLists.txt
@@ -7,6 +7,7 @@ target_sources(mg-repl_coord_glue
         mode.hpp
         role.hpp
         handler.hpp
+        common.hpp
 
         PRIVATE
         messages.cpp
diff --git a/src/replication_coordination_glue/common.hpp b/src/replication_coordination_glue/common.hpp
new file mode 100644
index 000000000..439e5cae8
--- /dev/null
+++ b/src/replication_coordination_glue/common.hpp
@@ -0,0 +1,32 @@
+// Copyright 2024 Memgraph Ltd.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
+// License, and you may not use this file except in compliance with the Business Source License.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+#pragma once
+
+#include "rpc/client.hpp"
+#include "utils/uuid.hpp"
+
+#include <deque>
+#include "messages.hpp"
+#include "rpc/messages.hpp"
+#include "utils/uuid.hpp"
+
+namespace memgraph::replication_coordination_glue {
+
+struct DatabaseHistory {
+  memgraph::utils::UUID db_uuid;
+  std::vector<std::pair<std::string, uint64_t>> history;
+  std::string name;
+};
+
+using DatabaseHistories = std::vector<DatabaseHistory>;
+
+}  // namespace memgraph::replication_coordination_glue
diff --git a/src/replication_handler/include/replication_handler/replication_handler.hpp b/src/replication_handler/include/replication_handler/replication_handler.hpp
index b110e6015..d5c2bfa71 100644
--- a/src/replication_handler/include/replication_handler/replication_handler.hpp
+++ b/src/replication_handler/include/replication_handler/replication_handler.hpp
@@ -14,6 +14,7 @@
 #include "dbms/dbms_handler.hpp"
 #include "flags/experimental.hpp"
 #include "replication/include/replication/state.hpp"
+#include "replication_coordination_glue/common.hpp"
 #include "replication_handler/system_replication.hpp"
 #include "replication_handler/system_rpc.hpp"
 #include "utils/result.hpp"
@@ -149,6 +150,8 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler {
 
   auto GetReplicaUUID() -> std::optional<utils::UUID>;
 
+  auto GetDatabasesHistories() -> replication_coordination_glue::DatabaseHistories;
+
  private:
   template <bool SendSwapUUID>
   auto RegisterReplica_(const memgraph::replication::ReplicationClientConfig &config)
diff --git a/src/replication_handler/replication_handler.cpp b/src/replication_handler/replication_handler.cpp
index 5f807779d..ea567eed0 100644
--- a/src/replication_handler/replication_handler.cpp
+++ b/src/replication_handler/replication_handler.cpp
@@ -14,6 +14,7 @@
 #include "dbms/dbms_handler.hpp"
 #include "replication/replication_client.hpp"
 #include "replication_handler/system_replication.hpp"
+#include "utils/functional.hpp"
 
 namespace memgraph::replication {
 
@@ -265,8 +266,26 @@ auto ReplicationHandler::GetRole() const -> replication_coordination_glue::Repli
   return repl_state_.GetRole();
 }
 
+auto ReplicationHandler::GetDatabasesHistories() -> replication_coordination_glue::DatabaseHistories {
+  replication_coordination_glue::DatabaseHistories results;
+  dbms_handler_.ForEach([&results](memgraph::dbms::DatabaseAccess db_acc) {
+    auto &repl_storage_state = db_acc->storage()->repl_storage_state_;
+
+    std::vector<std::pair<std::string, uint64_t>> history =
+        utils::fmap([](const auto &elem) { return std::pair<std::string, uint64_t>(elem.first, elem.second); },
+                    repl_storage_state.history);
+
+    history.emplace_back(std::string(repl_storage_state.epoch_.id()), repl_storage_state.last_commit_timestamp_.load());
+    replication_coordination_glue::DatabaseHistory repl{
+        .db_uuid = utils::UUID{db_acc->storage()->uuid()}, .history = history, .name = std::string(db_acc->name())};
+    results.emplace_back(repl);
+  });
+
+  return results;
+}
+
 auto ReplicationHandler::GetReplicaUUID() -> std::optional<utils::UUID> {
-  MG_ASSERT(repl_state_.IsReplica());
+  MG_ASSERT(repl_state_.IsReplica(), "Instance is not replica");
   return std::get<RoleReplicaData>(repl_state_.ReplicationData()).uuid_;
 }
 
diff --git a/src/storage/v2/inmemory/replication/recovery.cpp b/src/storage/v2/inmemory/replication/recovery.cpp
index 921c1f5c0..fe752bfd1 100644
--- a/src/storage/v2/inmemory/replication/recovery.cpp
+++ b/src/storage/v2/inmemory/replication/recovery.cpp
@@ -106,8 +106,8 @@ uint64_t ReplicateCurrentWal(const utils::UUID &main_uuid, const InMemoryStorage
   return response.current_commit_timestamp;
 }
 
-/// This method tries to find the optimal path for recoverying a single replica.
-/// Based on the last commit transfered to replica it tries to update the
+/// This method tries to find the optimal path for recovering a single replica.
+/// Based on the last commit transferred to replica it tries to update the
 /// replica using durability files - WALs and Snapshots. WAL files are much
 /// smaller in size as they contain only the Deltas (changes) made during the
 /// transactions while Snapshots contain all the data. For that reason we prefer
@@ -175,7 +175,7 @@ std::vector<RecoveryStep> GetRecoverySteps(uint64_t replica_commit, utils::FileR
   auto add_snapshot = [&]() {
     if (!latest_snapshot) return;
     const auto lock_success = locker_acc.AddPath(latest_snapshot->path);
-    MG_ASSERT(!lock_success.HasError(), "Tried to lock a nonexistant snapshot path.");
+    MG_ASSERT(!lock_success.HasError(), "Tried to lock a non-existent snapshot path.");
     recovery_steps.emplace_back(std::in_place_type_t<RecoverySnapshot>{}, std::move(latest_snapshot->path));
   };
 
diff --git a/src/storage/v2/replication/replication_client.cpp b/src/storage/v2/replication/replication_client.cpp
index 16429d11f..1eb06bf10 100644
--- a/src/storage/v2/replication/replication_client.cpp
+++ b/src/storage/v2/replication/replication_client.cpp
@@ -53,11 +53,13 @@ void ReplicationStorageClient::UpdateReplicaState(Storage *storage, DatabaseAcce
 #endif
 
   std::optional<uint64_t> branching_point;
+  // different epoch id, replica was main
   if (replica.epoch_id != replStorageState.epoch_.id() && replica.current_commit_timestamp != kTimestampInitialId) {
     auto const &history = replStorageState.history;
     const auto epoch_info_iter = std::find_if(history.crbegin(), history.crend(), [&](const auto &main_epoch_info) {
       return main_epoch_info.first == replica.epoch_id;
     });
+    // main didn't have that epoch, but why is here branching point
     if (epoch_info_iter == history.crend()) {
       branching_point = 0;
     } else if (epoch_info_iter->second != replica.current_commit_timestamp) {
diff --git a/src/utils/functional.hpp b/src/utils/functional.hpp
index e0714de2a..f5242944a 100644
--- a/src/utils/functional.hpp
+++ b/src/utils/functional.hpp
@@ -18,8 +18,11 @@
 
 namespace memgraph::utils {
 
-template <class F, class T, class R = typename std::invoke_result<F, T>::type>
-auto fmap(F &&f, std::vector<T> const &v) -> std::vector<R> {
+template <template <typename, typename...> class Container, typename T, typename Allocator = std::allocator<T>,
+          typename F, typename R = std::invoke_result_t<F, T>>
+requires ranges::range<Container<T, Allocator>> &&
+    (!std::same_as<Container<T, Allocator>, std::string>)auto fmap(F &&f, const Container<T, Allocator> &v)
+        -> std::vector<R> {
   return v | ranges::views::transform(std::forward<F>(f)) | ranges::to<std::vector<R>>();
 }
 
diff --git a/src/utils/scheduler.hpp b/src/utils/scheduler.hpp
index 742271a95..45b2c8b04 100644
--- a/src/utils/scheduler.hpp
+++ b/src/utils/scheduler.hpp
@@ -57,7 +57,7 @@ class Scheduler {
         // program and there is probably no work to do in scheduled function at
         // the start of the program. Since Server will log some messages on
         // the program start we let him log first and we make sure by first
-        // waiting that funcion f will not log before it.
+        // waiting that function f will not log before it.
         // Check for pause also.
         std::unique_lock<std::mutex> lk(mutex_);
         auto now = std::chrono::system_clock::now();
diff --git a/src/utils/typeinfo.hpp b/src/utils/typeinfo.hpp
index 1ca08a3f7..aadc8a07b 100644
--- a/src/utils/typeinfo.hpp
+++ b/src/utils/typeinfo.hpp
@@ -114,6 +114,8 @@ enum class TypeId : uint64_t {
 
   COORD_GET_UUID_REQ,
   COORD_GET_UUID_RES,
+  COORD_GET_INSTANCE_DATABASES_REQ,
+  COORD_GET_INSTANCE_DATABASES_RES,
 
   // AST
   AST_LABELIX = 3000,
diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt
index 6f7b3bbef..b92989f4e 100644
--- a/tests/unit/CMakeLists.txt
+++ b/tests/unit/CMakeLists.txt
@@ -430,3 +430,11 @@ target_include_directories(${test_prefix}distributed_lamport_clock PRIVATE ${CMA
 
 add_unit_test(query_hint_provider.cpp)
 target_link_libraries(${test_prefix}query_hint_provider mg-query mg-glue)
+
+
+# Test coordination
+if(MG_ENTERPRISE)
+add_unit_test(coordination_utils.cpp)
+target_link_libraries(${test_prefix}coordination_utils gflags mg-coordination mg-repl_coord_glue)
+target_include_directories(${test_prefix}coordination_utils PRIVATE ${CMAKE_SOURCE_DIR}/include)
+endif()
diff --git a/tests/unit/coordination_utils.cpp b/tests/unit/coordination_utils.cpp
new file mode 100644
index 000000000..1346dce2c
--- /dev/null
+++ b/tests/unit/coordination_utils.cpp
@@ -0,0 +1,246 @@
+// Copyright 2024 Memgraph Ltd.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
+// License, and you may not use this file except in compliance with the Business Source License.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+#include <gflags/gflags.h>
+#include <gtest/gtest.h>
+#include "coordination/coordinator_instance.hpp"
+#include "dbms/constants.hpp"
+#include "replication_coordination_glue/common.hpp"
+#include "utils/functional.hpp"
+
+class CoordinationUtils : public ::testing::Test {
+ protected:
+  void SetUp() override {}
+
+  void TearDown() override {}
+
+  std::filesystem::path test_folder_{std::filesystem::temp_directory_path() / "MG_tests_unit_coordination"};
+};
+
+TEST_F(CoordinationUtils, MemgraphDbHistorySimple) {
+  // Choose any if everything is same
+  // X = dead
+  // Main      : A(24)  B(36)  C(48) D(50) E(51) X
+  // replica  1: A(24)  B(36)  C(48) D(50) E(51)
+  // replica  2: A(24)  B(36)  C(48) D(50) E(51)
+  // replica  3: A(24)  B(36)  C(48) D(50) E(51)
+  std::vector<std::pair<std::string, memgraph::replication_coordination_glue::DatabaseHistories>>
+      instance_database_histories;
+
+  std::vector<std::pair<memgraph::utils::UUID, uint64_t>> histories;
+  histories.emplace_back(memgraph::utils::UUID{}, 24);
+  histories.emplace_back(memgraph::utils::UUID{}, 36);
+  histories.emplace_back(memgraph::utils::UUID{}, 48);
+  histories.emplace_back(memgraph::utils::UUID{}, 50);
+  histories.emplace_back(memgraph::utils::UUID{}, 51);
+
+  memgraph::utils::UUID db_uuid;
+  std::string default_name = std::string(memgraph::dbms::kDefaultDB);
+
+  auto db_histories = memgraph::utils::fmap(
+      [](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
+        return std::make_pair(std::string(pair.first), pair.second);
+      },
+      histories);
+
+  memgraph::replication_coordination_glue::DatabaseHistory history{
+      .db_uuid = db_uuid, .history = db_histories, .name = default_name};
+
+  memgraph::replication_coordination_glue::DatabaseHistories instance_1_db_histories_{history};
+  instance_database_histories.emplace_back("instance_1", instance_1_db_histories_);
+
+  memgraph::replication_coordination_glue::DatabaseHistories instance_2_db_histories_{history};
+  instance_database_histories.emplace_back("instance_2", instance_2_db_histories_);
+
+  memgraph::replication_coordination_glue::DatabaseHistories instance_3_db_histories_{history};
+  instance_database_histories.emplace_back("instance_3", instance_3_db_histories_);
+  memgraph::coordination::CoordinatorInstance instance;
+
+  auto [instance_name, latest_epoch, latest_commit_timestamp] =
+      instance.ChooseMostUpToDateInstance(instance_database_histories);
+  ASSERT_TRUE(instance_name == "instance_1" || instance_name == "instance_2" || instance_name == "instance_3");
+  ASSERT_TRUE(*latest_epoch == db_histories.back().first);
+  ASSERT_TRUE(*latest_commit_timestamp == db_histories.back().second);
+}
+
+TEST_F(CoordinationUtils, MemgraphDbHistoryLastEpochDifferent) {
+  // Prioritize one with the biggest last commit timestamp on last epoch
+  // X = dead
+  // Main      : A(24)  B(36)  C(48) D(50) E(59) X
+  // replica  1: A(24)  B(12)  C(15) D(17) E(51)
+  // replica  2: A(24)  B(12)  C(15) D(17) E(57)
+  // replica  3: A(24)  B(12)  C(15) D(17) E(59)
+  std::vector<std::pair<std::string, memgraph::replication_coordination_glue::DatabaseHistories>>
+      instance_database_histories;
+
+  std::vector<std::pair<memgraph::utils::UUID, uint64_t>> histories;
+  histories.emplace_back(memgraph::utils::UUID{}, 24);
+  histories.emplace_back(memgraph::utils::UUID{}, 36);
+  histories.emplace_back(memgraph::utils::UUID{}, 48);
+  histories.emplace_back(memgraph::utils::UUID{}, 50);
+  histories.emplace_back(memgraph::utils::UUID{}, 59);
+
+  memgraph::utils::UUID db_uuid;
+  std::string default_name = std::string(memgraph::dbms::kDefaultDB);
+
+  auto db_histories = memgraph::utils::fmap(
+      [](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
+        return std::make_pair(std::string(pair.first), pair.second);
+      },
+      histories);
+
+  db_histories.back().second = 51;
+  memgraph::replication_coordination_glue::DatabaseHistory history1{
+      .db_uuid = db_uuid, .history = db_histories, .name = default_name};
+
+  memgraph::replication_coordination_glue::DatabaseHistories instance_1_db_histories_{history1};
+  instance_database_histories.emplace_back("instance_1", instance_1_db_histories_);
+
+  db_histories.back().second = 57;
+  memgraph::replication_coordination_glue::DatabaseHistory history2{
+      .db_uuid = db_uuid, .history = db_histories, .name = default_name};
+  memgraph::replication_coordination_glue::DatabaseHistories instance_2_db_histories_{history2};
+  instance_database_histories.emplace_back("instance_2", instance_2_db_histories_);
+
+  db_histories.back().second = 59;
+  memgraph::replication_coordination_glue::DatabaseHistory history3{
+      .db_uuid = db_uuid, .history = db_histories, .name = default_name};
+  memgraph::replication_coordination_glue::DatabaseHistories instance_3_db_histories_{history3};
+  instance_database_histories.emplace_back("instance_3", instance_3_db_histories_);
+
+  memgraph::coordination::CoordinatorInstance instance;
+  auto [instance_name, latest_epoch, latest_commit_timestamp] =
+      instance.ChooseMostUpToDateInstance(instance_database_histories);
+
+  ASSERT_TRUE(instance_name == "instance_3");
+  ASSERT_TRUE(*latest_epoch == db_histories.back().first);
+  ASSERT_TRUE(*latest_commit_timestamp == db_histories.back().second);
+}
+
+TEST_F(CoordinationUtils, MemgraphDbHistoryOneInstanceAheadFewEpochs) {
+  // Prioritize one biggest commit timestamp
+  // X = dead
+  // Main      : A(24)  B(36)  C(48) D(50) E(51)   X    X     X  X
+  // replica  1: A(24)  B(36)  C(48) D(50) E(51) F(60) G(65)  X  up
+  // replica  2: A(24)  B(36)  C(48) D(50) E(51)  X     X     X  up
+  // replica  3: A(24)  B(36)  C(48) D(50) E(51)  X     X     X  up
+  std::vector<std::pair<std::string, memgraph::replication_coordination_glue::DatabaseHistories>>
+      instance_database_histories;
+
+  std::vector<std::pair<memgraph::utils::UUID, uint64_t>> histories;
+  histories.emplace_back(memgraph::utils::UUID{}, 24);
+  histories.emplace_back(memgraph::utils::UUID{}, 36);
+  histories.emplace_back(memgraph::utils::UUID{}, 48);
+  histories.emplace_back(memgraph::utils::UUID{}, 50);
+  histories.emplace_back(memgraph::utils::UUID{}, 51);
+
+  memgraph::utils::UUID db_uuid;
+  std::string default_name = std::string(memgraph::dbms::kDefaultDB);
+
+  auto db_histories = memgraph::utils::fmap(
+      [](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
+        return std::make_pair(std::string(pair.first), pair.second);
+      },
+      histories);
+
+  memgraph::replication_coordination_glue::DatabaseHistory history{
+      .db_uuid = db_uuid, .history = db_histories, .name = default_name};
+
+  memgraph::replication_coordination_glue::DatabaseHistories instance_1_db_histories_{history};
+  instance_database_histories.emplace_back("instance_1", instance_1_db_histories_);
+
+  memgraph::replication_coordination_glue::DatabaseHistories instance_2_db_histories_{history};
+  instance_database_histories.emplace_back("instance_2", instance_2_db_histories_);
+
+  histories.emplace_back(memgraph::utils::UUID{}, 60);
+  histories.emplace_back(memgraph::utils::UUID{}, 65);
+  auto db_histories_longest = memgraph::utils::fmap(
+      [](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
+        return std::make_pair(std::string(pair.first), pair.second);
+      },
+      histories);
+
+  memgraph::replication_coordination_glue::DatabaseHistory history_longest{
+      .db_uuid = db_uuid, .history = db_histories_longest, .name = default_name};
+
+  memgraph::replication_coordination_glue::DatabaseHistories instance_3_db_histories_{history_longest};
+  instance_database_histories.emplace_back("instance_3", instance_3_db_histories_);
+
+  memgraph::coordination::CoordinatorInstance instance;
+  auto [instance_name, latest_epoch, latest_commit_timestamp] =
+      instance.ChooseMostUpToDateInstance(instance_database_histories);
+
+  ASSERT_TRUE(instance_name == "instance_3");
+  ASSERT_TRUE(*latest_epoch == db_histories_longest.back().first);
+  ASSERT_TRUE(*latest_commit_timestamp == db_histories_longest.back().second);
+}
+
+TEST_F(CoordinationUtils, MemgraphDbHistoryInstancesHistoryDiverged) {
+  // When history diverged, also prioritize one with biggest last commit timestamp
+  // Main      : A(1)  B(2)   C(3)    X
+  // replica  1: A(1)  B(2)   C(3)    X     X up
+  // replica  2: A(1)  B(2)    X     D(5)   X up
+  // replica  3: A(1)  B(2)    X     D(4)   X up
+  std::vector<std::pair<std::string, memgraph::replication_coordination_glue::DatabaseHistories>>
+      instance_database_histories;
+
+  std::vector<std::pair<memgraph::utils::UUID, uint64_t>> histories;
+  histories.emplace_back(memgraph::utils::UUID{}, 1);
+  histories.emplace_back(memgraph::utils::UUID{}, 2);
+  histories.emplace_back(memgraph::utils::UUID{}, 3);
+
+  memgraph::utils::UUID db_uuid;
+  std::string default_name = std::string(memgraph::dbms::kDefaultDB);
+
+  auto db_histories = memgraph::utils::fmap(
+      [](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
+        return std::make_pair(std::string(pair.first), pair.second);
+      },
+      histories);
+
+  memgraph::replication_coordination_glue::DatabaseHistory history{
+      .db_uuid = db_uuid, .history = db_histories, .name = default_name};
+
+  memgraph::replication_coordination_glue::DatabaseHistories instance_1_db_histories_{history};
+  instance_database_histories.emplace_back("instance_1", instance_1_db_histories_);
+
+  db_histories.pop_back();
+
+  auto oldest_commit_timestamp{5};
+  auto newest_different_epoch = memgraph::utils::UUID{};
+  histories.emplace_back(newest_different_epoch, oldest_commit_timestamp);
+  auto db_histories_different = memgraph::utils::fmap(
+      [](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
+        return std::make_pair(std::string(pair.first), pair.second);
+      },
+      histories);
+
+  memgraph::replication_coordination_glue::DatabaseHistory history_3{
+      .db_uuid = db_uuid, .history = db_histories_different, .name = default_name};
+
+  memgraph::replication_coordination_glue::DatabaseHistories instance_3_db_histories_{history_3};
+  instance_database_histories.emplace_back("instance_3", instance_3_db_histories_);
+
+  db_histories_different.back().second = 4;
+  memgraph::replication_coordination_glue::DatabaseHistory history_2{
+      .db_uuid = db_uuid, .history = db_histories_different, .name = default_name};
+
+  memgraph::replication_coordination_glue::DatabaseHistories instance_2_db_histories_{history_2};
+  instance_database_histories.emplace_back("instance_2", instance_2_db_histories_);
+
+  memgraph::coordination::CoordinatorInstance instance;
+  auto [instance_name, latest_epoch, latest_commit_timestamp] =
+      instance.ChooseMostUpToDateInstance(instance_database_histories);
+
+  ASSERT_TRUE(instance_name == "instance_3");
+  ASSERT_TRUE(*latest_epoch == std::string(newest_different_epoch));
+  ASSERT_TRUE(*latest_commit_timestamp == oldest_commit_timestamp);
+}