From 90519f0bebf6ed0359b1c80a42422a7577ced626 Mon Sep 17 00:00:00 2001
From: Dominik Gleich <dominik.gleich@memgraph.io>
Date: Wed, 11 Jul 2018 14:34:19 +0200
Subject: [PATCH] Fix distributed index creation

Summary:
During the creation of indexes there could be a case in which a vertex contains a label/property but is not a part of index after
index building completes.
This happens if vertices are being inserted while the index is being built.

Reviewers: buda, msantl

Reviewed By: msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1484
---
 src/database/graph_db_accessor.cpp     | 32 +++++++++++++++++----
 src/distributed/index_rpc_messages.lcp |  8 +++++-
 src/distributed/index_rpc_server.cpp   | 28 ++++++++----------
 src/distributed/rpc_worker_clients.hpp | 20 ++++++++++---
 tests/manual/distributed_common.hpp    |  3 +-
 tests/unit/distributed_graph_db.cpp    | 39 ++++++++++++++++++++++++++
 6 files changed, 102 insertions(+), 28 deletions(-)

diff --git a/src/database/graph_db_accessor.cpp b/src/database/graph_db_accessor.cpp
index fd75c220d..640a29bc6 100644
--- a/src/database/graph_db_accessor.cpp
+++ b/src/database/graph_db_accessor.cpp
@@ -154,6 +154,28 @@ void GraphDbAccessor::BuildIndex(storage::Label label,
         "exists.");
   }
 
+  std::experimental::optional<std::vector<utils::Future<bool>>>
+      index_rpc_completions;
+
+  // Notify all workers to create the index
+  if (db_.type() == GraphDb::Type::DISTRIBUTED_MASTER) {
+    index_rpc_completions.emplace(db_.index_rpc_clients().GetCreateIndexFutures(
+        label, property, this->db_.WorkerId()));
+  }
+
+  if (index_rpc_completions) {
+    // Wait first, check later - so that every thread finishes and none
+    // terminates - this can probably be optimized in case we fail early so that
+    // we notify other workers to stop building indexes
+    for (auto &index_built : *index_rpc_completions) index_built.wait();
+    for (auto &index_built : *index_rpc_completions) {
+      if (!index_built.get()) {
+        db_.storage().label_property_index_.DeleteIndex(key);
+        throw IndexCreationOnWorkerException("Index exists on a worker");
+      }
+    }
+  }
+
   // Everything that happens after the line above ended will be added to the
   // index automatically, but we still have to add to index everything that
   // happened earlier. We have to first wait for every transaction that
