From 0eb0b913a84c1124bc1deb9196d0ec35c87ec7d5 Mon Sep 17 00:00:00 2001
From: Matija Santl <matija.santl@memgraph.com>
Date: Fri, 7 Sep 2018 15:59:10 +0200
Subject: [PATCH] Support dynamic worker addition

Summary:
With this diff we should be able to support dynamic worker addition.
This is ofcourse a minimal effort maximal impact approach.

This diff introduces new RPC calls when a worker registers.

The `DynamicWorkerAddition` doesn't use `GraphDbAccessor` to get indices because
they create WAL entries.

Reviewers: vkasljevic, ipaljak, buda

Reviewed By: ipaljak

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1594
---
 src/CMakeLists.txt                            |   3 +
 src/database/distributed_graph_db.cpp         |  30 ++-
 src/database/graph_db.cpp                     |   6 +-
 src/distributed/dynamic_worker.cpp            |  46 +++++
 src/distributed/dynamic_worker.hpp            |  47 +++++
 .../dynamic_worker_rpc_messages.capnp         |  18 ++
 .../dynamic_worker_rpc_messages.hpp           |  48 +++++
 .../dynamic_worker_rpc_messages.lcp           |  42 ++++
 src/durability/recovery.cpp                   | 119 ++++++-----
 src/durability/recovery.hpp                   |  10 +-
 tests/unit/CMakeLists.txt                     |   3 +
 tests/unit/distributed_common.hpp             |   2 +-
 tests/unit/distributed_dynamic_worker.cpp     | 189 ++++++++++++++++++
 tools/tests/mg_recovery_check.cpp             |   5 +-
 14 files changed, 489 insertions(+), 79 deletions(-)
 create mode 100644 src/distributed/dynamic_worker.cpp
 create mode 100644 src/distributed/dynamic_worker.hpp
 create mode 100644 src/distributed/dynamic_worker_rpc_messages.capnp
 create mode 100644 src/distributed/dynamic_worker_rpc_messages.hpp
 create mode 100644 src/distributed/dynamic_worker_rpc_messages.lcp
 create mode 100644 tests/unit/distributed_dynamic_worker.cpp

diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 2d8dcfdf9..9c9a60637 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -33,6 +33,7 @@ set(memgraph_src_files
     distributed/dgp/vertex_migrator.cpp
     distributed/durability_rpc_master.cpp
     distributed/durability_rpc_worker.cpp
+    distributed/dynamic_worker.cpp
     distributed/index_rpc_server.cpp
     distributed/plan_consumer.cpp
     distributed/plan_dispatcher.cpp
@@ -110,6 +111,8 @@ add_lcp(distributed/token_sharing_rpc_messages.lcp CAPNP_SCHEMA @0x8f295db54ec4c
 add_capnp(distributed/token_sharing_rpc_messages.capnp)
 add_lcp(distributed/updates_rpc_messages.lcp CAPNP_SCHEMA @0x82d5f38d73c7b53a)
 add_capnp(distributed/updates_rpc_messages.capnp)
+add_lcp(distributed/dynamic_worker_rpc_messages.lcp CAPNP_SCHEMA @0x8c53f6c9a0c71b05)
+add_capnp(distributed/dynamic_worker_rpc_messages.capnp)
 
 # distributed_ops.lcp is leading the capnp code generation, so we only want
 # function declarations in generated operator.hpp
diff --git a/src/database/distributed_graph_db.cpp b/src/database/distributed_graph_db.cpp
index 008355d93..3c3b8283a 100644
--- a/src/database/distributed_graph_db.cpp
+++ b/src/database/distributed_graph_db.cpp
@@ -14,6 +14,7 @@
 #include "distributed/data_rpc_server.hpp"
 #include "distributed/durability_rpc_master.hpp"
 #include "distributed/durability_rpc_worker.hpp"
+#include "distributed/dynamic_worker.hpp"
 #include "distributed/index_rpc_server.hpp"
 #include "distributed/plan_consumer.hpp"
 #include "distributed/plan_dispatcher.hpp"
@@ -623,6 +624,7 @@ class Master {
   distributed::TokenSharingRpcServer token_sharing_server_{
       self_, config_.worker_id, &coordination_, &server_,
       &token_sharing_clients_};
+  distributed::DynamicWorkerAddition dynamic_worker_addition_{self_, &server_};
 };
 
 }  // namespace impl
@@ -679,13 +681,15 @@ Master::Master(Config config)
       recovery_data.wal_tx_to_recover =
           impl_->coordination_.CommonWalTransactions(*recovery_info);
       MasterRecoveryTransactions recovery_transactions(this);
-      durability::RecoverWalAndIndexes(impl_->config_.durability_directory,
-                                       this, &recovery_data,
-                                       &recovery_transactions);
+      durability::RecoverWal(impl_->config_.durability_directory, this,
+                             &recovery_data, &recovery_transactions);
+      durability::RecoverIndexes(this, recovery_data.indexes);
       auto workers_recovered_wal =
           impl_->durability_rpc_.RecoverWalAndIndexes(&recovery_data);
       workers_recovered_wal.get();
     }
