From 8a3f3b6c88cfef9be14de6bcdd15980e476fd0e8 Mon Sep 17 00:00:00 2001
From: Marko Budiselic <mbudiselicbuda@gmail.com>
Date: Fri, 7 Sep 2018 18:45:09 +0100
Subject: [PATCH] Add dgp integration test

Reviewers: msantl, mferencevic

Reviewed By: msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1586
---
 docs/dev/diagram/dgp/logical.dot              |   8 +-
 src/CMakeLists.txt                            |   4 +-
 src/database/config.cpp                       |   3 +
 src/database/distributed_graph_db.cpp         |   2 +-
 .../dgp/partitioner.cpp}                      |  47 +++-
 src/distributed/dgp/partitioner.hpp           |  89 +++++++
 .../dgp}/vertex_migrator.cpp                  |   5 +-
 .../dgp}/vertex_migrator.hpp                  |   6 +
 src/distributed/pull_produce_rpc_messages.lcp |   2 +-
 src/distributed/token_sharing_rpc_server.hpp  |  41 ++-
 src/storage/dynamic_graph_partitioner/dgp.hpp |  54 ----
 tests/integration/dgp/.gitignore              |   1 +
 tests/integration/dgp/run.py                  | 222 +++++++++++++++
 tests/unit/CMakeLists.txt                     |   7 +
 tests/unit/distributed_dgp_partitioner.cpp    | 252 ++++++++++++++++++
 .../unit/distributed_dgp_vertex_migrator.cpp  |   6 +-
 tests/unit/distributed_token_sharing.cpp      |  33 ---
 17 files changed, 657 insertions(+), 125 deletions(-)
 rename src/{storage/dynamic_graph_partitioner/dgp.cpp => distributed/dgp/partitioner.cpp} (75%)
 create mode 100644 src/distributed/dgp/partitioner.hpp
 rename src/{storage/dynamic_graph_partitioner => distributed/dgp}/vertex_migrator.cpp (94%)
 rename src/{storage/dynamic_graph_partitioner => distributed/dgp}/vertex_migrator.hpp (92%)
 delete mode 100644 src/storage/dynamic_graph_partitioner/dgp.hpp
 create mode 100644 tests/integration/dgp/.gitignore
 create mode 100755 tests/integration/dgp/run.py
 create mode 100644 tests/unit/distributed_dgp_partitioner.cpp
 delete mode 100644 tests/unit/distributed_token_sharing.cpp