@@ -179,14 +201,12 @@ void GraphDbAccessor::BuildIndex(storage::Label label,
   // CreateIndex.
   GraphDbAccessor dba(db_);
 
-  std::experimental::optional<std::vector<utils::Future<bool>>>
-      index_rpc_completions;
-
-  // Notify all workers to start building an index if we are the master since
+  // Notify all workers to start populating an index if we are the master since
   // they don't have to wait anymore
   if (db_.type() == GraphDb::Type::DISTRIBUTED_MASTER) {
-    index_rpc_completions.emplace(db_.index_rpc_clients().GetBuildIndexFutures(
-        label, property, transaction_id(), this->db_.WorkerId()));
+    index_rpc_completions.emplace(
+        db_.index_rpc_clients().GetPopulateIndexFutures(
+            label, property, dba.transaction_id(), this->db_.WorkerId()));
   }
 
   // Add transaction to the build_tx_in_progress as this transaction doesn't
diff --git a/src/distributed/index_rpc_messages.lcp b/src/distributed/index_rpc_messages.lcp
index d1573b53a..817498cd4 100644
--- a/src/distributed/index_rpc_messages.lcp
+++ b/src/distributed/index_rpc_messages.lcp
@@ -15,11 +15,17 @@ cpp<#
 
 (lcp:capnp-import 'storage "/storage/serialization.capnp")
 
-(lcp:define-rpc build-index
+(lcp:define-rpc populate-index
     (:request
      ((label "storage::Label" :capnp-type "Storage.Common")
       (property "storage::Property" :capnp-type "Storage.Common")
       (tx-id "tx::TransactionId" :capnp-type "UInt64")))
   (:response ()))
 
+(lcp:define-rpc create-index
+    (:request
+     ((label "storage::Label" :capnp-type "Storage.Common")
+      (property "storage::Property" :capnp-type "Storage.Common")))
+  (:response ()))
+
 (lcp:pop-namespace) ;; distributed
diff --git a/src/distributed/index_rpc_server.cpp b/src/distributed/index_rpc_server.cpp
index a88b0595d..ea47b6373 100644
--- a/src/distributed/index_rpc_server.cpp
+++ b/src/distributed/index_rpc_server.cpp
@@ -7,26 +7,22 @@ namespace distributed {
 IndexRpcServer::IndexRpcServer(database::GraphDb &db,
                                communication::rpc::Server &server)
     : db_(db), rpc_server_(server) {
-  rpc_server_.Register<BuildIndexRpc>(
+  rpc_server_.Register<CreateIndexRpc>(
       [this](const auto &req_reader, auto *res_builder) {
-        BuildIndexReq req;
+        CreateIndexReq req;
+        req.Load(req_reader);
+        database::LabelPropertyIndex::Key key{req.label, req.property};
+        db_.storage().label_property_index_.CreateIndex(key);
+      });
+
+  rpc_server_.Register<PopulateIndexRpc>(
+      [this](const auto &req_reader, auto *res_builder) {
+        PopulateIndexReq req;
         req.Load(req_reader);
         database::LabelPropertyIndex::Key key{req.label, req.property};
         database::GraphDbAccessor dba(db_, req.tx_id);
-
-        if (db_.storage().label_property_index_.CreateIndex(key) == false) {
-          // If we are a distributed worker we just have to wait till the index
-          // (which should be in progress of being created) is created so that
-          // our return guarantess that the index has been built - this assumes
-          // that no worker thread that is creating an index will fail
-          while (!dba.LabelPropertyIndexExists(key.label_, key.property_)) {
-            // TODO reconsider this constant, currently rule-of-thumb chosen
-            std::this_thread::sleep_for(std::chrono::microseconds(100));
-          }
-        } else {
-          dba.PopulateIndex(key);
-          dba.EnableIndex(key);
-        }
+        dba.PopulateIndex(key);
+        dba.EnableIndex(key);
       });
 }
 
diff --git a/src/distributed/rpc_worker_clients.hpp b/src/distributed/rpc_worker_clients.hpp
index a55b14ca6..52ee3c8ee 100644
--- a/src/distributed/rpc_worker_clients.hpp
+++ b/src/distributed/rpc_worker_clients.hpp
@@ -84,15 +84,27 @@ class IndexRpcClients {
  public:
   explicit IndexRpcClients(RpcWorkerClients &clients) : clients_(clients) {}
 
-  auto GetBuildIndexFutures(const storage::Label &label,
-                            const storage::Property &property,
-                            tx::TransactionId transaction_id, int worker_id) {
+  auto GetPopulateIndexFutures(const storage::Label &label,
+                               const storage::Property &property,
+                               tx::TransactionId transaction_id,
+                               int worker_id) {
     return clients_.ExecuteOnWorkers<bool>(
         worker_id,
         [label, property, transaction_id](
             int worker_id, communication::rpc::ClientPool &client_pool) {
+          return static_cast<bool>(client_pool.Call<PopulateIndexRpc>(
+              label, property, transaction_id));
+        });
+  }
+
+  auto GetCreateIndexFutures(const storage::Label &label,
+                             const storage::Property &property, int worker_id) {
+    return clients_.ExecuteOnWorkers<bool>(
+        worker_id,
+        [label, property](int worker_id,
+                          communication::rpc::ClientPool &client_pool) {
           return static_cast<bool>(
-              client_pool.Call<BuildIndexRpc>(label, property, transaction_id));
+              client_pool.Call<CreateIndexRpc>(label, property));
         });
   }
 
diff --git a/tests/manual/distributed_common.hpp b/tests/manual/distributed_common.hpp
index 676d90d90..6db64e6cb 100644
--- a/tests/manual/distributed_common.hpp
+++ b/tests/manual/distributed_common.hpp
@@ -52,8 +52,9 @@ class Cluster {
 
   void Stop() {
     interpreter_ = nullptr;
-    master_ = nullptr;
+    auto t = std::thread([this]() { master_ = nullptr; });
     workers_.clear();
+    if (t.joinable()) t.join();
   }
 
   ~Cluster() {
diff --git a/tests/unit/distributed_graph_db.cpp b/tests/unit/distributed_graph_db.cpp
index cbd102671..fe25d725f 100644
--- a/tests/unit/distributed_graph_db.cpp
+++ b/tests/unit/distributed_graph_db.cpp
@@ -174,6 +174,45 @@ TEST_F(DistributedGraphDb, BuildIndexDistributed) {
   }
 }
 
+TEST_F(DistributedGraphDb, BuildIndexConcurrentInsert) {
+  storage::Label label;
+  storage::Property property;
+
+  GraphDbAccessor dba0{master()};
+  label = dba0.Label("label");
+  property = dba0.Property("property");
+
+  int cnt = 0;
+  auto add_vertex = [label, property, &cnt](GraphDbAccessor &dba) {
+    auto vertex = dba.InsertVertex();
+    vertex.add_label(label);
+    vertex.PropsSet(property, ++cnt);
+  };
+  dba0.Commit();
+
+  auto worker_insert = std::thread([this, &add_vertex]() {
+    for (int i = 0; i < 10000; ++i) {
+      GraphDbAccessor dba1{worker(1)};
+      add_vertex(dba1);
+      dba1.Commit();
+    }
+  });
+
+  std::this_thread::sleep_for(0.5s);
+  {
+    GraphDbAccessor dba{master()};
+    dba.BuildIndex(label, property);
+    EXPECT_TRUE(dba.LabelPropertyIndexExists(label, property));
+  }
+
+  worker_insert.join();
+  {
+    GraphDbAccessor dba{worker(1)};
+    EXPECT_TRUE(dba.LabelPropertyIndexExists(label, property));
+    EXPECT_EQ(CountIterable(dba.Vertices(label, property, false)), 10000);
+  }
+}
+
 TEST_F(DistributedGraphDb, WorkerOwnedDbAccessors) {
   GraphDbAccessor dba_w1(worker(1));
   auto v = dba_w1.InsertVertex();