Add dgp integration test

Reviewers: msantl, mferencevic

Reviewed By: msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1586
This commit is contained in:
Marko Budiselic 2018-09-07 18:45:09 +01:00
parent 96ece11cdd
commit 8a3f3b6c88
17 changed files with 657 additions and 125 deletions

View File

@ -13,10 +13,10 @@ digraph {
"distributed::TokenSharingRpcServer" -> "communication::rpc::Server";
"distributed::TokenSharingRpcServer" -> "distributed::Coordination";
"distributed::TokenSharingRpcServer" -> "distributed::TokenSharingRpcClients";
"distributed::TokenSharingRpcServer" -> "storage::dgp::Partitioner";
"distributed::TokenSharingRpcServer" -> "distributed::dgp::Partitioner";
"storage::dgp::Partitioner" -> "distributed::DistributedGraphDb" [style=dashed];
"distributed::dgp::Partitioner" -> "distributed::DistributedGraphDb" [style=dashed];
"storage::dgp::Partitioner" -> "storage::dgp::VertexMigrator";
"storage::dgp::VertexMigrator" -> "database::GraphDbAccessor" [style=dashed];
"distributed::dgp::Partitioner" -> "distributed::dgp::VertexMigrator";
"distributed::dgp::VertexMigrator" -> "database::GraphDbAccessor" [style=dashed];
}

View File

