Create Dynamic Graph Partitioner,

Summary:
Count labels in DGP

Add worker_id getter

Replace current_worker_id with worker_id

Add spinner

Add rpc calls for worker counts

Check worker capacity

Migrate to new worker

Fix moving two connected vertices

Token sharing algorithm

Reviewers: msantl, buda

Reviewed By: buda

Subscribers: msantl, pullbot

Differential Revision: https://phabricator.memgraph.io/D1392
This commit is contained in:
Dominik Gleich 2018-05-29 11:32:21 +02:00
parent c294127065
commit 1875be1e34
30 changed files with 977 additions and 39 deletions

View File

@ -1,5 +1,15 @@
# Change Log
## v0.11.0
### Breaking Changes
### Major Features and Improvements
* Dynamic graph partitioner added.
### Bug Fixes and Other Changes
## v0.10.0
### Breaking Changes

View File

@ -0,0 +1,48 @@
## Dynamic Graph Partitioning
Memgraph supports dynamic graph partitioning similar to the Spinner algorithm,
mentioned in this paper: [https://arxiv.org/pdf/1404.3861.pdf].
Dgp is useful because it tries to group `local` date on the same worker, i.e.
it tries to keep closely connected data on one worker. It tries to avoid jumps
across workers when querying/traversing the distributed graph.
### Our implementation
It works independently on each worker but it is always running the migration
on only one worker at the same time. It achieves that by sharing a token
between workers, and the token ownership is transferred to the next worker
when the current worker finishes its migration step.
The reason that we want workers to work in disjoint time slots is it avoid
serialization errors caused by creating/removing edges of vertices during
migrations, which might cause an update of some vertex from two or more
different transactions.
### Migrations
For each vertex and workerid (label in the context of Dgp algorithm) we define
a score function. Score function takes into account labels of surrounding
endpoints of vertex edges (in/out) and the capacity of the worker with said
label. Score function loosely looks like this
```
locality(v, l) =
count endpoints of edges of vertex `v` with label `l` / degree of `v`
capacity(l) =
number of vertices on worker `l` divided by the worker capacity
(usually equal to the average number of vertices per worker)
score(v, l) = locality(v, l) - capacity(l)
```
We also define two flags alongside ```dynamic_graph_partitioner_enabled```,
```dgp_improvement_threshold``` and ```dgp_max_batch_size```.
These two flags are used during the migration phase.
When deciding if we need to migrate some vertex `v` from worker `l1` to worker
`l2` we examine the difference in scores, i.e.
if score(v, l1) - dgp_improvement_threshold / 100 < score(v, l2) then we
migrate the vertex.
Max batch size flag limits the number of vertices we can transfer in one batch
(one migration step).
Setting this value to a too large value will probably cause
a lot of interference with client queries, and having it a small value
will slow down convergence of the algorithm.

View File

@ -0,0 +1,4 @@
## Dynamic Graph Partitioner
Memgraph supports dynamic graph partitioning which improves performance on badly partitioned dataset over workers dynamically. To enable it, use the
```--dynamic_graph_partitioner_enabled``` flag.

View File

@ -58,6 +58,8 @@ set(memgraph_src_files
stats/stats.cpp
storage/concurrent_id_mapper_master.cpp
storage/concurrent_id_mapper_worker.cpp
storage/dynamic_graph_partitioner/dgp.cpp
storage/dynamic_graph_partitioner/vertex_migrator.cpp
storage/edge_accessor.cpp
storage/locking/record_lock.cpp
storage/property_value.cpp

View File

@ -72,6 +72,8 @@ BOOST_CLASS_EXPORT(distributed::EdgeRes);
BOOST_CLASS_EXPORT(distributed::VertexReq);
BOOST_CLASS_EXPORT(distributed::VertexRes);
BOOST_CLASS_EXPORT(distributed::TxGidPair);
BOOST_CLASS_EXPORT(distributed::VertexCountReq);
BOOST_CLASS_EXPORT(distributed::VertexCountRes);
// Distributed plan exchange.
BOOST_CLASS_EXPORT(distributed::DispatchPlanReq);
@ -90,6 +92,10 @@ BOOST_CLASS_EXPORT(distributed::BuildIndexReq);
BOOST_CLASS_EXPORT(distributed::BuildIndexRes);
BOOST_CLASS_EXPORT(distributed::IndexLabelPropertyTx);
// Token sharing.
BOOST_CLASS_EXPORT(distributed::TokenTransferReq);
BOOST_CLASS_EXPORT(distributed::TokenTransferRes);
// Stats.
BOOST_CLASS_EXPORT(stats::StatsReq);
BOOST_CLASS_EXPORT(stats::StatsRes);

View File

@ -61,8 +61,10 @@ DEFINE_VALIDATED_HIDDEN_int32(rpc_num_workers,
FLAG_IN_RANGE(1, INT32_MAX));
DEFINE_VALIDATED_int32(recovering_cluster_size, 0,
"Number of workers (including master) in the "
"previously snapshooted/wal cluster",
"previously snapshooted/wal cluster.",
FLAG_IN_RANGE(0, INT32_MAX));
DEFINE_bool(dynamic_graph_partitioner_enabled, false,
"If the dynamic graph partitioner should be enabled.");
#endif
// clang-format off
@ -82,6 +84,7 @@ database::Config::Config()
#ifndef MG_COMMUNITY
,
// Distributed flags.
dynamic_graph_partitioner_enabled{FLAGS_dynamic_graph_partitioner_enabled},
rpc_num_workers{FLAGS_rpc_num_workers},
worker_id{FLAGS_worker_id},
master_endpoint{FLAGS_master_host,

View File

@ -24,6 +24,7 @@
#include "distributed/plan_dispatcher.hpp"
#include "distributed/produce_rpc_server.hpp"
#include "distributed/pull_rpc_clients.hpp"
#include "distributed/token_sharing_rpc_server.hpp"
#include "distributed/transactional_cache_cleaner.hpp"
#include "distributed/updates_rpc_clients.hpp"
#include "distributed/updates_rpc_server.hpp"
@ -47,6 +48,7 @@ namespace impl {
class PrivateBase : public GraphDb {
public:
explicit PrivateBase(const Config &config) : config_(config) {}
virtual ~PrivateBase() {}
const Config config_;
@ -263,6 +265,11 @@ class Master : public PrivateBase {
tx_engine_, updates_server_, data_manager_};
distributed::ClusterDiscoveryMaster cluster_discovery_{server_, coordination_,
rpc_worker_clients_};
distributed::TokenSharingRpcClients token_sharing_clients_{
&rpc_worker_clients_};
distributed::TokenSharingRpcServer token_sharing_server_{
this, config_.worker_id, &coordination_, &server_,
&token_sharing_clients_};
};
class Worker : public PrivateBase {
@ -322,6 +329,11 @@ class Worker : public PrivateBase {
distributed::DurabilityRpcServer durability_rpc_server_{*this, server_};
distributed::ClusterDiscoveryWorker cluster_discovery_{
server_, coordination_, rpc_worker_clients_.GetClientPool(0)};
distributed::TokenSharingRpcClients token_sharing_clients_{
&rpc_worker_clients_};
distributed::TokenSharingRpcServer token_sharing_server_{
this, config_.worker_id, &coordination_, &server_,
&token_sharing_clients_};
};
#undef IMPL_GETTERS
@ -371,6 +383,12 @@ PublicBase::PublicBase(std::unique_ptr<PrivateBase> impl)
}
}
// Start the dynamic graph partitioner inside token sharing server
if (impl_->config_.dynamic_graph_partitioner_enabled) {
dynamic_cast<impl::Master *>(impl_.get())
->token_sharing_server_.StartTokenSharing();
}
break;
case Type::DISTRIBUTED_WORKER:
if (required_recovery_info != recovery_info)

View File

@ -51,6 +51,7 @@ struct Config {
std::vector<std::string> properties_on_disk;
// Distributed master/worker flags.
bool dynamic_graph_partitioner_enabled{false};
int rpc_num_workers{0};
int worker_id{0};
io::network::Endpoint master_endpoint{"0.0.0.0", 0};

View File

@ -36,7 +36,7 @@ std::unordered_map<int, int64_t> BfsRpcClients::CreateBfsSubcursors(
void BfsRpcClients::RegisterSubcursors(
const std::unordered_map<int, int64_t> &subcursor_ids) {
auto futures = clients_->ExecuteOnWorkers<void>(
db_->WorkerId(), [&subcursor_ids](auto &client) {
db_->WorkerId(), [&subcursor_ids](int worker_id, auto &client) {
auto res = client.template Call<RegisterSubcursorsRpc>(subcursor_ids);
CHECK(res) << "RegisterSubcursors RPC failed!";
});

View File

@ -17,7 +17,7 @@ ClusterDiscoveryMaster::ClusterDiscoveryMaster(
if (registration_successful) {
rpc_worker_clients_.ExecuteOnWorkers<void>(
0, [req](communication::rpc::ClientPool &client_pool) {
0, [req](int worker_id, communication::rpc::ClientPool &client_pool) {
auto result = client_pool.Call<ClusterDiscoveryRpc>(
req.desired_worker_id, req.endpoint);
CHECK(result) << "ClusterDiscoveryRpc failed";

View File

@ -1,3 +1,5 @@
#include <unordered_map>
#include "distributed/data_rpc_clients.hpp"
#include "distributed/data_rpc_messages.hpp"
#include "storage/edge.hpp"
@ -16,12 +18,32 @@ std::unique_ptr<Edge> DataRpcClients::RemoteElement(int worker_id,
}
template <>
std::unique_ptr<Vertex> DataRpcClients::RemoteElement(
int worker_id, tx::TransactionId tx_id, gid::Gid gid) {
std::unique_ptr<Vertex> DataRpcClients::RemoteElement(int worker_id,
tx::TransactionId tx_id,
gid::Gid gid) {
auto response =
clients_.GetClientPool(worker_id).Call<VertexRpc>(TxGidPair{tx_id, gid});
CHECK(response) << "VertexRpc failed";
return std::move(response->name_output_);
}
std::unordered_map<int, int64_t> DataRpcClients::VertexCounts(
tx::TransactionId tx_id) {
auto future_results = clients_.ExecuteOnWorkers<std::pair<int, int64_t>>(
-1, [tx_id](int worker_id, communication::rpc::ClientPool &client_pool) {
auto response = client_pool.Call<VertexCountRpc>(tx_id);
CHECK(response) << "VertexCountRpc failed";
return std::make_pair(worker_id, response->member);
});
std::unordered_map<int, int64_t> results;
for (auto &result : future_results) {
auto result_pair = result.get();
int worker = result_pair.first;
int vertex_count = result_pair.second;
results[worker] = vertex_count;
}
return results;
}
} // namespace distributed

View File

@ -17,10 +17,13 @@ class DataRpcClients {
/// That worker must own the vertex/edge for the given id, and that vertex
/// must be visible in given transaction.
template <typename TRecord>
std::unique_ptr<TRecord> RemoteElement(int worker_id,
tx::TransactionId tx_id,
std::unique_ptr<TRecord> RemoteElement(int worker_id, tx::TransactionId tx_id,
gid::Gid gid);
/// Returns (worker_id, vertex_count) for each worker and the number of
/// vertices on it from the perspective of transaction `tx_id`.
std::unordered_map<int, int64_t> VertexCounts(tx::TransactionId tx_id);
private:
RpcWorkerClients &clients_;
};

View File

@ -61,8 +61,12 @@ MAKE_RESPONSE(Edge, edge)
RPC_SINGLE_MEMBER_MESSAGE(VertexReq, TxGidPair);
RPC_SINGLE_MEMBER_MESSAGE(EdgeReq, TxGidPair);
RPC_SINGLE_MEMBER_MESSAGE(VertexCountReq, tx::TransactionId);
RPC_SINGLE_MEMBER_MESSAGE(VertexCountRes, int64_t);
using VertexRpc = communication::rpc::RequestResponse<VertexReq, VertexRes>;
using EdgeRpc = communication::rpc::RequestResponse<EdgeReq, EdgeRes>;
using VertexCountRpc =
communication::rpc::RequestResponse<VertexCountReq, VertexCountRes>;
} // namespace distributed

View File

@ -24,6 +24,13 @@ DataRpcServer::DataRpcServer(database::GraphDb &db,
CHECK(edge.GetOld()) << "Old record must exist when sending edge by RPC";
return std::make_unique<EdgeRes>(edge.GetOld(), db_.WorkerId());
});
rpc_server_.Register<VertexCountRpc>([this](const VertexCountReq &req) {
database::GraphDbAccessor dba(db_, req.member);
int64_t size = 0;
for (auto vertex : dba.Vertices(false)) ++size;
return std::make_unique<VertexCountRes>(size);
});
}
} // namespace distributed

View File

@ -8,7 +8,7 @@ namespace distributed {
utils::Future<bool> DurabilityRpcClients::MakeSnapshot(tx::TransactionId tx) {
return utils::make_future(std::async(std::launch::async, [this, tx] {
auto futures = clients_.ExecuteOnWorkers<bool>(
0, [tx](communication::rpc::ClientPool &client_pool) {
0, [tx](int worker_id, communication::rpc::ClientPool &client_pool) {
auto res = client_pool.Call<MakeSnapshotRpc>(tx);
if (res == nullptr) return false;
return res->member;

View File

@ -8,8 +8,8 @@ void PlanDispatcher::DispatchPlan(
int64_t plan_id, std::shared_ptr<query::plan::LogicalOperator> plan,
const SymbolTable &symbol_table) {
auto futures = clients_.ExecuteOnWorkers<void>(
0, [plan_id, plan,
symbol_table](communication::rpc::ClientPool &client_pool) {
0, [plan_id, plan, symbol_table](
int worker_id, communication::rpc::ClientPool &client_pool) {
auto result =
client_pool.Call<DistributedPlanRpc>(plan_id, plan, symbol_table);
CHECK(result) << "DistributedPlanRpc failed";
@ -22,7 +22,7 @@ void PlanDispatcher::DispatchPlan(
void PlanDispatcher::RemovePlan(int64_t plan_id) {
auto futures = clients_.ExecuteOnWorkers<void>(
0, [plan_id](communication::rpc::ClientPool &client_pool) {
0, [plan_id](int worker_id, communication::rpc::ClientPool &client_pool) {
auto result = client_pool.Call<RemovePlanRpc>(plan_id);
CHECK(result) << "Failed to remove plan from worker";
});

View File

@ -14,7 +14,7 @@ utils::Future<PullData> PullRpcClients::Pull(
int batch_size) {
return clients_.ExecuteOnWorker<PullData>(
worker_id, [&dba, plan_id, command_id, params, symbols, accumulate,
batch_size](ClientPool &client_pool) {
batch_size](int worker_id, ClientPool &client_pool) {
auto result = client_pool.Call<PullRpc>(
dba.transaction_id(), dba.transaction().snapshot(), plan_id,
command_id, params, symbols, accumulate, batch_size, true, true);
@ -63,10 +63,11 @@ utils::Future<PullData> PullRpcClients::Pull(
std::vector<utils::Future<void>>
PullRpcClients::NotifyAllTransactionCommandAdvanced(tx::TransactionId tx_id) {
return clients_.ExecuteOnWorkers<void>(0, [tx_id](auto &client) {
auto res = client.template Call<TransactionCommandAdvancedRpc>(tx_id);
CHECK(res) << "TransactionCommandAdvanceRpc failed";
});
return clients_.ExecuteOnWorkers<void>(
0, [tx_id](int worker_id, auto &client) {
auto res = client.template Call<TransactionCommandAdvancedRpc>(tx_id);
CHECK(res) << "TransactionCommandAdvanceRpc failed";
});
}
} // namespace distributed

View File

@ -7,6 +7,7 @@
#include "communication/rpc/client_pool.hpp"
#include "distributed/coordination.hpp"
#include "distributed/index_rpc_messages.hpp"
#include "distributed/token_sharing_rpc_messages.hpp"
#include "distributed/transactional_cache_cleaner_rpc_messages.hpp"
#include "storage/types.hpp"
#include "transactions/transaction.hpp"
@ -48,9 +49,10 @@ class RpcWorkerClients {
template <typename TResult>
auto ExecuteOnWorker(
int worker_id,
std::function<TResult(communication::rpc::ClientPool &)> execute) {
std::function<TResult(int worker_id, communication::rpc::ClientPool &)>
execute) {
auto &client_pool = GetClientPool(worker_id);
return thread_pool_.Run(execute, std::ref(client_pool));
return thread_pool_.Run(execute, worker_id, std::ref(client_pool));
}
/** Asynchroniously executes the `execute` function on all worker rpc clients
@ -59,7 +61,8 @@ class RpcWorkerClients {
template <typename TResult>
auto ExecuteOnWorkers(
int skip_worker_id,
std::function<TResult(communication::rpc::ClientPool &)> execute) {
std::function<TResult(int worker_id, communication::rpc::ClientPool &)>
execute) {
std::vector<utils::Future<TResult>> futures;
for (auto &worker_id : coordination_.GetWorkerIds()) {
if (worker_id == skip_worker_id) continue;
@ -68,20 +71,6 @@ class RpcWorkerClients {
return futures;
}
template <typename TResult>
auto ExecuteOnWorkers(
int skip_worker_id,
std::function<TResult(int, communication::rpc::ClientPool &)> execute) {
std::vector<utils::Future<TResult>> futures;
for (auto &worker_id : coordination_.GetWorkerIds()) {
if (worker_id == skip_worker_id) continue;
auto &client_pool = GetClientPool(worker_id);
futures.emplace_back(
thread_pool_.Run(execute, worker_id, std::ref(client_pool)));
}
return futures;
}
private:
// TODO make Coordination const, it's member GetEndpoint must be const too.
Coordination &coordination_;
@ -100,8 +89,9 @@ class IndexRpcClients {
const storage::Property &property,
tx::TransactionId transaction_id, int worker_id) {
return clients_.ExecuteOnWorkers<bool>(
worker_id, [label, property, transaction_id](
communication::rpc::ClientPool &client_pool) {
worker_id,
[label, property, transaction_id](
int worker_id, communication::rpc::ClientPool &client_pool) {
return client_pool.Call<BuildIndexRpc>(
distributed::IndexLabelPropertyTx{
label, property, transaction_id}) != nullptr;
@ -112,6 +102,26 @@ class IndexRpcClients {
RpcWorkerClients &clients_;
};
/** Wrapper class around a RPC call to share token between workers.
*/
class TokenSharingRpcClients {
public:
explicit TokenSharingRpcClients(RpcWorkerClients *clients)
: clients_(clients) {}
auto TransferToken(int worker_id) {
return clients_->ExecuteOnWorker<void>(
worker_id,
[](int worker_id, communication::rpc::ClientPool &client_pool) {
CHECK(client_pool.Call<TokenTransferRpc>())
<< "Unable to transfer token";
});
}
private:
RpcWorkerClients *clients_;
};
/** Join ongoing produces on all workers.
*
* Sends a RPC request to all workers when a transaction is ending, notifying
@ -124,7 +134,7 @@ class OngoingProduceJoinerRpcClients {
void JoinOngoingProduces(tx::TransactionId tx_id) {
auto futures = clients_.ExecuteOnWorkers<void>(
0, [tx_id](communication::rpc::ClientPool &client_pool) {
0, [tx_id](int worker_id, communication::rpc::ClientPool &client_pool) {
auto result =
client_pool.Call<distributed::WaitOnTransactionEndRpc>(tx_id);
CHECK(result)

View File

@ -0,0 +1,16 @@
#pragma once
#include <memory>
#include <string>
#include "communication/rpc/messages.hpp"
#include "distributed/serialization.hpp"
namespace distributed {
RPC_NO_MEMBER_MESSAGE(TokenTransferReq);
RPC_NO_MEMBER_MESSAGE(TokenTransferRes);
using TokenTransferRpc =
communication::rpc::RequestResponse<TokenTransferReq, TokenTransferRes>;
} // namespace distributed

View File

@ -0,0 +1,103 @@
#pragma once
#include "distributed/rpc_worker_clients.hpp"
#include "storage/dynamic_graph_partitioner/dgp.hpp"
namespace communication::rpc {
class Server;
}
namespace database {
class GraphDb;
};
namespace distributed {
/// Shares the token between dynamic graph partitioners instances across workers
/// by passing the token from one worker to another, in a circular fashion. This
/// guarantees that no two workers will execute the dynamic graph partitioner
/// step in the same time.
class TokenSharingRpcServer {
public:
TokenSharingRpcServer(database::GraphDb *db, int worker_id,
distributed::Coordination *coordination,
communication::rpc::Server *server,
distributed::TokenSharingRpcClients *clients)
: worker_id_(worker_id),
coordination_(coordination),
server_(server),
clients_(clients),
dgp_(db) {
server_->Register<distributed::TokenTransferRpc>(
[this](const distributed::TokenTransferReq &req) {
token_ = true;
return std::make_unique<distributed::TokenTransferRes>();
});
runner_ = std::thread([this]() {
while (true) {
// Wait till we get the token
while (!token_) {
if (shutting_down_) break;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
if (shutting_down_) break;
token_ = false;
dgp_.Run();
// Transfer token to next
auto workers = coordination_->GetWorkerIds();
sort(workers.begin(), workers.end());
int next_worker = -1;
auto pos = std::upper_bound(workers.begin(), workers.end(), worker_id_);
if (pos != workers.end()) {
next_worker = *pos;
} else {
next_worker = workers[0];
}
clients_->TransferToken(next_worker);
}
});
}
/// Starts the token sharing server which in turn starts the dynamic graph
/// partitioner.
void StartTokenSharing() {
started_ = true;
token_ = true;
}
~TokenSharingRpcServer() {
shutting_down_ = true;
if (runner_.joinable()) runner_.join();
if (started_ && worker_id_ == 0) {
// Wait till we get the token back otherwise some worker might try to
// migrate to another worker while that worker is shutting down or
// something else bad might happen
// TODO(dgleich): Solve this better in the future since this blocks
// shutting down until spinner steps complete
while (!token_) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
}
private:
int worker_id_;
distributed::Coordination *coordination_;
communication::rpc::Server *server_;
distributed::TokenSharingRpcClients *clients_;
std::atomic<bool> started_{false};
std::atomic<bool> token_{false};
std::atomic<bool> shutting_down_{false};
std::thread runner_;
DynamicGraphPartitioner dgp_;
};
} // namespace distributed

View File

@ -107,7 +107,7 @@ void UpdatesRpcClients::RemoveInEdge(tx::TransactionId tx_id, int worker_id,
std::vector<utils::Future<UpdateResult>> UpdatesRpcClients::UpdateApplyAll(
int skip_worker_id, tx::TransactionId tx_id) {
return worker_clients_.ExecuteOnWorkers<UpdateResult>(
skip_worker_id, [tx_id](auto &client) {
skip_worker_id, [tx_id](int worker_id, auto &client) {
auto res = client.template Call<UpdateApplyRpc>(tx_id);
CHECK(res) << "UpdateApplyRpc failed";
return res->member;

View File

@ -0,0 +1,135 @@
#include "storage/dynamic_graph_partitioner/dgp.hpp"
#include <algorithm>
#include <unordered_map>
#include <vector>
#include "database/graph_db_accessor.hpp"
#include "distributed/updates_rpc_clients.hpp"
#include "query/exceptions.hpp"
#include "storage/dynamic_graph_partitioner/vertex_migrator.hpp"
#include "utils/flag_validation.hpp"
DEFINE_VALIDATED_int32(
dgp_improvement_threshold, 10,
"How much better should specific node score be to consider "
"a migration to another worker. This represents the minimal difference "
"between new score that the vertex will have when migrated and the old one "
"such that it's migrated.",
FLAG_IN_RANGE(1, 100));
DEFINE_VALIDATED_int32(dgp_max_batch_size, 2000,
"Maximal amount of vertices which should be migrated in "
"one dynamic graph partitioner step.",
FLAG_IN_RANGE(1, std::numeric_limits<int32_t>::max()));
DynamicGraphPartitioner::DynamicGraphPartitioner(database::GraphDb *db)
: db_(db) {}
void DynamicGraphPartitioner::Run() {
database::GraphDbAccessor dba(*db_);
VLOG(21) << "Starting DynamicGraphPartitioner in tx: "
<< dba.transaction().id_;
auto migrations = FindMigrations(dba);
try {
VertexMigrator migrator(&dba);
for (auto &migration : migrations) {
migrator.MigrateVertex(migration.first, migration.second);
}
auto apply_futures = db_->updates_clients().UpdateApplyAll(
db_->WorkerId(), dba.transaction().id_);
for (auto &future : apply_futures) {
switch (future.get()) {
case distributed::UpdateResult::SERIALIZATION_ERROR:
throw mvcc::SerializationError(
"Failed to relocate vertex due to SerializationError");
case distributed::UpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR:
throw query::RemoveAttachedVertexException();
case distributed::UpdateResult::UPDATE_DELETED_ERROR:
throw query::QueryRuntimeException(
"Failed to apply deferred updates due to RecordDeletedError");
case distributed::UpdateResult::LOCK_TIMEOUT_ERROR:
throw LockTimeoutException(
"Failed to apply deferred update due to LockTimeoutException");
case distributed::UpdateResult::DONE:
break;
}
}
dba.Commit();
VLOG(21) << "Sucesfully migrated " << migrations.size() << " vertices..";
} catch (const utils::BasicException &e) {
VLOG(21) << "Didn't succeed in relocating; " << e.what();
dba.Abort();
}
}
std::vector<std::pair<VertexAccessor, int>>
DynamicGraphPartitioner::FindMigrations(database::GraphDbAccessor &dba) {
// Find workers vertex count
std::unordered_map<int, int64_t> worker_vertex_count =
db_->data_clients().VertexCounts(dba.transaction().id_);
int64_t total_vertex_count = 0;
for (auto worker_vertex_count_pair : worker_vertex_count) {
total_vertex_count += worker_vertex_count_pair.second;
}
double average_vertex_count =
total_vertex_count * 1.0 / worker_vertex_count.size();
// Considers all migrations which maximally improve single vertex score
std::vector<std::pair<VertexAccessor, int>> migrations;
for (const auto &vertex : dba.Vertices(false)) {
auto label_counts = CountLabels(vertex);
std::unordered_map<int, double> per_label_score;
size_t degree = vertex.in_degree() + vertex.out_degree();
for (auto worker_vertex_count_pair : worker_vertex_count) {
int worker = worker_vertex_count_pair.first;
int64_t worker_vertex_count = worker_vertex_count_pair.second;
per_label_score[worker] =
label_counts[worker] * 1.0 / degree -
worker_vertex_count * 1.0 / average_vertex_count;
}
auto label_cmp = [](const std::pair<int, double> &p1,
const std::pair<int, double> &p2) {
return p1.second < p2.second;
};
auto best_label = std::max_element(per_label_score.begin(),
per_label_score.end(), label_cmp);
// Consider as a migration only if the improvement is high enough
if (best_label != per_label_score.end() &&
best_label->first != db_->WorkerId() &&
per_label_score[best_label->first] -
FLAGS_dgp_improvement_threshold / 100.0 >=
per_label_score[db_->WorkerId()]) {
migrations.emplace_back(vertex, best_label->first);
}
if (migrations.size() >= FLAGS_dgp_max_batch_size) break;
}
return migrations;
}
std::unordered_map<int, int64_t> DynamicGraphPartitioner::CountLabels(
const VertexAccessor &vertex) const {
std::unordered_map<int, int64_t> label_count;
for (auto edge : vertex.in()) {
auto address = edge.from().address();
auto label = address.is_remote() ? address.worker_id() : db_->WorkerId();
label_count[label]++;
}
for (auto edge : vertex.out()) {
auto address = edge.to().address();
auto label = address.is_remote() ? address.worker_id() : db_->WorkerId();
label_count[label]++;
}
return label_count;
}

View File

@ -0,0 +1,54 @@
#pragma once
#include <thread>
#include "distributed/data_rpc_clients.hpp"
#include "distributed/token_sharing_rpc_messages.hpp"
#include "storage/dynamic_graph_partitioner/vertex_migrator.hpp"
#include "storage/vertex_accessor.hpp"
namespace database {
class GraphDb;
class GraphDbAccessor;
}; // namespace database
/// Handles dynamic graph partitions, migrates vertices from one worker to
/// another based on available scoring which takes into account neighbours of a
/// vertex and tries to put it where most of its neighbours are located. Also
/// takes into account the number of vertices on the destination and source
/// machine.
class DynamicGraphPartitioner {
public:
DynamicGraphPartitioner(const DynamicGraphPartitioner &other) = delete;
DynamicGraphPartitioner(DynamicGraphPartitioner &&other) = delete;
DynamicGraphPartitioner &operator=(const DynamicGraphPartitioner &other) =
delete;
DynamicGraphPartitioner &operator=(DynamicGraphPartitioner &&other) = delete;
explicit DynamicGraphPartitioner(database::GraphDb *db);
/// Runs one dynamic graph partitioning cycle (step).
void Run();
/// Returns a vector of pairs of `vertex` and `destination` of where should
/// some vertex be relocated from the view of `dba` accessor.
//
/// Each vertex is located on some worker (which in context of migrations we
/// call a vertex label). Each vertex has it's score for each different label
/// (worker_id) evaluated. This score is calculated by considering
/// neighbouring vertices labels. Simply put, each vertex is attracted to be
/// located on the same worker as it's neighbouring vertices. Migrations which
/// improve that scoring, which also takes into account saturation of other
/// workers on which it's considering to migrate this vertex, are determined.
std::vector<std::pair<VertexAccessor, int>> FindMigrations(
database::GraphDbAccessor &dba);
/// Counts number of each label (worker_id) on endpoints of edges (in/out) of
/// `vertex`.
/// Returns a map consisting of (label, count) key-value pairs.
std::unordered_map<int, int64_t> CountLabels(
const VertexAccessor &vertex) const;
private:
database::GraphDb *db_;
};

View File

@ -0,0 +1,52 @@
#include "storage/dynamic_graph_partitioner/vertex_migrator.hpp"
#include "database/graph_db_accessor.hpp"
#include "query/typed_value.hpp"
VertexMigrator::VertexMigrator(database::GraphDbAccessor *dba) : dba_(dba) {}
void VertexMigrator::MigrateVertex(VertexAccessor &vertex, int destination) {
auto get_props = [](auto &record) {
std::unordered_map<storage::Property, query::TypedValue> properties;
for (auto prop : record.Properties()) {
properties[prop.first] = prop.second;
}
return properties;
};
auto update_if_moved = [this](auto &vertex) {
if (vertex_migrated_to_.count(vertex.gid())) {
vertex = VertexAccessor(vertex_migrated_to_[vertex.gid()], *dba_);
}
};
auto relocated_vertex = dba_->InsertVertexIntoRemote(
destination, vertex.labels(), get_props(vertex));
vertex_migrated_to_[vertex.gid()] = relocated_vertex.address();
for (auto in_edge : vertex.in()) {
auto from = in_edge.from();
update_if_moved(from);
auto new_in_edge =
dba_->InsertEdge(from, relocated_vertex, in_edge.EdgeType());
for (auto prop : get_props(in_edge)) {
new_in_edge.PropsSet(prop.first, prop.second);
}
}
for (auto out_edge : vertex.out()) {
auto to = out_edge.to();
// Continue on self-loops since those edges have already been added
// while iterating over in edges
if (to == vertex) continue;
update_if_moved(to);
auto new_out_edge =
dba_->InsertEdge(relocated_vertex, to, out_edge.EdgeType());
for (auto prop : get_props(out_edge)) {
new_out_edge.PropsSet(prop.first, prop.second);
}
}
dba_->DetachRemoveVertex(vertex);
}

View File

@ -0,0 +1,31 @@
#pragma once
#include <thread>
#include <unordered_map>
#include "storage/gid.hpp"
#include "storage/vertex_accessor.hpp"
namespace database {
class GraphDbAccessor;
}; // namespace database
/// Migrates vertices from one worker to another (updates edges as well).
class VertexMigrator {
public:
explicit VertexMigrator(database::GraphDbAccessor *dba);
VertexMigrator(const VertexMigrator &other) = delete;
VertexMigrator(VertexMigrator &&other) = delete;
VertexMigrator &operator=(const VertexMigrator &other) = delete;
VertexMigrator &operator=(VertexMigrator &&other) = delete;
/// Creates a new vertex on the destination, deletes the old `vertex`, and
/// deletes/creates every new edge that it needs since the destination of the
/// vertex changed.
void MigrateVertex(VertexAccessor &v, int destination);
private:
database::GraphDbAccessor *dba_;
std::unordered_map<gid::Gid, storage::VertexAddress> vertex_migrated_to_;
};

View File

@ -88,3 +88,45 @@ TEST_F(DistributedGraphDbTest, RemoteExpansion) {
}
}
}
TEST_F(DistributedGraphDbTest, VertexCountsEqual) {
for (int i = 0; i < 5; ++i) InsertVertex(master());
for (int i = 0; i < 7; ++i) InsertVertex(worker(1));
for (int i = 0; i < 9; ++i) InsertVertex(worker(2));
{
GraphDbAccessor accessor(master());
auto m_cnt =
master().data_clients().VertexCounts(accessor.transaction().id_);
auto w1_cnt =
worker(1).data_clients().VertexCounts(accessor.transaction().id_);
auto w2_cnt =
worker(2).data_clients().VertexCounts(accessor.transaction().id_);
auto check = [&m_cnt, &w1_cnt, &w2_cnt](int key, int value) {
return m_cnt[key] == w1_cnt[key] && w1_cnt[key] == w2_cnt[key] &&
m_cnt[key] == value;
};
EXPECT_TRUE(check(master().WorkerId(), 5));
EXPECT_TRUE(check(worker(1).WorkerId(), 7));
EXPECT_TRUE(check(worker(2).WorkerId(), 9));
}
}
TEST_F(DistributedGraphDbTest, VertexCountsTransactional) {
{
GraphDbAccessor accessor(master());
InsertVertex(master());
EXPECT_EQ(master().data_clients().VertexCounts(
accessor.transaction().id_)[master().WorkerId()],
0);
}
// Transaction after insert which should now see the insertion
{
GraphDbAccessor accessor(master());
EXPECT_EQ(master().data_clients().VertexCounts(
accessor.transaction().id_)[master().WorkerId()],
1);
}
}

View File

@ -0,0 +1,152 @@
#include "distributed_common.hpp"
#include <memory>
#include <thread>
#include <unordered_set>
#include <vector>
#include "gtest/gtest.h"
#include "distributed/updates_rpc_clients.hpp"
#include "storage/dynamic_graph_partitioner/dgp.hpp"
using namespace distributed;
using namespace database;
DECLARE_int32(dgp_max_batch_size);
TEST_F(DistributedGraphDbTest, CountLabels) {
auto va = InsertVertex(master());
auto vb = InsertVertex(worker(1));
auto vc = InsertVertex(worker(2));
for (int i = 0; i < 2; ++i) InsertEdge(va, va, "edge");
for (int i = 0; i < 3; ++i) InsertEdge(va, vb, "edge");
for (int i = 0; i < 4; ++i) InsertEdge(va, vc, "edge");
for (int i = 0; i < 5; ++i) InsertEdge(vb, va, "edge");
for (int i = 0; i < 6; ++i) InsertEdge(vc, va, "edge");
DynamicGraphPartitioner dgp(&master());
GraphDbAccessor dba(master());
VertexAccessor v(va, dba);
auto count_labels = dgp.CountLabels(v);
// Self loops counted twice
EXPECT_EQ(count_labels[master().WorkerId()], 2 * 2);
EXPECT_EQ(count_labels[worker(1).WorkerId()], 3 + 5);
EXPECT_EQ(count_labels[worker(2).WorkerId()], 4 + 6);
}
TEST_F(DistributedGraphDbTest, FindMigrationsMoveVertex) {
auto va = InsertVertex(master());
auto vb = InsertVertex(worker(1));
// Balance the number of nodes on workers a bit
InsertVertex(worker(2));
InsertVertex(worker(2));
for (int i = 0; i < 100; ++i) InsertEdge(va, vb, "edge");
DynamicGraphPartitioner dgp(&master());
GraphDbAccessor dba(master());
auto migrations = dgp.FindMigrations(dba);
// Expect `va` to try to move to another worker, the one connected to it
ASSERT_EQ(migrations.size(), 1);
EXPECT_EQ(migrations[0].second, worker(1).WorkerId());
}
TEST_F(DistributedGraphDbTest, FindMigrationsNoChange) {
InsertVertex(master());
InsertVertex(worker(1));
InsertVertex(worker(2));
// Everything is balanced, there should be no movement
DynamicGraphPartitioner dgp(&master());
GraphDbAccessor dba(master());
auto migrations = dgp.FindMigrations(dba);
EXPECT_EQ(migrations.size(), 0);
}
TEST_F(DistributedGraphDbTest, FindMigrationsMultipleAndLimit) {
auto va = InsertVertex(master());
auto vb = InsertVertex(master());
auto vc = InsertVertex(worker(1));
// Balance the number of nodes on workers a bit
InsertVertex(worker(1));
InsertVertex(worker(2));
InsertVertex(worker(2));
for (int i = 0; i < 100; ++i) InsertEdge(va, vc, "edge");
for (int i = 0; i < 100; ++i) InsertEdge(vb, vc, "edge");
DynamicGraphPartitioner dgp(&master());
GraphDbAccessor dba(master());
{
auto migrations = dgp.FindMigrations(dba);
// Expect vertices to try to move to another worker
ASSERT_EQ(migrations.size(), 2);
}
// See if flag affects number of returned results
{
FLAGS_dgp_max_batch_size = 1;
auto migrations = dgp.FindMigrations(dba);
// Expect vertices to try to move to another worker
ASSERT_EQ(migrations.size(), 1);
}
}
TEST_F(DistributedGraphDbTest, Run) {
// Emulate a bipartite graph with lots of connections on the left, and right
// side, and some connections between the halfs
std::vector<storage::VertexAddress> left;
for (int i = 0; i < 10; ++i) {
left.push_back(InsertVertex(master()));
}
std::vector<storage::VertexAddress> right;
for (int i = 0; i < 10; ++i) {
right.push_back(InsertVertex(master()));
}
// Force the nodes of both sides to stay on one worker by inserting a lot of
// edges in between them
for (int i = 0; i < 1000; ++i) {
InsertEdge(left[rand() % 10], left[rand() % 10], "edge");
InsertEdge(right[rand() % 10], right[rand() % 10], "edge");
}
// Insert edges between left and right side
for (int i = 0; i < 50; ++i)
InsertEdge(left[rand() % 10], right[rand() % 10], "edge");
// Balance it out so that the vertices count on workers don't influence the
// partitioning too much
for (int i = 0; i < 10; ++i) InsertVertex(worker(2));
DynamicGraphPartitioner dgp(&master());
// Transfer one by one to actually converge
FLAGS_dgp_max_batch_size = 1;
// Try a bit more transfers to see if we reached a steady state
for (int i = 0; i < 15; ++i) {
dgp.Run();
}
EXPECT_EQ(VertexCount(master()), 10);
EXPECT_EQ(VertexCount(worker(1)), 10);
auto CountRemotes = [](GraphDbAccessor &dba) {
int64_t cnt = 0;
for (auto vertex : dba.Vertices(false)) {
for (auto edge : vertex.in())
if (edge.from_addr().is_remote()) ++cnt;
for (auto edge : vertex.out())
if (edge.to_addr().is_remote()) ++cnt;
}
return cnt;
};
GraphDbAccessor dba_m(master());
GraphDbAccessor dba_w1(worker(1));
EXPECT_EQ(CountRemotes(dba_m), 50);
EXPECT_EQ(CountRemotes(dba_w1), 50);
}

View File

@ -0,0 +1,33 @@
#include "distributed_common.hpp"
#include <memory>
#include <thread>
#include <unordered_set>
#include <vector>
#include "gtest/gtest.h"
DECLARE_bool(dynamic_graph_partitioner_enabled);
DECLARE_int32(dgp_max_batch_size);
using namespace distributed;
using namespace database;
class TokenSharingTest : public DistributedGraphDbTest {
void SetUp() override {
FLAGS_dynamic_graph_partitioner_enabled = true;
FLAGS_dgp_max_batch_size = 1;
DistributedGraphDbTest::SetUp();
}
};
TEST_F(TokenSharingTest, Integration) {
auto vb = InsertVertex(worker(1));
for (int i = 0; i < 100; ++i) {
auto v = InsertVertex(master());
InsertEdge(vb, v, "edge");
}
std::this_thread::sleep_for(std::chrono::seconds(3));
// Migrate at least something from or to here
EXPECT_NE(VertexCount(master()), 100);
}

View File

@ -0,0 +1,181 @@
#include "distributed_common.hpp"
#include <memory>
#include <thread>
#include <unordered_set>
#include "gtest/gtest.h"
#include "distributed/updates_rpc_clients.hpp"
#include "storage/dynamic_graph_partitioner/vertex_migrator.hpp"
using namespace distributed;
using namespace database;
DECLARE_bool(generate_vertex_ids);
DECLARE_bool(generate_edge_ids);
// Check if the auto-generated gid property is unchanged after migration
TEST_F(DistributedGraphDbTest, VertexEdgeGidSaved) {
FLAGS_generate_vertex_ids = true;
FLAGS_generate_edge_ids = true;
// Fill master so that the ids are not the same on master and worker 1
for (int i = 0; i < 10; ++i) {
auto va = InsertVertex(master());
auto ea = InsertEdge(va, va, "edge");
}
auto va = InsertVertex(master());
auto ea = InsertEdge(va, va, "edge");
PropertyValue old_vgid_property(42);
PropertyValue old_egid_property(42);
{
database::GraphDbAccessor dba(master());
VertexAccessor vaccessor(va, dba);
old_vgid_property =
vaccessor.PropsAt(dba.Property(PropertyValueStore::IdPropertyName));
EXPECT_FALSE(old_vgid_property.IsNull());
EdgeAccessor eaccessor(ea, dba);
old_egid_property =
eaccessor.PropsAt(dba.Property(PropertyValueStore::IdPropertyName));
EXPECT_FALSE(old_egid_property.IsNull());
}
{
database::GraphDbAccessor dba(master());
VertexAccessor accessor(va, dba);
VertexMigrator migrator(&dba);
migrator.MigrateVertex(accessor, worker(1).WorkerId());
{
auto apply_futures = master().updates_clients().UpdateApplyAll(
master().WorkerId(), dba.transaction().id_);
// Destructor waits on application
}
dba.Commit();
}
ASSERT_EQ(VertexCount(worker(1)), 1);
{
database::GraphDbAccessor dba(worker(1));
auto vaccessor = *dba.Vertices(false).begin();
auto eaccessor = *dba.Edges(false).begin();
auto new_vgid_property =
vaccessor.PropsAt(dba.Property(PropertyValueStore::IdPropertyName));
auto new_egid_property =
eaccessor.PropsAt(dba.Property(PropertyValueStore::IdPropertyName));
EXPECT_EQ(old_vgid_property.Value<int64_t>(),
new_vgid_property.Value<int64_t>());
EXPECT_EQ(old_egid_property.Value<int64_t>(),
new_egid_property.Value<int64_t>());
}
}
// Checks if two connected nodes from master will be transfered to worker 1 and
// if edge from vertex on the worker 2 will now point to worker 1 after transfer
TEST_F(DistributedGraphDbTest, SomeTransfer) {
auto va = InsertVertex(master());
auto vb = InsertVertex(master());
auto vc = InsertVertex(worker(2));
InsertEdge(va, vb, "edge");
InsertEdge(vc, va, "edge");
{
database::GraphDbAccessor dba(master());
VertexMigrator migrator(&dba);
for (auto &vertex : dba.Vertices(false)) {
migrator.MigrateVertex(vertex, worker(1).WorkerId());
}
{
auto apply_futures = master().updates_clients().UpdateApplyAll(
master().WorkerId(), dba.transaction().id_);
// Destructor waits on application
}
dba.Commit();
}
EXPECT_EQ(VertexCount(master()), 0);
EXPECT_EQ(EdgeCount(master()), 0);
EXPECT_EQ(VertexCount(worker(1)), 2);
EXPECT_EQ(EdgeCount(worker(1)), 1);
EXPECT_EQ(VertexCount(worker(2)), 1);
ASSERT_EQ(EdgeCount(worker(2)), 1);
{
database::GraphDbAccessor dba(worker(2));
auto edge = *dba.Edges(false).begin();
// Updated remote edge on another worker
EXPECT_EQ(edge.to_addr().worker_id(), worker(1).WorkerId());
}
}
// Check if cycle edge is transfered only once since it's contained in both in
// and out edges of a vertex and if not handled correctly could cause problems
TEST_F(DistributedGraphDbTest, EdgeCycle) {
auto va = InsertVertex(master());
InsertEdge(va, va, "edge");
{
database::GraphDbAccessor dba(master());
VertexMigrator migrator(&dba);
for (auto &vertex : dba.Vertices(false)) {
migrator.MigrateVertex(vertex, worker(1).WorkerId());
}
{
auto apply_futures = master().updates_clients().UpdateApplyAll(
master().WorkerId(), dba.transaction().id_);
// Destructor waits on application
}
dba.Commit();
}
EXPECT_EQ(VertexCount(master()), 0);
EXPECT_EQ(EdgeCount(master()), 0);
EXPECT_EQ(VertexCount(worker(1)), 1);
EXPECT_EQ(EdgeCount(worker(1)), 1);
}
TEST_F(DistributedGraphDbTest, TransferLabelsAndProperties) {
{
database::GraphDbAccessor dba(master());
auto va = dba.InsertVertex();
auto vb = dba.InsertVertex();
va.add_label(dba.Label("l"));
vb.add_label(dba.Label("l"));
va.PropsSet(dba.Property("p"), 42);
vb.PropsSet(dba.Property("p"), 42);
auto ea = dba.InsertEdge(va, vb, dba.EdgeType("edge"));
ea.PropsSet(dba.Property("pe"), 43);
auto eb = dba.InsertEdge(vb, va, dba.EdgeType("edge"));
eb.PropsSet(dba.Property("pe"), 43);
dba.Commit();
}
{
database::GraphDbAccessor dba(master());
VertexMigrator migrator(&dba);
for (auto &vertex : dba.Vertices(false)) {
migrator.MigrateVertex(vertex, worker(1).WorkerId());
}
{
auto apply_futures = master().updates_clients().UpdateApplyAll(
master().WorkerId(), dba.transaction().id_);
// Destructor waits on application
}
dba.Commit();
}
{
database::GraphDbAccessor dba(worker(1));
EXPECT_EQ(VertexCount(master()), 0);
ASSERT_EQ(VertexCount(worker(1)), 2);
for (auto vertex : dba.Vertices(false)) {
ASSERT_EQ(vertex.labels().size(), 1);
EXPECT_EQ(vertex.labels()[0], dba.Label("l"));
EXPECT_EQ(vertex.PropsAt(dba.Property("p")).Value<int64_t>(), 42);
}
ASSERT_EQ(EdgeCount(worker(1)), 2);
auto edge = *dba.Edges(false).begin();
EXPECT_EQ(edge.PropsAt(dba.Property("pe")).Value<int64_t>(), 43);
EXPECT_EQ(edge.EdgeType(), dba.EdgeType("edge"));
}
}

View File

@ -104,7 +104,7 @@ TEST_F(RpcWorkerClientsTest, GetClientPool) {
}
TEST_F(RpcWorkerClientsTest, ExecuteOnWorker) {
auto execute = [](auto &client) -> void {
auto execute = [](int worker_id, auto &client) -> void {
ASSERT_TRUE(client.template Call<distributed::IncrementCounterRpc>());
};
@ -115,7 +115,7 @@ TEST_F(RpcWorkerClientsTest, ExecuteOnWorker) {
}
TEST_F(RpcWorkerClientsTest, ExecuteOnWorkers) {
auto execute = [](auto &client) -> void {
auto execute = [](int worker_id, auto &client) -> void {
ASSERT_TRUE(client.template Call<distributed::IncrementCounterRpc>());
};