diff --git a/docs/dev/diagram/dgp/logical.dot b/docs/dev/diagram/dgp/logical.dot
index 79388f968..76c5d941c 100644
--- a/docs/dev/diagram/dgp/logical.dot
+++ b/docs/dev/diagram/dgp/logical.dot
@@ -13,10 +13,10 @@ digraph {
   "distributed::TokenSharingRpcServer" -> "communication::rpc::Server";
   "distributed::TokenSharingRpcServer" -> "distributed::Coordination";
   "distributed::TokenSharingRpcServer" -> "distributed::TokenSharingRpcClients";
-  "distributed::TokenSharingRpcServer" -> "storage::dgp::Partitioner";
+  "distributed::TokenSharingRpcServer" -> "distributed::dgp::Partitioner";
 
-  "storage::dgp::Partitioner" -> "distributed::DistributedGraphDb" [style=dashed];
+  "distributed::dgp::Partitioner" -> "distributed::DistributedGraphDb" [style=dashed];
 
-  "storage::dgp::Partitioner" -> "storage::dgp::VertexMigrator";
-  "storage::dgp::VertexMigrator" -> "database::GraphDbAccessor" [style=dashed];
+  "distributed::dgp::Partitioner" -> "distributed::dgp::VertexMigrator";
+  "distributed::dgp::VertexMigrator" -> "database::GraphDbAccessor" [style=dashed];
 }
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index f5bba8848..b9e4300a4 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -29,6 +29,8 @@ set(memgraph_src_files
     distributed/data_manager.cpp
     distributed/data_rpc_clients.cpp
     distributed/data_rpc_server.cpp
+    distributed/dgp/partitioner.cpp
+    distributed/dgp/vertex_migrator.cpp
     distributed/durability_rpc_master.cpp
     distributed/durability_rpc_worker.cpp
     distributed/index_rpc_server.cpp
@@ -65,8 +67,6 @@ set(memgraph_src_files
     query/typed_value.cpp
     storage/concurrent_id_mapper_master.cpp
     storage/concurrent_id_mapper_worker.cpp
-    storage/dynamic_graph_partitioner/dgp.cpp
-    storage/dynamic_graph_partitioner/vertex_migrator.cpp
     storage/edge_accessor.cpp
     storage/locking/record_lock.cpp
     storage/property_value.cpp
diff --git a/src/database/config.cpp b/src/database/config.cpp
index b5a982c7d..d179ef26b 100644
--- a/src/database/config.cpp
+++ b/src/database/config.cpp
@@ -68,6 +68,9 @@ DEFINE_VALIDATED_int32(recovering_cluster_size, 0,
                        "Number of workers (including master) in the "
                        "previously snapshooted/wal cluster.",
                        FLAG_IN_RANGE(0, INT32_MAX));
+// TODO (buda): Implement openCypher query because it completely make sense
+//              to being able to start and stop DGP on the fly.
+//              The implementation should be straightforward.
 DEFINE_bool(dynamic_graph_partitioner_enabled, false,
             "If the dynamic graph partitioner should be enabled.");
 #endif
diff --git a/src/database/distributed_graph_db.cpp b/src/database/distributed_graph_db.cpp
index 299a4916a..3827aa087 100644
--- a/src/database/distributed_graph_db.cpp
+++ b/src/database/distributed_graph_db.cpp
@@ -687,7 +687,7 @@ Master::Master(Config config)
 
   // Start the dynamic graph partitioner inside token sharing server
   if (impl_->config_.dynamic_graph_partitioner_enabled) {
-    impl_->token_sharing_server_.StartTokenSharing();
+    impl_->token_sharing_server_.Start();
   }
 
   if (impl_->config_.durability_enabled) {
diff --git a/src/storage/dynamic_graph_partitioner/dgp.cpp b/src/distributed/dgp/partitioner.cpp
similarity index 75%
rename from src/storage/dynamic_graph_partitioner/dgp.cpp
rename to src/distributed/dgp/partitioner.cpp
index 479080215..748f78b20 100644
--- a/src/storage/dynamic_graph_partitioner/dgp.cpp
+++ b/src/distributed/dgp/partitioner.cpp
@@ -1,4 +1,4 @@
-#include "storage/dynamic_graph_partitioner/dgp.hpp"
+#include "distributed/dgp/partitioner.hpp"
 
 #include <algorithm>
 #include <unordered_map>
@@ -8,10 +8,11 @@
 #include "database/graph_db_accessor.hpp"
 #include "distributed/updates_rpc_clients.hpp"
 #include "query/exceptions.hpp"
-#include "storage/dynamic_graph_partitioner/vertex_migrator.hpp"
+#include "distributed/dgp/vertex_migrator.hpp"
 #include "utils/flag_validation.hpp"
 #include "utils/thread/sync.hpp"
 
+// TODO (buda): Implement openCypher commands to control these parameters.
 DEFINE_VALIDATED_int32(
     dgp_improvement_threshold, 10,
     "How much better should specific node score be to consider "
@@ -19,25 +20,28 @@ DEFINE_VALIDATED_int32(
     "between new score that the vertex will have when migrated and the old one "
     "such that it's migrated.",
     FLAG_IN_RANGE(1, 100));
+// TODO (buda): The default here should be int_max because that will allow us to
+// partition large dataset faster. It should be used for our tests where we can
+// run the partitioning up front.
 DEFINE_VALIDATED_int32(dgp_max_batch_size, 2000,
                        "Maximal amount of vertices which should be migrated in "
                        "one dynamic graph partitioner step.",
                        FLAG_IN_RANGE(1, std::numeric_limits<int32_t>::max()));
 
-DynamicGraphPartitioner::DynamicGraphPartitioner(
-    database::DistributedGraphDb *db)
-    : db_(db) {}
+namespace distributed::dgp {
 
-void DynamicGraphPartitioner::Run() {
+Partitioner::Partitioner(database::DistributedGraphDb *db) : db_(db) {}
+
+std::pair<double, bool> Partitioner::Partition() {
   auto dba = db_->Access();
   VLOG(21) << "Starting DynamicGraphPartitioner in tx: "
            << dba->transaction().id_;
 
-  auto migrations = FindMigrations(*dba);
+  auto data = FindMigrations(*dba);
 
   try {
     VertexMigrator migrator(dba.get());
-    for (auto &migration : migrations) {
+    for (auto &migration : data.migrations) {
       migrator.MigrateVertex(migration.first, migration.second);
     }
 
@@ -63,19 +67,25 @@ void DynamicGraphPartitioner::Run() {
     }
 
     dba->Commit();
-    VLOG(21) << "Sucesfully migrated " << migrations.size() << " vertices..";
+    VLOG(21) << "Sucesfully migrated " << data.migrations.size()
+             << " vertices..";
+    return std::make_pair(data.score, true);
   } catch (const utils::BasicException &e) {
     VLOG(21) << "Didn't succeed in relocating; " << e.what();
     dba->Abort();
+    // Returning VertexAccessors after Abort might not be a good idea. + The
+    // returned migrations are entirely useless because the engine didn't
+    // succeed to migrate anything.
+    return std::make_pair(data.score, false);
   }
 }
 
-std::vector<std::pair<VertexAccessor, int>>
-DynamicGraphPartitioner::FindMigrations(database::GraphDbAccessor &dba) {
+MigrationsData Partitioner::FindMigrations(database::GraphDbAccessor &dba) {
   // Find workers vertex count
   std::unordered_map<int, int64_t> worker_vertex_count =
       db_->data_clients().VertexCounts(dba.transaction().id_);
 
+  // TODO (buda): Add total edge count as an option.
   int64_t total_vertex_count = 0;
   for (auto worker_vertex_count_pair : worker_vertex_count) {
     total_vertex_count += worker_vertex_count_pair.second;
@@ -83,6 +93,9 @@ DynamicGraphPartitioner::FindMigrations(database::GraphDbAccessor &dba) {
 
   double average_vertex_count =
       total_vertex_count * 1.0 / worker_vertex_count.size();
+  if (average_vertex_count == 0) return MigrationsData(0);
+
+  double local_graph_score = 0;
 
   // Considers all migrations which maximally improve single vertex score
   std::vector<std::pair<VertexAccessor, int>> migrations;
@@ -90,7 +103,7 @@ DynamicGraphPartitioner::FindMigrations(database::GraphDbAccessor &dba) {
     auto label_counts = CountLabels(vertex);
     std::unordered_map<int, double> per_label_score;
     size_t degree = vertex.in_degree() + vertex.out_degree();
-
+    if (degree == 0) continue;
     for (auto worker_vertex_count_pair : worker_vertex_count) {
       int worker = worker_vertex_count_pair.first;
       int64_t worker_vertex_count = worker_vertex_count_pair.second;
@@ -103,9 +116,12 @@ DynamicGraphPartitioner::FindMigrations(database::GraphDbAccessor &dba) {
                         const std::pair<int, double> &p2) {
       return p1.second < p2.second;
     };
+
     auto best_label = std::max_element(per_label_score.begin(),
                                        per_label_score.end(), label_cmp);
 
+    local_graph_score += best_label->second;
+
     // Consider as a migration only if the improvement is high enough
     if (best_label != per_label_score.end() &&
         best_label->first != db_->WorkerId() &&
@@ -118,10 +134,12 @@ DynamicGraphPartitioner::FindMigrations(database::GraphDbAccessor &dba) {
     if (migrations.size() >= FLAGS_dgp_max_batch_size) break;
   }
 
-  return migrations;
+  DLOG(INFO) << "Local graph score: " << local_graph_score;
+
+  return MigrationsData(local_graph_score, std::move(migrations));
 }
 
-std::unordered_map<int, int64_t> DynamicGraphPartitioner::CountLabels(
+std::unordered_map<int, int64_t> Partitioner::CountLabels(
     const VertexAccessor &vertex) const {
   std::unordered_map<int, int64_t> label_count;
   for (auto edge : vertex.in()) {
@@ -136,3 +154,4 @@ std::unordered_map<int, int64_t> DynamicGraphPartitioner::CountLabels(
   }
   return label_count;
 }
+}  // namespace distributed::dgp
diff --git a/src/distributed/dgp/partitioner.hpp b/src/distributed/dgp/partitioner.hpp
new file mode 100644
index 000000000..4381fcb82
--- /dev/null
+++ b/src/distributed/dgp/partitioner.hpp
@@ -0,0 +1,89 @@
+/// @file
+
+#pragma once
+
+#include <thread>
+
+#include "distributed/data_rpc_clients.hpp"
+#include "distributed/token_sharing_rpc_messages.hpp"
+#include "distributed/dgp/vertex_migrator.hpp"
+#include "storage/vertex_accessor.hpp"
+
+namespace database {
+class DistributedGraphDb;
+class GraphDbAccessor;
+};  // namespace database
+
+namespace distributed::dgp {
+
+/// Contains a set of vertices and where they should be migrated
+/// (machine/instance id) + score how good the partitioning is.
+struct MigrationsData {
+ private:
+  using Migrations = std::vector<std::pair<VertexAccessor, int>>;
+
+ public:
+  MigrationsData(double score, Migrations migrations = Migrations())
+      : score(std::move(score)), migrations(std::move(migrations)) {}
+
+  /// Disable copying because the number of migrations could be huge. The
+  /// expected number is 1k, but a user can configure the database in a way
+  /// where the number of migrations could be much higher.
+  MigrationsData(const MigrationsData &other) = delete;
+  MigrationsData &operator=(const MigrationsData &other) = delete;
+
+  MigrationsData(MigrationsData &&other) = default;
+  MigrationsData &operator=(MigrationsData &&other) = default;
+
+  double score;
+  Migrations migrations;
+};
+
+/// Handles dynamic graph partitions, migrates vertices from one worker to
+/// another based on available scoring which takes into account neighbours of a
+/// vertex and tries to put it where most of its neighbours are located. Also
+/// takes into account the number of vertices on the destination and source
+/// machine.
+class Partitioner {
+ public:
+  /// The partitioner needs GraphDb because each partition step is a new
+  /// database transactions (database accessor has to be created).
+  /// TODO (buda): Consider passing GraphDbAccessor directly.
+  explicit Partitioner(database::DistributedGraphDb *db);
+
+  Partitioner(const Partitioner &other) = delete;
+  Partitioner(Partitioner &&other) = delete;
+  Partitioner &operator=(const Partitioner &other) = delete;
+  Partitioner &operator=(Partitioner &&other) = delete;
+
+  /// Runs one dynamic graph partitioning cycle (step). In case of any error,
+  /// the transaction will be aborted.
+  ///
+  /// @return Calculated partitioning score and were the migrations successful.
+  std::pair<double, bool> Partition();
+
+  /// Returns a vector of pairs of `vertex` and `destination` of where should
+  /// some vertex be relocated from the view of `dba` accessor.
+  //
+  /// Each vertex is located on some worker (which in context of migrations we
+  /// call a vertex label). Each vertex has it's score for each different label
+  /// (worker_id) evaluated. This score is calculated by considering
+  /// neighbouring vertices labels. Simply put, each vertex is attracted to be
+  /// located on the same worker as it's neighbouring vertices. Migrations which
+  /// improve that scoring, which also takes into account saturation of other
+  /// workers on which it's considering to migrate this vertex, are determined.
+  MigrationsData FindMigrations(database::GraphDbAccessor &dba);
+
+  /// Counts number of each label (worker_id) on endpoints of edges (in/out) of
+  /// `vertex`.
+  ///
+  /// @return A map consisting of (label/machine/instance id, count) key-value
+  ///         pairs.
+  std::unordered_map<int, int64_t> CountLabels(
+      const VertexAccessor &vertex) const;
+
+ private:
+  database::DistributedGraphDb *db_{nullptr};
+};
+
+}  // namespace distributed::dgp
diff --git a/src/storage/dynamic_graph_partitioner/vertex_migrator.cpp b/src/distributed/dgp/vertex_migrator.cpp
similarity index 94%
rename from src/storage/dynamic_graph_partitioner/vertex_migrator.cpp
rename to src/distributed/dgp/vertex_migrator.cpp
index 09c7e26ec..4bfdeaf50 100644
--- a/src/storage/dynamic_graph_partitioner/vertex_migrator.cpp
+++ b/src/distributed/dgp/vertex_migrator.cpp
@@ -1,9 +1,11 @@
-#include "storage/dynamic_graph_partitioner/vertex_migrator.hpp"
+#include "distributed/dgp/vertex_migrator.hpp"
 
 #include "database/distributed_graph_db.hpp"
 #include "database/graph_db_accessor.hpp"
 #include "query/typed_value.hpp"
 
+namespace distributed::dgp {
+
 VertexMigrator::VertexMigrator(database::GraphDbAccessor *dba) : dba_(dba) {}
 
 void VertexMigrator::MigrateVertex(VertexAccessor &vertex, int destination) {
@@ -57,3 +59,4 @@ void VertexMigrator::MigrateVertex(VertexAccessor &vertex, int destination) {
 
   dba_->DetachRemoveVertex(vertex);
 }
+}  // namespace distributed::dgp
diff --git a/src/storage/dynamic_graph_partitioner/vertex_migrator.hpp b/src/distributed/dgp/vertex_migrator.hpp
similarity index 92%
rename from src/storage/dynamic_graph_partitioner/vertex_migrator.hpp
rename to src/distributed/dgp/vertex_migrator.hpp
index 19f746ce7..664a1af32 100644
--- a/src/storage/dynamic_graph_partitioner/vertex_migrator.hpp
+++ b/src/distributed/dgp/vertex_migrator.hpp
@@ -1,3 +1,5 @@
+/// @file
+
 #pragma once
 
 #include <thread>
@@ -10,6 +12,8 @@ namespace database {
 class GraphDbAccessor;
 };  // namespace database
 
+namespace distributed::dgp {
+
 /// Migrates vertices from one worker to another (updates edges as well).
 class VertexMigrator {
  public:
@@ -29,3 +33,5 @@ class VertexMigrator {
   database::GraphDbAccessor *dba_;
   std::unordered_map<gid::Gid, storage::VertexAddress> vertex_migrated_to_;
 };
+
+}  // namespace distributed::dgp
diff --git a/src/distributed/pull_produce_rpc_messages.lcp b/src/distributed/pull_produce_rpc_messages.lcp
index 35047b3f6..21cd1aa76 100644
--- a/src/distributed/pull_produce_rpc_messages.lcp
+++ b/src/distributed/pull_produce_rpc_messages.lcp
@@ -267,7 +267,7 @@ void PullResData::LoadGraphElement(
             ? distributed::LoadVertex(vertex_reader.getNew())
             : nullptr;
     data_manager->Emplace<Vertex>(
-        dba->transaction_id(), global_address.gid(), 
+        dba->transaction_id(), global_address.gid(),
         distributed::CachedRecordData<Vertex>(cypher_id,
                                               std::move(old_record),
                                               std::move(new_record)));
diff --git a/src/distributed/token_sharing_rpc_server.hpp b/src/distributed/token_sharing_rpc_server.hpp
index 21aafd208..2085b7e2d 100644
--- a/src/distributed/token_sharing_rpc_server.hpp
+++ b/src/distributed/token_sharing_rpc_server.hpp
@@ -1,7 +1,9 @@
+/// @file
+
 #pragma once
 
 #include "distributed/rpc_worker_clients.hpp"
-#include "storage/dynamic_graph_partitioner/dgp.hpp"
+#include "distributed/dgp/partitioner.hpp"
 
 namespace communication::rpc {
 class Server;
@@ -13,6 +15,12 @@ class DistributedGraphDb;
 
 namespace distributed {
 
+// TODO (buda): dgp_.Run() should be injected. This server shouldn't know
+// anything about the partitioning.
+// TODO (buda): It makes more sense to have centralized server which will assign
+// tokens because error handling would be much easier.
+// TODO (buda): Broken by design.
+
 /// Shares the token between dynamic graph partitioners instances across workers
 /// by passing the token from one worker to another, in a circular fashion. This
 /// guarantees that no two workers will execute the dynamic graph partitioner
@@ -30,10 +38,20 @@ class TokenSharingRpcServer {
         dgp_(db) {
     server_->Register<distributed::TokenTransferRpc>(
         [this](const auto &req_reader, auto *res_builder) { token_ = true; });
-
+    // TODO (buda): It's not trivial to move this part in the Start method
+    // because worker then doesn't run the step. Will resolve that with
+    // a different implementation of the token assignment.
     runner_ = std::thread([this]() {
-      while (true) {
-        // Wait till we get the token
+      while (!shutting_down_) {
+        // If no other instances are connected just wait. It doesn't make sense
+        // to migrate anything because only one machine is available.
+        auto workers = coordination_->GetWorkerIds();
+        if (!(workers.size() > 1)) {
+          std::this_thread::sleep_for(std::chrono::seconds(1));
+          continue;
+        }
+
+        // Wait till we get the token.
         while (!token_) {
           if (shutting_down_) break;
           std::this_thread::sleep_for(std::chrono::seconds(1));
@@ -42,10 +60,9 @@ class TokenSharingRpcServer {
         if (shutting_down_) break;
 
         token_ = false;
-        dgp_.Run();
+        dgp_.Partition();
 
-        // Transfer token to next
-        auto workers = coordination_->GetWorkerIds();
+        // Transfer token to next.
         sort(workers.begin(), workers.end());
 
         int next_worker = -1;
@@ -63,7 +80,7 @@ class TokenSharingRpcServer {
 
   /// Starts the token sharing server which in turn starts the dynamic graph
   /// partitioner.
-  void StartTokenSharing() {
+  void Start() {
     started_ = true;
     token_ = true;
   }
@@ -74,9 +91,9 @@ class TokenSharingRpcServer {
     if (started_ && worker_id_ == 0) {
       // Wait till we get the token back otherwise some worker might try to
       // migrate to another worker while that worker is shutting down or
-      // something else bad might happen
-      // TODO(dgleich): Solve this better in the future since this blocks
-      // shutting down until spinner steps complete
+      // something else bad might happen.
+      // TODO (buda): Solve this better in the future since this blocks
+      // shutting down until spinner steps complete.
       while (!token_) {
         std::this_thread::sleep_for(std::chrono::milliseconds(500));
       }
@@ -94,7 +111,7 @@ class TokenSharingRpcServer {
   std::atomic<bool> shutting_down_{false};
   std::thread runner_;
 
-  DynamicGraphPartitioner dgp_;
+  distributed::dgp::Partitioner dgp_;
 };
 
 }  // namespace distributed
diff --git a/src/storage/dynamic_graph_partitioner/dgp.hpp b/src/storage/dynamic_graph_partitioner/dgp.hpp
deleted file mode 100644
index 7eda46064..000000000
--- a/src/storage/dynamic_graph_partitioner/dgp.hpp
+++ /dev/null
@@ -1,54 +0,0 @@
-#pragma once
-
-#include <thread>
-
-#include "distributed/data_rpc_clients.hpp"
-#include "distributed/token_sharing_rpc_messages.hpp"
-#include "storage/dynamic_graph_partitioner/vertex_migrator.hpp"
-#include "storage/vertex_accessor.hpp"
-
-namespace database {
-class DistributedGraphDb;
-class GraphDbAccessor;
-};  // namespace database
-
-/// Handles dynamic graph partitions, migrates vertices from one worker to
-/// another based on available scoring which takes into account neighbours of a
-/// vertex and tries to put it where most of its neighbours are located. Also
-/// takes into account the number of vertices on the destination and source
-/// machine.
-class DynamicGraphPartitioner {
- public:
-  DynamicGraphPartitioner(const DynamicGraphPartitioner &other) = delete;
-  DynamicGraphPartitioner(DynamicGraphPartitioner &&other) = delete;
-  DynamicGraphPartitioner &operator=(const DynamicGraphPartitioner &other) =
-      delete;
-  DynamicGraphPartitioner &operator=(DynamicGraphPartitioner &&other) = delete;
-
-  explicit DynamicGraphPartitioner(database::DistributedGraphDb *db);
-
-  /// Runs one dynamic graph partitioning cycle (step).
-  void Run();
-
-  /// Returns a vector of pairs of `vertex` and `destination` of where should
-  /// some vertex be relocated from the view of `dba` accessor.
-  //
-  /// Each vertex is located on some worker (which in context of migrations we
-  /// call a vertex label). Each vertex has it's score for each different label
-  /// (worker_id) evaluated. This score is calculated by considering
-  /// neighbouring vertices labels. Simply put, each vertex is attracted to be
-  /// located on the same worker as it's neighbouring vertices. Migrations which
-  /// improve that scoring, which also takes into account saturation of other
-  /// workers on which it's considering to migrate this vertex, are determined.
-  std::vector<std::pair<VertexAccessor, int>> FindMigrations(
-      database::GraphDbAccessor &dba);
-
-  /// Counts number of each label (worker_id) on endpoints of edges (in/out) of
-  /// `vertex`.
-  /// Returns a map consisting of (label, count) key-value pairs.
-  std::unordered_map<int, int64_t> CountLabels(
-      const VertexAccessor &vertex) const;
-
- private:
-  database::DistributedGraphDb *db_{nullptr};
-};
diff --git a/tests/integration/dgp/.gitignore b/tests/integration/dgp/.gitignore
new file mode 100644
index 000000000..ea1472ec1
--- /dev/null
+++ b/tests/integration/dgp/.gitignore
@@ -0,0 +1 @@
+output/
diff --git a/tests/integration/dgp/run.py b/tests/integration/dgp/run.py
new file mode 100755
index 000000000..e9dce265e
--- /dev/null
+++ b/tests/integration/dgp/run.py
@@ -0,0 +1,222 @@
+#!/usr/bin/python3
+
+'''
+Test dynamic graph partitioner on Memgraph cluster
+with randomly generated graph (uniform distribution
+of nodes and edges). The partitioning goal is to
+minimize number of crossing edges while keeping
+the cluster balanced.
+'''
+
+import argparse
+import atexit
+import logging
+import os
+import random
+import sys
+import subprocess
+import time
+
+try:
+    # graphviz==0.9
+    from graphviz import Digraph
+except ImportError:
+    print("graphviz module isn't available. "
+          "Graph won't be generated but the checks will still work.")
+from neo4j.v1 import GraphDatabase, basic_auth
+
+SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
+PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", ".."))
+COMMON_ARGS = ["--durability-enabled=false",
+               "--snapshot-on-exit=false",
+               "--db-recover-on-startup=false"]
+MASTER_ARGS = ["--master",
+               "--master-port", "10000",
+               "--dynamic-graph-partitioner-enabled",
+               "--durability-directory=durability_master"]
+
+log = logging.getLogger(__name__)
+memgraph_processes = []
+
+
+def worker_args(worker_id):
+    args = ["--worker",
+            "--worker-id", str(worker_id),
+            "--worker-port", str(10000 + worker_id),
+            "--master-port", str(10000),
+            "--durability-directory=durability_worker%s" % worker_id]
+    return args
+
+
+def wait_for_server(port, delay=0.01):
+    cmd = ["nc", "-z", "-w", "1", "127.0.0.1", port]
+    while subprocess.call(cmd) != 0:
+        time.sleep(delay)
+    time.sleep(delay)
+
+
+def run_memgraph_process(binary, args):
+    global memgraph_processes
+    process = subprocess.Popen([binary] + args, cwd=os.path.dirname(binary))
+    memgraph_processes.append(process)
+
+
+def run_distributed_memgraph(args):
+    run_memgraph_process(args.memgraph, COMMON_ARGS + MASTER_ARGS)
+    wait_for_server("10000")
+    for i in range(1, int(args.machine_count)):
+        run_memgraph_process(args.memgraph, COMMON_ARGS + worker_args(i))
+    wait_for_server("7687")
+
+
+def terminate():
+    global memgraph_processes
+    for process in memgraph_processes:
+        process.terminate()
+        status = process.wait()
+        if status != 0:
+            raise Exception(
+                "Memgraph binary returned non-zero ({})!".format(status))
+
+
+@atexit.register
+def cleanup():
+    global memgraph_processes
+    for proc in memgraph_processes:
+        if proc.poll() is not None:
+            continue
+        proc.kill()
+        proc.wait()
+
+
+def run_test(args):
+    driver = GraphDatabase.driver(args.endpoint,
+                                  auth=basic_auth(args.username,
+                                                  args.password),
+                                  encrypted=args.encrypted)
+    session = driver.session()
+
+    session.run("CREATE INDEX ON :Node(id)").consume()
+    session.run(
+        "UNWIND range(0, $num - 1) AS n CREATE (:Node {id: n})",
+        num=args.node_count - 1).consume()
+
+    created_edges = 0
+    while created_edges <= args.edge_count:
+        # Retry block is here because DGP is running in background and
+        # serialization errors may occure.
+        try:
+            session.run("MATCH (n:Node {id: $id1}), (m:Node {id:$id2}) "
+                        "CREATE (n)-[:Edge]->(m)",
+                        id1=random.randint(0, args.node_count - 1),
+                        id2=random.randint(0, args.node_count - 1)).consume()
+            created_edges += 1
+        except Exception:
+            pass
+
+    # Check cluster state periodically.
+    crossing_edges_history = []
+    load_history = []
+    duration = 0
+    for iteration in range(args.iteration_count):
+        iteration_start_time = time.time()
+        data = session.run(
+            "MATCH (n)-[r]->(m) "
+            "RETURN "
+            "    id(n) AS v1, id(m) AS v2, "
+            "    workerid(n) AS position_n, workerid(m) AS position_m").data()
+
+        # Visualize cluster state.
+        if args.visualize and 'graphviz' in sys.modules:
+            output_dir = os.path.join(SCRIPT_DIR, "output")
+            if not os.path.exists(output_dir):
+                os.makedirs(output_dir)
+            cluster = Digraph(name="memgraph_cluster", format="png")
+            cluster.attr(splines="false", rank="TB")
+            subgraphs = [Digraph(name="cluster_worker%s" % i)
+                         for i in range(args.machine_count)]
+            for index, subgraph in enumerate(subgraphs):
+                subgraph.attr(label="worker%s" % index)
+            edges = []
+            for row in data:
+                #             start_id   end_id     machine_id
+                edges.append((row["v1"], row["v2"], row["position_n"]))
+            edges = sorted(edges, key=lambda x: x[0])
+            for edge in edges:
+                #         machine_id        start_id      end_id
+                subgraphs[edge[2]].edge(str(edge[0]), str(edge[1]))
+            for subgraph in subgraphs:
+                cluster.subgraph(subgraph)
+            cluster.render("output/iteration_%s" % iteration)
+
+        # Collect data.
+        num_of_crossing_edges = 0
+        load = [0] * args.machine_count
+        for edge in data:
+            src_worker = int(edge["position_n"])
+            dst_worker = int(edge["position_m"])
+            if src_worker != dst_worker:
+                num_of_crossing_edges = num_of_crossing_edges + 1
+            load[src_worker] = load[src_worker] + 1
+        crossing_edges_history.append(num_of_crossing_edges)
+        load_history.append(load)
+        iteration_delta_time = time.time() - iteration_start_time
+        duration += iteration_delta_time
+        log.info("Number of crossing edges %s" % num_of_crossing_edges)
+        log.info("Cluster load %s" % load)
+
+        # Wait for DGP a bit.
+        if iteration_delta_time < args.iteration_interval:
+            time.sleep(args.iteration_interval - iteration_delta_time)
+            duration += args.iteration_interval
+        else:
+            duration += iteration_delta_time
+        # TODO (buda): Somehow align with DGP turns. Replace runtime param with
+        #              the query.
+
+    assert crossing_edges_history[-1] < crossing_edges_history[0], \
+        "Number of crossing edges is equal or bigger."
+    for machine in range(args.machine_count):
+        assert load_history[-1][machine] > 0, "Machine %s is empty." % machine
+    log.info("Config")
+    log.info("    Machines: %s" % args.machine_count)
+    log.info("    Nodes: %s" % args.node_count)
+    log.info("    Edges: %s" % args.edge_count)
+    log.info("Start")
+    log.info("    Crossing Edges: %s" % crossing_edges_history[0])
+    log.info("    Cluster Load: %s" % load_history[0])
+    log.info("End")
+    log.info("    Crossing Edges: %s" % crossing_edges_history[-1])
+    log.info("    Cluster Load: %s" % load_history[-1])
+    log.info("Stats")
+    log.info("    Duration: %ss" % duration)
+
+    session.close()
+    driver.close()
+
+
+if __name__ == "__main__":
+    logging.basicConfig(level=logging.INFO)
+
+    memgraph_binary = os.path.join(PROJECT_DIR, "build", "memgraph")
+    if not os.path.exists(memgraph_binary):
+        memgraph_binary = os.path.join(PROJECT_DIR, "build_debug", "memgraph")
+
+    parser = argparse.ArgumentParser()
+    parser.add_argument("--memgraph", default=memgraph_binary)
+    parser.add_argument("--endpoint", type=str,
+                        default="bolt://localhost:7687")
+    parser.add_argument("--username", type=str, default="")
+    parser.add_argument("--password", type=str, default="")
+    parser.add_argument("--encrypted", type=bool, default=False)
+    parser.add_argument("--visualize", type=bool, default=False)
+    parser.add_argument("--machine-count", type=int, default=3)
+    parser.add_argument("--node-count", type=int, default=1000000)
+    parser.add_argument("--edge-count", type=int, default=1000000)
+    parser.add_argument("--iteration-count", type=int, default=10)
+    parser.add_argument("--iteration-interval", type=float, default=5)
+    args = parser.parse_args()
+
+    run_distributed_memgraph(args)
+    run_test(args)
+    terminate()
diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt
index 30584eb45..b7e944ab9 100644
--- a/tests/unit/CMakeLists.txt
+++ b/tests/unit/CMakeLists.txt
@@ -94,6 +94,13 @@ target_link_libraries(${test_prefix}distributed_serialization memgraph_lib kvsto
 add_unit_test(distributed_updates.cpp)
 target_link_libraries(${test_prefix}distributed_updates memgraph_lib kvstore_dummy_lib)
 
+# TODO (buda): Replace token sharing with centralized solution and write an appropriate test.
+# add_unit_test(distributed_token_sharing.cpp)
+# target_link_libraries(${test_prefix}distributed_token_sharing memgraph_lib kvstore_dummy_lib)
+
+add_unit_test(distributed_dgp_partitioner.cpp)
+target_link_libraries(${test_prefix}distributed_dgp_partitioner memgraph_lib kvstore_dummy_lib)
+
 add_unit_test(durability.cpp)
 target_link_libraries(${test_prefix}durability memgraph_lib kvstore_dummy_lib)
 
diff --git a/tests/unit/distributed_dgp_partitioner.cpp b/tests/unit/distributed_dgp_partitioner.cpp
new file mode 100644
index 000000000..ceb78b5ea
--- /dev/null
+++ b/tests/unit/distributed_dgp_partitioner.cpp
@@ -0,0 +1,252 @@
+#include "distributed_common.hpp"
+
+#include <memory>
+#include <thread>
+#include <unordered_set>
+#include <vector>
+
+#include "gtest/gtest.h"
+
+#include "distributed/dgp/partitioner.hpp"
+#include "distributed/updates_rpc_clients.hpp"
+
+using namespace distributed;
+using namespace database;
+
+DECLARE_int32(dgp_max_batch_size);
+
+class DistributedDynamicGraphPartitionerTest : public DistributedGraphDbTest {
+ public:
+  DistributedDynamicGraphPartitionerTest()
+      : DistributedGraphDbTest("dynamic_graph_partitioner") {}
+
+  void LogClusterState() {
+    LOG(INFO) << "master_v: " << VertexCount(master())
+              << " master_e: " << EdgeCount(master());
+    LOG(INFO) << "worker1_v: " << VertexCount(worker(1))
+              << " worker1_e: " << EdgeCount(worker(1));
+    LOG(INFO) << "worker2_v: " << VertexCount(worker(2))
+              << " worker2_e: " << EdgeCount(worker(2));
+  }
+};
+
+TEST_F(DistributedDynamicGraphPartitionerTest, CountLabels) {
+  auto va = InsertVertex(master());
+  auto vb = InsertVertex(worker(1));
+  auto vc = InsertVertex(worker(2));
+  for (int i = 0; i < 2; ++i) InsertEdge(va, va, "edge");
+  for (int i = 0; i < 3; ++i) InsertEdge(va, vb, "edge");
+  for (int i = 0; i < 4; ++i) InsertEdge(va, vc, "edge");
+  for (int i = 0; i < 5; ++i) InsertEdge(vb, va, "edge");
+  for (int i = 0; i < 6; ++i) InsertEdge(vc, va, "edge");
+
+  distributed::dgp::Partitioner dgp(&master());
+  auto dba = master().Access();
+  VertexAccessor v(va, *dba);
+  auto count_labels = dgp.CountLabels(v);
+
+  // Self loops counted twice
+  EXPECT_EQ(count_labels[master().WorkerId()], 2 * 2);
+
+  EXPECT_EQ(count_labels[worker(1).WorkerId()], 3 + 5);
+  EXPECT_EQ(count_labels[worker(2).WorkerId()], 4 + 6);
+}
+
+TEST_F(DistributedDynamicGraphPartitionerTest, FindMigrationsMoveVertex) {
+  auto va = InsertVertex(master());
+  auto vb = InsertVertex(worker(1));
+
+  // Balance the number of nodes on workers a bit
+  InsertVertex(worker(2));
+  InsertVertex(worker(2));
+
+  for (int i = 0; i < 100; ++i) InsertEdge(va, vb, "edge");
+  distributed::dgp::Partitioner dgp(&master());
+  auto dba = master().Access();
+  auto data = dgp.FindMigrations(*dba);
+  // Expect `va` to try to move to another worker, the one connected to it
+  ASSERT_EQ(data.migrations.size(), 1);
+  EXPECT_EQ(data.migrations[0].second, worker(1).WorkerId());
+}
+
+TEST_F(DistributedDynamicGraphPartitionerTest, FindMigrationsNoChange) {
+  InsertVertex(master());
+  InsertVertex(worker(1));
+  InsertVertex(worker(2));
+
+  // Everything is balanced, there should be no movement
+
+  distributed::dgp::Partitioner dgp(&master());
+  auto dba = master().Access();
+  auto data = dgp.FindMigrations(*dba);
+  EXPECT_EQ(data.migrations.size(), 0);
+}
+
+TEST_F(DistributedDynamicGraphPartitionerTest, FindMigrationsMultipleAndLimit) {
+  auto va = InsertVertex(master());
+  auto vb = InsertVertex(master());
+  auto vc = InsertVertex(worker(1));
+
+  // Balance the number of nodes on workers a bit
+  InsertVertex(worker(1));
+  InsertVertex(worker(2));
+  InsertVertex(worker(2));
+
+  for (int i = 0; i < 100; ++i) InsertEdge(va, vc, "edge");
+  for (int i = 0; i < 100; ++i) InsertEdge(vb, vc, "edge");
+  distributed::dgp::Partitioner dgp(&master());
+  auto dba = master().Access();
+  {
+    auto data = dgp.FindMigrations(*dba);
+    // Expect vertices to try to move to another worker
+    ASSERT_EQ(data.migrations.size(), 2);
+  }
+
+  // See if flag affects number of returned results
+  {
+    FLAGS_dgp_max_batch_size = 1;
+    auto data = dgp.FindMigrations(*dba);
+    // Expect vertices to try to move to another worker
+    ASSERT_EQ(data.migrations.size(), 1);
+  }
+}
+
+TEST_F(DistributedDynamicGraphPartitionerTest, Run) {
+  // Emulate a bipartite graph with lots of connections on the left, and right
+  // side, and some connections between the halfs
+  std::vector<storage::VertexAddress> left;
+  for (int i = 0; i < 10; ++i) {
+    left.push_back(InsertVertex(master()));
+  }
+  std::vector<storage::VertexAddress> right;
+  for (int i = 0; i < 10; ++i) {
+    right.push_back(InsertVertex(master()));
+  }
+
+  // Force the nodes of both sides to stay on one worker by inserting a lot of
+  // edges in between them
+  for (int i = 0; i < 1000; ++i) {
+    InsertEdge(left[rand() % 10], left[rand() % 10], "edge");
+    InsertEdge(right[rand() % 10], right[rand() % 10], "edge");
+  }
+
+  // Insert edges between left and right side
+  for (int i = 0; i < 50; ++i)
+    InsertEdge(left[rand() % 10], right[rand() % 10], "edge");
+
+  // Balance it out so that the vertices count on workers don't influence the
+  // partitioning too much
+  for (int i = 0; i < 10; ++i) InsertVertex(worker(2));
+
+  distributed::dgp::Partitioner dgp(&master());
+  // Transfer one by one to actually converge
+  FLAGS_dgp_max_batch_size = 1;
+  // Try a bit more transfers to see if we reached a steady state
+  for (int i = 0; i < 15; ++i) {
+    dgp.Partition();
+  }
+
+  EXPECT_EQ(VertexCount(master()), 10);
+  EXPECT_EQ(VertexCount(worker(1)), 10);
+
+  auto CountRemotes = [](GraphDbAccessor &dba) {
+    int64_t cnt = 0;
+    for (auto vertex : dba.Vertices(false)) {
+      for (auto edge : vertex.in())
+        if (edge.from_addr().is_remote()) ++cnt;
+      for (auto edge : vertex.out())
+        if (edge.to_addr().is_remote()) ++cnt;
+    }
+    return cnt;
+  };
+
+  auto dba_m = master().Access();
+  auto dba_w1 = worker(1).Access();
+  EXPECT_EQ(CountRemotes(*dba_m), 50);
+  EXPECT_EQ(CountRemotes(*dba_w1), 50);
+}
+
+TEST_F(DistributedDynamicGraphPartitionerTest, Convergence) {
+  auto seed = std::time(nullptr);
+  LOG(INFO) << "Seed: " << seed;
+  std::srand(seed);
+
+  // Generate random graph across cluster.
+  std::vector<storage::VertexAddress> master_vertices;
+  for (int i = 0; i < 1000; ++i) {
+    master_vertices.push_back(InsertVertex(master()));
+  }
+  std::vector<storage::VertexAddress> worker1_vertices;
+  for (int i = 0; i < 1000; ++i) {
+    worker1_vertices.push_back(InsertVertex(worker(1)));
+  }
+  std::vector<storage::VertexAddress> worker2_vertices;
+  for (int i = 0; i < 1000; ++i) {
+    worker2_vertices.push_back(InsertVertex(worker(2)));
+  }
+
+  // Generate random edges between machines.
+  for (int i = 0; i < 1000; ++i) {
+    InsertEdge(master_vertices[rand() % 1000], worker1_vertices[rand() % 1000],
+               "edge");
+    InsertEdge(master_vertices[rand() % 1000], worker2_vertices[rand() % 1000],
+               "edge");
+    InsertEdge(worker1_vertices[rand() % 1000], master_vertices[rand() % 1000],
+               "edge");
+    InsertEdge(worker1_vertices[rand() % 1000], worker2_vertices[rand() % 1000],
+               "edge");
+    InsertEdge(worker2_vertices[rand() % 1000], master_vertices[rand() % 1000],
+               "edge");
+    InsertEdge(worker2_vertices[rand() % 1000], worker1_vertices[rand() % 1000],
+               "edge");
+  }
+
+  // Run the partitioning algorithm, after some time it should stop doing
+  // migrations.
+  distributed::dgp::Partitioner dgp_master(&master());
+  std::vector<double> master_scores;
+  distributed::dgp::Partitioner dgp_worker1(&worker(1));
+  std::vector<double> worker1_scores;
+  distributed::dgp::Partitioner dgp_worker2(&worker(2));
+  std::vector<double> worker2_scores;
+  FLAGS_dgp_max_batch_size = 10;
+  for (int i = 0; i < 100; ++i) {
+    LOG(INFO) << "Iteration: " << i;
+
+    auto data_master = dgp_master.Partition();
+    LOG(INFO) << "Master score: " << data_master.first;
+    master_scores.push_back(data_master.first);
+    LogClusterState();
+
+    auto data_worker1 = dgp_worker1.Partition();
+    LOG(INFO) << "Worker1 score: " << data_worker1.first;
+    worker1_scores.push_back(data_worker1.first);
+    LogClusterState();
+
+    auto data_worker2 = dgp_worker2.Partition();
+    LOG(INFO) << "Worker2 score: " << data_worker2.first;
+    worker2_scores.push_back(data_worker2.first);
+    LogClusterState();
+  }
+
+  // Check that the last N scores from each instance are the same.
+  int scores_to_validate = 10;
+  auto score_equality = [](double x, double y) {
+    return std::abs(x - y) < 10e-1;
+  };
+  ASSERT_TRUE(std::all_of(master_scores.end() - scores_to_validate,
+                          master_scores.end(),
+                          [&score_equality, &master_scores](double elem) {
+                            return score_equality(elem, master_scores.back());
+                          }));
+  ASSERT_TRUE(std::all_of(worker1_scores.end() - scores_to_validate,
+                          worker1_scores.end(),
+                          [&score_equality, &worker1_scores](double elem) {
+                            return score_equality(elem, worker1_scores.back());
+                          }));
+  ASSERT_TRUE(std::all_of(worker2_scores.end() - scores_to_validate,
+                          worker2_scores.end(),
+                          [&score_equality, &worker2_scores](double elem) {
+                            return score_equality(elem, worker2_scores.back());
+                          }));
+}
diff --git a/tests/unit/distributed_dgp_vertex_migrator.cpp b/tests/unit/distributed_dgp_vertex_migrator.cpp
index d68371b99..f556387b0 100644
--- a/tests/unit/distributed_dgp_vertex_migrator.cpp
+++ b/tests/unit/distributed_dgp_vertex_migrator.cpp
@@ -7,7 +7,7 @@
 #include "gtest/gtest.h"
 
 #include "distributed/updates_rpc_clients.hpp"
-#include "storage/dynamic_graph_partitioner/vertex_migrator.hpp"
+#include "distributed/dgp/vertex_migrator.hpp"
 
 using namespace distributed;
 using namespace database;
@@ -67,7 +67,7 @@ class DistributedVertexMigratorTest : public DistributedGraphDbTest {
   void MigrateVertexAndCommit(database::GraphDbAccessor *from_dba,
                               int64_t cypher_id, int to_worker_id) {
     auto vacc = FindVertex(from_dba, cypher_id);
-    VertexMigrator migrator(from_dba);
+    distributed::dgp::VertexMigrator migrator(from_dba);
     migrator.MigrateVertex(*vacc, to_worker_id);
     MasterApplyUpdatesAndCommit(from_dba);
   }
@@ -176,7 +176,7 @@ TEST_F(DistributedVertexMigratorTest, MigrationofLabelsEdgeTypesAndProperties) {
 
   {
     auto dba = master().Access();
-    VertexMigrator migrator(dba.get());
+    distributed::dgp::VertexMigrator migrator(dba.get());
     for (auto &vertex : dba->Vertices(false)) {
       migrator.MigrateVertex(vertex, worker(1).WorkerId());
     }
diff --git a/tests/unit/distributed_token_sharing.cpp b/tests/unit/distributed_token_sharing.cpp
deleted file mode 100644
index f2cff3a51..000000000
--- a/tests/unit/distributed_token_sharing.cpp
+++ /dev/null
@@ -1,33 +0,0 @@
-#include "distributed_common.hpp"
-
-#include <memory>
-#include <thread>
-#include <unordered_set>
-#include <vector>
-
-#include "gtest/gtest.h"
-
-DECLARE_bool(dynamic_graph_partitioner_enabled);
-DECLARE_int32(dgp_max_batch_size);
-
-using namespace distributed;
-using namespace database;
-
-class TokenSharingTest : public DistributedGraphDbTest {
-  void SetUp() override {
-    FLAGS_dynamic_graph_partitioner_enabled = true;
-    FLAGS_dgp_max_batch_size = 1;
-    DistributedGraphDbTest::SetUp();
-  }
-};
-
-TEST_F(TokenSharingTest, Integration) {
-  auto vb = InsertVertex(worker(1));
-  for (int i = 0; i < 100; ++i) {
-    auto v = InsertVertex(master());
-    InsertEdge(vb, v, "edge");
-  }
-  std::this_thread::sleep_for(std::chrono::seconds(3));
-  // Migrate at least something from or to here
-  EXPECT_NE(VertexCount(master()), 100);
-}