Synchronize snapshooting

Summary: Make synchronized snapshot. This invokese the snapshooter on workers on the master snapshot scheduler interval.

Reviewers: msantl, mtomic

Reviewed By: msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1334
This commit is contained in:
Dominik Gleich 2018-04-06 09:59:54 +02:00
parent 23fe66e828
commit b20e31e800
14 changed files with 298 additions and 35 deletions

View File

@ -16,6 +16,8 @@ set(memgraph_src_files
database/state_delta.cpp
distributed/coordination_master.cpp
distributed/coordination_worker.cpp
distributed/durability_rpc_clients.cpp
distributed/durability_rpc_server.cpp
distributed/index_rpc_server.cpp
distributed/plan_consumer.cpp
distributed/plan_dispatcher.cpp

View File

@ -7,6 +7,7 @@
#include "database/state_delta.hpp"
#include "distributed/coordination_rpc_messages.hpp"
#include "distributed/data_rpc_messages.hpp"
#include "distributed/durability_rpc_messages.hpp"
#include "distributed/index_rpc_messages.hpp"
#include "distributed/plan_rpc_messages.hpp"
#include "distributed/pull_produce_rpc_messages.hpp"
@ -118,6 +119,10 @@ BOOST_CLASS_EXPORT(distributed::RemoveInEdgeData);
BOOST_CLASS_EXPORT(distributed::RemoveInEdgeReq);
BOOST_CLASS_EXPORT(distributed::RemoveInEdgeRes);
// Durability
BOOST_CLASS_EXPORT(distributed::MakeSnapshotReq);
BOOST_CLASS_EXPORT(distributed::MakeSnapshotRes);
// Storage Gc.
BOOST_CLASS_EXPORT(distributed::GcClearedStatusReq);
BOOST_CLASS_EXPORT(distributed::GcClearedStatusRes);
@ -125,4 +130,3 @@ BOOST_CLASS_EXPORT(distributed::GcClearedStatusRes);
// Transactional Cache Cleaner.
BOOST_CLASS_EXPORT(distributed::WaitOnTransactionEndReq);
BOOST_CLASS_EXPORT(distributed::WaitOnTransactionEndRes);

View File

@ -10,6 +10,9 @@
#include "distributed/data_manager.hpp"
#include "distributed/data_rpc_clients.hpp"
#include "distributed/data_rpc_server.hpp"
#include "distributed/durability_rpc_clients.hpp"
#include "distributed/durability_rpc_messages.hpp"
#include "distributed/durability_rpc_server.hpp"
#include "distributed/index_rpc_server.hpp"
#include "distributed/plan_consumer.hpp"
#include "distributed/plan_dispatcher.hpp"
@ -45,6 +48,19 @@ class PrivateBase : public GraphDb {
durability::WriteAheadLog &wal() override { return wal_; }
int WorkerId() const override { return config_.worker_id; }
// Makes a local snapshot from the visibility of accessor
bool MakeSnapshot(GraphDbAccessor &accessor) override {
const bool status = durability::MakeSnapshot(
*this, accessor, fs::path(config_.durability_directory),
config_.snapshot_max_retained);
if (status) {
LOG(INFO) << "Snapshot created successfully." << std::endl;
} else {
LOG(ERROR) << "Snapshot creation failed!" << std::endl;
}
return status;
}
distributed::PullRpcClients &pull_clients() override {
LOG(FATAL) << "Remote pull clients only available in master.";
}
@ -150,6 +166,21 @@ class Master : public PrivateBase {
GraphDb::Type type() const override {
return GraphDb::Type::DISTRIBUTED_MASTER;
}
// Makes a local snapshot and forces the workers to do the same. Snapshot is
// written here only if workers sucesfully created their own snapshot
bool MakeSnapshot(GraphDbAccessor &accessor) override {
auto workers_snapshot =
durability_rpc_clients_.MakeSnapshot(accessor.transaction_id());
if (!workers_snapshot.get()) return false;
// This can be further optimized by creating master snapshot at the same
// time as workers snapshots but this forces us to delete the master
// snapshot if we succeed in creating it and workers somehow fail. Because
// we have an assumption that every snapshot that exists on master with some
// tx_id visibility also exists on workers
return PrivateBase::MakeSnapshot(accessor);
}
IMPL_GETTERS
IMPL_DISTRIBUTED_GETTERS
distributed::PlanDispatcher &plan_dispatcher() override {
@ -169,6 +200,8 @@ class Master : public PrivateBase {
distributed::RpcWorkerClients rpc_worker_clients_{coordination_};
TypemapPack<MasterConcurrentIdMapper> typemap_pack_{server_};
database::MasterCounters counters_{server_};
distributed::DurabilityRpcClients durability_rpc_clients_{
rpc_worker_clients_};
distributed::DataRpcServer data_server_{*this, server_};
distributed::DataRpcClients data_clients_{rpc_worker_clients_};
distributed::PlanDispatcher plan_dispatcher_{rpc_worker_clients_};
@ -227,6 +260,7 @@ class Worker : public PrivateBase {
distributed::DataManager data_manager_{storage_, data_clients_};
distributed::WorkerTransactionalCacheCleaner cache_cleaner_{
tx_engine_, server_, produce_server_, updates_server_, data_manager_};
distributed::DurabilityRpcServer durability_rpc_server_{*this, server_};
};
#undef IMPL_GETTERS
@ -241,10 +275,6 @@ PublicBase::PublicBase(std::unique_ptr<PrivateBase> impl)
durability::Recover(impl_->config_.durability_directory, *impl_);
if (impl_->config_.durability_enabled) {
impl_->wal().Enable();
snapshot_creator_ = std::make_unique<Scheduler>();
snapshot_creator_->Run(
"Snapshot", std::chrono::seconds(impl_->config_.snapshot_cycle_sec),
[this] { MakeSnapshot(); });
}
// Start transaction killer.
@ -272,8 +302,13 @@ PublicBase::~PublicBase() {
tx_engine().LocalForEachActiveTransaction(
[](auto &t) { t.set_should_abort(); });
snapshot_creator_ = nullptr;
if (impl_->config_.snapshot_on_exit) MakeSnapshot();
// If we are not a worker we can do a snapshot on exit if it's enabled. Doing
// this on the master forces workers to do the same through rpcs
if (impl_->config_.snapshot_on_exit &&
impl_->type() != Type::DISTRIBUTED_WORKER) {
GraphDbAccessor dba(*this);
MakeSnapshot(dba);
}
}
GraphDb::Type PublicBase::type() const { return impl_->type(); }
@ -326,18 +361,27 @@ distributed::DataManager &PublicBase::data_manager() {
return impl_->data_manager();
}
void PublicBase::MakeSnapshot() {
const bool status = durability::MakeSnapshot(
*impl_, fs::path(impl_->config_.durability_directory),
impl_->config_.snapshot_max_retained);
if (status) {
LOG(INFO) << "Snapshot created successfully." << std::endl;
} else {
LOG(ERROR) << "Snapshot creation failed!" << std::endl;
}
bool PublicBase::MakeSnapshot(GraphDbAccessor &accessor) {
return impl_->MakeSnapshot(accessor);
}
} // namespace impl
MasterBase::MasterBase(std::unique_ptr<impl::PrivateBase> impl)
: PublicBase(std::move(impl)) {
if (impl_->config_.durability_enabled) {
impl_->wal().Enable();
snapshot_creator_ = std::make_unique<Scheduler>();
snapshot_creator_->Run(
"Snapshot", std::chrono::seconds(impl_->config_.snapshot_cycle_sec),
[this] {
GraphDbAccessor dba(*this);
impl_->MakeSnapshot(dba);
});
}
}
MasterBase::~MasterBase() { snapshot_creator_ = nullptr; }
SingleNode::SingleNode(Config config)
: MasterBase(std::make_unique<impl::SingleNode>(config)) {}

View File

@ -111,6 +111,9 @@ class GraphDb {
virtual distributed::ProduceRpcServer &produce_server() = 0;
virtual distributed::PlanConsumer &plan_consumer() = 0;
// Makes a snapshot from the visibility of the given accessor
virtual bool MakeSnapshot(GraphDbAccessor &accessor) = 0;
GraphDb(const GraphDb &) = delete;
GraphDb(GraphDb &&) = delete;
GraphDb &operator=(const GraphDb &) = delete;
@ -150,6 +153,7 @@ class PublicBase : public GraphDb {
distributed::DataManager &data_manager() override;
bool is_accepting_transactions() const { return is_accepting_transactions_; }
bool MakeSnapshot(GraphDbAccessor &accessor) override;
protected:
explicit PublicBase(std::unique_ptr<PrivateBase> impl);
@ -158,19 +162,19 @@ class PublicBase : public GraphDb {
std::unique_ptr<PrivateBase> impl_;
private:
std::unique_ptr<Scheduler> snapshot_creator_;
/** When this is false, no new transactions should be created. */
std::atomic<bool> is_accepting_transactions_{true};
Scheduler transaction_killer_;
void MakeSnapshot();
};
} // namespace impl
class MasterBase : public impl::PublicBase {
public:
using PublicBase::PublicBase;
MasterBase(std::unique_ptr<impl::PrivateBase> impl);
~MasterBase();
private:
std::unique_ptr<Scheduler> snapshot_creator_;
};
class SingleNode : public MasterBase {

View File

@ -0,0 +1,26 @@
#include "distributed/durability_rpc_clients.hpp"
#include "distributed/durability_rpc_messages.hpp"
#include "transactions/transaction.hpp"
#include "utils/future.hpp"
namespace distributed {
utils::Future<bool> DurabilityRpcClients::MakeSnapshot(
tx::transaction_id_t tx) {
return std::async(std::launch::async, [this, tx] {
auto futures = clients_.ExecuteOnWorkers<bool>(
0, [tx](communication::rpc::ClientPool &client_pool) {
auto res = client_pool.Call<MakeSnapshotRpc>(tx);
if (res == nullptr) return false;
return res->member;
});
bool created = true;
for (auto &future : futures) {
created &= future.get();
}
return created;
});
}
} // namespace distributed

View File

@ -0,0 +1,28 @@
#pragma once
#include <future>
#include <mutex>
#include <utility>
#include "distributed/rpc_worker_clients.hpp"
#include "storage/gid.hpp"
#include "transactions/type.hpp"
namespace distributed {
/// Provides an ability to trigger snapshooting on other workers.
class DurabilityRpcClients {
public:
DurabilityRpcClients(RpcWorkerClients &clients) : clients_(clients) {}
// Sends a snapshot request to workers and returns a future which becomes true
// if all workers sucesfully completed their snapshot creation, false
// otherwise
// @param tx - transaction from which to take db snapshot
utils::Future<bool> MakeSnapshot(tx::transaction_id_t tx);
private:
RpcWorkerClients &clients_;
};
} // namespace distributed

View File

@ -0,0 +1,17 @@
#pragma once
#include "boost/serialization/access.hpp"
#include "boost/serialization/base_object.hpp"
#include "communication/rpc/messages.hpp"
#include "transactions/transaction.hpp"
namespace distributed {
RPC_SINGLE_MEMBER_MESSAGE(MakeSnapshotReq, tx::transaction_id_t);
RPC_SINGLE_MEMBER_MESSAGE(MakeSnapshotRes, bool);
using MakeSnapshotRpc =
communication::rpc::RequestResponse<MakeSnapshotReq, MakeSnapshotRes>;
} // namespace distributed

View File

@ -0,0 +1,18 @@
#include "distributed/durability_rpc_server.hpp"
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "distributed/durability_rpc_messages.hpp"
namespace distributed {
DurabilityRpcServer::DurabilityRpcServer(database::GraphDb &db,
communication::rpc::Server &server)
: db_(db), rpc_server_(server) {
rpc_server_.Register<MakeSnapshotRpc>([this](const MakeSnapshotReq &req) {
database::GraphDbAccessor dba(this->db_, req.member);
return std::make_unique<MakeSnapshotRes>(this->db_.MakeSnapshot(dba));
});
}
} // namespace distributed

View File

@ -0,0 +1,21 @@
#pragma once
#include "communication/rpc/server.hpp"
namespace database {
class GraphDb;
};
namespace distributed {
class DurabilityRpcServer {
public:
DurabilityRpcServer(database::GraphDb &db,
communication::rpc::Server &server);
private:
database::GraphDb &db_;
communication::rpc::Server &rpc_server_;
};
} // namespace distributed

View File

@ -117,9 +117,9 @@ void RemoveOldWals(const fs::path &wal_dir,
}
} // namespace
bool MakeSnapshot(database::GraphDb &db, const fs::path &durability_dir,
bool MakeSnapshot(database::GraphDb &db, database::GraphDbAccessor &dba,
const fs::path &durability_dir,
const int snapshot_max_retained) {
database::GraphDbAccessor dba(db);
if (!EnsureDir(durability_dir / kSnapshotDir)) return false;
const auto snapshot_file =
MakeSnapshotPath(durability_dir, db.WorkerId(), dba.transaction_id());
@ -127,7 +127,6 @@ bool MakeSnapshot(database::GraphDb &db, const fs::path &durability_dir,
if (Encode(snapshot_file, db, dba)) {
RemoveOldSnapshots(durability_dir / kSnapshotDir, snapshot_max_retained);
RemoveOldWals(durability_dir / kWalDir, dba.transaction());
dba.Commit();
return true;
} else {
std::error_code error_code; // Just for exception suppression.

View File

@ -9,10 +9,11 @@ namespace durability {
/**
* Make snapshot and save it in snapshots folder. Returns true if successful.
* @param db - database for which we are creating a snapshot
* @param dba - db accessor with which we are creating a snapshot (reading data)
* @param durability_dir - directory where durability data is stored.
* @param snapshot_max_retained - maximum number of snapshots to retain.
*/
bool MakeSnapshot(database::GraphDb &db,
bool MakeSnapshot(database::GraphDb &db, database::GraphDbAccessor &dba,
const std::experimental::filesystem::path &durability_dir,
int snapshot_max_retained);
} // namespace durability

View File

@ -1,3 +1,4 @@
#include <experimental/filesystem>
#include <memory>
#include <thread>
@ -9,6 +10,8 @@
#include "storage/address_types.hpp"
#include "transactions/engine_master.hpp"
namespace fs = std::experimental::filesystem;
class DistributedGraphDbTest : public ::testing::Test {
const std::string kLocal = "127.0.0.1";
const int kWorkerCount = 2;
@ -30,39 +33,55 @@ class DistributedGraphDbTest : public ::testing::Test {
protected:
virtual int QueryExecutionTimeSec(int) { return 180; }
void SetUp() override {
void Initialize(
std::function<database::Config(database::Config config)> modify_config) {
const auto kInitTime = 200ms;
database::Config master_config;
master_config.master_endpoint = {kLocal, 0};
master_config.query_execution_time_sec = QueryExecutionTimeSec(0);
master_ = std::make_unique<database::Master>(master_config);
master_config.durability_directory = tmp_dir_;
master_ = std::make_unique<database::Master>(modify_config(master_config));
std::this_thread::sleep_for(kInitTime);
auto worker_config = [this](int worker_id) {
database::Config config;
config.worker_id = worker_id;
config.master_endpoint = master_->endpoint();
config.durability_directory = tmp_dir_;
config.worker_endpoint = {kLocal, 0};
config.query_execution_time_sec = QueryExecutionTimeSec(worker_id);
return config;
};
for (int i = 0; i < kWorkerCount; ++i) {
workers_.emplace_back(
std::make_unique<WorkerInThread>(worker_config(i + 1)));
workers_.emplace_back(std::make_unique<WorkerInThread>(
modify_config(worker_config(i + 1))));
std::this_thread::sleep_for(kInitTime);
}
}
void TearDown() override {
void SetUp() override {
Initialize([](database::Config config) { return config; });
}
void ShutDown() {
// Kill master first because it will expect a shutdown response from the
// workers.
auto t = std::thread([this]() { master_ = nullptr; });
for (int i = kWorkerCount - 1; i >= 0; --i) workers_[i] = nullptr;
workers_.clear();
if (t.joinable()) t.join();
}
void CleanDurability() {
if (fs::exists(tmp_dir_)) fs::remove_all(tmp_dir_);
}
void TearDown() override {
ShutDown();
CleanDurability();
}
database::Master &master() { return *master_; }
auto &master_tx_engine() {
return dynamic_cast<tx::MasterEngine &>(master_->tx_engine());
@ -114,4 +133,7 @@ class DistributedGraphDbTest : public ::testing::Test {
private:
std::unique_ptr<database::Master> master_;
std::vector<std::unique_ptr<WorkerInThread>> workers_;
fs::path tmp_dir_ = fs::temp_directory_path() /
("MG_test_unit_durability" + std::to_string(getpid()));
};

View File

@ -0,0 +1,75 @@
#include "distributed_common.hpp"
#include "database/graph_db_accessor.hpp"
class DistributedDurability : public DistributedGraphDbTest {
public:
void write_labels() {
add_label(master(), "master");
add_label(worker(1), "worker1");
add_label(worker(2), "worker2");
}
void check_labels() {
check_label(master(), "master");
check_label(worker(1), "worker1");
check_label(worker(2), "worker2");
}
private:
void add_label(database::GraphDb &db, const std::string &label) {
database::GraphDbAccessor dba(db);
auto vertex = dba.InsertVertex();
vertex.add_label(dba.Label(label));
dba.Commit();
}
void check_label(database::GraphDb &db, const std::string &label) {
database::GraphDbAccessor dba(db);
auto it = dba.Vertices(false);
ASSERT_NE(it.begin(), it.end());
auto vertex = *it.begin();
ASSERT_EQ(vertex.labels().size(), 1);
EXPECT_EQ(vertex.labels()[0], dba.Label(label));
}
};
TEST_F(DistributedDurability, MakeSnapshot) {
// Create a graph with 3 nodes with 3 labels, one on each and make a snapshot
// of it
{
write_labels();
database::GraphDbAccessor dba(master());
master().MakeSnapshot(dba);
}
// Recover the graph and check if it's the same as before
{
ShutDown();
Initialize([](database::Config config) {
config.db_recover_on_startup = true;
return config;
});
check_labels();
}
}
TEST_F(DistributedDurability, SnapshotOnExit) {
{
TearDown();
Initialize([](database::Config config) {
config.snapshot_on_exit = true;
return config;
});
write_labels();
}
// Recover the graph and check if it's the same as before
{
// This should force the db to make a snapshot
ShutDown();
Initialize([](database::Config config) {
config.db_recover_on_startup = true;
return config;
});
check_labels();
}
}

View File

@ -300,8 +300,9 @@ class Durability : public ::testing::Test {
}
void MakeSnapshot(database::GraphDb &db, int snapshot_max_retained = -1) {
ASSERT_TRUE(
durability::MakeSnapshot(db, durability_dir_, snapshot_max_retained));
database::GraphDbAccessor dba(db);
ASSERT_TRUE(durability::MakeSnapshot(db, dba, durability_dir_,
snapshot_max_retained));
}
void SetUp() override {
@ -820,8 +821,9 @@ TEST_F(Durability, SequentialRecovery) {
return threads;
};
auto make_updates = [&run_updates, this](
database::GraphDb &db, bool snapshot_during, bool snapshot_after) {
auto make_updates = [&run_updates, this](database::GraphDb &db,
bool snapshot_during,
bool snapshot_after) {
std::atomic<bool> keep_running{true};
auto update_theads = run_updates(db, keep_running);
std::this_thread::sleep_for(25ms);