diff --git a/docs/dev/diagram/dgp/logical.dot b/docs/dev/diagram/dgp/logical.dot index 79388f968..76c5d941c 100644 --- a/docs/dev/diagram/dgp/logical.dot +++ b/docs/dev/diagram/dgp/logical.dot @@ -13,10 +13,10 @@ digraph { "distributed::TokenSharingRpcServer" -> "communication::rpc::Server"; "distributed::TokenSharingRpcServer" -> "distributed::Coordination"; "distributed::TokenSharingRpcServer" -> "distributed::TokenSharingRpcClients"; - "distributed::TokenSharingRpcServer" -> "storage::dgp::Partitioner"; + "distributed::TokenSharingRpcServer" -> "distributed::dgp::Partitioner"; - "storage::dgp::Partitioner" -> "distributed::DistributedGraphDb" [style=dashed]; + "distributed::dgp::Partitioner" -> "distributed::DistributedGraphDb" [style=dashed]; - "storage::dgp::Partitioner" -> "storage::dgp::VertexMigrator"; - "storage::dgp::VertexMigrator" -> "database::GraphDbAccessor" [style=dashed]; + "distributed::dgp::Partitioner" -> "distributed::dgp::VertexMigrator"; + "distributed::dgp::VertexMigrator" -> "database::GraphDbAccessor" [style=dashed]; } diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index f5bba8848..b9e4300a4 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -29,6 +29,8 @@ set(memgraph_src_files distributed/data_manager.cpp distributed/data_rpc_clients.cpp distributed/data_rpc_server.cpp + distributed/dgp/partitioner.cpp + distributed/dgp/vertex_migrator.cpp distributed/durability_rpc_master.cpp distributed/durability_rpc_worker.cpp distributed/index_rpc_server.cpp @@ -65,8 +67,6 @@ set(memgraph_src_files query/typed_value.cpp storage/concurrent_id_mapper_master.cpp storage/concurrent_id_mapper_worker.cpp - storage/dynamic_graph_partitioner/dgp.cpp - storage/dynamic_graph_partitioner/vertex_migrator.cpp storage/edge_accessor.cpp storage/locking/record_lock.cpp storage/property_value.cpp diff --git a/src/database/config.cpp b/src/database/config.cpp index b5a982c7d..d179ef26b 100644 --- a/src/database/config.cpp +++ b/src/database/config.cpp @@ -68,6 +68,9 @@ DEFINE_VALIDATED_int32(recovering_cluster_size, 0, "Number of workers (including master) in the " "previously snapshooted/wal cluster.", FLAG_IN_RANGE(0, INT32_MAX)); +// TODO (buda): Implement openCypher query because it completely make sense +// to being able to start and stop DGP on the fly. +// The implementation should be straightforward. DEFINE_bool(dynamic_graph_partitioner_enabled, false, "If the dynamic graph partitioner should be enabled."); #endif diff --git a/src/database/distributed_graph_db.cpp b/src/database/distributed_graph_db.cpp index 299a4916a..3827aa087 100644 --- a/src/database/distributed_graph_db.cpp +++ b/src/database/distributed_graph_db.cpp @@ -687,7 +687,7 @@ Master::Master(Config config) // Start the dynamic graph partitioner inside token sharing server if (impl_->config_.dynamic_graph_partitioner_enabled) { - impl_->token_sharing_server_.StartTokenSharing(); + impl_->token_sharing_server_.Start(); } if (impl_->config_.durability_enabled) { diff --git a/src/storage/dynamic_graph_partitioner/dgp.cpp b/src/distributed/dgp/partitioner.cpp similarity index 75% rename from src/storage/dynamic_graph_partitioner/dgp.cpp rename to src/distributed/dgp/partitioner.cpp index 479080215..748f78b20 100644 --- a/src/storage/dynamic_graph_partitioner/dgp.cpp +++ b/src/distributed/dgp/partitioner.cpp @@ -1,4 +1,4 @@ -#include "storage/dynamic_graph_partitioner/dgp.hpp" +#include "distributed/dgp/partitioner.hpp" #include #include @@ -8,10 +8,11 @@ #include "database/graph_db_accessor.hpp" #include "distributed/updates_rpc_clients.hpp" #include "query/exceptions.hpp" -#include "storage/dynamic_graph_partitioner/vertex_migrator.hpp" +#include "distributed/dgp/vertex_migrator.hpp" #include "utils/flag_validation.hpp" #include "utils/thread/sync.hpp" +// TODO (buda): Implement openCypher commands to control these parameters. DEFINE_VALIDATED_int32( dgp_improvement_threshold, 10, "How much better should specific node score be to consider " @@ -19,25 +20,28 @@ DEFINE_VALIDATED_int32( "between new score that the vertex will have when migrated and the old one " "such that it's migrated.", FLAG_IN_RANGE(1, 100)); +// TODO (buda): The default here should be int_max because that will allow us to +// partition large dataset faster. It should be used for our tests where we can +// run the partitioning up front. DEFINE_VALIDATED_int32(dgp_max_batch_size, 2000, "Maximal amount of vertices which should be migrated in " "one dynamic graph partitioner step.", FLAG_IN_RANGE(1, std::numeric_limits::max())); -DynamicGraphPartitioner::DynamicGraphPartitioner( - database::DistributedGraphDb *db) - : db_(db) {} +namespace distributed::dgp { -void DynamicGraphPartitioner::Run() { +Partitioner::Partitioner(database::DistributedGraphDb *db) : db_(db) {} + +std::pair Partitioner::Partition() { auto dba = db_->Access(); VLOG(21) << "Starting DynamicGraphPartitioner in tx: " << dba->transaction().id_; - auto migrations = FindMigrations(*dba); + auto data = FindMigrations(*dba); try { VertexMigrator migrator(dba.get()); - for (auto &migration : migrations) { + for (auto &migration : data.migrations) { migrator.MigrateVertex(migration.first, migration.second); } @@ -63,19 +67,25 @@ void DynamicGraphPartitioner::Run() { } dba->Commit(); - VLOG(21) << "Sucesfully migrated " << migrations.size() << " vertices.."; + VLOG(21) << "Sucesfully migrated " << data.migrations.size() + << " vertices.."; + return std::make_pair(data.score, true); } catch (const utils::BasicException &e) { VLOG(21) << "Didn't succeed in relocating; " << e.what(); dba->Abort(); + // Returning VertexAccessors after Abort might not be a good idea. + The + // returned migrations are entirely useless because the engine didn't + // succeed to migrate anything. + return std::make_pair(data.score, false); } } -std::vector> -DynamicGraphPartitioner::FindMigrations(database::GraphDbAccessor &dba) { +MigrationsData Partitioner::FindMigrations(database::GraphDbAccessor &dba) { // Find workers vertex count std::unordered_map worker_vertex_count = db_->data_clients().VertexCounts(dba.transaction().id_); + // TODO (buda): Add total edge count as an option. int64_t total_vertex_count = 0; for (auto worker_vertex_count_pair : worker_vertex_count) { total_vertex_count += worker_vertex_count_pair.second; @@ -83,6 +93,9 @@ DynamicGraphPartitioner::FindMigrations(database::GraphDbAccessor &dba) { double average_vertex_count = total_vertex_count * 1.0 / worker_vertex_count.size(); + if (average_vertex_count == 0) return MigrationsData(0); + + double local_graph_score = 0; // Considers all migrations which maximally improve single vertex score std::vector> migrations; @@ -90,7 +103,7 @@ DynamicGraphPartitioner::FindMigrations(database::GraphDbAccessor &dba) { auto label_counts = CountLabels(vertex); std::unordered_map per_label_score; size_t degree = vertex.in_degree() + vertex.out_degree(); - + if (degree == 0) continue; for (auto worker_vertex_count_pair : worker_vertex_count) { int worker = worker_vertex_count_pair.first; int64_t worker_vertex_count = worker_vertex_count_pair.second; @@ -103,9 +116,12 @@ DynamicGraphPartitioner::FindMigrations(database::GraphDbAccessor &dba) { const std::pair &p2) { return p1.second < p2.second; }; + auto best_label = std::max_element(per_label_score.begin(), per_label_score.end(), label_cmp); + local_graph_score += best_label->second; + // Consider as a migration only if the improvement is high enough if (best_label != per_label_score.end() && best_label->first != db_->WorkerId() && @@ -118,10 +134,12 @@ DynamicGraphPartitioner::FindMigrations(database::GraphDbAccessor &dba) { if (migrations.size() >= FLAGS_dgp_max_batch_size) break; } - return migrations; + DLOG(INFO) << "Local graph score: " << local_graph_score; + + return MigrationsData(local_graph_score, std::move(migrations)); } -std::unordered_map DynamicGraphPartitioner::CountLabels( +std::unordered_map Partitioner::CountLabels( const VertexAccessor &vertex) const { std::unordered_map label_count; for (auto edge : vertex.in()) { @@ -136,3 +154,4 @@ std::unordered_map DynamicGraphPartitioner::CountLabels( } return label_count; } +} // namespace distributed::dgp diff --git a/src/distributed/dgp/partitioner.hpp b/src/distributed/dgp/partitioner.hpp new file mode 100644 index 000000000..4381fcb82 --- /dev/null +++ b/src/distributed/dgp/partitioner.hpp @@ -0,0 +1,89 @@ +/// @file + +#pragma once + +#include + +#include "distributed/data_rpc_clients.hpp" +#include "distributed/token_sharing_rpc_messages.hpp" +#include "distributed/dgp/vertex_migrator.hpp" +#include "storage/vertex_accessor.hpp" + +namespace database { +class DistributedGraphDb; +class GraphDbAccessor; +}; // namespace database + +namespace distributed::dgp { + +/// Contains a set of vertices and where they should be migrated +/// (machine/instance id) + score how good the partitioning is. +struct MigrationsData { + private: + using Migrations = std::vector>; + + public: + MigrationsData(double score, Migrations migrations = Migrations()) + : score(std::move(score)), migrations(std::move(migrations)) {} + + /// Disable copying because the number of migrations could be huge. The + /// expected number is 1k, but a user can configure the database in a way + /// where the number of migrations could be much higher. + MigrationsData(const MigrationsData &other) = delete; + MigrationsData &operator=(const MigrationsData &other) = delete; + + MigrationsData(MigrationsData &&other) = default; + MigrationsData &operator=(MigrationsData &&other) = default; + + double score; + Migrations migrations; +}; + +/// Handles dynamic graph partitions, migrates vertices from one worker to +/// another based on available scoring which takes into account neighbours of a +/// vertex and tries to put it where most of its neighbours are located. Also +/// takes into account the number of vertices on the destination and source +/// machine. +class Partitioner { + public: + /// The partitioner needs GraphDb because each partition step is a new + /// database transactions (database accessor has to be created). + /// TODO (buda): Consider passing GraphDbAccessor directly. + explicit Partitioner(database::DistributedGraphDb *db); + + Partitioner(const Partitioner &other) = delete; + Partitioner(Partitioner &&other) = delete; + Partitioner &operator=(const Partitioner &other) = delete; + Partitioner &operator=(Partitioner &&other) = delete; + + /// Runs one dynamic graph partitioning cycle (step). In case of any error, + /// the transaction will be aborted. + /// + /// @return Calculated partitioning score and were the migrations successful. + std::pair Partition(); + + /// Returns a vector of pairs of `vertex` and `destination` of where should + /// some vertex be relocated from the view of `dba` accessor. + // + /// Each vertex is located on some worker (which in context of migrations we + /// call a vertex label). Each vertex has it's score for each different label + /// (worker_id) evaluated. This score is calculated by considering + /// neighbouring vertices labels. Simply put, each vertex is attracted to be + /// located on the same worker as it's neighbouring vertices. Migrations which + /// improve that scoring, which also takes into account saturation of other + /// workers on which it's considering to migrate this vertex, are determined. + MigrationsData FindMigrations(database::GraphDbAccessor &dba); + + /// Counts number of each label (worker_id) on endpoints of edges (in/out) of + /// `vertex`. + /// + /// @return A map consisting of (label/machine/instance id, count) key-value + /// pairs. + std::unordered_map CountLabels( + const VertexAccessor &vertex) const; + + private: + database::DistributedGraphDb *db_{nullptr}; +}; + +} // namespace distributed::dgp diff --git a/src/storage/dynamic_graph_partitioner/vertex_migrator.cpp b/src/distributed/dgp/vertex_migrator.cpp similarity index 94% rename from src/storage/dynamic_graph_partitioner/vertex_migrator.cpp rename to src/distributed/dgp/vertex_migrator.cpp index 09c7e26ec..4bfdeaf50 100644 --- a/src/storage/dynamic_graph_partitioner/vertex_migrator.cpp +++ b/src/distributed/dgp/vertex_migrator.cpp @@ -1,9 +1,11 @@ -#include "storage/dynamic_graph_partitioner/vertex_migrator.hpp" +#include "distributed/dgp/vertex_migrator.hpp" #include "database/distributed_graph_db.hpp" #include "database/graph_db_accessor.hpp" #include "query/typed_value.hpp" +namespace distributed::dgp { + VertexMigrator::VertexMigrator(database::GraphDbAccessor *dba) : dba_(dba) {} void VertexMigrator::MigrateVertex(VertexAccessor &vertex, int destination) { @@ -57,3 +59,4 @@ void VertexMigrator::MigrateVertex(VertexAccessor &vertex, int destination) { dba_->DetachRemoveVertex(vertex); } +} // namespace distributed::dgp diff --git a/src/storage/dynamic_graph_partitioner/vertex_migrator.hpp b/src/distributed/dgp/vertex_migrator.hpp similarity index 92% rename from src/storage/dynamic_graph_partitioner/vertex_migrator.hpp rename to src/distributed/dgp/vertex_migrator.hpp index 19f746ce7..664a1af32 100644 --- a/src/storage/dynamic_graph_partitioner/vertex_migrator.hpp +++ b/src/distributed/dgp/vertex_migrator.hpp @@ -1,3 +1,5 @@ +/// @file + #pragma once #include @@ -10,6 +12,8 @@ namespace database { class GraphDbAccessor; }; // namespace database +namespace distributed::dgp { + /// Migrates vertices from one worker to another (updates edges as well). class VertexMigrator { public: @@ -29,3 +33,5 @@ class VertexMigrator { database::GraphDbAccessor *dba_; std::unordered_map vertex_migrated_to_; }; + +} // namespace distributed::dgp diff --git a/src/distributed/pull_produce_rpc_messages.lcp b/src/distributed/pull_produce_rpc_messages.lcp index 35047b3f6..21cd1aa76 100644 --- a/src/distributed/pull_produce_rpc_messages.lcp +++ b/src/distributed/pull_produce_rpc_messages.lcp @@ -267,7 +267,7 @@ void PullResData::LoadGraphElement( ? distributed::LoadVertex(vertex_reader.getNew()) : nullptr; data_manager->Emplace( - dba->transaction_id(), global_address.gid(), + dba->transaction_id(), global_address.gid(), distributed::CachedRecordData(cypher_id, std::move(old_record), std::move(new_record))); diff --git a/src/distributed/token_sharing_rpc_server.hpp b/src/distributed/token_sharing_rpc_server.hpp index 21aafd208..2085b7e2d 100644 --- a/src/distributed/token_sharing_rpc_server.hpp +++ b/src/distributed/token_sharing_rpc_server.hpp @@ -1,7 +1,9 @@ +/// @file + #pragma once #include "distributed/rpc_worker_clients.hpp" -#include "storage/dynamic_graph_partitioner/dgp.hpp" +#include "distributed/dgp/partitioner.hpp" namespace communication::rpc { class Server; @@ -13,6 +15,12 @@ class DistributedGraphDb; namespace distributed { +// TODO (buda): dgp_.Run() should be injected. This server shouldn't know +// anything about the partitioning. +// TODO (buda): It makes more sense to have centralized server which will assign +// tokens because error handling would be much easier. +// TODO (buda): Broken by design. + /// Shares the token between dynamic graph partitioners instances across workers /// by passing the token from one worker to another, in a circular fashion. This /// guarantees that no two workers will execute the dynamic graph partitioner @@ -30,10 +38,20 @@ class TokenSharingRpcServer { dgp_(db) { server_->Register( [this](const auto &req_reader, auto *res_builder) { token_ = true; }); - + // TODO (buda): It's not trivial to move this part in the Start method + // because worker then doesn't run the step. Will resolve that with + // a different implementation of the token assignment. runner_ = std::thread([this]() { - while (true) { - // Wait till we get the token + while (!shutting_down_) { + // If no other instances are connected just wait. It doesn't make sense + // to migrate anything because only one machine is available. + auto workers = coordination_->GetWorkerIds(); + if (!(workers.size() > 1)) { + std::this_thread::sleep_for(std::chrono::seconds(1)); + continue; + } + + // Wait till we get the token. while (!token_) { if (shutting_down_) break; std::this_thread::sleep_for(std::chrono::seconds(1)); @@ -42,10 +60,9 @@ class TokenSharingRpcServer { if (shutting_down_) break; token_ = false; - dgp_.Run(); + dgp_.Partition(); - // Transfer token to next - auto workers = coordination_->GetWorkerIds(); + // Transfer token to next. sort(workers.begin(), workers.end()); int next_worker = -1; @@ -63,7 +80,7 @@ class TokenSharingRpcServer { /// Starts the token sharing server which in turn starts the dynamic graph /// partitioner. - void StartTokenSharing() { + void Start() { started_ = true; token_ = true; } @@ -74,9 +91,9 @@ class TokenSharingRpcServer { if (started_ && worker_id_ == 0) { // Wait till we get the token back otherwise some worker might try to // migrate to another worker while that worker is shutting down or - // something else bad might happen - // TODO(dgleich): Solve this better in the future since this blocks - // shutting down until spinner steps complete + // something else bad might happen. + // TODO (buda): Solve this better in the future since this blocks + // shutting down until spinner steps complete. while (!token_) { std::this_thread::sleep_for(std::chrono::milliseconds(500)); } @@ -94,7 +111,7 @@ class TokenSharingRpcServer { std::atomic shutting_down_{false}; std::thread runner_; - DynamicGraphPartitioner dgp_; + distributed::dgp::Partitioner dgp_; }; } // namespace distributed diff --git a/src/storage/dynamic_graph_partitioner/dgp.hpp b/src/storage/dynamic_graph_partitioner/dgp.hpp deleted file mode 100644 index 7eda46064..000000000 --- a/src/storage/dynamic_graph_partitioner/dgp.hpp +++ /dev/null @@ -1,54 +0,0 @@ -#pragma once - -#include - -#include "distributed/data_rpc_clients.hpp" -#include "distributed/token_sharing_rpc_messages.hpp" -#include "storage/dynamic_graph_partitioner/vertex_migrator.hpp" -#include "storage/vertex_accessor.hpp" - -namespace database { -class DistributedGraphDb; -class GraphDbAccessor; -}; // namespace database - -/// Handles dynamic graph partitions, migrates vertices from one worker to -/// another based on available scoring which takes into account neighbours of a -/// vertex and tries to put it where most of its neighbours are located. Also -/// takes into account the number of vertices on the destination and source -/// machine. -class DynamicGraphPartitioner { - public: - DynamicGraphPartitioner(const DynamicGraphPartitioner &other) = delete; - DynamicGraphPartitioner(DynamicGraphPartitioner &&other) = delete; - DynamicGraphPartitioner &operator=(const DynamicGraphPartitioner &other) = - delete; - DynamicGraphPartitioner &operator=(DynamicGraphPartitioner &&other) = delete; - - explicit DynamicGraphPartitioner(database::DistributedGraphDb *db); - - /// Runs one dynamic graph partitioning cycle (step). - void Run(); - - /// Returns a vector of pairs of `vertex` and `destination` of where should - /// some vertex be relocated from the view of `dba` accessor. - // - /// Each vertex is located on some worker (which in context of migrations we - /// call a vertex label). Each vertex has it's score for each different label - /// (worker_id) evaluated. This score is calculated by considering - /// neighbouring vertices labels. Simply put, each vertex is attracted to be - /// located on the same worker as it's neighbouring vertices. Migrations which - /// improve that scoring, which also takes into account saturation of other - /// workers on which it's considering to migrate this vertex, are determined. - std::vector> FindMigrations( - database::GraphDbAccessor &dba); - - /// Counts number of each label (worker_id) on endpoints of edges (in/out) of - /// `vertex`. - /// Returns a map consisting of (label, count) key-value pairs. - std::unordered_map CountLabels( - const VertexAccessor &vertex) const; - - private: - database::DistributedGraphDb *db_{nullptr}; -}; diff --git a/tests/integration/dgp/.gitignore b/tests/integration/dgp/.gitignore new file mode 100644 index 000000000..ea1472ec1 --- /dev/null +++ b/tests/integration/dgp/.gitignore @@ -0,0 +1 @@ +output/ diff --git a/tests/integration/dgp/run.py b/tests/integration/dgp/run.py new file mode 100755 index 000000000..e9dce265e --- /dev/null +++ b/tests/integration/dgp/run.py @@ -0,0 +1,222 @@ +#!/usr/bin/python3 + +''' +Test dynamic graph partitioner on Memgraph cluster +with randomly generated graph (uniform distribution +of nodes and edges). The partitioning goal is to +minimize number of crossing edges while keeping +the cluster balanced. +''' + +import argparse +import atexit +import logging +import os +import random +import sys +import subprocess +import time + +try: + # graphviz==0.9 + from graphviz import Digraph +except ImportError: + print("graphviz module isn't available. " + "Graph won't be generated but the checks will still work.") +from neo4j.v1 import GraphDatabase, basic_auth + +SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) +PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", "..")) +COMMON_ARGS = ["--durability-enabled=false", + "--snapshot-on-exit=false", + "--db-recover-on-startup=false"] +MASTER_ARGS = ["--master", + "--master-port", "10000", + "--dynamic-graph-partitioner-enabled", + "--durability-directory=durability_master"] + +log = logging.getLogger(__name__) +memgraph_processes = [] + + +def worker_args(worker_id): + args = ["--worker", + "--worker-id", str(worker_id), + "--worker-port", str(10000 + worker_id), + "--master-port", str(10000), + "--durability-directory=durability_worker%s" % worker_id] + return args + + +def wait_for_server(port, delay=0.01): + cmd = ["nc", "-z", "-w", "1", "127.0.0.1", port] + while subprocess.call(cmd) != 0: + time.sleep(delay) + time.sleep(delay) + + +def run_memgraph_process(binary, args): + global memgraph_processes + process = subprocess.Popen([binary] + args, cwd=os.path.dirname(binary)) + memgraph_processes.append(process) + + +def run_distributed_memgraph(args): + run_memgraph_process(args.memgraph, COMMON_ARGS + MASTER_ARGS) + wait_for_server("10000") + for i in range(1, int(args.machine_count)): + run_memgraph_process(args.memgraph, COMMON_ARGS + worker_args(i)) + wait_for_server("7687") + + +def terminate(): + global memgraph_processes + for process in memgraph_processes: + process.terminate() + status = process.wait() + if status != 0: + raise Exception( + "Memgraph binary returned non-zero ({})!".format(status)) + + +@atexit.register +def cleanup(): + global memgraph_processes + for proc in memgraph_processes: + if proc.poll() is not None: + continue + proc.kill() + proc.wait() + + +def run_test(args): + driver = GraphDatabase.driver(args.endpoint, + auth=basic_auth(args.username, + args.password), + encrypted=args.encrypted) + session = driver.session() + + session.run("CREATE INDEX ON :Node(id)").consume() + session.run( + "UNWIND range(0, $num - 1) AS n CREATE (:Node {id: n})", + num=args.node_count - 1).consume() + + created_edges = 0 + while created_edges <= args.edge_count: + # Retry block is here because DGP is running in background and + # serialization errors may occure. + try: + session.run("MATCH (n:Node {id: $id1}), (m:Node {id:$id2}) " + "CREATE (n)-[:Edge]->(m)", + id1=random.randint(0, args.node_count - 1), + id2=random.randint(0, args.node_count - 1)).consume() + created_edges += 1 + except Exception: + pass + + # Check cluster state periodically. + crossing_edges_history = [] + load_history = [] + duration = 0 + for iteration in range(args.iteration_count): + iteration_start_time = time.time() + data = session.run( + "MATCH (n)-[r]->(m) " + "RETURN " + " id(n) AS v1, id(m) AS v2, " + " workerid(n) AS position_n, workerid(m) AS position_m").data() + + # Visualize cluster state. + if args.visualize and 'graphviz' in sys.modules: + output_dir = os.path.join(SCRIPT_DIR, "output") + if not os.path.exists(output_dir): + os.makedirs(output_dir) + cluster = Digraph(name="memgraph_cluster", format="png") + cluster.attr(splines="false", rank="TB") + subgraphs = [Digraph(name="cluster_worker%s" % i) + for i in range(args.machine_count)] + for index, subgraph in enumerate(subgraphs): + subgraph.attr(label="worker%s" % index) + edges = [] + for row in data: + # start_id end_id machine_id + edges.append((row["v1"], row["v2"], row["position_n"])) + edges = sorted(edges, key=lambda x: x[0]) + for edge in edges: + # machine_id start_id end_id + subgraphs[edge[2]].edge(str(edge[0]), str(edge[1])) + for subgraph in subgraphs: + cluster.subgraph(subgraph) + cluster.render("output/iteration_%s" % iteration) + + # Collect data. + num_of_crossing_edges = 0 + load = [0] * args.machine_count + for edge in data: + src_worker = int(edge["position_n"]) + dst_worker = int(edge["position_m"]) + if src_worker != dst_worker: + num_of_crossing_edges = num_of_crossing_edges + 1 + load[src_worker] = load[src_worker] + 1 + crossing_edges_history.append(num_of_crossing_edges) + load_history.append(load) + iteration_delta_time = time.time() - iteration_start_time + duration += iteration_delta_time + log.info("Number of crossing edges %s" % num_of_crossing_edges) + log.info("Cluster load %s" % load) + + # Wait for DGP a bit. + if iteration_delta_time < args.iteration_interval: + time.sleep(args.iteration_interval - iteration_delta_time) + duration += args.iteration_interval + else: + duration += iteration_delta_time + # TODO (buda): Somehow align with DGP turns. Replace runtime param with + # the query. + + assert crossing_edges_history[-1] < crossing_edges_history[0], \ + "Number of crossing edges is equal or bigger." + for machine in range(args.machine_count): + assert load_history[-1][machine] > 0, "Machine %s is empty." % machine + log.info("Config") + log.info(" Machines: %s" % args.machine_count) + log.info(" Nodes: %s" % args.node_count) + log.info(" Edges: %s" % args.edge_count) + log.info("Start") + log.info(" Crossing Edges: %s" % crossing_edges_history[0]) + log.info(" Cluster Load: %s" % load_history[0]) + log.info("End") + log.info(" Crossing Edges: %s" % crossing_edges_history[-1]) + log.info(" Cluster Load: %s" % load_history[-1]) + log.info("Stats") + log.info(" Duration: %ss" % duration) + + session.close() + driver.close() + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + + memgraph_binary = os.path.join(PROJECT_DIR, "build", "memgraph") + if not os.path.exists(memgraph_binary): + memgraph_binary = os.path.join(PROJECT_DIR, "build_debug", "memgraph") + + parser = argparse.ArgumentParser() + parser.add_argument("--memgraph", default=memgraph_binary) + parser.add_argument("--endpoint", type=str, + default="bolt://localhost:7687") + parser.add_argument("--username", type=str, default="") + parser.add_argument("--password", type=str, default="") + parser.add_argument("--encrypted", type=bool, default=False) + parser.add_argument("--visualize", type=bool, default=False) + parser.add_argument("--machine-count", type=int, default=3) + parser.add_argument("--node-count", type=int, default=1000000) + parser.add_argument("--edge-count", type=int, default=1000000) + parser.add_argument("--iteration-count", type=int, default=10) + parser.add_argument("--iteration-interval", type=float, default=5) + args = parser.parse_args() + + run_distributed_memgraph(args) + run_test(args) + terminate() diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index 30584eb45..b7e944ab9 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -94,6 +94,13 @@ target_link_libraries(${test_prefix}distributed_serialization memgraph_lib kvsto add_unit_test(distributed_updates.cpp) target_link_libraries(${test_prefix}distributed_updates memgraph_lib kvstore_dummy_lib) +# TODO (buda): Replace token sharing with centralized solution and write an appropriate test. +# add_unit_test(distributed_token_sharing.cpp) +# target_link_libraries(${test_prefix}distributed_token_sharing memgraph_lib kvstore_dummy_lib) + +add_unit_test(distributed_dgp_partitioner.cpp) +target_link_libraries(${test_prefix}distributed_dgp_partitioner memgraph_lib kvstore_dummy_lib) + add_unit_test(durability.cpp) target_link_libraries(${test_prefix}durability memgraph_lib kvstore_dummy_lib) diff --git a/tests/unit/distributed_dgp_partitioner.cpp b/tests/unit/distributed_dgp_partitioner.cpp new file mode 100644 index 000000000..ceb78b5ea --- /dev/null +++ b/tests/unit/distributed_dgp_partitioner.cpp @@ -0,0 +1,252 @@ +#include "distributed_common.hpp" + +#include +#include +#include +#include + +#include "gtest/gtest.h" + +#include "distributed/dgp/partitioner.hpp" +#include "distributed/updates_rpc_clients.hpp" + +using namespace distributed; +using namespace database; + +DECLARE_int32(dgp_max_batch_size); + +class DistributedDynamicGraphPartitionerTest : public DistributedGraphDbTest { + public: + DistributedDynamicGraphPartitionerTest() + : DistributedGraphDbTest("dynamic_graph_partitioner") {} + + void LogClusterState() { + LOG(INFO) << "master_v: " << VertexCount(master()) + << " master_e: " << EdgeCount(master()); + LOG(INFO) << "worker1_v: " << VertexCount(worker(1)) + << " worker1_e: " << EdgeCount(worker(1)); + LOG(INFO) << "worker2_v: " << VertexCount(worker(2)) + << " worker2_e: " << EdgeCount(worker(2)); + } +}; + +TEST_F(DistributedDynamicGraphPartitionerTest, CountLabels) { + auto va = InsertVertex(master()); + auto vb = InsertVertex(worker(1)); + auto vc = InsertVertex(worker(2)); + for (int i = 0; i < 2; ++i) InsertEdge(va, va, "edge"); + for (int i = 0; i < 3; ++i) InsertEdge(va, vb, "edge"); + for (int i = 0; i < 4; ++i) InsertEdge(va, vc, "edge"); + for (int i = 0; i < 5; ++i) InsertEdge(vb, va, "edge"); + for (int i = 0; i < 6; ++i) InsertEdge(vc, va, "edge"); + + distributed::dgp::Partitioner dgp(&master()); + auto dba = master().Access(); + VertexAccessor v(va, *dba); + auto count_labels = dgp.CountLabels(v); + + // Self loops counted twice + EXPECT_EQ(count_labels[master().WorkerId()], 2 * 2); + + EXPECT_EQ(count_labels[worker(1).WorkerId()], 3 + 5); + EXPECT_EQ(count_labels[worker(2).WorkerId()], 4 + 6); +} + +TEST_F(DistributedDynamicGraphPartitionerTest, FindMigrationsMoveVertex) { + auto va = InsertVertex(master()); + auto vb = InsertVertex(worker(1)); + + // Balance the number of nodes on workers a bit + InsertVertex(worker(2)); + InsertVertex(worker(2)); + + for (int i = 0; i < 100; ++i) InsertEdge(va, vb, "edge"); + distributed::dgp::Partitioner dgp(&master()); + auto dba = master().Access(); + auto data = dgp.FindMigrations(*dba); + // Expect `va` to try to move to another worker, the one connected to it + ASSERT_EQ(data.migrations.size(), 1); + EXPECT_EQ(data.migrations[0].second, worker(1).WorkerId()); +} + +TEST_F(DistributedDynamicGraphPartitionerTest, FindMigrationsNoChange) { + InsertVertex(master()); + InsertVertex(worker(1)); + InsertVertex(worker(2)); + + // Everything is balanced, there should be no movement + + distributed::dgp::Partitioner dgp(&master()); + auto dba = master().Access(); + auto data = dgp.FindMigrations(*dba); + EXPECT_EQ(data.migrations.size(), 0); +} + +TEST_F(DistributedDynamicGraphPartitionerTest, FindMigrationsMultipleAndLimit) { + auto va = InsertVertex(master()); + auto vb = InsertVertex(master()); + auto vc = InsertVertex(worker(1)); + + // Balance the number of nodes on workers a bit + InsertVertex(worker(1)); + InsertVertex(worker(2)); + InsertVertex(worker(2)); + + for (int i = 0; i < 100; ++i) InsertEdge(va, vc, "edge"); + for (int i = 0; i < 100; ++i) InsertEdge(vb, vc, "edge"); + distributed::dgp::Partitioner dgp(&master()); + auto dba = master().Access(); + { + auto data = dgp.FindMigrations(*dba); + // Expect vertices to try to move to another worker + ASSERT_EQ(data.migrations.size(), 2); + } + + // See if flag affects number of returned results + { + FLAGS_dgp_max_batch_size = 1; + auto data = dgp.FindMigrations(*dba); + // Expect vertices to try to move to another worker + ASSERT_EQ(data.migrations.size(), 1); + } +} + +TEST_F(DistributedDynamicGraphPartitionerTest, Run) { + // Emulate a bipartite graph with lots of connections on the left, and right + // side, and some connections between the halfs + std::vector left; + for (int i = 0; i < 10; ++i) { + left.push_back(InsertVertex(master())); + } + std::vector right; + for (int i = 0; i < 10; ++i) { + right.push_back(InsertVertex(master())); + } + + // Force the nodes of both sides to stay on one worker by inserting a lot of + // edges in between them + for (int i = 0; i < 1000; ++i) { + InsertEdge(left[rand() % 10], left[rand() % 10], "edge"); + InsertEdge(right[rand() % 10], right[rand() % 10], "edge"); + } + + // Insert edges between left and right side + for (int i = 0; i < 50; ++i) + InsertEdge(left[rand() % 10], right[rand() % 10], "edge"); + + // Balance it out so that the vertices count on workers don't influence the + // partitioning too much + for (int i = 0; i < 10; ++i) InsertVertex(worker(2)); + + distributed::dgp::Partitioner dgp(&master()); + // Transfer one by one to actually converge + FLAGS_dgp_max_batch_size = 1; + // Try a bit more transfers to see if we reached a steady state + for (int i = 0; i < 15; ++i) { + dgp.Partition(); + } + + EXPECT_EQ(VertexCount(master()), 10); + EXPECT_EQ(VertexCount(worker(1)), 10); + + auto CountRemotes = [](GraphDbAccessor &dba) { + int64_t cnt = 0; + for (auto vertex : dba.Vertices(false)) { + for (auto edge : vertex.in()) + if (edge.from_addr().is_remote()) ++cnt; + for (auto edge : vertex.out()) + if (edge.to_addr().is_remote()) ++cnt; + } + return cnt; + }; + + auto dba_m = master().Access(); + auto dba_w1 = worker(1).Access(); + EXPECT_EQ(CountRemotes(*dba_m), 50); + EXPECT_EQ(CountRemotes(*dba_w1), 50); +} + +TEST_F(DistributedDynamicGraphPartitionerTest, Convergence) { + auto seed = std::time(nullptr); + LOG(INFO) << "Seed: " << seed; + std::srand(seed); + + // Generate random graph across cluster. + std::vector master_vertices; + for (int i = 0; i < 1000; ++i) { + master_vertices.push_back(InsertVertex(master())); + } + std::vector worker1_vertices; + for (int i = 0; i < 1000; ++i) { + worker1_vertices.push_back(InsertVertex(worker(1))); + } + std::vector worker2_vertices; + for (int i = 0; i < 1000; ++i) { + worker2_vertices.push_back(InsertVertex(worker(2))); + } + + // Generate random edges between machines. + for (int i = 0; i < 1000; ++i) { + InsertEdge(master_vertices[rand() % 1000], worker1_vertices[rand() % 1000], + "edge"); + InsertEdge(master_vertices[rand() % 1000], worker2_vertices[rand() % 1000], + "edge"); + InsertEdge(worker1_vertices[rand() % 1000], master_vertices[rand() % 1000], + "edge"); + InsertEdge(worker1_vertices[rand() % 1000], worker2_vertices[rand() % 1000], + "edge"); + InsertEdge(worker2_vertices[rand() % 1000], master_vertices[rand() % 1000], + "edge"); + InsertEdge(worker2_vertices[rand() % 1000], worker1_vertices[rand() % 1000], + "edge"); + } + + // Run the partitioning algorithm, after some time it should stop doing + // migrations. + distributed::dgp::Partitioner dgp_master(&master()); + std::vector master_scores; + distributed::dgp::Partitioner dgp_worker1(&worker(1)); + std::vector worker1_scores; + distributed::dgp::Partitioner dgp_worker2(&worker(2)); + std::vector worker2_scores; + FLAGS_dgp_max_batch_size = 10; + for (int i = 0; i < 100; ++i) { + LOG(INFO) << "Iteration: " << i; + + auto data_master = dgp_master.Partition(); + LOG(INFO) << "Master score: " << data_master.first; + master_scores.push_back(data_master.first); + LogClusterState(); + + auto data_worker1 = dgp_worker1.Partition(); + LOG(INFO) << "Worker1 score: " << data_worker1.first; + worker1_scores.push_back(data_worker1.first); + LogClusterState(); + + auto data_worker2 = dgp_worker2.Partition(); + LOG(INFO) << "Worker2 score: " << data_worker2.first; + worker2_scores.push_back(data_worker2.first); + LogClusterState(); + } + + // Check that the last N scores from each instance are the same. + int scores_to_validate = 10; + auto score_equality = [](double x, double y) { + return std::abs(x - y) < 10e-1; + }; + ASSERT_TRUE(std::all_of(master_scores.end() - scores_to_validate, + master_scores.end(), + [&score_equality, &master_scores](double elem) { + return score_equality(elem, master_scores.back()); + })); + ASSERT_TRUE(std::all_of(worker1_scores.end() - scores_to_validate, + worker1_scores.end(), + [&score_equality, &worker1_scores](double elem) { + return score_equality(elem, worker1_scores.back()); + })); + ASSERT_TRUE(std::all_of(worker2_scores.end() - scores_to_validate, + worker2_scores.end(), + [&score_equality, &worker2_scores](double elem) { + return score_equality(elem, worker2_scores.back()); + })); +} diff --git a/tests/unit/distributed_dgp_vertex_migrator.cpp b/tests/unit/distributed_dgp_vertex_migrator.cpp index d68371b99..f556387b0 100644 --- a/tests/unit/distributed_dgp_vertex_migrator.cpp +++ b/tests/unit/distributed_dgp_vertex_migrator.cpp @@ -7,7 +7,7 @@ #include "gtest/gtest.h" #include "distributed/updates_rpc_clients.hpp" -#include "storage/dynamic_graph_partitioner/vertex_migrator.hpp" +#include "distributed/dgp/vertex_migrator.hpp" using namespace distributed; using namespace database; @@ -67,7 +67,7 @@ class DistributedVertexMigratorTest : public DistributedGraphDbTest { void MigrateVertexAndCommit(database::GraphDbAccessor *from_dba, int64_t cypher_id, int to_worker_id) { auto vacc = FindVertex(from_dba, cypher_id); - VertexMigrator migrator(from_dba); + distributed::dgp::VertexMigrator migrator(from_dba); migrator.MigrateVertex(*vacc, to_worker_id); MasterApplyUpdatesAndCommit(from_dba); } @@ -176,7 +176,7 @@ TEST_F(DistributedVertexMigratorTest, MigrationofLabelsEdgeTypesAndProperties) { { auto dba = master().Access(); - VertexMigrator migrator(dba.get()); + distributed::dgp::VertexMigrator migrator(dba.get()); for (auto &vertex : dba->Vertices(false)) { migrator.MigrateVertex(vertex, worker(1).WorkerId()); } diff --git a/tests/unit/distributed_token_sharing.cpp b/tests/unit/distributed_token_sharing.cpp deleted file mode 100644 index f2cff3a51..000000000 --- a/tests/unit/distributed_token_sharing.cpp +++ /dev/null @@ -1,33 +0,0 @@ -#include "distributed_common.hpp" - -#include -#include -#include -#include - -#include "gtest/gtest.h" - -DECLARE_bool(dynamic_graph_partitioner_enabled); -DECLARE_int32(dgp_max_batch_size); - -using namespace distributed; -using namespace database; - -class TokenSharingTest : public DistributedGraphDbTest { - void SetUp() override { - FLAGS_dynamic_graph_partitioner_enabled = true; - FLAGS_dgp_max_batch_size = 1; - DistributedGraphDbTest::SetUp(); - } -}; - -TEST_F(TokenSharingTest, Integration) { - auto vb = InsertVertex(worker(1)); - for (int i = 0; i < 100; ++i) { - auto v = InsertVertex(master()); - InsertEdge(vb, v, "edge"); - } - std::this_thread::sleep_for(std::chrono::seconds(3)); - // Migrate at least something from or to here - EXPECT_NE(VertexCount(master()), 100); -}