Support dynamic worker addition

Summary:
With this diff we should be able to support dynamic worker addition.
This is ofcourse a minimal effort maximal impact approach.

This diff introduces new RPC calls when a worker registers.

The `DynamicWorkerAddition` doesn't use `GraphDbAccessor` to get indices because
they create WAL entries.

Reviewers: vkasljevic, ipaljak, buda

Reviewed By: ipaljak

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1594
This commit is contained in:
Matija Santl 2018-09-07 15:59:10 +02:00
parent 42a43322b2
commit 0eb0b913a8
14 changed files with 489 additions and 79 deletions

View File

@ -33,6 +33,7 @@ set(memgraph_src_files
distributed/dgp/vertex_migrator.cpp
distributed/durability_rpc_master.cpp
distributed/durability_rpc_worker.cpp
distributed/dynamic_worker.cpp
distributed/index_rpc_server.cpp
distributed/plan_consumer.cpp
distributed/plan_dispatcher.cpp
@ -110,6 +111,8 @@ add_lcp(distributed/token_sharing_rpc_messages.lcp CAPNP_SCHEMA @0x8f295db54ec4c
add_capnp(distributed/token_sharing_rpc_messages.capnp)
add_lcp(distributed/updates_rpc_messages.lcp CAPNP_SCHEMA @0x82d5f38d73c7b53a)
add_capnp(distributed/updates_rpc_messages.capnp)
add_lcp(distributed/dynamic_worker_rpc_messages.lcp CAPNP_SCHEMA @0x8c53f6c9a0c71b05)
add_capnp(distributed/dynamic_worker_rpc_messages.capnp)
# distributed_ops.lcp is leading the capnp code generation, so we only want
# function declarations in generated operator.hpp

View File

@ -14,6 +14,7 @@
#include "distributed/data_rpc_server.hpp"
#include "distributed/durability_rpc_master.hpp"
#include "distributed/durability_rpc_worker.hpp"
#include "distributed/dynamic_worker.hpp"
#include "distributed/index_rpc_server.hpp"
#include "distributed/plan_consumer.hpp"
#include "distributed/plan_dispatcher.hpp"
@ -623,6 +624,7 @@ class Master {
distributed::TokenSharingRpcServer token_sharing_server_{
self_, config_.worker_id, &coordination_, &server_,
&token_sharing_clients_};
distributed::DynamicWorkerAddition dynamic_worker_addition_{self_, &server_};
};
} // namespace impl
@ -679,13 +681,15 @@ Master::Master(Config config)
recovery_data.wal_tx_to_recover =
impl_->coordination_.CommonWalTransactions(*recovery_info);
MasterRecoveryTransactions recovery_transactions(this);
durability::RecoverWalAndIndexes(impl_->config_.durability_directory,
this, &recovery_data,
&recovery_transactions);
durability::RecoverWal(impl_->config_.durability_directory, this,
&recovery_data, &recovery_transactions);
durability::RecoverIndexes(this, recovery_data.indexes);
auto workers_recovered_wal =
impl_->durability_rpc_.RecoverWalAndIndexes(&recovery_data);
workers_recovered_wal.get();
}
impl_->dynamic_worker_addition_.Enable();
}
// Start the dynamic graph partitioner inside token sharing server
@ -962,6 +966,8 @@ class Worker {
distributed::TokenSharingRpcServer token_sharing_server_{
self_, config_.worker_id, &coordination_, &server_,
&token_sharing_clients_};
distributed::DynamicWorkerRegistration dynamic_worker_registration_{
&rpc_worker_clients_.GetClientPool(0)};
};
} // namespace impl
@ -981,8 +987,10 @@ Worker::Worker(Config config)
if (impl_->config_.durability_enabled)
utils::CheckDir(impl_->config_.durability_directory);
// Durability recovery.
{
// Durability recovery. We need to check this flag for workers that are added
// after the "main" cluster recovery.
if (impl_->config_.db_recover_on_startup) {
// What we should recover.
// What we should recover (version, transaction_id) pair.
auto snapshot_to_recover = impl_->cluster_discovery_.snapshot_to_recover();
@ -1013,6 +1021,13 @@ Worker::Worker(Config config)
LOG(FATAL) << "Memgraph worker failed to recover the database state "
"recovered on the master";
impl_->cluster_discovery_.NotifyWorkerRecovered(recovery_info);
} else {
// Check with master if we're a dynamically added worker and need to update
// our indices.
auto indexes = impl_->dynamic_worker_registration_.GetIndicesToCreate();
if (!indexes.empty()) {
durability::RecoverIndexes(this, indexes);
}
}
if (impl_->config_.durability_enabled) {
@ -1123,8 +1138,9 @@ void Worker::ReinitializeStorage() {
void Worker::RecoverWalAndIndexes(durability::RecoveryData *recovery_data) {
WorkerRecoveryTransactions recovery_transactions(this);
durability::RecoverWalAndIndexes(impl_->config_.durability_directory, this,
durability::RecoverWal(impl_->config_.durability_directory, this,
recovery_data, &recovery_transactions);
durability::RecoverIndexes(this, recovery_data->indexes);
}
io::network::Endpoint Worker::endpoint() const {

View File

@ -233,9 +233,9 @@ SingleNode::SingleNode(Config config)
if (recovery_info) {
recovery_data.wal_tx_to_recover = recovery_info->wal_recovered;
SingleNodeRecoveryTransanctions recovery_transactions(this);
durability::RecoverWalAndIndexes(impl_->config_.durability_directory,
this, &recovery_data,
&recovery_transactions);
durability::RecoverWal(impl_->config_.durability_directory, this,
&recovery_data, &recovery_transactions);
durability::RecoverIndexes(this, recovery_data.indexes);
}
}

View File

@ -0,0 +1,46 @@
#include "distributed/dynamic_worker.hpp"
#include "database/distributed_graph_db.hpp"
#include "distributed/dynamic_worker_rpc_messages.hpp"
namespace distributed {
using Server = communication::rpc::Server;
using ClientPool = communication::rpc::ClientPool;
DynamicWorkerAddition::DynamicWorkerAddition(database::DistributedGraphDb *db,
Server *server)
: db_(db), server_(server) {
server_->Register<DynamicWorkerRpc>(
[this](const auto &req_reader, auto *res_builder) {
DynamicWorkerReq req;
req.Load(req_reader);
DynamicWorkerRes res(this->GetIndicesToCreate());
res.Save(res_builder);
});
}
std::vector<std::pair<std::string, std::string>>
DynamicWorkerAddition::GetIndicesToCreate() {
std::vector<std::pair<std::string, std::string>> indices;
if (!enabled_.load()) return indices;
for (const auto &key : db_->storage().label_property_index().Keys()) {
auto label = db_->label_mapper().id_to_value(key.label_);
auto property = db_->property_mapper().id_to_value(key.property_);
indices.emplace_back(label, property);
}
return indices;
}
void DynamicWorkerAddition::Enable() { enabled_.store(true); }
DynamicWorkerRegistration::DynamicWorkerRegistration(ClientPool *client_pool)
: client_pool_(client_pool) {}
std::vector<std::pair<std::string, std::string>>
DynamicWorkerRegistration::GetIndicesToCreate() {
auto result = client_pool_->Call<DynamicWorkerRpc>();
CHECK(result) << "DynamicWorkerRpc failed";
return result->recover_indices;
}
} // namespace distributed

View File

@ -0,0 +1,47 @@
/// @file
#pragma once
#include <atomic>
#include <string>
#include <vector>
#include "communication/rpc/client_pool.hpp"
#include "communication/rpc/server.hpp"
namespace database {
class DistributedGraphDb;
} // namespace database
namespace distributed {
using Server = communication::rpc::Server;
using ClientPool = communication::rpc::ClientPool;
class DynamicWorkerAddition final {
public:
DynamicWorkerAddition(database::DistributedGraphDb *db, Server *server);
/// Enable dynamic worker addition.
void Enable();
private:
database::DistributedGraphDb *db_{nullptr};
Server *server_;
std::atomic<bool> enabled_{false};
/// Return the indices a dynamically added worker needs to create.
std::vector<std::pair<std::string, std::string>> GetIndicesToCreate();
};
class DynamicWorkerRegistration final {
public:
explicit DynamicWorkerRegistration(ClientPool *client_pool);
/// Make a RPC call to master to get indices to create.
std::vector<std::pair<std::string, std::string>> GetIndicesToCreate();
private:
ClientPool *client_pool_;
};
} // namespace distributed

View File

@ -0,0 +1,18 @@
# -*- buffer-read-only: t; -*-
# vim: readonly
# DO NOT EDIT! Generated using LCP from 'dynamic_worker_rpc_messages.lcp'
@0x8c53f6c9a0c71b05;
using Cxx = import "/capnp/c++.capnp";
$Cxx.namespace("distributed::capnp");
using Utils = import "/utils/serialization.capnp";
struct DynamicWorkerRes {
recoverIndices @0 :List(Utils.Pair(Text, Text));
}
struct DynamicWorkerReq {
}

View File

@ -0,0 +1,48 @@
// -*- buffer-read-only: t; -*-
// vim: readonly
// DO NOT EDIT! Generated using LCP from 'dynamic_worker_rpc_messages.lcp'
#pragma once
#include <string>
#include <vector>
#include "communication/rpc/messages.hpp"
#include "distributed/dynamic_worker_rpc_messages.capnp.h"
namespace distributed {
struct DynamicWorkerReq {
using Capnp = capnp::DynamicWorkerReq;
static const communication::rpc::MessageType TypeInfo;
DynamicWorkerReq() {}
void Save(capnp::DynamicWorkerReq::Builder *builder) const;
static std::unique_ptr<DynamicWorkerReq> Construct(
const capnp::DynamicWorkerReq::Reader &reader);
void Load(const capnp::DynamicWorkerReq::Reader &reader);
};
struct DynamicWorkerRes {
using Capnp = capnp::DynamicWorkerRes;
static const communication::rpc::MessageType TypeInfo;
DynamicWorkerRes() {}
explicit DynamicWorkerRes(
std::vector<std::pair<std::string, std::string>> recover_indices)
: recover_indices(recover_indices) {}
std::vector<std::pair<std::string, std::string>> recover_indices;
void Save(capnp::DynamicWorkerRes::Builder *builder) const;
static std::unique_ptr<DynamicWorkerRes> Construct(
const capnp::DynamicWorkerRes::Reader &reader);
void Load(const capnp::DynamicWorkerRes::Reader &reader);
};
using DynamicWorkerRpc =
communication::rpc::RequestResponse<DynamicWorkerReq, DynamicWorkerRes>;
}

View File

@ -0,0 +1,42 @@
#>cpp
#pragma once
#include <vector>
#include <string>
#include "communication/rpc/messages.hpp"
#include "distributed/dynamic_worker_rpc_messages.capnp.h"
cpp<#
(lcp:namespace distributed)
(lcp:capnp-namespace "distributed")
(lcp:capnp-import 'utils "/utils/serialization.capnp")
(lcp:define-rpc dynamic-worker
(:request ())
(:response
((recover-indices "std::vector<std::pair<std::string, std::string>>"
:capnp-type "List(Utils.Pair(Text, Text))"
:capnp-save
(lambda (builder member)
#>cpp
utils::SaveVector<utils::capnp::Pair<::capnp::Text, ::capnp::Text>,
std::pair<std::string, std::string>>(
${member}, &${builder}, [](auto *builder, const auto value) {
builder->setFirst(value.first);
builder->setSecond(value.second);
});
cpp<#)
:capnp-load
(lambda (reader member)
#>cpp
utils::LoadVector<utils::capnp::Pair<::capnp::Text, ::capnp::Text>,
std::pair<std::string, std::string>>(
&${member}, ${reader}, [](const auto &reader) {
return std::make_pair(reader.getFirst(), reader.getSecond());
});
cpp<#)))))
(lcp:pop-namespace) ;; distributed

View File

@ -368,10 +368,61 @@ std::vector<tx::TransactionId> ReadWalRecoverableTransactions(
return committed_tx_ids;
}
} // anonymous namespace
RecoveryInfo RecoverOnlySnapshot(
const fs::path &durability_dir, database::GraphDb *db,
RecoveryData *recovery_data,
std::experimental::optional<tx::TransactionId> required_snapshot_tx_id,
int worker_id) {
// Attempt to recover from snapshot files in reverse order (from newest
// backwards).
const auto snapshot_dir = durability_dir / kSnapshotDir;
std::vector<fs::path> snapshot_files;
if (fs::exists(snapshot_dir) && fs::is_directory(snapshot_dir))
for (auto &file : fs::directory_iterator(snapshot_dir))
snapshot_files.emplace_back(file);
std::sort(snapshot_files.rbegin(), snapshot_files.rend());
for (auto &snapshot_file : snapshot_files) {
if (required_snapshot_tx_id) {
auto snapshot_file_tx_id =
TransactionIdFromSnapshotFilename(snapshot_file);
if (!snapshot_file_tx_id ||
snapshot_file_tx_id.value() != *required_snapshot_tx_id) {
LOG(INFO) << "Skipping snapshot file '" << snapshot_file
<< "' because it does not match the required snapshot tx id: "
<< *required_snapshot_tx_id;
continue;
}
}
LOG(INFO) << "Starting snapshot recovery from: " << snapshot_file;
if (!RecoverSnapshot(snapshot_file, db, recovery_data, worker_id)) {
db->ReinitializeStorage();
recovery_data->Clear();
LOG(WARNING) << "Snapshot recovery failed, trying older snapshot...";
continue;
} else {
LOG(INFO) << "Snapshot recovery successful.";
break;
}
}
// If snapshot recovery is required, and we failed, don't even deal with
// the WAL recovery.
if (required_snapshot_tx_id &&
recovery_data->snapshooter_tx_id != *required_snapshot_tx_id)
return {durability::kVersion, recovery_data->snapshooter_tx_id, {}};
return {durability::kVersion, recovery_data->snapshooter_tx_id,
ReadWalRecoverableTransactions(durability_dir / kWalDir, db,
*recovery_data)};
}
// TODO - finer-grained recovery feedback could be useful here.
bool RecoverWal(const fs::path &wal_dir, database::GraphDb *db,
void RecoverWal(const fs::path &durability_dir, database::GraphDb *db,
RecoveryData *recovery_data,
RecoveryTransactions *transactions) {
auto wal_dir = durability_dir / kWalDir;
auto wal_files = GetWalFiles(wal_dir);
// Track which transaction should be recovered first, and define logic for
// which transactions should be skipped in recovery.
@ -431,71 +482,13 @@ bool RecoverWal(const fs::path &wal_dir, database::GraphDb *db,
// - WAL recovery error
db->tx_engine().EnsureNextIdGreater(max_observed_tx_id);
return true;
}
} // anonymous namespace
RecoveryInfo RecoverOnlySnapshot(
const fs::path &durability_dir, database::GraphDb *db,
RecoveryData *recovery_data,
std::experimental::optional<tx::TransactionId> required_snapshot_tx_id,
int worker_id) {
// Attempt to recover from snapshot files in reverse order (from newest
// backwards).
const auto snapshot_dir = durability_dir / kSnapshotDir;
std::vector<fs::path> snapshot_files;
if (fs::exists(snapshot_dir) && fs::is_directory(snapshot_dir))
for (auto &file : fs::directory_iterator(snapshot_dir))
snapshot_files.emplace_back(file);
std::sort(snapshot_files.rbegin(), snapshot_files.rend());
for (auto &snapshot_file : snapshot_files) {
if (required_snapshot_tx_id) {
auto snapshot_file_tx_id =
TransactionIdFromSnapshotFilename(snapshot_file);
if (!snapshot_file_tx_id ||
snapshot_file_tx_id.value() != *required_snapshot_tx_id) {
LOG(INFO) << "Skipping snapshot file '" << snapshot_file
<< "' because it does not match the required snapshot tx id: "
<< *required_snapshot_tx_id;
continue;
}
}
LOG(INFO) << "Starting snapshot recovery from: " << snapshot_file;
if (!RecoverSnapshot(snapshot_file, db, recovery_data, worker_id)) {
db->ReinitializeStorage();
recovery_data->Clear();
LOG(WARNING) << "Snapshot recovery failed, trying older snapshot...";
continue;
} else {
LOG(INFO) << "Snapshot recovery successful.";
break;
}
}
// If snapshot recovery is required, and we failed, don't even deal with
// the WAL recovery.
if (required_snapshot_tx_id &&
recovery_data->snapshooter_tx_id != *required_snapshot_tx_id)
return {durability::kVersion, recovery_data->snapshooter_tx_id, {}};
return {durability::kVersion, recovery_data->snapshooter_tx_id,
ReadWalRecoverableTransactions(durability_dir / kWalDir, db,
*recovery_data)};
}
void RecoverWalAndIndexes(const fs::path &durability_dir, database::GraphDb *db,
RecoveryData *recovery_data,
RecoveryTransactions *transactions) {
// Write-ahead-log recovery.
// WAL recovery does not have to be complete for the recovery to be
// considered successful. For the time being ignore the return value,
// consider a better system.
RecoverWal(durability_dir / kWalDir, db, recovery_data, transactions);
// Index recovery.
void RecoverIndexes(
database::GraphDb *db,
const std::vector<std::pair<std::string, std::string>> &indexes) {
auto db_accessor_indices = db->Access();
for (const auto &label_prop : recovery_data->indexes) {
for (const auto &label_prop : indexes) {
const database::LabelPropertyIndex::Key key{
db_accessor_indices->Label(label_prop.first),
db_accessor_indices->Property(label_prop.second)};

View File

@ -180,8 +180,12 @@ class RecoveryTransactions {
virtual void Apply(const database::StateDelta &) = 0;
};
void RecoverWalAndIndexes(const std::experimental::filesystem::path &dir,
void RecoverWal(const std::experimental::filesystem::path &durability_dir,
database::GraphDb *db, RecoveryData *recovery_data,
RecoveryTransactions *transactions);
void RecoverIndexes(
database::GraphDb *db,
const std::vector<std::pair<std::string, std::string>> &indexes);
} // namespace durability

View File

@ -73,6 +73,9 @@ target_link_libraries(${test_prefix}distributed_dgp_vertex_migrator memgraph_lib
add_unit_test(distributed_durability.cpp)
target_link_libraries(${test_prefix}distributed_durability memgraph_lib kvstore_dummy_lib)
add_unit_test(distributed_dynamic_worker.cpp)
target_link_libraries(${test_prefix}distributed_dynamic_worker memgraph_lib kvstore_dummy_lib)
add_unit_test(distributed_gc.cpp)
target_link_libraries(${test_prefix}distributed_gc memgraph_lib kvstore_dummy_lib)

View File

@ -156,7 +156,7 @@ class DistributedGraphDbTest : public ::testing::Test {
// Each test has to specify its own durability suffix to avoid conflicts
DistributedGraphDbTest() = delete;
DistributedGraphDbTest(const std::string &dir_suffix)
explicit DistributedGraphDbTest(const std::string &dir_suffix)
: dir_suffix_(dir_suffix) {
tmp_dir_ =
fs::temp_directory_path() / ("MG_test_unit_durability_" + dir_suffix_);

View File

@ -0,0 +1,189 @@
#include <memory>
#include <thread>
#include "gtest/gtest.h"
#include "database/distributed_graph_db.hpp"
#include "database/graph_db.hpp"
#include "distributed_common.hpp"
#include "io/network/endpoint.hpp"
#include "query_plan_common.hpp"
namespace fs = std::experimental::filesystem;
using namespace std::literals::chrono_literals;
class DistributedDynamicWorker : public ::testing::Test {
public:
const std::string kLocal = "127.0.0.1";
std::unique_ptr<database::Master> CreateMaster(
std::function<database::Config(database::Config config)> modify_config) {
database::Config master_config;
master_config.master_endpoint = {kLocal, 0};
master_config.durability_directory = GetDurabilityDirectory(0);
// Flag needs to be updated due to props on disk storage.
FLAGS_durability_directory = GetDurabilityDirectory(0);
auto master =
std::make_unique<database::Master>(modify_config(master_config));
std::this_thread::sleep_for(200ms);
return master;
}
std::unique_ptr<WorkerInThread> CreateWorker(
io::network::Endpoint master_endpoint, int worker_id,
std::function<database::Config(database::Config config)> modify_config) {
database::Config config;
config.durability_directory = GetDurabilityDirectory(worker_id);
// Flag needs to be updated due to props on disk storage.
FLAGS_durability_directory = GetDurabilityDirectory(worker_id);
config.worker_id = worker_id;
config.master_endpoint = master_endpoint;
config.worker_endpoint = {kLocal, 0};
auto worker = std::make_unique<WorkerInThread>(modify_config(config));
std::this_thread::sleep_for(200ms);
return worker;
}
fs::path GetDurabilityDirectory(int worker_id) {
if (worker_id == 0) return tmp_dir_ / "master";
return tmp_dir_ / fmt::format("worker{}", worker_id);
}
void CleanDurability() {
if (fs::exists(tmp_dir_)) fs::remove_all(tmp_dir_);
}
private:
fs::path tmp_dir_ =
fs::temp_directory_path() / ("MG_test_unit_distributed_worker_addition");
};
TEST_F(DistributedDynamicWorker, IndexExistsOnNewWorker) {
auto modify_config = [](database::Config config) { return config; };
auto master = CreateMaster(modify_config);
// Lets insert some data and build label property index
{
auto dba = master->Access();
storage::Label label;
storage::Property property;
label = dba->Label("label");
property = dba->Property("property");
for (int i = 0; i < 100; ++i) {
auto vertex = dba->InsertVertex();
vertex.add_label(label);
vertex.PropsSet(property, 1);
}
dba->Commit();
}
{
auto dba = master->Access();
storage::Label label;
storage::Property property;
label = dba->Label("label");
property = dba->Property("property");
dba->BuildIndex(label, property);
EXPECT_TRUE(dba->LabelPropertyIndexExists(label, property));
EXPECT_EQ(CountIterable(dba->Vertices(label, property, false)), 100);
}
auto worker1 = CreateWorker(master->endpoint(), 1, modify_config);
// Check that the new worker has that index
{
auto dba = worker1->db()->Access();
storage::Label label;
storage::Property property;
label = dba->Label("label");
property = dba->Property("property");
EXPECT_TRUE(dba->LabelPropertyIndexExists(label, property));
}
auto t = std::thread([&]() { master = nullptr; });
worker1 = nullptr;
if (t.joinable()) t.join();
}
TEST_F(DistributedDynamicWorker, IndexExistsOnNewWorkerAfterRecovery) {
auto modify_config = [](database::Config config) { return config; };
auto durability_config = [](database::Config config) {
config.durability_enabled = true;
config.snapshot_on_exit = true;
return config;
};
auto recovery_config = [](database::Config config) {
config.recovering_cluster_size = 1;
config.db_recover_on_startup = true;
return config;
};
{
auto master = CreateMaster(durability_config);
// Lets insert some data and build label property index
{
auto dba = master->Access();
storage::Label label;
storage::Property property;
label = dba->Label("label");
property = dba->Property("property");
for (int i = 0; i < 100; ++i) {
auto vertex = dba->InsertVertex();
vertex.add_label(label);
vertex.PropsSet(property, 1);
}
dba->Commit();
}
{
auto dba = master->Access();
storage::Label label;
storage::Property property;
label = dba->Label("label");
property = dba->Property("property");
dba->BuildIndex(label, property);
EXPECT_TRUE(dba->LabelPropertyIndexExists(label, property));
}
master = nullptr;
}
{
auto master = CreateMaster(recovery_config);
// Make sure the index is recovered on master.
{
auto dba = master->Access();
storage::Label label;
storage::Property property;
label = dba->Label("label");
property = dba->Property("property");
EXPECT_TRUE(dba->LabelPropertyIndexExists(label, property));
}
auto worker1 = CreateWorker(master->endpoint(), 1, modify_config);
// Check that the new worker has that index.
{
auto dba = worker1->db()->Access();
storage::Label label;
storage::Property property;
label = dba->Label("label");
property = dba->Property("property");
EXPECT_TRUE(dba->LabelPropertyIndexExists(label, property));
}
auto t = std::thread([&]() { master = nullptr; });
worker1 = nullptr;
if (t.joinable()) t.join();
}
}

View File

@ -25,8 +25,9 @@ class RecoveryTest : public ::testing::Test {
durability::RecoverOnlySnapshot(durability_dir, &db_, &recovery_data,
std::experimental::nullopt, 0);
database::SingleNodeRecoveryTransanctions recovery_transactions(&db_);
durability::RecoverWalAndIndexes(durability_dir, &db_, &recovery_data,
durability::RecoverWal(durability_dir, &db_, &recovery_data,
&recovery_transactions);
durability::RecoverIndexes(&db_, recovery_data.indexes);
}
database::SingleNode db_;