+
+    impl_->dynamic_worker_addition_.Enable();
   }
 
   // Start the dynamic graph partitioner inside token sharing server
@@ -962,6 +966,8 @@ class Worker {
   distributed::TokenSharingRpcServer token_sharing_server_{
       self_, config_.worker_id, &coordination_, &server_,
       &token_sharing_clients_};
+  distributed::DynamicWorkerRegistration dynamic_worker_registration_{
+      &rpc_worker_clients_.GetClientPool(0)};
 };
 
 }  // namespace impl
@@ -981,8 +987,10 @@ Worker::Worker(Config config)
   if (impl_->config_.durability_enabled)
     utils::CheckDir(impl_->config_.durability_directory);
 
-  // Durability recovery.
-  {
+  // Durability recovery. We need to check this flag for workers that are added
+  // after the "main" cluster recovery.
+  if (impl_->config_.db_recover_on_startup) {
+    // What we should recover.
     // What we should recover (version, transaction_id) pair.
     auto snapshot_to_recover = impl_->cluster_discovery_.snapshot_to_recover();
 
@@ -1013,6 +1021,13 @@ Worker::Worker(Config config)
       LOG(FATAL) << "Memgraph worker failed to recover the database state "
                     "recovered on the master";
     impl_->cluster_discovery_.NotifyWorkerRecovered(recovery_info);
+  } else {
+    // Check with master if we're a dynamically added worker and need to update
+    // our indices.
+    auto indexes = impl_->dynamic_worker_registration_.GetIndicesToCreate();
+    if (!indexes.empty()) {
+      durability::RecoverIndexes(this, indexes);
+    }
   }
 
   if (impl_->config_.durability_enabled) {
@@ -1123,8 +1138,9 @@ void Worker::ReinitializeStorage() {
 
 void Worker::RecoverWalAndIndexes(durability::RecoveryData *recovery_data) {
   WorkerRecoveryTransactions recovery_transactions(this);
-  durability::RecoverWalAndIndexes(impl_->config_.durability_directory, this,
-                                   recovery_data, &recovery_transactions);
+  durability::RecoverWal(impl_->config_.durability_directory, this,
+                         recovery_data, &recovery_transactions);
+  durability::RecoverIndexes(this, recovery_data->indexes);
 }
 
 io::network::Endpoint Worker::endpoint() const {
diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp
index fe3cea9ac..164a092c0 100644
--- a/src/database/graph_db.cpp
+++ b/src/database/graph_db.cpp
@@ -233,9 +233,9 @@ SingleNode::SingleNode(Config config)
     if (recovery_info) {
       recovery_data.wal_tx_to_recover = recovery_info->wal_recovered;
       SingleNodeRecoveryTransanctions recovery_transactions(this);
-      durability::RecoverWalAndIndexes(impl_->config_.durability_directory,
-                                       this, &recovery_data,
-                                       &recovery_transactions);
+      durability::RecoverWal(impl_->config_.durability_directory, this,
+                             &recovery_data, &recovery_transactions);
+      durability::RecoverIndexes(this, recovery_data.indexes);
     }
   }
 
diff --git a/src/distributed/dynamic_worker.cpp b/src/distributed/dynamic_worker.cpp
new file mode 100644
index 000000000..580236a96
--- /dev/null
+++ b/src/distributed/dynamic_worker.cpp
@@ -0,0 +1,46 @@
+#include "distributed/dynamic_worker.hpp"
+
+#include "database/distributed_graph_db.hpp"
+#include "distributed/dynamic_worker_rpc_messages.hpp"
+
+namespace distributed {
+using Server = communication::rpc::Server;
+using ClientPool = communication::rpc::ClientPool;
+
+DynamicWorkerAddition::DynamicWorkerAddition(database::DistributedGraphDb *db,
+                                             Server *server)
+    : db_(db), server_(server) {
+  server_->Register<DynamicWorkerRpc>(
+      [this](const auto &req_reader, auto *res_builder) {
+        DynamicWorkerReq req;
+        req.Load(req_reader);
+        DynamicWorkerRes res(this->GetIndicesToCreate());
+        res.Save(res_builder);
+      });
+}
+
+std::vector<std::pair<std::string, std::string>>
+DynamicWorkerAddition::GetIndicesToCreate() {
+  std::vector<std::pair<std::string, std::string>> indices;
+  if (!enabled_.load()) return indices;
+  for (const auto &key : db_->storage().label_property_index().Keys()) {
+    auto label = db_->label_mapper().id_to_value(key.label_);
+    auto property = db_->property_mapper().id_to_value(key.property_);
+    indices.emplace_back(label, property);
+  }
+  return indices;
+}
+
+void DynamicWorkerAddition::Enable() { enabled_.store(true); }
+
+DynamicWorkerRegistration::DynamicWorkerRegistration(ClientPool *client_pool)
+    : client_pool_(client_pool) {}
+
+std::vector<std::pair<std::string, std::string>>
+DynamicWorkerRegistration::GetIndicesToCreate() {
+  auto result = client_pool_->Call<DynamicWorkerRpc>();
+  CHECK(result) << "DynamicWorkerRpc failed";
+  return result->recover_indices;
+}
+
+}  // namespace distributed
diff --git a/src/distributed/dynamic_worker.hpp b/src/distributed/dynamic_worker.hpp
new file mode 100644
index 000000000..68b388b52
--- /dev/null
+++ b/src/distributed/dynamic_worker.hpp
@@ -0,0 +1,47 @@
+/// @file
+#pragma once
+
+#include <atomic>
+#include <string>
+#include <vector>
+
+#include "communication/rpc/client_pool.hpp"
+#include "communication/rpc/server.hpp"
+
+namespace database {
+class DistributedGraphDb;
+}  // namespace database
+
+namespace distributed {
+using Server = communication::rpc::Server;
+using ClientPool = communication::rpc::ClientPool;
+
+class DynamicWorkerAddition final {
+ public:
+  DynamicWorkerAddition(database::DistributedGraphDb *db, Server *server);
+
+  /// Enable dynamic worker addition.
+  void Enable();
+
+ private:
+  database::DistributedGraphDb *db_{nullptr};
+  Server *server_;
+
+  std::atomic<bool> enabled_{false};
+
+  /// Return the indices a dynamically added worker needs to create.
+  std::vector<std::pair<std::string, std::string>> GetIndicesToCreate();
+};
+
+class DynamicWorkerRegistration final {
+ public:
+  explicit DynamicWorkerRegistration(ClientPool *client_pool);
+
+  /// Make a RPC call to master to get indices to create.
+  std::vector<std::pair<std::string, std::string>> GetIndicesToCreate();
+
+ private:
+  ClientPool *client_pool_;
+};
+
+}  // namespace distributed
diff --git a/src/distributed/dynamic_worker_rpc_messages.capnp b/src/distributed/dynamic_worker_rpc_messages.capnp
new file mode 100644
index 000000000..7a76c5be4
--- /dev/null
+++ b/src/distributed/dynamic_worker_rpc_messages.capnp
@@ -0,0 +1,18 @@
+# -*- buffer-read-only: t; -*-
+# vim: readonly
+# DO NOT EDIT! Generated using LCP from 'dynamic_worker_rpc_messages.lcp'
+
+@0x8c53f6c9a0c71b05;
+
+using Cxx = import "/capnp/c++.capnp";
+$Cxx.namespace("distributed::capnp");
+
+using Utils = import "/utils/serialization.capnp";
+
+struct DynamicWorkerRes {
+  recoverIndices @0 :List(Utils.Pair(Text, Text));
+}
+
+struct DynamicWorkerReq {
+}
+
diff --git a/src/distributed/dynamic_worker_rpc_messages.hpp b/src/distributed/dynamic_worker_rpc_messages.hpp
new file mode 100644
index 000000000..824b70b84
--- /dev/null
+++ b/src/distributed/dynamic_worker_rpc_messages.hpp
@@ -0,0 +1,48 @@
+// -*- buffer-read-only: t; -*-
+// vim: readonly
+// DO NOT EDIT! Generated using LCP from 'dynamic_worker_rpc_messages.lcp'
+
+#pragma once
+
+#include <string>
+#include <vector>
+
+#include "communication/rpc/messages.hpp"
+#include "distributed/dynamic_worker_rpc_messages.capnp.h"
+
+namespace distributed {
+
+struct DynamicWorkerReq {
+  using Capnp = capnp::DynamicWorkerReq;
+  static const communication::rpc::MessageType TypeInfo;
+  DynamicWorkerReq() {}
+
+  void Save(capnp::DynamicWorkerReq::Builder *builder) const;
+
+  static std::unique_ptr<DynamicWorkerReq> Construct(
+      const capnp::DynamicWorkerReq::Reader &reader);
+
+  void Load(const capnp::DynamicWorkerReq::Reader &reader);
+};
+
+struct DynamicWorkerRes {
+  using Capnp = capnp::DynamicWorkerRes;
+  static const communication::rpc::MessageType TypeInfo;
+  DynamicWorkerRes() {}
+  explicit DynamicWorkerRes(
+      std::vector<std::pair<std::string, std::string>> recover_indices)
+      : recover_indices(recover_indices) {}
+
+  std::vector<std::pair<std::string, std::string>> recover_indices;
+
+  void Save(capnp::DynamicWorkerRes::Builder *builder) const;
+
+  static std::unique_ptr<DynamicWorkerRes> Construct(
+      const capnp::DynamicWorkerRes::Reader &reader);
+
+  void Load(const capnp::DynamicWorkerRes::Reader &reader);
+};
+
+using DynamicWorkerRpc =
+    communication::rpc::RequestResponse<DynamicWorkerReq, DynamicWorkerRes>;
+}
diff --git a/src/distributed/dynamic_worker_rpc_messages.lcp b/src/distributed/dynamic_worker_rpc_messages.lcp
new file mode 100644
index 000000000..58a68ff96
--- /dev/null
+++ b/src/distributed/dynamic_worker_rpc_messages.lcp
@@ -0,0 +1,42 @@
+#>cpp
+#pragma once
+
+#include <vector>
+#include <string>
+
+#include "communication/rpc/messages.hpp"
+#include "distributed/dynamic_worker_rpc_messages.capnp.h"
+cpp<#
+
+(lcp:namespace distributed)
+
+(lcp:capnp-namespace "distributed")
+
+(lcp:capnp-import 'utils "/utils/serialization.capnp")
+
+(lcp:define-rpc dynamic-worker
+    (:request ())
+  (:response
+     ((recover-indices "std::vector<std::pair<std::string, std::string>>"
+                      :capnp-type "List(Utils.Pair(Text, Text))"
+                      :capnp-save
+                      (lambda (builder member)
+                        #>cpp
+                        utils::SaveVector<utils::capnp::Pair<::capnp::Text, ::capnp::Text>,
+                                          std::pair<std::string, std::string>>(
+                            ${member}, &${builder}, [](auto *builder, const auto value) {
+                              builder->setFirst(value.first);
+                              builder->setSecond(value.second);
+                            });
+                        cpp<#)
+                      :capnp-load
+                      (lambda (reader member)
+                        #>cpp
+                        utils::LoadVector<utils::capnp::Pair<::capnp::Text, ::capnp::Text>,
+                                          std::pair<std::string, std::string>>(
+                            &${member}, ${reader}, [](const auto &reader) {
+                              return std::make_pair(reader.getFirst(), reader.getSecond());
+                            });
+                        cpp<#)))))
+
+(lcp:pop-namespace) ;; distributed
diff --git a/src/durability/recovery.cpp b/src/durability/recovery.cpp
index ee59e007a..58fbd5b09 100644
--- a/src/durability/recovery.cpp
+++ b/src/durability/recovery.cpp
@@ -368,10 +368,61 @@ std::vector<tx::TransactionId> ReadWalRecoverableTransactions(
   return committed_tx_ids;
 }
 
+}  // anonymous namespace
+
+RecoveryInfo RecoverOnlySnapshot(
+    const fs::path &durability_dir, database::GraphDb *db,
+    RecoveryData *recovery_data,
+    std::experimental::optional<tx::TransactionId> required_snapshot_tx_id,
+    int worker_id) {
+  // Attempt to recover from snapshot files in reverse order (from newest
+  // backwards).
+  const auto snapshot_dir = durability_dir / kSnapshotDir;
+  std::vector<fs::path> snapshot_files;
+  if (fs::exists(snapshot_dir) && fs::is_directory(snapshot_dir))
+    for (auto &file : fs::directory_iterator(snapshot_dir))
+      snapshot_files.emplace_back(file);
+  std::sort(snapshot_files.rbegin(), snapshot_files.rend());
+  for (auto &snapshot_file : snapshot_files) {
+    if (required_snapshot_tx_id) {
+      auto snapshot_file_tx_id =
+          TransactionIdFromSnapshotFilename(snapshot_file);
+      if (!snapshot_file_tx_id ||
+          snapshot_file_tx_id.value() != *required_snapshot_tx_id) {
+        LOG(INFO) << "Skipping snapshot file '" << snapshot_file
+                  << "' because it does not match the required snapshot tx id: "
+                  << *required_snapshot_tx_id;
+        continue;
+      }
+    }
+    LOG(INFO) << "Starting snapshot recovery from: " << snapshot_file;
+    if (!RecoverSnapshot(snapshot_file, db, recovery_data, worker_id)) {
+      db->ReinitializeStorage();
+      recovery_data->Clear();
+      LOG(WARNING) << "Snapshot recovery failed, trying older snapshot...";
+      continue;
+    } else {
+      LOG(INFO) << "Snapshot recovery successful.";
+      break;
+    }
+  }
+
+  // If snapshot recovery is required, and we failed, don't even deal with
+  // the WAL recovery.
+  if (required_snapshot_tx_id &&
+      recovery_data->snapshooter_tx_id != *required_snapshot_tx_id)
+    return {durability::kVersion, recovery_data->snapshooter_tx_id, {}};
+
+  return {durability::kVersion, recovery_data->snapshooter_tx_id,
+          ReadWalRecoverableTransactions(durability_dir / kWalDir, db,
+                                         *recovery_data)};
+}
+
 // TODO - finer-grained recovery feedback could be useful here.
-bool RecoverWal(const fs::path &wal_dir, database::GraphDb *db,
+void RecoverWal(const fs::path &durability_dir, database::GraphDb *db,
                 RecoveryData *recovery_data,
                 RecoveryTransactions *transactions) {
+  auto wal_dir = durability_dir / kWalDir;
   auto wal_files = GetWalFiles(wal_dir);
   // Track which transaction should be recovered first, and define logic for
   // which transactions should be skipped in recovery.
@@ -431,71 +482,13 @@ bool RecoverWal(const fs::path &wal_dir, database::GraphDb *db,
   // - WAL recovery error
 
   db->tx_engine().EnsureNextIdGreater(max_observed_tx_id);
-  return true;
 }
 
-}  // anonymous namespace
-
-RecoveryInfo RecoverOnlySnapshot(
-    const fs::path &durability_dir, database::GraphDb *db,
-    RecoveryData *recovery_data,
-    std::experimental::optional<tx::TransactionId> required_snapshot_tx_id,
-    int worker_id) {
-  // Attempt to recover from snapshot files in reverse order (from newest
-  // backwards).
-  const auto snapshot_dir = durability_dir / kSnapshotDir;
-  std::vector<fs::path> snapshot_files;
-  if (fs::exists(snapshot_dir) && fs::is_directory(snapshot_dir))
-    for (auto &file : fs::directory_iterator(snapshot_dir))
-      snapshot_files.emplace_back(file);
-  std::sort(snapshot_files.rbegin(), snapshot_files.rend());
-  for (auto &snapshot_file : snapshot_files) {
-    if (required_snapshot_tx_id) {
-      auto snapshot_file_tx_id =
-          TransactionIdFromSnapshotFilename(snapshot_file);
-      if (!snapshot_file_tx_id ||
-          snapshot_file_tx_id.value() != *required_snapshot_tx_id) {
-        LOG(INFO) << "Skipping snapshot file '" << snapshot_file
-                  << "' because it does not match the required snapshot tx id: "
-                  << *required_snapshot_tx_id;
-        continue;
-      }
-    }
-    LOG(INFO) << "Starting snapshot recovery from: " << snapshot_file;
-    if (!RecoverSnapshot(snapshot_file, db, recovery_data, worker_id)) {
-      db->ReinitializeStorage();
-      recovery_data->Clear();
-      LOG(WARNING) << "Snapshot recovery failed, trying older snapshot...";
-      continue;
-    } else {
-      LOG(INFO) << "Snapshot recovery successful.";
-      break;
-    }
-  }
-
-  // If snapshot recovery is required, and we failed, don't even deal with
-  // the WAL recovery.
-  if (required_snapshot_tx_id &&
-      recovery_data->snapshooter_tx_id != *required_snapshot_tx_id)
-    return {durability::kVersion, recovery_data->snapshooter_tx_id, {}};
-
-  return {durability::kVersion, recovery_data->snapshooter_tx_id,
-          ReadWalRecoverableTransactions(durability_dir / kWalDir, db,
-                                         *recovery_data)};
-}
-
-void RecoverWalAndIndexes(const fs::path &durability_dir, database::GraphDb *db,
-                          RecoveryData *recovery_data,
-                          RecoveryTransactions *transactions) {
-  // Write-ahead-log recovery.
-  // WAL recovery does not have to be complete for the recovery to be
-  // considered successful. For the time being ignore the return value,
-  // consider a better system.
-  RecoverWal(durability_dir / kWalDir, db, recovery_data, transactions);
-
-  // Index recovery.
+void RecoverIndexes(
+    database::GraphDb *db,
+    const std::vector<std::pair<std::string, std::string>> &indexes) {
   auto db_accessor_indices = db->Access();
-  for (const auto &label_prop : recovery_data->indexes) {
+  for (const auto &label_prop : indexes) {
     const database::LabelPropertyIndex::Key key{
         db_accessor_indices->Label(label_prop.first),
         db_accessor_indices->Property(label_prop.second)};
diff --git a/src/durability/recovery.hpp b/src/durability/recovery.hpp
index 663bd4976..8d4c8747a 100644
--- a/src/durability/recovery.hpp
+++ b/src/durability/recovery.hpp
@@ -180,8 +180,12 @@ class RecoveryTransactions {
   virtual void Apply(const database::StateDelta &) = 0;
 };
 
-void RecoverWalAndIndexes(const std::experimental::filesystem::path &dir,
-                          database::GraphDb *db, RecoveryData *recovery_data,
-                          RecoveryTransactions *transactions);
+void RecoverWal(const std::experimental::filesystem::path &durability_dir,
+                    database::GraphDb *db, RecoveryData *recovery_data,
+                    RecoveryTransactions *transactions);
+
+void RecoverIndexes(
+    database::GraphDb *db,
+    const std::vector<std::pair<std::string, std::string>> &indexes);
 
 }  // namespace durability
diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt
index b7e944ab9..89f7d05f9 100644
--- a/tests/unit/CMakeLists.txt
+++ b/tests/unit/CMakeLists.txt
@@ -73,6 +73,9 @@ target_link_libraries(${test_prefix}distributed_dgp_vertex_migrator memgraph_lib
 add_unit_test(distributed_durability.cpp)
 target_link_libraries(${test_prefix}distributed_durability memgraph_lib kvstore_dummy_lib)
 
+add_unit_test(distributed_dynamic_worker.cpp)
+target_link_libraries(${test_prefix}distributed_dynamic_worker memgraph_lib kvstore_dummy_lib)
+
 add_unit_test(distributed_gc.cpp)
 target_link_libraries(${test_prefix}distributed_gc memgraph_lib kvstore_dummy_lib)
 
diff --git a/tests/unit/distributed_common.hpp b/tests/unit/distributed_common.hpp
index c43b0b95d..2b1b2b6a3 100644
--- a/tests/unit/distributed_common.hpp
+++ b/tests/unit/distributed_common.hpp
@@ -156,7 +156,7 @@ class DistributedGraphDbTest : public ::testing::Test {
   // Each test has to specify its own durability suffix to avoid conflicts
   DistributedGraphDbTest() = delete;
 
-  DistributedGraphDbTest(const std::string &dir_suffix)
+  explicit DistributedGraphDbTest(const std::string &dir_suffix)
       : dir_suffix_(dir_suffix) {
     tmp_dir_ =
         fs::temp_directory_path() / ("MG_test_unit_durability_" + dir_suffix_);
diff --git a/tests/unit/distributed_dynamic_worker.cpp b/tests/unit/distributed_dynamic_worker.cpp
new file mode 100644
index 000000000..f0e23faea
--- /dev/null
+++ b/tests/unit/distributed_dynamic_worker.cpp
@@ -0,0 +1,189 @@
+#include <memory>
+#include <thread>
+
+#include "gtest/gtest.h"
+
+#include "database/distributed_graph_db.hpp"
+#include "database/graph_db.hpp"
+#include "distributed_common.hpp"
+#include "io/network/endpoint.hpp"
+#include "query_plan_common.hpp"
+
+namespace fs = std::experimental::filesystem;
+using namespace std::literals::chrono_literals;
+
+class DistributedDynamicWorker : public ::testing::Test {
+ public:
+  const std::string kLocal = "127.0.0.1";
+
+  std::unique_ptr<database::Master> CreateMaster(
+      std::function<database::Config(database::Config config)> modify_config) {
+    database::Config master_config;
+    master_config.master_endpoint = {kLocal, 0};
+    master_config.durability_directory = GetDurabilityDirectory(0);
+    // Flag needs to be updated due to props on disk storage.
+    FLAGS_durability_directory = GetDurabilityDirectory(0);
+    auto master =
+        std::make_unique<database::Master>(modify_config(master_config));
+    std::this_thread::sleep_for(200ms);
+    return master;
+  }
+
+  std::unique_ptr<WorkerInThread> CreateWorker(
+      io::network::Endpoint master_endpoint, int worker_id,
+      std::function<database::Config(database::Config config)> modify_config) {
+    database::Config config;
+    config.durability_directory = GetDurabilityDirectory(worker_id);
+    // Flag needs to be updated due to props on disk storage.
+    FLAGS_durability_directory = GetDurabilityDirectory(worker_id);
+
+    config.worker_id = worker_id;
+    config.master_endpoint = master_endpoint;
+    config.worker_endpoint = {kLocal, 0};
+
+    auto worker = std::make_unique<WorkerInThread>(modify_config(config));
+    std::this_thread::sleep_for(200ms);
+    return worker;
+  }
+
+  fs::path GetDurabilityDirectory(int worker_id) {
+    if (worker_id == 0) return tmp_dir_ / "master";
+    return tmp_dir_ / fmt::format("worker{}", worker_id);
+  }
+
+  void CleanDurability() {
+    if (fs::exists(tmp_dir_)) fs::remove_all(tmp_dir_);
+  }
+
+ private:
+  fs::path tmp_dir_ =
+      fs::temp_directory_path() / ("MG_test_unit_distributed_worker_addition");
+};
+
+TEST_F(DistributedDynamicWorker, IndexExistsOnNewWorker) {
+  auto modify_config = [](database::Config config) { return config; };
+  auto master = CreateMaster(modify_config);
+
+  // Lets insert some data and build label property index
+  {
+    auto dba = master->Access();
+    storage::Label label;
+    storage::Property property;
+    label = dba->Label("label");
+    property = dba->Property("property");
+
+    for (int i = 0; i < 100; ++i) {
+      auto vertex = dba->InsertVertex();
+      vertex.add_label(label);
+      vertex.PropsSet(property, 1);
+    }
+    dba->Commit();
+  }
+
+  {
+    auto dba = master->Access();
+    storage::Label label;
+    storage::Property property;
+    label = dba->Label("label");
+    property = dba->Property("property");
+
+    dba->BuildIndex(label, property);
+    EXPECT_TRUE(dba->LabelPropertyIndexExists(label, property));
+    EXPECT_EQ(CountIterable(dba->Vertices(label, property, false)), 100);
+  }
+
+  auto worker1 = CreateWorker(master->endpoint(), 1, modify_config);
+
+  // Check that the new worker has that index
+  {
+    auto dba = worker1->db()->Access();
+    storage::Label label;
+    storage::Property property;
+    label = dba->Label("label");
+    property = dba->Property("property");
+
+    EXPECT_TRUE(dba->LabelPropertyIndexExists(label, property));
+  }
+
+  auto t = std::thread([&]() { master = nullptr; });
+  worker1 = nullptr;
+  if (t.joinable()) t.join();
+}
+
+TEST_F(DistributedDynamicWorker, IndexExistsOnNewWorkerAfterRecovery) {
+  auto modify_config = [](database::Config config) { return config; };
+  auto durability_config = [](database::Config config) {
+    config.durability_enabled = true;
+    config.snapshot_on_exit = true;
+    return config;
+  };
+  auto recovery_config = [](database::Config config) {
+    config.recovering_cluster_size = 1;
+    config.db_recover_on_startup = true;
+    return config;
+  };
+
+  {
+    auto master = CreateMaster(durability_config);
+
+    // Lets insert some data and build label property index
+    {
+      auto dba = master->Access();
+      storage::Label label;
+      storage::Property property;
+      label = dba->Label("label");
+      property = dba->Property("property");
+
+      for (int i = 0; i < 100; ++i) {
+        auto vertex = dba->InsertVertex();
+        vertex.add_label(label);
+        vertex.PropsSet(property, 1);
+      }
+      dba->Commit();
+    }
+
+    {
+      auto dba = master->Access();
+      storage::Label label;
+      storage::Property property;
+      label = dba->Label("label");
+      property = dba->Property("property");
+
+      dba->BuildIndex(label, property);
+      EXPECT_TRUE(dba->LabelPropertyIndexExists(label, property));
+    }
+    master = nullptr;
+  }
+
+  {
+    auto master = CreateMaster(recovery_config);
+
+    // Make sure the index is recovered on master.
+    {
+      auto dba = master->Access();
+      storage::Label label;
+      storage::Property property;
+      label = dba->Label("label");
+      property = dba->Property("property");
+
+      EXPECT_TRUE(dba->LabelPropertyIndexExists(label, property));
+    }
+
+    auto worker1 = CreateWorker(master->endpoint(), 1, modify_config);
+
+    // Check that the new worker has that index.
+    {
+      auto dba = worker1->db()->Access();
+      storage::Label label;
+      storage::Property property;
+      label = dba->Label("label");
+      property = dba->Property("property");
+
+      EXPECT_TRUE(dba->LabelPropertyIndexExists(label, property));
+    }
+
+    auto t = std::thread([&]() { master = nullptr; });
+    worker1 = nullptr;
+    if (t.joinable()) t.join();
+  }
+}
diff --git a/tools/tests/mg_recovery_check.cpp b/tools/tests/mg_recovery_check.cpp
index 7533a664c..b87276dad 100644
--- a/tools/tests/mg_recovery_check.cpp
+++ b/tools/tests/mg_recovery_check.cpp
@@ -25,8 +25,9 @@ class RecoveryTest : public ::testing::Test {
     durability::RecoverOnlySnapshot(durability_dir, &db_, &recovery_data,
                                     std::experimental::nullopt, 0);
     database::SingleNodeRecoveryTransanctions recovery_transactions(&db_);
-    durability::RecoverWalAndIndexes(durability_dir, &db_, &recovery_data,
-                                     &recovery_transactions);
+    durability::RecoverWal(durability_dir, &db_, &recovery_data,
+                               &recovery_transactions);
+    durability::RecoverIndexes(&db_, recovery_data.indexes);
   }
 
   database::SingleNode db_;