diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 51c9ef502..16971a4fa 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -16,6 +16,8 @@ set(memgraph_src_files database/state_delta.cpp distributed/coordination_master.cpp distributed/coordination_worker.cpp + distributed/durability_rpc_clients.cpp + distributed/durability_rpc_server.cpp distributed/index_rpc_server.cpp distributed/plan_consumer.cpp distributed/plan_dispatcher.cpp diff --git a/src/communication/rpc/messages-inl.hpp b/src/communication/rpc/messages-inl.hpp index e33212ec7..6b22a7e0f 100644 --- a/src/communication/rpc/messages-inl.hpp +++ b/src/communication/rpc/messages-inl.hpp @@ -7,6 +7,7 @@ #include "database/state_delta.hpp" #include "distributed/coordination_rpc_messages.hpp" #include "distributed/data_rpc_messages.hpp" +#include "distributed/durability_rpc_messages.hpp" #include "distributed/index_rpc_messages.hpp" #include "distributed/plan_rpc_messages.hpp" #include "distributed/pull_produce_rpc_messages.hpp" @@ -118,6 +119,10 @@ BOOST_CLASS_EXPORT(distributed::RemoveInEdgeData); BOOST_CLASS_EXPORT(distributed::RemoveInEdgeReq); BOOST_CLASS_EXPORT(distributed::RemoveInEdgeRes); +// Durability +BOOST_CLASS_EXPORT(distributed::MakeSnapshotReq); +BOOST_CLASS_EXPORT(distributed::MakeSnapshotRes); + // Storage Gc. BOOST_CLASS_EXPORT(distributed::GcClearedStatusReq); BOOST_CLASS_EXPORT(distributed::GcClearedStatusRes); @@ -125,4 +130,3 @@ BOOST_CLASS_EXPORT(distributed::GcClearedStatusRes); // Transactional Cache Cleaner. BOOST_CLASS_EXPORT(distributed::WaitOnTransactionEndReq); BOOST_CLASS_EXPORT(distributed::WaitOnTransactionEndRes); - diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index e792de9b6..27a4d7536 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -10,6 +10,9 @@ #include "distributed/data_manager.hpp" #include "distributed/data_rpc_clients.hpp" #include "distributed/data_rpc_server.hpp" +#include "distributed/durability_rpc_clients.hpp" +#include "distributed/durability_rpc_messages.hpp" +#include "distributed/durability_rpc_server.hpp" #include "distributed/index_rpc_server.hpp" #include "distributed/plan_consumer.hpp" #include "distributed/plan_dispatcher.hpp" @@ -45,6 +48,19 @@ class PrivateBase : public GraphDb { durability::WriteAheadLog &wal() override { return wal_; } int WorkerId() const override { return config_.worker_id; } + // Makes a local snapshot from the visibility of accessor + bool MakeSnapshot(GraphDbAccessor &accessor) override { + const bool status = durability::MakeSnapshot( + *this, accessor, fs::path(config_.durability_directory), + config_.snapshot_max_retained); + if (status) { + LOG(INFO) << "Snapshot created successfully." << std::endl; + } else { + LOG(ERROR) << "Snapshot creation failed!" << std::endl; + } + return status; + } + distributed::PullRpcClients &pull_clients() override { LOG(FATAL) << "Remote pull clients only available in master."; } @@ -150,6 +166,21 @@ class Master : public PrivateBase { GraphDb::Type type() const override { return GraphDb::Type::DISTRIBUTED_MASTER; } + + // Makes a local snapshot and forces the workers to do the same. Snapshot is + // written here only if workers sucesfully created their own snapshot + bool MakeSnapshot(GraphDbAccessor &accessor) override { + auto workers_snapshot = + durability_rpc_clients_.MakeSnapshot(accessor.transaction_id()); + if (!workers_snapshot.get()) return false; + // This can be further optimized by creating master snapshot at the same + // time as workers snapshots but this forces us to delete the master + // snapshot if we succeed in creating it and workers somehow fail. Because + // we have an assumption that every snapshot that exists on master with some + // tx_id visibility also exists on workers + return PrivateBase::MakeSnapshot(accessor); + } + IMPL_GETTERS IMPL_DISTRIBUTED_GETTERS distributed::PlanDispatcher &plan_dispatcher() override { @@ -169,6 +200,8 @@ class Master : public PrivateBase { distributed::RpcWorkerClients rpc_worker_clients_{coordination_}; TypemapPack typemap_pack_{server_}; database::MasterCounters counters_{server_}; + distributed::DurabilityRpcClients durability_rpc_clients_{ + rpc_worker_clients_}; distributed::DataRpcServer data_server_{*this, server_}; distributed::DataRpcClients data_clients_{rpc_worker_clients_}; distributed::PlanDispatcher plan_dispatcher_{rpc_worker_clients_}; @@ -227,6 +260,7 @@ class Worker : public PrivateBase { distributed::DataManager data_manager_{storage_, data_clients_}; distributed::WorkerTransactionalCacheCleaner cache_cleaner_{ tx_engine_, server_, produce_server_, updates_server_, data_manager_}; + distributed::DurabilityRpcServer durability_rpc_server_{*this, server_}; }; #undef IMPL_GETTERS @@ -241,10 +275,6 @@ PublicBase::PublicBase(std::unique_ptr impl) durability::Recover(impl_->config_.durability_directory, *impl_); if (impl_->config_.durability_enabled) { impl_->wal().Enable(); - snapshot_creator_ = std::make_unique(); - snapshot_creator_->Run( - "Snapshot", std::chrono::seconds(impl_->config_.snapshot_cycle_sec), - [this] { MakeSnapshot(); }); } // Start transaction killer. @@ -272,8 +302,13 @@ PublicBase::~PublicBase() { tx_engine().LocalForEachActiveTransaction( [](auto &t) { t.set_should_abort(); }); - snapshot_creator_ = nullptr; - if (impl_->config_.snapshot_on_exit) MakeSnapshot(); + // If we are not a worker we can do a snapshot on exit if it's enabled. Doing + // this on the master forces workers to do the same through rpcs + if (impl_->config_.snapshot_on_exit && + impl_->type() != Type::DISTRIBUTED_WORKER) { + GraphDbAccessor dba(*this); + MakeSnapshot(dba); + } } GraphDb::Type PublicBase::type() const { return impl_->type(); } @@ -326,18 +361,27 @@ distributed::DataManager &PublicBase::data_manager() { return impl_->data_manager(); } -void PublicBase::MakeSnapshot() { - const bool status = durability::MakeSnapshot( - *impl_, fs::path(impl_->config_.durability_directory), - impl_->config_.snapshot_max_retained); - if (status) { - LOG(INFO) << "Snapshot created successfully." << std::endl; - } else { - LOG(ERROR) << "Snapshot creation failed!" << std::endl; - } +bool PublicBase::MakeSnapshot(GraphDbAccessor &accessor) { + return impl_->MakeSnapshot(accessor); } } // namespace impl +MasterBase::MasterBase(std::unique_ptr impl) + : PublicBase(std::move(impl)) { + if (impl_->config_.durability_enabled) { + impl_->wal().Enable(); + snapshot_creator_ = std::make_unique(); + snapshot_creator_->Run( + "Snapshot", std::chrono::seconds(impl_->config_.snapshot_cycle_sec), + [this] { + GraphDbAccessor dba(*this); + impl_->MakeSnapshot(dba); + }); + } +} + +MasterBase::~MasterBase() { snapshot_creator_ = nullptr; } + SingleNode::SingleNode(Config config) : MasterBase(std::make_unique(config)) {} diff --git a/src/database/graph_db.hpp b/src/database/graph_db.hpp index 4f129de0f..c70441fde 100644 --- a/src/database/graph_db.hpp +++ b/src/database/graph_db.hpp @@ -111,6 +111,9 @@ class GraphDb { virtual distributed::ProduceRpcServer &produce_server() = 0; virtual distributed::PlanConsumer &plan_consumer() = 0; + // Makes a snapshot from the visibility of the given accessor + virtual bool MakeSnapshot(GraphDbAccessor &accessor) = 0; + GraphDb(const GraphDb &) = delete; GraphDb(GraphDb &&) = delete; GraphDb &operator=(const GraphDb &) = delete; @@ -150,6 +153,7 @@ class PublicBase : public GraphDb { distributed::DataManager &data_manager() override; bool is_accepting_transactions() const { return is_accepting_transactions_; } + bool MakeSnapshot(GraphDbAccessor &accessor) override; protected: explicit PublicBase(std::unique_ptr impl); @@ -158,19 +162,19 @@ class PublicBase : public GraphDb { std::unique_ptr impl_; private: - std::unique_ptr snapshot_creator_; - /** When this is false, no new transactions should be created. */ std::atomic is_accepting_transactions_{true}; Scheduler transaction_killer_; - - void MakeSnapshot(); }; } // namespace impl class MasterBase : public impl::PublicBase { public: - using PublicBase::PublicBase; + MasterBase(std::unique_ptr impl); + ~MasterBase(); + + private: + std::unique_ptr snapshot_creator_; }; class SingleNode : public MasterBase { diff --git a/src/distributed/durability_rpc_clients.cpp b/src/distributed/durability_rpc_clients.cpp new file mode 100644 index 000000000..7146a6e0c --- /dev/null +++ b/src/distributed/durability_rpc_clients.cpp @@ -0,0 +1,26 @@ +#include "distributed/durability_rpc_clients.hpp" + +#include "distributed/durability_rpc_messages.hpp" +#include "transactions/transaction.hpp" +#include "utils/future.hpp" + +namespace distributed { +utils::Future DurabilityRpcClients::MakeSnapshot( + tx::transaction_id_t tx) { + return std::async(std::launch::async, [this, tx] { + auto futures = clients_.ExecuteOnWorkers( + 0, [tx](communication::rpc::ClientPool &client_pool) { + auto res = client_pool.Call(tx); + if (res == nullptr) return false; + return res->member; + }); + + bool created = true; + for (auto &future : futures) { + created &= future.get(); + } + + return created; + }); +} +} // namespace distributed diff --git a/src/distributed/durability_rpc_clients.hpp b/src/distributed/durability_rpc_clients.hpp new file mode 100644 index 000000000..b6e4ef3c9 --- /dev/null +++ b/src/distributed/durability_rpc_clients.hpp @@ -0,0 +1,28 @@ +#pragma once + +#include +#include +#include + +#include "distributed/rpc_worker_clients.hpp" +#include "storage/gid.hpp" +#include "transactions/type.hpp" + +namespace distributed { + +/// Provides an ability to trigger snapshooting on other workers. +class DurabilityRpcClients { + public: + DurabilityRpcClients(RpcWorkerClients &clients) : clients_(clients) {} + + // Sends a snapshot request to workers and returns a future which becomes true + // if all workers sucesfully completed their snapshot creation, false + // otherwise + // @param tx - transaction from which to take db snapshot + utils::Future MakeSnapshot(tx::transaction_id_t tx); + + private: + RpcWorkerClients &clients_; +}; + +} // namespace distributed diff --git a/src/distributed/durability_rpc_messages.hpp b/src/distributed/durability_rpc_messages.hpp new file mode 100644 index 000000000..4b51cb6a3 --- /dev/null +++ b/src/distributed/durability_rpc_messages.hpp @@ -0,0 +1,17 @@ +#pragma once + +#include "boost/serialization/access.hpp" +#include "boost/serialization/base_object.hpp" + +#include "communication/rpc/messages.hpp" +#include "transactions/transaction.hpp" + +namespace distributed { + +RPC_SINGLE_MEMBER_MESSAGE(MakeSnapshotReq, tx::transaction_id_t); +RPC_SINGLE_MEMBER_MESSAGE(MakeSnapshotRes, bool); + +using MakeSnapshotRpc = + communication::rpc::RequestResponse; + +} // namespace distributed diff --git a/src/distributed/durability_rpc_server.cpp b/src/distributed/durability_rpc_server.cpp new file mode 100644 index 000000000..801b59b16 --- /dev/null +++ b/src/distributed/durability_rpc_server.cpp @@ -0,0 +1,18 @@ +#include "distributed/durability_rpc_server.hpp" + +#include "database/graph_db.hpp" +#include "database/graph_db_accessor.hpp" +#include "distributed/durability_rpc_messages.hpp" + +namespace distributed { + +DurabilityRpcServer::DurabilityRpcServer(database::GraphDb &db, + communication::rpc::Server &server) + : db_(db), rpc_server_(server) { + rpc_server_.Register([this](const MakeSnapshotReq &req) { + database::GraphDbAccessor dba(this->db_, req.member); + return std::make_unique(this->db_.MakeSnapshot(dba)); + }); +} + +} // namespace distributed diff --git a/src/distributed/durability_rpc_server.hpp b/src/distributed/durability_rpc_server.hpp new file mode 100644 index 000000000..1373b6aec --- /dev/null +++ b/src/distributed/durability_rpc_server.hpp @@ -0,0 +1,21 @@ +#pragma once + +#include "communication/rpc/server.hpp" + +namespace database { +class GraphDb; +}; + +namespace distributed { + +class DurabilityRpcServer { + public: + DurabilityRpcServer(database::GraphDb &db, + communication::rpc::Server &server); + + private: + database::GraphDb &db_; + communication::rpc::Server &rpc_server_; +}; + +} // namespace distributed diff --git a/src/durability/snapshooter.cpp b/src/durability/snapshooter.cpp index 0327e8f8e..b9c5fc4ae 100644 --- a/src/durability/snapshooter.cpp +++ b/src/durability/snapshooter.cpp @@ -117,9 +117,9 @@ void RemoveOldWals(const fs::path &wal_dir, } } // namespace -bool MakeSnapshot(database::GraphDb &db, const fs::path &durability_dir, +bool MakeSnapshot(database::GraphDb &db, database::GraphDbAccessor &dba, + const fs::path &durability_dir, const int snapshot_max_retained) { - database::GraphDbAccessor dba(db); if (!EnsureDir(durability_dir / kSnapshotDir)) return false; const auto snapshot_file = MakeSnapshotPath(durability_dir, db.WorkerId(), dba.transaction_id()); @@ -127,7 +127,6 @@ bool MakeSnapshot(database::GraphDb &db, const fs::path &durability_dir, if (Encode(snapshot_file, db, dba)) { RemoveOldSnapshots(durability_dir / kSnapshotDir, snapshot_max_retained); RemoveOldWals(durability_dir / kWalDir, dba.transaction()); - dba.Commit(); return true; } else { std::error_code error_code; // Just for exception suppression. diff --git a/src/durability/snapshooter.hpp b/src/durability/snapshooter.hpp index 09f5aef7a..aad943c77 100644 --- a/src/durability/snapshooter.hpp +++ b/src/durability/snapshooter.hpp @@ -9,10 +9,11 @@ namespace durability { /** * Make snapshot and save it in snapshots folder. Returns true if successful. * @param db - database for which we are creating a snapshot + * @param dba - db accessor with which we are creating a snapshot (reading data) * @param durability_dir - directory where durability data is stored. * @param snapshot_max_retained - maximum number of snapshots to retain. */ -bool MakeSnapshot(database::GraphDb &db, +bool MakeSnapshot(database::GraphDb &db, database::GraphDbAccessor &dba, const std::experimental::filesystem::path &durability_dir, int snapshot_max_retained); } // namespace durability diff --git a/tests/unit/distributed_common.hpp b/tests/unit/distributed_common.hpp index c15afd3c1..26e1a9409 100644 --- a/tests/unit/distributed_common.hpp +++ b/tests/unit/distributed_common.hpp @@ -1,3 +1,4 @@ +#include #include #include @@ -9,6 +10,8 @@ #include "storage/address_types.hpp" #include "transactions/engine_master.hpp" +namespace fs = std::experimental::filesystem; + class DistributedGraphDbTest : public ::testing::Test { const std::string kLocal = "127.0.0.1"; const int kWorkerCount = 2; @@ -30,39 +33,55 @@ class DistributedGraphDbTest : public ::testing::Test { protected: virtual int QueryExecutionTimeSec(int) { return 180; } - void SetUp() override { + void Initialize( + std::function modify_config) { const auto kInitTime = 200ms; database::Config master_config; master_config.master_endpoint = {kLocal, 0}; master_config.query_execution_time_sec = QueryExecutionTimeSec(0); - master_ = std::make_unique(master_config); + master_config.durability_directory = tmp_dir_; + master_ = std::make_unique(modify_config(master_config)); std::this_thread::sleep_for(kInitTime); auto worker_config = [this](int worker_id) { database::Config config; config.worker_id = worker_id; config.master_endpoint = master_->endpoint(); + config.durability_directory = tmp_dir_; config.worker_endpoint = {kLocal, 0}; config.query_execution_time_sec = QueryExecutionTimeSec(worker_id); return config; }; for (int i = 0; i < kWorkerCount; ++i) { - workers_.emplace_back( - std::make_unique(worker_config(i + 1))); + workers_.emplace_back(std::make_unique( + modify_config(worker_config(i + 1)))); std::this_thread::sleep_for(kInitTime); } } - void TearDown() override { + void SetUp() override { + Initialize([](database::Config config) { return config; }); + } + + void ShutDown() { // Kill master first because it will expect a shutdown response from the // workers. auto t = std::thread([this]() { master_ = nullptr; }); - for (int i = kWorkerCount - 1; i >= 0; --i) workers_[i] = nullptr; + workers_.clear(); if (t.joinable()) t.join(); } + void CleanDurability() { + if (fs::exists(tmp_dir_)) fs::remove_all(tmp_dir_); + } + + void TearDown() override { + ShutDown(); + CleanDurability(); + } + database::Master &master() { return *master_; } auto &master_tx_engine() { return dynamic_cast(master_->tx_engine()); @@ -114,4 +133,7 @@ class DistributedGraphDbTest : public ::testing::Test { private: std::unique_ptr master_; std::vector> workers_; + + fs::path tmp_dir_ = fs::temp_directory_path() / + ("MG_test_unit_durability" + std::to_string(getpid())); }; diff --git a/tests/unit/distributed_durability.cpp b/tests/unit/distributed_durability.cpp new file mode 100644 index 000000000..62518113a --- /dev/null +++ b/tests/unit/distributed_durability.cpp @@ -0,0 +1,75 @@ +#include "distributed_common.hpp" + +#include "database/graph_db_accessor.hpp" + +class DistributedDurability : public DistributedGraphDbTest { + public: + void write_labels() { + add_label(master(), "master"); + add_label(worker(1), "worker1"); + add_label(worker(2), "worker2"); + } + void check_labels() { + check_label(master(), "master"); + check_label(worker(1), "worker1"); + check_label(worker(2), "worker2"); + } + + private: + void add_label(database::GraphDb &db, const std::string &label) { + database::GraphDbAccessor dba(db); + auto vertex = dba.InsertVertex(); + vertex.add_label(dba.Label(label)); + dba.Commit(); + } + + void check_label(database::GraphDb &db, const std::string &label) { + database::GraphDbAccessor dba(db); + auto it = dba.Vertices(false); + ASSERT_NE(it.begin(), it.end()); + auto vertex = *it.begin(); + ASSERT_EQ(vertex.labels().size(), 1); + EXPECT_EQ(vertex.labels()[0], dba.Label(label)); + } +}; + +TEST_F(DistributedDurability, MakeSnapshot) { + // Create a graph with 3 nodes with 3 labels, one on each and make a snapshot + // of it + { + write_labels(); + database::GraphDbAccessor dba(master()); + master().MakeSnapshot(dba); + } + // Recover the graph and check if it's the same as before + { + ShutDown(); + Initialize([](database::Config config) { + config.db_recover_on_startup = true; + return config; + }); + check_labels(); + } +} + +TEST_F(DistributedDurability, SnapshotOnExit) { + { + TearDown(); + Initialize([](database::Config config) { + config.snapshot_on_exit = true; + return config; + }); + write_labels(); + } + // Recover the graph and check if it's the same as before + { + // This should force the db to make a snapshot + ShutDown(); + + Initialize([](database::Config config) { + config.db_recover_on_startup = true; + return config; + }); + check_labels(); + } +} diff --git a/tests/unit/durability.cpp b/tests/unit/durability.cpp index e29d5e77e..c00ba38e7 100644 --- a/tests/unit/durability.cpp +++ b/tests/unit/durability.cpp @@ -300,8 +300,9 @@ class Durability : public ::testing::Test { } void MakeSnapshot(database::GraphDb &db, int snapshot_max_retained = -1) { - ASSERT_TRUE( - durability::MakeSnapshot(db, durability_dir_, snapshot_max_retained)); + database::GraphDbAccessor dba(db); + ASSERT_TRUE(durability::MakeSnapshot(db, dba, durability_dir_, + snapshot_max_retained)); } void SetUp() override { @@ -820,8 +821,9 @@ TEST_F(Durability, SequentialRecovery) { return threads; }; - auto make_updates = [&run_updates, this]( - database::GraphDb &db, bool snapshot_during, bool snapshot_after) { + auto make_updates = [&run_updates, this](database::GraphDb &db, + bool snapshot_during, + bool snapshot_after) { std::atomic keep_running{true}; auto update_theads = run_updates(db, keep_running); std::this_thread::sleep_for(25ms);