@ -29,6 +29,8 @@ set(memgraph_src_files
distributed/data_manager.cpp
distributed/data_rpc_clients.cpp
distributed/data_rpc_server.cpp
distributed/dgp/partitioner.cpp
distributed/dgp/vertex_migrator.cpp
distributed/durability_rpc_master.cpp
distributed/durability_rpc_worker.cpp
distributed/index_rpc_server.cpp
@ -65,8 +67,6 @@ set(memgraph_src_files
query/typed_value.cpp
storage/concurrent_id_mapper_master.cpp
storage/concurrent_id_mapper_worker.cpp
storage/dynamic_graph_partitioner/dgp.cpp
storage/dynamic_graph_partitioner/vertex_migrator.cpp
storage/edge_accessor.cpp
storage/locking/record_lock.cpp
storage/property_value.cpp

View File

@ -68,6 +68,9 @@ DEFINE_VALIDATED_int32(recovering_cluster_size, 0,
"Number of workers (including master) in the "
"previously snapshooted/wal cluster.",
FLAG_IN_RANGE(0, INT32_MAX));
// TODO (buda): Implement openCypher query because it completely make sense
// to being able to start and stop DGP on the fly.
// The implementation should be straightforward.
DEFINE_bool(dynamic_graph_partitioner_enabled, false,
"If the dynamic graph partitioner should be enabled.");
#endif

View File

@ -687,7 +687,7 @@ Master::Master(Config config)
// Start the dynamic graph partitioner inside token sharing server
if (impl_->config_.dynamic_graph_partitioner_enabled) {
impl_->token_sharing_server_.StartTokenSharing();
impl_->token_sharing_server_.Start();
}
if (impl_->config_.durability_enabled) {

View File

@ -1,4 +1,4 @@
#include "storage/dynamic_graph_partitioner/dgp.hpp"
#include "distributed/dgp/partitioner.hpp"
#include <algorithm>
#include <unordered_map>
@ -8,10 +8,11 @@
#include "database/graph_db_accessor.hpp"
#include "distributed/updates_rpc_clients.hpp"
#include "query/exceptions.hpp"
#include "storage/dynamic_graph_partitioner/vertex_migrator.hpp"
#include "distributed/dgp/vertex_migrator.hpp"
#include "utils/flag_validation.hpp"
#include "utils/thread/sync.hpp"
// TODO (buda): Implement openCypher commands to control these parameters.
DEFINE_VALIDATED_int32(
dgp_improvement_threshold, 10,
"How much better should specific node score be to consider "
@ -19,25 +20,28 @@ DEFINE_VALIDATED_int32(
"between new score that the vertex will have when migrated and the old one "
"such that it's migrated.",
FLAG_IN_RANGE(1, 100));
// TODO (buda): The default here should be int_max because that will allow us to
// partition large dataset faster. It should be used for our tests where we can
// run the partitioning up front.
DEFINE_VALIDATED_int32(dgp_max_batch_size, 2000,
"Maximal amount of vertices which should be migrated in "
"one dynamic graph partitioner step.",
FLAG_IN_RANGE(1, std::numeric_limits<int32_t>::max()));
DynamicGraphPartitioner::DynamicGraphPartitioner(
database::DistributedGraphDb *db)
: db_(db) {}
namespace distributed::dgp {
void DynamicGraphPartitioner::Run() {
Partitioner::Partitioner(database::DistributedGraphDb *db) : db_(db) {}
std::pair<double, bool> Partitioner::Partition() {
auto dba = db_->Access();
VLOG(21) << "Starting DynamicGraphPartitioner in tx: "
<< dba->transaction().id_;
auto migrations = FindMigrations(*dba);
auto data = FindMigrations(*dba);
try {
VertexMigrator migrator(dba.get());
for (auto &migration : migrations) {
for (auto &migration : data.migrations) {
migrator.MigrateVertex(migration.first, migration.second);
}
@ -63,19 +67,25 @@ void DynamicGraphPartitioner::Run() {
}
dba->Commit();
VLOG(21) << "Sucesfully migrated " << migrations.size() << " vertices..";
VLOG(21) << "Sucesfully migrated " << data.migrations.size()
<< " vertices..";
return std::make_pair(data.score, true);
} catch (const utils::BasicException &e) {
VLOG(21) << "Didn't succeed in relocating; " << e.what();
dba->Abort();
// Returning VertexAccessors after Abort might not be a good idea. + The
// returned migrations are entirely useless because the engine didn't
// succeed to migrate anything.
return std::make_pair(data.score, false);
}
}
std::vector<std::pair<VertexAccessor, int>>
DynamicGraphPartitioner::FindMigrations(database::GraphDbAccessor &dba) {
MigrationsData Partitioner::FindMigrations(database::GraphDbAccessor &dba) {
// Find workers vertex count
std::unordered_map<int, int64_t> worker_vertex_count =
db_->data_clients().VertexCounts(dba.transaction().id_);
// TODO (buda): Add total edge count as an option.
int64_t total_vertex_count = 0;
for (auto worker_vertex_count_pair : worker_vertex_count) {
total_vertex_count += worker_vertex_count_pair.second;
@ -83,6 +93,9 @@ DynamicGraphPartitioner::FindMigrations(database::GraphDbAccessor &dba) {
double average_vertex_count =
total_vertex_count * 1.0 / worker_vertex_count.size();
if (average_vertex_count == 0) return MigrationsData(0);
double local_graph_score = 0;
// Considers all migrations which maximally improve single vertex score
std::vector<std::pair<VertexAccessor, int>> migrations;
@ -90,7 +103,7 @@ DynamicGraphPartitioner::FindMigrations(database::GraphDbAccessor &dba) {
auto label_counts = CountLabels(vertex);
std::unordered_map<int, double> per_label_score;
size_t degree = vertex.in_degree() + vertex.out_degree();
if (degree == 0) continue;
for (auto worker_vertex_count_pair : worker_vertex_count) {
int worker = worker_vertex_count_pair.first;
int64_t worker_vertex_count = worker_vertex_count_pair.second;
@ -103,9 +116,12 @@ DynamicGraphPartitioner::FindMigrations(database::GraphDbAccessor &dba) {
const std::pair<int, double> &p2) {
return p1.second < p2.second;
};
auto best_label = std::max_element(per_label_score.begin(),
per_label_score.end(), label_cmp);
local_graph_score += best_label->second;
// Consider as a migration only if the improvement is high enough
if (best_label != per_label_score.end() &&
best_label->first != db_->WorkerId() &&
@ -118,10 +134,12 @@ DynamicGraphPartitioner::FindMigrations(database::GraphDbAccessor &dba) {
if (migrations.size() >= FLAGS_dgp_max_batch_size) break;
}
return migrations;
DLOG(INFO) << "Local graph score: " << local_graph_score;
return MigrationsData(local_graph_score, std::move(migrations));
}
std::unordered_map<int, int64_t> DynamicGraphPartitioner::CountLabels(
std::unordered_map<int, int64_t> Partitioner::CountLabels(
const VertexAccessor &vertex) const {
std::unordered_map<int, int64_t> label_count;
for (auto edge : vertex.in()) {
@ -136,3 +154,4 @@ std::unordered_map<int, int64_t> DynamicGraphPartitioner::CountLabels(
}
return label_count;
}
} // namespace distributed::dgp

View File

@ -0,0 +1,89 @@
/// @file
#pragma once
#include <thread>
#include "distributed/data_rpc_clients.hpp"
#include "distributed/token_sharing_rpc_messages.hpp"
#include "distributed/dgp/vertex_migrator.hpp"
#include "storage/vertex_accessor.hpp"
namespace database {
class DistributedGraphDb;
class GraphDbAccessor;
}; // namespace database
namespace distributed::dgp {
/// Contains a set of vertices and where they should be migrated
/// (machine/instance id) + score how good the partitioning is.
struct MigrationsData {
private:
using Migrations = std::vector<std::pair<VertexAccessor, int>>;
public:
MigrationsData(double score, Migrations migrations = Migrations())
: score(std::move(score)), migrations(std::move(migrations)) {}
/// Disable copying because the number of migrations could be huge. The
/// expected number is 1k, but a user can configure the database in a way
/// where the number of migrations could be much higher.
MigrationsData(const MigrationsData &other) = delete;
MigrationsData &operator=(const MigrationsData &other) = delete;
MigrationsData(MigrationsData &&other) = default;
MigrationsData &operator=(MigrationsData &&other) = default;
double score;
Migrations migrations;
};
/// Handles dynamic graph partitions, migrates vertices from one worker to
/// another based on available scoring which takes into account neighbours of a
/// vertex and tries to put it where most of its neighbours are located. Also
/// takes into account the number of vertices on the destination and source
/// machine.
class Partitioner {
public:
/// The partitioner needs GraphDb because each partition step is a new
/// database transactions (database accessor has to be created).
/// TODO (buda): Consider passing GraphDbAccessor directly.
explicit Partitioner(database::DistributedGraphDb *db);
Partitioner(const Partitioner &other) = delete;
Partitioner(Partitioner &&other) = delete;
Partitioner &operator=(const Partitioner &other) = delete;
Partitioner &operator=(Partitioner &&other) = delete;
/// Runs one dynamic graph partitioning cycle (step). In case of any error,
/// the transaction will be aborted.
///
/// @return Calculated partitioning score and were the migrations successful.
std::pair<double, bool> Partition();
/// Returns a vector of pairs of `vertex` and `destination` of where should
/// some vertex be relocated from the view of `dba` accessor.
//
/// Each vertex is located on some worker (which in context of migrations we
/// call a vertex label). Each vertex has it's score for each different label
/// (worker_id) evaluated. This score is calculated by considering
/// neighbouring vertices labels. Simply put, each vertex is attracted to be
/// located on the same worker as it's neighbouring vertices. Migrations which
/// improve that scoring, which also takes into account saturation of other
/// workers on which it's considering to migrate this vertex, are determined.
MigrationsData FindMigrations(database::GraphDbAccessor &dba);
/// Counts number of each label (worker_id) on endpoints of edges (in/out) of
/// `vertex`.
///
/// @return A map consisting of (label/machine/instance id, count) key-value
/// pairs.
std::unordered_map<int, int64_t> CountLabels(
const VertexAccessor &vertex) const;
private:
database::DistributedGraphDb *db_{nullptr};
};
} // namespace distributed::dgp

View File

@ -1,9 +1,11 @@
#include "storage/dynamic_graph_partitioner/vertex_migrator.hpp"
#include "distributed/dgp/vertex_migrator.hpp"
#include "database/distributed_graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "query/typed_value.hpp"
namespace distributed::dgp {
VertexMigrator::VertexMigrator(database::GraphDbAccessor *dba) : dba_(dba) {}
void VertexMigrator::MigrateVertex(VertexAccessor &vertex, int destination) {
@ -57,3 +59,4 @@ void VertexMigrator::MigrateVertex(VertexAccessor &vertex, int destination) {
dba_->DetachRemoveVertex(vertex);
}
} // namespace distributed::dgp

View File

@ -1,3 +1,5 @@
/// @file
#pragma once
#include <thread>
@ -10,6 +12,8 @@ namespace database {
class GraphDbAccessor;
}; // namespace database
namespace distributed::dgp {
/// Migrates vertices from one worker to another (updates edges as well).
class VertexMigrator {
public:
@ -29,3 +33,5 @@ class VertexMigrator {
database::GraphDbAccessor *dba_;
std::unordered_map<gid::Gid, storage::VertexAddress> vertex_migrated_to_;
};
} // namespace distributed::dgp

View File

@ -1,7 +1,9 @@
/// @file
#pragma once
#include "distributed/rpc_worker_clients.hpp"
#include "storage/dynamic_graph_partitioner/dgp.hpp"
#include "distributed/dgp/partitioner.hpp"
namespace communication::rpc {
class Server;
@ -13,6 +15,12 @@ class DistributedGraphDb;
namespace distributed {
// TODO (buda): dgp_.Run() should be injected. This server shouldn't know
// anything about the partitioning.
// TODO (buda): It makes more sense to have centralized server which will assign
// tokens because error handling would be much easier.
// TODO (buda): Broken by design.
/// Shares the token between dynamic graph partitioners instances across workers
/// by passing the token from one worker to another, in a circular fashion. This
/// guarantees that no two workers will execute the dynamic graph partitioner
@ -30,10 +38,20 @@ class TokenSharingRpcServer {
dgp_(db) {
server_->Register<distributed::TokenTransferRpc>(
[this](const auto &req_reader, auto *res_builder) { token_ = true; });
// TODO (buda): It's not trivial to move this part in the Start method
// because worker then doesn't run the step. Will resolve that with
// a different implementation of the token assignment.
runner_ = std::thread([this]() {
while (true) {
// Wait till we get the token
while (!shutting_down_) {
// If no other instances are connected just wait. It doesn't make sense
// to migrate anything because only one machine is available.
auto workers = coordination_->GetWorkerIds();
if (!(workers.size() > 1)) {
std::this_thread::sleep_for(std::chrono::seconds(1));
continue;
}
// Wait till we get the token.
while (!token_) {
if (shutting_down_) break;
std::this_thread::sleep_for(std::chrono::seconds(1));
@ -42,10 +60,9 @@ class TokenSharingRpcServer {
if (shutting_down_) break;
token_ = false;
dgp_.Run();
dgp_.Partition();
// Transfer token to next
auto workers = coordination_->GetWorkerIds();
// Transfer token to next.
sort(workers.begin(), workers.end());
int next_worker = -1;
@ -63,7 +80,7 @@ class TokenSharingRpcServer {
/// Starts the token sharing server which in turn starts the dynamic graph
/// partitioner.
void StartTokenSharing() {
void Start() {
started_ = true;
token_ = true;
}
@ -74,9 +91,9 @@ class TokenSharingRpcServer {
if (started_ && worker_id_ == 0) {
// Wait till we get the token back otherwise some worker might try to
// migrate to another worker while that worker is shutting down or
// something else bad might happen
// TODO(dgleich): Solve this better in the future since this blocks
// shutting down until spinner steps complete
// something else bad might happen.
// TODO (buda): Solve this better in the future since this blocks
// shutting down until spinner steps complete.
while (!token_) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
@ -94,7 +111,7 @@ class TokenSharingRpcServer {
std::atomic<bool> shutting_down_{false};
std::thread runner_;
DynamicGraphPartitioner dgp_;
distributed::dgp::Partitioner dgp_;
};
} // namespace distributed

View File

@ -1,54 +0,0 @@
#pragma once
#include <thread>
#include "distributed/data_rpc_clients.hpp"
#include "distributed/token_sharing_rpc_messages.hpp"
#include "storage/dynamic_graph_partitioner/vertex_migrator.hpp"
#include "storage/vertex_accessor.hpp"
namespace database {
class DistributedGraphDb;
class GraphDbAccessor;
}; // namespace database
/// Handles dynamic graph partitions, migrates vertices from one worker to
/// another based on available scoring which takes into account neighbours of a
/// vertex and tries to put it where most of its neighbours are located. Also
/// takes into account the number of vertices on the destination and source
/// machine.
class DynamicGraphPartitioner {
public:
DynamicGraphPartitioner(const DynamicGraphPartitioner &other) = delete;
DynamicGraphPartitioner(DynamicGraphPartitioner &&other) = delete;
DynamicGraphPartitioner &operator=(const DynamicGraphPartitioner &other) =
delete;
DynamicGraphPartitioner &operator=(DynamicGraphPartitioner &&other) = delete;
explicit DynamicGraphPartitioner(database::DistributedGraphDb *db);
/// Runs one dynamic graph partitioning cycle (step).
void Run();
/// Returns a vector of pairs of `vertex` and `destination` of where should
/// some vertex be relocated from the view of `dba` accessor.
//
/// Each vertex is located on some worker (which in context of migrations we
/// call a vertex label). Each vertex has it's score for each different label
/// (worker_id) evaluated. This score is calculated by considering
/// neighbouring vertices labels. Simply put, each vertex is attracted to be
/// located on the same worker as it's neighbouring vertices. Migrations which
/// improve that scoring, which also takes into account saturation of other
/// workers on which it's considering to migrate this vertex, are determined.
std::vector<std::pair<VertexAccessor, int>> FindMigrations(
database::GraphDbAccessor &dba);
/// Counts number of each label (worker_id) on endpoints of edges (in/out) of
/// `vertex`.
/// Returns a map consisting of (label, count) key-value pairs.
std::unordered_map<int, int64_t> CountLabels(
const VertexAccessor &vertex) const;
private:
database::DistributedGraphDb *db_{nullptr};
};

1
tests/integration/dgp/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
output/

222
tests/integration/dgp/run.py Executable file
View File

@ -0,0 +1,222 @@
#!/usr/bin/python3
'''
Test dynamic graph partitioner on Memgraph cluster
with randomly generated graph (uniform distribution
of nodes and edges). The partitioning goal is to
minimize number of crossing edges while keeping
the cluster balanced.
'''
import argparse
import atexit
import logging
import os
import random
import sys
import subprocess
import time
try:
# graphviz==0.9
from graphviz import Digraph
except ImportError:
print("graphviz module isn't available. "
"Graph won't be generated but the checks will still work.")
from neo4j.v1 import GraphDatabase, basic_auth
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", ".."))
COMMON_ARGS = ["--durability-enabled=false",
"--snapshot-on-exit=false",
"--db-recover-on-startup=false"]
MASTER_ARGS = ["--master",
"--master-port", "10000",
"--dynamic-graph-partitioner-enabled",
"--durability-directory=durability_master"]
log = logging.getLogger(__name__)
memgraph_processes = []
def worker_args(worker_id):
args = ["--worker",
"--worker-id", str(worker_id),
"--worker-port", str(10000 + worker_id),
"--master-port", str(10000),
"--durability-directory=durability_worker%s" % worker_id]
return args
def wait_for_server(port, delay=0.01):
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", port]
while subprocess.call(cmd) != 0:
time.sleep(delay)
time.sleep(delay)
def run_memgraph_process(binary, args):
global memgraph_processes
process = subprocess.Popen([binary] + args, cwd=os.path.dirname(binary))
memgraph_processes.append(process)
def run_distributed_memgraph(args):
run_memgraph_process(args.memgraph, COMMON_ARGS + MASTER_ARGS)
wait_for_server("10000")
for i in range(1, int(args.machine_count)):
run_memgraph_process(args.memgraph, COMMON_ARGS + worker_args(i))
wait_for_server("7687")
def terminate():
global memgraph_processes
for process in memgraph_processes:
process.terminate()
status = process.wait()
if status != 0:
raise Exception(
"Memgraph binary returned non-zero ({})!".format(status))
@atexit.register
def cleanup():
global memgraph_processes
for proc in memgraph_processes:
if proc.poll() is not None:
continue
proc.kill()
proc.wait()
def run_test(args):
driver = GraphDatabase.driver(args.endpoint,
auth=basic_auth(args.username,
args.password),
encrypted=args.encrypted)
session = driver.session()
session.run("CREATE INDEX ON :Node(id)").consume()
session.run(
"UNWIND range(0, $num - 1) AS n CREATE (:Node {id: n})",
num=args.node_count - 1).consume()
created_edges = 0
while created_edges <= args.edge_count:
# Retry block is here because DGP is running in background and
# serialization errors may occure.
try:
session.run("MATCH (n:Node {id: $id1}), (m:Node {id:$id2}) "
"CREATE (n)-[:Edge]->(m)",
id1=random.randint(0, args.node_count - 1),
id2=random.randint(0, args.node_count - 1)).consume()
created_edges += 1
except Exception:
pass
# Check cluster state periodically.
crossing_edges_history = []
load_history = []
duration = 0
for iteration in range(args.iteration_count):
iteration_start_time = time.time()
data = session.run(
"MATCH (n)-[r]->(m) "
"RETURN "
" id(n) AS v1, id(m) AS v2, "
" workerid(n) AS position_n, workerid(m) AS position_m").data()
# Visualize cluster state.
if args.visualize and 'graphviz' in sys.modules:
output_dir = os.path.join(SCRIPT_DIR, "output")
if not os.path.exists(output_dir):
os.makedirs(output_dir)
cluster = Digraph(name="memgraph_cluster", format="png")
cluster.attr(splines="false", rank="TB")
subgraphs = [Digraph(name="cluster_worker%s" % i)
for i in range(args.machine_count)]
for index, subgraph in enumerate(subgraphs):
subgraph.attr(label="worker%s" % index)
edges = []
for row in data:
# start_id end_id machine_id
edges.append((row["v1"], row["v2"], row["position_n"]))
edges = sorted(edges, key=lambda x: x[0])
for edge in edges:
# machine_id start_id end_id
subgraphs[edge[2]].edge(str(edge[0]), str(edge[1]))
for subgraph in subgraphs:
cluster.subgraph(subgraph)
cluster.render("output/iteration_%s" % iteration)
# Collect data.
num_of_crossing_edges = 0
load = [0] * args.machine_count
for edge in data:
src_worker = int(edge["position_n"])
dst_worker = int(edge["position_m"])
if src_worker != dst_worker:
num_of_crossing_edges = num_of_crossing_edges + 1
load[src_worker] = load[src_worker] + 1
crossing_edges_history.append(num_of_crossing_edges)
load_history.append(load)
iteration_delta_time = time.time() - iteration_start_time
duration += iteration_delta_time
log.info("Number of crossing edges %s" % num_of_crossing_edges)
log.info("Cluster load %s" % load)
# Wait for DGP a bit.
if iteration_delta_time < args.iteration_interval:
time.sleep(args.iteration_interval - iteration_delta_time)
duration += args.iteration_interval
else:
duration += iteration_delta_time
# TODO (buda): Somehow align with DGP turns. Replace runtime param with
# the query.
assert crossing_edges_history[-1] < crossing_edges_history[0], \
"Number of crossing edges is equal or bigger."
for machine in range(args.machine_count):
assert load_history[-1][machine] > 0, "Machine %s is empty." % machine
log.info("Config")
log.info(" Machines: %s" % args.machine_count)
log.info(" Nodes: %s" % args.node_count)
log.info(" Edges: %s" % args.edge_count)
log.info("Start")
log.info(" Crossing Edges: %s" % crossing_edges_history[0])
log.info(" Cluster Load: %s" % load_history[0])
log.info("End")
log.info(" Crossing Edges: %s" % crossing_edges_history[-1])
log.info(" Cluster Load: %s" % load_history[-1])
log.info("Stats")
log.info(" Duration: %ss" % duration)
session.close()
driver.close()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
memgraph_binary = os.path.join(PROJECT_DIR, "build", "memgraph")
if not os.path.exists(memgraph_binary):
memgraph_binary = os.path.join(PROJECT_DIR, "build_debug", "memgraph")
parser = argparse.ArgumentParser()
parser.add_argument("--memgraph", default=memgraph_binary)
parser.add_argument("--endpoint", type=str,
default="bolt://localhost:7687")
parser.add_argument("--username", type=str, default="")
parser.add_argument("--password", type=str, default="")
parser.add_argument("--encrypted", type=bool, default=False)
parser.add_argument("--visualize", type=bool, default=False)
parser.add_argument("--machine-count", type=int, default=3)
parser.add_argument("--node-count", type=int, default=1000000)
parser.add_argument("--edge-count", type=int, default=1000000)
parser.add_argument("--iteration-count", type=int, default=10)
parser.add_argument("--iteration-interval", type=float, default=5)
args = parser.parse_args()
run_distributed_memgraph(args)
run_test(args)
terminate()

View File

@ -94,6 +94,13 @@ target_link_libraries(${test_prefix}distributed_serialization memgraph_lib kvsto
add_unit_test(distributed_updates.cpp)
target_link_libraries(${test_prefix}distributed_updates memgraph_lib kvstore_dummy_lib)
# TODO (buda): Replace token sharing with centralized solution and write an appropriate test.
# add_unit_test(distributed_token_sharing.cpp)
# target_link_libraries(${test_prefix}distributed_token_sharing memgraph_lib kvstore_dummy_lib)
add_unit_test(distributed_dgp_partitioner.cpp)
target_link_libraries(${test_prefix}distributed_dgp_partitioner memgraph_lib kvstore_dummy_lib)
add_unit_test(durability.cpp)
target_link_libraries(${test_prefix}durability memgraph_lib kvstore_dummy_lib)

View File

@ -0,0 +1,252 @@
#include "distributed_common.hpp"
#include <memory>
#include <thread>
#include <unordered_set>
#include <vector>
#include "gtest/gtest.h"
#include "distributed/dgp/partitioner.hpp"
#include "distributed/updates_rpc_clients.hpp"
using namespace distributed;
using namespace database;
DECLARE_int32(dgp_max_batch_size);
class DistributedDynamicGraphPartitionerTest : public DistributedGraphDbTest {
public:
DistributedDynamicGraphPartitionerTest()
: DistributedGraphDbTest("dynamic_graph_partitioner") {}
void LogClusterState() {
LOG(INFO) << "master_v: " << VertexCount(master())
<< " master_e: " << EdgeCount(master());
LOG(INFO) << "worker1_v: " << VertexCount(worker(1))
<< " worker1_e: " << EdgeCount(worker(1));
LOG(INFO) << "worker2_v: " << VertexCount(worker(2))
<< " worker2_e: " << EdgeCount(worker(2));
}
};
TEST_F(DistributedDynamicGraphPartitionerTest, CountLabels) {
auto va = InsertVertex(master());
auto vb = InsertVertex(worker(1));
auto vc = InsertVertex(worker(2));
for (int i = 0; i < 2; ++i) InsertEdge(va, va, "edge");
for (int i = 0; i < 3; ++i) InsertEdge(va, vb, "edge");
for (int i = 0; i < 4; ++i) InsertEdge(va, vc, "edge");
for (int i = 0; i < 5; ++i) InsertEdge(vb, va, "edge");
for (int i = 0; i < 6; ++i) InsertEdge(vc, va, "edge");
distributed::dgp::Partitioner dgp(&master());
auto dba = master().Access();
VertexAccessor v(va, *dba);
auto count_labels = dgp.CountLabels(v);
// Self loops counted twice
EXPECT_EQ(count_labels[master().WorkerId()], 2 * 2);
EXPECT_EQ(count_labels[worker(1).WorkerId()], 3 + 5);
EXPECT_EQ(count_labels[worker(2).WorkerId()], 4 + 6);
}
TEST_F(DistributedDynamicGraphPartitionerTest, FindMigrationsMoveVertex) {
auto va = InsertVertex(master());
auto vb = InsertVertex(worker(1));
// Balance the number of nodes on workers a bit
InsertVertex(worker(2));
InsertVertex(worker(2));
for (int i = 0; i < 100; ++i) InsertEdge(va, vb, "edge");
distributed::dgp::Partitioner dgp(&master());
auto dba = master().Access();
auto data = dgp.FindMigrations(*dba);
// Expect `va` to try to move to another worker, the one connected to it
ASSERT_EQ(data.migrations.size(), 1);
EXPECT_EQ(data.migrations[0].second, worker(1).WorkerId());
}
TEST_F(DistributedDynamicGraphPartitionerTest, FindMigrationsNoChange) {
InsertVertex(master());
InsertVertex(worker(1));
InsertVertex(worker(2));
// Everything is balanced, there should be no movement
distributed::dgp::Partitioner dgp(&master());
auto dba = master().Access();
auto data = dgp.FindMigrations(*dba);
EXPECT_EQ(data.migrations.size(), 0);
}
TEST_F(DistributedDynamicGraphPartitionerTest, FindMigrationsMultipleAndLimit) {
auto va = InsertVertex(master());
auto vb = InsertVertex(master());
auto vc = InsertVertex(worker(1));
// Balance the number of nodes on workers a bit
InsertVertex(worker(1));
InsertVertex(worker(2));
InsertVertex(worker(2));
for (int i = 0; i < 100; ++i) InsertEdge(va, vc, "edge");
for (int i = 0; i < 100; ++i) InsertEdge(vb, vc, "edge");
distributed::dgp::Partitioner dgp(&master());
auto dba = master().Access();
{
auto data = dgp.FindMigrations(*dba);
// Expect vertices to try to move to another worker
ASSERT_EQ(data.migrations.size(), 2);
}
// See if flag affects number of returned results
{
FLAGS_dgp_max_batch_size = 1;
auto data = dgp.FindMigrations(*dba);
// Expect vertices to try to move to another worker
ASSERT_EQ(data.migrations.size(), 1);
}
}
TEST_F(DistributedDynamicGraphPartitionerTest, Run) {
// Emulate a bipartite graph with lots of connections on the left, and right
// side, and some connections between the halfs
std::vector<storage::VertexAddress> left;
for (int i = 0; i < 10; ++i) {
left.push_back(InsertVertex(master()));
}
std::vector<storage::VertexAddress> right;
for (int i = 0; i < 10; ++i) {
right.push_back(InsertVertex(master()));
}
// Force the nodes of both sides to stay on one worker by inserting a lot of
// edges in between them
for (int i = 0; i < 1000; ++i) {
InsertEdge(left[rand() % 10], left[rand() % 10], "edge");
InsertEdge(right[rand() % 10], right[rand() % 10], "edge");
}
// Insert edges between left and right side
for (int i = 0; i < 50; ++i)
InsertEdge(left[rand() % 10], right[rand() % 10], "edge");
// Balance it out so that the vertices count on workers don't influence the
// partitioning too much
for (int i = 0; i < 10; ++i) InsertVertex(worker(2));
distributed::dgp::Partitioner dgp(&master());
// Transfer one by one to actually converge
FLAGS_dgp_max_batch_size = 1;
// Try a bit more transfers to see if we reached a steady state
for (int i = 0; i < 15; ++i) {
dgp.Partition();
}
EXPECT_EQ(VertexCount(master()), 10);
EXPECT_EQ(VertexCount(worker(1)), 10);
auto CountRemotes = [](GraphDbAccessor &dba) {
int64_t cnt = 0;
for (auto vertex : dba.Vertices(false)) {
for (auto edge : vertex.in())
if (edge.from_addr().is_remote()) ++cnt;
for (auto edge : vertex.out())
if (edge.to_addr().is_remote()) ++cnt;
}
return cnt;
};
auto dba_m = master().Access();
auto dba_w1 = worker(1).Access();
EXPECT_EQ(CountRemotes(*dba_m), 50);
EXPECT_EQ(CountRemotes(*dba_w1), 50);
}
TEST_F(DistributedDynamicGraphPartitionerTest, Convergence) {
auto seed = std::time(nullptr);
LOG(INFO) << "Seed: " << seed;
std::srand(seed);
// Generate random graph across cluster.
std::vector<storage::VertexAddress> master_vertices;
for (int i = 0; i < 1000; ++i) {
master_vertices.push_back(InsertVertex(master()));
}
std::vector<storage::VertexAddress> worker1_vertices;
for (int i = 0; i < 1000; ++i) {
worker1_vertices.push_back(InsertVertex(worker(1)));
}
std::vector<storage::VertexAddress> worker2_vertices;
for (int i = 0; i < 1000; ++i) {
worker2_vertices.push_back(InsertVertex(worker(2)));
}
// Generate random edges between machines.
for (int i = 0; i < 1000; ++i) {
InsertEdge(master_vertices[rand() % 1000], worker1_vertices[rand() % 1000],
"edge");
InsertEdge(master_vertices[rand() % 1000], worker2_vertices[rand() % 1000],
"edge");
InsertEdge(worker1_vertices[rand() % 1000], master_vertices[rand() % 1000],
"edge");
InsertEdge(worker1_vertices[rand() % 1000], worker2_vertices[rand() % 1000],
"edge");
InsertEdge(worker2_vertices[rand() % 1000], master_vertices[rand() % 1000],
"edge");
InsertEdge(worker2_vertices[rand() % 1000], worker1_vertices[rand() % 1000],
"edge");
}
// Run the partitioning algorithm, after some time it should stop doing
// migrations.
distributed::dgp::Partitioner dgp_master(&master());
std::vector<double> master_scores;
distributed::dgp::Partitioner dgp_worker1(&worker(1));
std::vector<double> worker1_scores;
distributed::dgp::Partitioner dgp_worker2(&worker(2));
std::vector<double> worker2_scores;
FLAGS_dgp_max_batch_size = 10;
for (int i = 0; i < 100; ++i) {
LOG(INFO) << "Iteration: " << i;
auto data_master = dgp_master.Partition();
LOG(INFO) << "Master score: " << data_master.first;
master_scores.push_back(data_master.first);
LogClusterState();
auto data_worker1 = dgp_worker1.Partition();
LOG(INFO) << "Worker1 score: " << data_worker1.first;
worker1_scores.push_back(data_worker1.first);
LogClusterState();
auto data_worker2 = dgp_worker2.Partition();
LOG(INFO) << "Worker2 score: " << data_worker2.first;
worker2_scores.push_back(data_worker2.first);
LogClusterState();
}
// Check that the last N scores from each instance are the same.
int scores_to_validate = 10;
auto score_equality = [](double x, double y) {
return std::abs(x - y) < 10e-1;
};
ASSERT_TRUE(std::all_of(master_scores.end() - scores_to_validate,
master_scores.end(),
[&score_equality, &master_scores](double elem) {
return score_equality(elem, master_scores.back());
}));
ASSERT_TRUE(std::all_of(worker1_scores.end() - scores_to_validate,
worker1_scores.end(),
[&score_equality, &worker1_scores](double elem) {
return score_equality(elem, worker1_scores.back());
}));
ASSERT_TRUE(std::all_of(worker2_scores.end() - scores_to_validate,
worker2_scores.end(),
[&score_equality, &worker2_scores](double elem) {
return score_equality(elem, worker2_scores.back());
}));
}

View File

@ -7,7 +7,7 @@
#include "gtest/gtest.h"
#include "distributed/updates_rpc_clients.hpp"
#include "storage/dynamic_graph_partitioner/vertex_migrator.hpp"
#include "distributed/dgp/vertex_migrator.hpp"
using namespace distributed;
using namespace database;
@ -67,7 +67,7 @@ class DistributedVertexMigratorTest : public DistributedGraphDbTest {
void MigrateVertexAndCommit(database::GraphDbAccessor *from_dba,
int64_t cypher_id, int to_worker_id) {
auto vacc = FindVertex(from_dba, cypher_id);
VertexMigrator migrator(from_dba);
distributed::dgp::VertexMigrator migrator(from_dba);
migrator.MigrateVertex(*vacc, to_worker_id);
MasterApplyUpdatesAndCommit(from_dba);
}
@ -176,7 +176,7 @@ TEST_F(DistributedVertexMigratorTest, MigrationofLabelsEdgeTypesAndProperties) {
{
auto dba = master().Access();
VertexMigrator migrator(dba.get());
distributed::dgp::VertexMigrator migrator(dba.get());
for (auto &vertex : dba->Vertices(false)) {
migrator.MigrateVertex(vertex, worker(1).WorkerId());
}

View File

@ -1,33 +0,0 @@
#include "distributed_common.hpp"
#include <memory>
#include <thread>
#include <unordered_set>
#include <vector>
#include "gtest/gtest.h"
DECLARE_bool(dynamic_graph_partitioner_enabled);
DECLARE_int32(dgp_max_batch_size);
using namespace distributed;
using namespace database;
class TokenSharingTest : public DistributedGraphDbTest {
void SetUp() override {
FLAGS_dynamic_graph_partitioner_enabled = true;
FLAGS_dgp_max_batch_size = 1;
DistributedGraphDbTest::SetUp();
}
};
TEST_F(TokenSharingTest, Integration) {
auto vb = InsertVertex(worker(1));
for (int i = 0; i < 100; ++i) {
auto v = InsertVertex(master());
InsertEdge(vb, v, "edge");
}
std::this_thread::sleep_for(std::chrono::seconds(3));
// Migrate at least something from or to here
EXPECT_NE(VertexCount(master()), 100);
}