From 06810403955b177555b4a387077efbcad92cb5d8 Mon Sep 17 00:00:00 2001 From: Dominik Gleich Date: Tue, 17 Jul 2018 11:03:03 +0200 Subject: [PATCH] Durability distributed wal Summary: Recover only snapshot Return recovery info on worker recovery Update tests Start wal recovery Single node wal split Keep track of wal possible to recover Fix comment Wal tx intersection Merge branch 'master' into sync_wal_tx Reviewers: buda, ipaljak, dgleich, vkasljevic, teon.banek Reviewed By: teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1489 --- src/CMakeLists.txt | 4 +- src/database/distributed_graph_db.cpp | 66 +++-- src/database/distributed_graph_db.hpp | 2 + src/database/graph_db.cpp | 23 +- src/database/storage.hpp | 11 +- src/distributed/cluster_discovery_master.cpp | 7 +- src/distributed/cluster_discovery_worker.cpp | 13 +- src/distributed/cluster_discovery_worker.hpp | 11 +- src/distributed/coordination_master.cpp | 53 +++- src/distributed/coordination_master.hpp | 24 +- src/distributed/coordination_rpc_messages.lcp | 23 +- src/distributed/durability_rpc_clients.cpp | 25 -- src/distributed/durability_rpc_master.cpp | 46 ++++ ..._clients.hpp => durability_rpc_master.hpp} | 8 +- src/distributed/durability_rpc_messages.lcp | 7 + src/distributed/durability_rpc_server.cpp | 20 -- src/distributed/durability_rpc_server.hpp | 21 -- src/distributed/durability_rpc_worker.cpp | 27 ++ src/distributed/durability_rpc_worker.hpp | 20 ++ src/durability/recovery.capnp | 11 +- src/durability/recovery.cpp | 250 ++++++++++-------- src/durability/recovery.hpp | 110 ++++++-- src/lisp/lcp.lisp | 6 +- src/storage/garbage_collector.hpp | 5 +- src/utils/algorithm.hpp | 7 + tests/distributed/card_fraud/card_fraud.py | 2 +- tests/unit/distributed_coordination.cpp | 15 +- tests/unit/rpc_worker_clients.cpp | 2 +- tools/tests/mg_recovery_check.cpp | 5 +- 29 files changed, 553 insertions(+), 271 deletions(-) delete mode 100644 src/distributed/durability_rpc_clients.cpp create mode 100644 src/distributed/durability_rpc_master.cpp rename src/distributed/{durability_rpc_clients.hpp => durability_rpc_master.hpp} (72%) delete mode 100644 src/distributed/durability_rpc_server.cpp delete mode 100644 src/distributed/durability_rpc_server.hpp create mode 100644 src/distributed/durability_rpc_worker.cpp create mode 100644 src/distributed/durability_rpc_worker.hpp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 2f064ef1b..0e2010f3b 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -28,8 +28,8 @@ set(memgraph_src_files distributed/data_manager.cpp distributed/data_rpc_clients.cpp distributed/data_rpc_server.cpp - distributed/durability_rpc_clients.cpp - distributed/durability_rpc_server.cpp + distributed/durability_rpc_master.cpp + distributed/durability_rpc_worker.cpp distributed/index_rpc_server.cpp distributed/plan_consumer.cpp distributed/plan_dispatcher.cpp diff --git a/src/database/distributed_graph_db.cpp b/src/database/distributed_graph_db.cpp index c395da11c..b52eab96e 100644 --- a/src/database/distributed_graph_db.cpp +++ b/src/database/distributed_graph_db.cpp @@ -11,8 +11,8 @@ #include "distributed/coordination_worker.hpp" #include "distributed/data_manager.hpp" #include "distributed/data_rpc_server.hpp" -#include "distributed/durability_rpc_clients.hpp" -#include "distributed/durability_rpc_server.hpp" +#include "distributed/durability_rpc_master.hpp" +#include "distributed/durability_rpc_worker.hpp" #include "distributed/index_rpc_server.hpp" #include "distributed/plan_dispatcher.hpp" #include "distributed/pull_rpc_clients.hpp" @@ -48,7 +48,7 @@ struct TypemapPack { class Master { public: - explicit Master(const Config &config, DistributedGraphDb *self) + explicit Master(const Config &config, database::Master *self) : config_(config), self_(self) {} Config config_; @@ -62,7 +62,7 @@ class Master { // have a lot of circular pointers among members. It would be a good idea to // clean the mess. Also, be careful of virtual calls to `self_` in // constructors of members. - DistributedGraphDb *self_{nullptr}; + database::Master *self_{nullptr}; communication::rpc::Server server_{ config_.master_endpoint, static_cast(config_.rpc_num_workers)}; tx::MasterEngine tx_engine_{server_, rpc_worker_clients_, &wal_}; @@ -79,8 +79,7 @@ class Master { &subcursor_storage_}; distributed::BfsRpcClients bfs_subcursor_clients_{ self_, &subcursor_storage_, &rpc_worker_clients_, &data_manager_}; - distributed::DurabilityRpcClients durability_rpc_clients_{ - rpc_worker_clients_}; + distributed::DurabilityRpcMaster durability_rpc_{rpc_worker_clients_}; distributed::DataRpcServer data_server_{*self_, server_}; distributed::DataRpcClients data_clients_{rpc_worker_clients_}; distributed::PlanDispatcher plan_dispatcher_{rpc_worker_clients_}; @@ -112,14 +111,21 @@ Master::Master(Config config) // What we recover. std::experimental::optional recovery_info; + durability::RecoveryData recovery_data; // Recover only if necessary. if (impl_->config_.db_recover_on_startup) { - recovery_info = durability::Recover(impl_->config_.durability_directory, - *this, std::experimental::nullopt); + recovery_info = durability::RecoverOnlySnapshot( + impl_->config_.durability_directory, this, &recovery_data, + std::experimental::nullopt); } // Post-recovery setup and checking. - impl_->coordination_.SetRecoveryInfo(recovery_info); + impl_->coordination_.SetRecoveredSnapshot( + recovery_info + ? std::experimental::make_optional(recovery_info->snapshot_tx_id) + : std::experimental::nullopt); + + // Wait till workers report back their recoverable wal txs if (recovery_info) { CHECK(impl_->config_.recovering_cluster_size > 0) << "Invalid cluster recovery size flag. Recovered cluster size " @@ -129,8 +135,19 @@ Master::Master(Config config) LOG(INFO) << "Waiting for workers to finish recovering.."; std::this_thread::sleep_for(2s); } + + // Get the intersection of recoverable transactions from wal on + // workers and on master + recovery_data.wal_tx_to_recover = + impl_->coordination_.CommonWalTransactions(*recovery_info); + durability::RecoverWalAndIndexes(impl_->config_.durability_directory, + this, &recovery_data); + auto workers_recovered_wal = + impl_->durability_rpc_.RecoverWalAndIndexes(&recovery_data); + workers_recovered_wal.get(); } } + // Start the dynamic graph partitioner inside token sharing server if (impl_->config_.dynamic_graph_partitioner_enabled) { impl_->token_sharing_server_.StartTokenSharing(); @@ -214,7 +231,7 @@ std::vector Master::GetWorkerIds() const { // written here only if workers sucesfully created their own snapshot bool Master::MakeSnapshot(GraphDbAccessor &accessor) { auto workers_snapshot = - impl_->durability_rpc_clients_.MakeSnapshot(accessor.transaction_id()); + impl_->durability_rpc_.MakeSnapshot(accessor.transaction_id()); if (!workers_snapshot.get()) return false; // This can be further optimized by creating master snapshot at the same // time as workers snapshots but this forces us to delete the master @@ -295,7 +312,7 @@ class Worker { config_.durability_directory, config_.durability_enabled}; - explicit Worker(const Config &config, DistributedGraphDb *self) + explicit Worker(const Config &config, database::Worker *self) : config_(config), self_(self) { cluster_discovery_.RegisterWorker(config.worker_id); } @@ -304,7 +321,7 @@ class Worker { // have a lot of circular pointers among members. It would be a good idea to // clean the mess. Also, be careful of virtual calls to `self_` in // constructors of members. - DistributedGraphDb *self_{nullptr}; + database::Worker *self_{nullptr}; communication::rpc::Server server_{ config_.worker_endpoint, static_cast(config_.rpc_num_workers)}; distributed::WorkerCoordination coordination_{server_, @@ -336,7 +353,7 @@ class Worker { distributed::WorkerTransactionalCacheCleaner cache_cleaner_{ tx_engine_, &wal_, server_, produce_server_, updates_server_, data_manager_}; - distributed::DurabilityRpcServer durability_rpc_server_{*self_, server_}; + distributed::DurabilityRpcWorker durability_rpc_{self_, &server_}; distributed::ClusterDiscoveryWorker cluster_discovery_{ server_, coordination_, rpc_worker_clients_.GetClientPool(0)}; distributed::TokenSharingRpcClients token_sharing_clients_{ @@ -356,23 +373,27 @@ Worker::Worker(Config config) // Durability recovery. { // What we should recover. - std::experimental::optional - required_recovery_info(impl_->cluster_discovery_.recovery_info()); + std::experimental::optional snapshot_to_recover; + snapshot_to_recover = impl_->cluster_discovery_.snapshot_to_recover(); // What we recover. std::experimental::optional recovery_info; + durability::RecoveryData recovery_data; // Recover only if necessary. - if (required_recovery_info) { - recovery_info = durability::Recover(impl_->config_.durability_directory, - *this, required_recovery_info); + if (snapshot_to_recover) { + recovery_info = durability::RecoverOnlySnapshot( + impl_->config_.durability_directory, this, &recovery_data, + snapshot_to_recover); } // Post-recovery setup and checking. - if (required_recovery_info != recovery_info) + if (snapshot_to_recover && + (!recovery_info || + snapshot_to_recover != recovery_info->snapshot_tx_id)) LOG(FATAL) << "Memgraph worker failed to recover the database state " "recovered on the master"; - impl_->cluster_discovery_.NotifyWorkerRecovered(); + impl_->cluster_discovery_.NotifyWorkerRecovered(recovery_info); } if (impl_->config_.durability_enabled) { @@ -456,6 +477,11 @@ void Worker::ReinitializeStorage() { impl_->rpc_worker_clients_.GetClientPool(0), impl_->config_.worker_id); } +void Worker::RecoverWalAndIndexes(durability::RecoveryData *recovery_data) { + durability::RecoverWalAndIndexes(impl_->config_.durability_directory, this, + recovery_data); +} + io::network::Endpoint Worker::endpoint() const { return impl_->server_.endpoint(); } diff --git a/src/database/distributed_graph_db.hpp b/src/database/distributed_graph_db.hpp index 286e21d5b..3a3aef635 100644 --- a/src/database/distributed_graph_db.hpp +++ b/src/database/distributed_graph_db.hpp @@ -1,6 +1,7 @@ #pragma once #include "database/graph_db.hpp" +#include "durability/recovery.hpp" namespace distributed { class BfsRpcServer; @@ -100,6 +101,7 @@ class Worker final : public DistributedGraphDb { std::vector GetWorkerIds() const override; bool MakeSnapshot(GraphDbAccessor &accessor) override; void ReinitializeStorage() override; + void RecoverWalAndIndexes(durability::RecoveryData *recovery_data); /** Gets this worker's endpoint. */ io::network::Endpoint endpoint() const; diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index 850b921aa..7b8a714f3 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -54,10 +54,25 @@ SingleNode::SingleNode(Config config) if (impl_->config_.durability_enabled) utils::CheckDir(impl_->config_.durability_directory); - // Recover only if necessary. - if (impl_->config_.db_recover_on_startup) { - durability::Recover(impl_->config_.durability_directory, *this, - std::experimental::nullopt); + // Durability recovery. + { + // What we recover. + std::experimental::optional recovery_info; + + durability::RecoveryData recovery_data; + // Recover only if necessary. + if (impl_->config_.db_recover_on_startup) { + recovery_info = durability::RecoverOnlySnapshot( + impl_->config_.durability_directory, this, &recovery_data, + std::experimental::nullopt); + } + + // Post-recovery setup and checking. + if (recovery_info) { + recovery_data.wal_tx_to_recover = recovery_info->wal_recovered; + durability::RecoverWalAndIndexes(impl_->config_.durability_directory, + this, &recovery_data); + } } if (impl_->config_.durability_enabled) { diff --git a/src/database/storage.hpp b/src/database/storage.hpp index d3f6b850a..c5172aa5d 100644 --- a/src/database/storage.hpp +++ b/src/database/storage.hpp @@ -23,13 +23,6 @@ namespace database { class GraphDb; }; -namespace durability { -struct RecoveryInfo; -RecoveryInfo Recover(const std::experimental::filesystem::path &, - database::GraphDb &, - std::experimental::optional); -}; // namespace durability - namespace database { /** A data structure containing the main data members of a graph database. */ @@ -56,6 +49,7 @@ class Storage { gid::Generator &VertexGenerator() { return vertex_generator_; } gid::Generator &EdgeGenerator() { return edge_generator_; } + LabelPropertyIndex &label_property_index() { return label_property_index_; } /// Gets the local address for the given gid. Fails if not present. template @@ -103,9 +97,6 @@ class Storage { friend class GraphDbAccessor; friend class StorageGc; friend class distributed::IndexRpcServer; - friend durability::RecoveryInfo durability::Recover( - const std::experimental::filesystem::path &, database::GraphDb &, - std::experimental::optional); int worker_id_; gid::Generator vertex_generator_; diff --git a/src/distributed/cluster_discovery_master.cpp b/src/distributed/cluster_discovery_master.cpp index 9c03a1e6f..21f35a092 100644 --- a/src/distributed/cluster_discovery_master.cpp +++ b/src/distributed/cluster_discovery_master.cpp @@ -28,14 +28,17 @@ ClusterDiscoveryMaster::ClusterDiscoveryMaster( } RegisterWorkerRes res(registration_successful, - this->coordination_.RecoveryInfo(), + this->coordination_.RecoveredSnapshotTx(), this->coordination_.GetWorkers()); res.Save(res_builder); }); server_.Register( [this](const auto &req_reader, auto *res_builder) { - this->coordination_.WorkerRecovered(req_reader.getMember()); + NotifyWorkerRecoveredReq req; + req.Load(req_reader); + this->coordination_.WorkerRecoveredSnapshot(req.worker_id, + req.recovery_info); }); } diff --git a/src/distributed/cluster_discovery_worker.cpp b/src/distributed/cluster_discovery_worker.cpp index 85746797c..b4691fc45 100644 --- a/src/distributed/cluster_discovery_worker.cpp +++ b/src/distributed/cluster_discovery_worker.cpp @@ -20,21 +20,24 @@ void ClusterDiscoveryWorker::RegisterWorker(int worker_id) { auto result = client_pool_.Call(worker_id, server_.endpoint()); CHECK(result) << "RegisterWorkerRpc failed"; - CHECK(result->registration_successful) - << "Unable to assign requested ID (" << worker_id << ") to worker!"; + CHECK(result->registration_successful) << "Unable to assign requested ID (" + << worker_id << ") to worker!"; worker_id_ = worker_id; for (auto &kv : result->workers) { coordination_.RegisterWorker(kv.first, kv.second); } - recovery_info_ = result->recovery_info; + snapshot_to_recover_ = result->snapshot_to_recover; } -void ClusterDiscoveryWorker::NotifyWorkerRecovered() { +void ClusterDiscoveryWorker::NotifyWorkerRecovered( + const std::experimental::optional + &recovery_info) { CHECK(worker_id_ >= 0) << "Workers id is not yet assigned, preform registration before " "notifying that the recovery finished"; - auto result = client_pool_.Call(worker_id_); + auto result = + client_pool_.Call(worker_id_, recovery_info); CHECK(result) << "NotifyWorkerRecoveredRpc failed"; } diff --git a/src/distributed/cluster_discovery_worker.hpp b/src/distributed/cluster_discovery_worker.hpp index 19fb98be1..395e582a1 100644 --- a/src/distributed/cluster_discovery_worker.hpp +++ b/src/distributed/cluster_discovery_worker.hpp @@ -34,17 +34,20 @@ class ClusterDiscoveryWorker final { * Notifies the master that the worker finished recovering. Assumes that the * worker was already registered with master. */ - void NotifyWorkerRecovered(); + void NotifyWorkerRecovered( + const std::experimental::optional + &recovery_info); - /** Returns the recovery info. Valid only after registration. */ - auto recovery_info() const { return recovery_info_; } + /** Returns the snapshot that should be recovered on workers. Valid only after + * registration. */ + auto snapshot_to_recover() const { return snapshot_to_recover_; } private: int worker_id_{-1}; Server &server_; WorkerCoordination &coordination_; communication::rpc::ClientPool &client_pool_; - std::experimental::optional recovery_info_; + std::experimental::optional snapshot_to_recover_; }; } // namespace distributed diff --git a/src/distributed/coordination_master.cpp b/src/distributed/coordination_master.cpp index ef90d690b..0e2b29194 100644 --- a/src/distributed/coordination_master.cpp +++ b/src/distributed/coordination_master.cpp @@ -40,8 +40,11 @@ bool MasterCoordination::RegisterWorker(int desired_worker_id, return true; } -void MasterCoordination::WorkerRecovered(int worker_id) { - CHECK(recovered_workers_.insert(worker_id).second) +void MasterCoordination::WorkerRecoveredSnapshot( + int worker_id, const std::experimental::optional + &recovery_info) { + CHECK(recovered_workers_.insert(std::make_pair(worker_id, recovery_info)) + .second) << "Worker already notified about finishing recovery"; } @@ -71,22 +74,54 @@ MasterCoordination::~MasterCoordination() { } } -void MasterCoordination::SetRecoveryInfo( - std::experimental::optional info) { +void MasterCoordination::SetRecoveredSnapshot( + std::experimental::optional recovered_snapshot_tx) { std::lock_guard guard(lock_); recovery_done_ = true; - recovery_info_ = info; + recovered_snapshot_tx_ = recovered_snapshot_tx; } int MasterCoordination::CountRecoveredWorkers() const { return recovered_workers_.size(); } -std::experimental::optional -MasterCoordination::RecoveryInfo() const { +std::experimental::optional +MasterCoordination::RecoveredSnapshotTx() const { std::lock_guard guard(lock_); - CHECK(recovery_done_) << "RecoveryInfo requested before it's available"; - return recovery_info_; + CHECK(recovery_done_) << "Recovered snapshot requested before it's available"; + return recovered_snapshot_tx_; +} + +std::vector MasterCoordination::CommonWalTransactions( + const durability::RecoveryInfo &master_info) const { + int cluster_size; + std::unordered_map tx_cnt; + for (auto tx : master_info.wal_recovered) { + tx_cnt[tx]++; + } + + { + std::lock_guard guard(lock_); + for (auto worker : recovered_workers_) { + // If there is no recovery info we can just return an empty vector since + // we can't restore any transaction + if (!worker.second) return {}; + for (auto tx : worker.second->wal_recovered) { + tx_cnt[tx]++; + } + } + // Add one because of master + cluster_size = recovered_workers_.size() + 1; + } + + std::vector tx_intersection; + for (auto tx : tx_cnt) { + if (tx.second == cluster_size) { + tx_intersection.push_back(tx.first); + } + } + + return tx_intersection; } } // namespace distributed diff --git a/src/distributed/coordination_master.hpp b/src/distributed/coordination_master.hpp index e5c8b5895..1bd35ee0e 100644 --- a/src/distributed/coordination_master.hpp +++ b/src/distributed/coordination_master.hpp @@ -31,31 +31,37 @@ class MasterCoordination final : public Coordination { /* * Worker `worker_id` finished with recovering, adds it to the set of - * recovered workers. + * recovered workers alongside with its recovery_info. */ - void WorkerRecovered(int worker_id); + void WorkerRecoveredSnapshot( + int worker_id, const std::experimental::optional + &recovery_info); Endpoint GetEndpoint(int worker_id); /// Sets the recovery info. nullopt indicates nothing was recovered. - void SetRecoveryInfo( - std::experimental::optional info); + void SetRecoveredSnapshot( + std::experimental::optional recovered_snapshot); - std::experimental::optional RecoveryInfo() const; + std::experimental::optional RecoveredSnapshotTx() const; int CountRecoveredWorkers() const; + std::vector CommonWalTransactions( + const durability::RecoveryInfo &master_info) const; + private: // Most master functions aren't thread-safe. mutable std::mutex lock_; - /// Durabiliry recovery info. + /// Durabilility recovery info. /// Indicates if the recovery phase is done. bool recovery_done_{false}; - /// Set of workers that finished sucesfully recovering - std::set recovered_workers_; + /// Set of workers that finished sucesfully recovering snapshot + std::map> + recovered_workers_; /// If nullopt nothing was recovered. - std::experimental::optional recovery_info_; + std::experimental::optional recovered_snapshot_tx_; }; } // namespace distributed diff --git a/src/distributed/coordination_rpc_messages.lcp b/src/distributed/coordination_rpc_messages.lcp index 10deebc3c..b0437077e 100644 --- a/src/distributed/coordination_rpc_messages.lcp +++ b/src/distributed/coordination_rpc_messages.lcp @@ -24,8 +24,21 @@ cpp<# (endpoint "io::network::Endpoint" :capnp-type "Io.Endpoint"))) (:response ((registration-successful :bool) - (recovery-info "std::experimental::optional" - :capnp-type "Utils.Optional(Dur.RecoveryInfo)") + (snapshot-to-recover "std::experimental::optional" + :capnp-type "Utils.Optional(Utils.BoxUInt64)" + :capnp-save + (lambda (builder member) + #>cpp + utils::SaveOptional( + ${member}, &${builder}, + [](auto builder, const auto &v){ builder->setValue(v); }); + cpp<#) + :capnp-load + (lambda (reader member) + #>cpp + ${member} = utils::LoadOptional( + ${reader}, [](auto reader){ return reader.getValue(); }); + cpp<#)) (workers "std::unordered_map" :capnp-type "Utils.Map(Utils.BoxInt16, Io.Endpoint)" :capnp-save @@ -61,8 +74,10 @@ cpp<# (:response ())) (lcp:define-rpc notify-worker-recovered - (:request ((member :int64_t))) + (:request + ((worker-id :int16_t) + (recovery-info "std::experimental::optional" + :capnp-type "Utils.Optional(Dur.RecoveryInfo)"))) (:response ())) (lcp:pop-namespace) ;; distributed - diff --git a/src/distributed/durability_rpc_clients.cpp b/src/distributed/durability_rpc_clients.cpp deleted file mode 100644 index 660965cba..000000000 --- a/src/distributed/durability_rpc_clients.cpp +++ /dev/null @@ -1,25 +0,0 @@ -#include "distributed/durability_rpc_clients.hpp" - -#include "distributed/durability_rpc_messages.hpp" -#include "transactions/transaction.hpp" -#include "utils/future.hpp" - -namespace distributed { -utils::Future DurabilityRpcClients::MakeSnapshot(tx::TransactionId tx) { - return utils::make_future(std::async(std::launch::async, [this, tx] { - auto futures = clients_.ExecuteOnWorkers( - 0, [tx](int worker_id, communication::rpc::ClientPool &client_pool) { - auto res = client_pool.Call(tx); - if (!res) return false; - return res->member; - }); - - bool created = true; - for (auto &future : futures) { - created &= future.get(); - } - - return created; - })); -} -} // namespace distributed diff --git a/src/distributed/durability_rpc_master.cpp b/src/distributed/durability_rpc_master.cpp new file mode 100644 index 000000000..34541c2a0 --- /dev/null +++ b/src/distributed/durability_rpc_master.cpp @@ -0,0 +1,46 @@ +#include "distributed/durability_rpc_master.hpp" + +#include "distributed/durability_rpc_messages.hpp" +#include "transactions/transaction.hpp" +#include "utils/future.hpp" + +namespace distributed { +utils::Future DurabilityRpcMaster::MakeSnapshot(tx::TransactionId tx) { + return utils::make_future(std::async(std::launch::async, [this, tx] { + auto futures = clients_.ExecuteOnWorkers( + 0, [tx](int worker_id, communication::rpc::ClientPool &client_pool) { + auto res = client_pool.Call(tx); + if (!res) return false; + return res->member; + }); + + bool created = true; + for (auto &future : futures) { + created &= future.get(); + } + + return created; + })); +} + +utils::Future DurabilityRpcMaster::RecoverWalAndIndexes( + durability::RecoveryData *recovery_data) { + return utils::make_future(std::async(std::launch::async, [this, + recovery_data] { + auto futures = clients_.ExecuteOnWorkers( + 0, [recovery_data](int worker_id, + communication::rpc::ClientPool &client_pool) { + auto res = client_pool.Call(*recovery_data); + if (!res) return false; + return true; + }); + + bool recovered = true; + for (auto &future : futures) { + recovered &= future.get(); + } + + return recovered; + })); +} +} // namespace distributed diff --git a/src/distributed/durability_rpc_clients.hpp b/src/distributed/durability_rpc_master.hpp similarity index 72% rename from src/distributed/durability_rpc_clients.hpp rename to src/distributed/durability_rpc_master.hpp index 880bde3d9..6d65d01ec 100644 --- a/src/distributed/durability_rpc_clients.hpp +++ b/src/distributed/durability_rpc_master.hpp @@ -5,15 +5,16 @@ #include #include "distributed/rpc_worker_clients.hpp" +#include "durability/recovery.hpp" #include "storage/gid.hpp" #include "transactions/type.hpp" namespace distributed { /// Provides an ability to trigger snapshooting on other workers. -class DurabilityRpcClients { +class DurabilityRpcMaster { public: - DurabilityRpcClients(RpcWorkerClients &clients) : clients_(clients) {} + explicit DurabilityRpcMaster(RpcWorkerClients &clients) : clients_(clients) {} // Sends a snapshot request to workers and returns a future which becomes true // if all workers sucesfully completed their snapshot creation, false @@ -21,6 +22,9 @@ class DurabilityRpcClients { // @param tx - transaction from which to take db snapshot utils::Future MakeSnapshot(tx::TransactionId tx); + utils::Future RecoverWalAndIndexes( + durability::RecoveryData *recovery_data); + private: RpcWorkerClients &clients_; }; diff --git a/src/distributed/durability_rpc_messages.lcp b/src/distributed/durability_rpc_messages.lcp index 9027569f1..e29d28b06 100644 --- a/src/distributed/durability_rpc_messages.lcp +++ b/src/distributed/durability_rpc_messages.lcp @@ -6,15 +6,22 @@ #include "communication/rpc/messages.hpp" #include "distributed/durability_rpc_messages.capnp.h" +#include "durability/recovery.hpp" #include "transactions/transaction.hpp" cpp<# (lcp:namespace distributed) +(lcp:capnp-import 'dur "/durability/recovery.capnp") + (lcp:capnp-namespace "distributed") (lcp:define-rpc make-snapshot (:request ((member "tx::TransactionId" :capnp-type "UInt64"))) (:response ((member :bool)))) +(lcp:define-rpc recover-wal-and-indexes + (:request ((member "durability::RecoveryData" :capnp-type "Dur.RecoveryData"))) + (:response ())) + (lcp:pop-namespace) ;; distributed diff --git a/src/distributed/durability_rpc_server.cpp b/src/distributed/durability_rpc_server.cpp deleted file mode 100644 index 031dc73dc..000000000 --- a/src/distributed/durability_rpc_server.cpp +++ /dev/null @@ -1,20 +0,0 @@ -#include "distributed/durability_rpc_server.hpp" - -#include "database/graph_db.hpp" -#include "database/graph_db_accessor.hpp" -#include "distributed/durability_rpc_messages.hpp" - -namespace distributed { - -DurabilityRpcServer::DurabilityRpcServer(database::GraphDb &db, - communication::rpc::Server &server) - : db_(db), rpc_server_(server) { - rpc_server_.Register( - [this](const auto &req_reader, auto *res_builder) { - database::GraphDbAccessor dba(this->db_, req_reader.getMember()); - MakeSnapshotRes res(this->db_.MakeSnapshot(dba)); - res.Save(res_builder); - }); -} - -} // namespace distributed diff --git a/src/distributed/durability_rpc_server.hpp b/src/distributed/durability_rpc_server.hpp deleted file mode 100644 index 1373b6aec..000000000 --- a/src/distributed/durability_rpc_server.hpp +++ /dev/null @@ -1,21 +0,0 @@ -#pragma once - -#include "communication/rpc/server.hpp" - -namespace database { -class GraphDb; -}; - -namespace distributed { - -class DurabilityRpcServer { - public: - DurabilityRpcServer(database::GraphDb &db, - communication::rpc::Server &server); - - private: - database::GraphDb &db_; - communication::rpc::Server &rpc_server_; -}; - -} // namespace distributed diff --git a/src/distributed/durability_rpc_worker.cpp b/src/distributed/durability_rpc_worker.cpp new file mode 100644 index 000000000..3b2d0a48a --- /dev/null +++ b/src/distributed/durability_rpc_worker.cpp @@ -0,0 +1,27 @@ +#include "distributed/durability_rpc_worker.hpp" + +#include "database/distributed_graph_db.hpp" +#include "database/graph_db_accessor.hpp" +#include "distributed/durability_rpc_messages.hpp" + +namespace distributed { + +DurabilityRpcWorker::DurabilityRpcWorker(database::Worker *db, + communication::rpc::Server *server) + : db_(db), rpc_server_(server) { + rpc_server_->Register( + [this](const auto &req_reader, auto *res_builder) { + database::GraphDbAccessor dba(*this->db_, req_reader.getMember()); + MakeSnapshotRes res(this->db_->MakeSnapshot(dba)); + res.Save(res_builder); + }); + + rpc_server_->Register( + [this](const auto &req_reader, auto *res_builder) { + durability::RecoveryData recovery_data; + recovery_data.Load(req_reader.getMember()); + this->db_->RecoverWalAndIndexes(&recovery_data); + }); +} + +} // namespace distributed diff --git a/src/distributed/durability_rpc_worker.hpp b/src/distributed/durability_rpc_worker.hpp new file mode 100644 index 000000000..e077f0435 --- /dev/null +++ b/src/distributed/durability_rpc_worker.hpp @@ -0,0 +1,20 @@ +#pragma once + +#include "communication/rpc/server.hpp" + +namespace database { +class Worker; +}; // namespace database + +namespace distributed { + +class DurabilityRpcWorker { + public: + DurabilityRpcWorker(database::Worker *db, communication::rpc::Server *server); + + private: + database::Worker *db_; + communication::rpc::Server *rpc_server_; +}; + +} // namespace distributed diff --git a/src/durability/recovery.capnp b/src/durability/recovery.capnp index 243b295c6..11a7dbbc2 100644 --- a/src/durability/recovery.capnp +++ b/src/durability/recovery.capnp @@ -1,9 +1,18 @@ @0xb3d70bc0576218f3; using Cxx = import "/capnp/c++.capnp"; +using Utils = import "/utils/serialization.capnp"; + $Cxx.namespace("durability::capnp"); struct RecoveryInfo { snapshotTxId @0 :UInt64; - maxWalTxId @1 :UInt64; + walRecovered @1 :List(UInt64); +} + +struct RecoveryData { + snapshooterTxId @0 :UInt64; + walTxToRecover @1 :List(UInt64); + snapshooterTxSnapshot @2 :List(UInt64); + indexes @3 :List(Utils.Pair(Text, Text)); } diff --git a/src/durability/recovery.cpp b/src/durability/recovery.cpp index a77de5228..21c59362e 100644 --- a/src/durability/recovery.cpp +++ b/src/durability/recovery.cpp @@ -37,31 +37,14 @@ bool ReadSnapshotSummary(HashedFileReader &buffer, int64_t &vertex_count, namespace { using communication::bolt::DecodedValue; -// A data structure for exchanging info between main recovery function and -// snapshot and WAL recovery functions. -struct RecoveryData { - tx::TransactionId snapshooter_tx_id{0}; - tx::TransactionId wal_max_recovered_tx_id{0}; - std::vector snapshooter_tx_snapshot; - // A collection into which the indexes should be added so they - // can be rebuilt at the end of the recovery transaction. - std::vector> indexes; - - void Clear() { - snapshooter_tx_id = 0; - snapshooter_tx_snapshot.clear(); - indexes.clear(); - } -}; - #define RETURN_IF_NOT(condition) \ if (!(condition)) { \ reader.Close(); \ return false; \ } -bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db, - RecoveryData &recovery_data) { +bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb *db, + RecoveryData *recovery_data) { HashedFileReader reader; SnapshotDecoder decoder(reader); @@ -85,25 +68,25 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db, // Checks worker id was set correctly RETURN_IF_NOT(decoder.ReadValue(&dv, DecodedValue::Type::Int) && - dv.ValueInt() == db.WorkerId()); + dv.ValueInt() == db->WorkerId()); // Vertex and edge generator ids RETURN_IF_NOT(decoder.ReadValue(&dv, DecodedValue::Type::Int)); uint64_t vertex_generator_cnt = dv.ValueInt(); - db.storage().VertexGenerator().SetId(std::max( - db.storage().VertexGenerator().LocalCount(), vertex_generator_cnt)); + db->storage().VertexGenerator().SetId(std::max( + db->storage().VertexGenerator().LocalCount(), vertex_generator_cnt)); RETURN_IF_NOT(decoder.ReadValue(&dv, DecodedValue::Type::Int)); uint64_t edge_generator_cnt = dv.ValueInt(); - db.storage().EdgeGenerator().SetId( - std::max(db.storage().EdgeGenerator().LocalCount(), edge_generator_cnt)); + db->storage().EdgeGenerator().SetId( + std::max(db->storage().EdgeGenerator().LocalCount(), edge_generator_cnt)); RETURN_IF_NOT(decoder.ReadValue(&dv, DecodedValue::Type::Int)); - recovery_data.snapshooter_tx_id = dv.ValueInt(); + recovery_data->snapshooter_tx_id = dv.ValueInt(); // Transaction snapshot of the transaction that created the snapshot. RETURN_IF_NOT(decoder.ReadValue(&dv, DecodedValue::Type::List)); for (const auto &value : dv.ValueList()) { RETURN_IF_NOT(value.IsInt()); - recovery_data.snapshooter_tx_snapshot.emplace_back(value.ValueInt()); + recovery_data->snapshooter_tx_snapshot.emplace_back(value.ValueInt()); } // A list of label+property indexes. @@ -114,11 +97,11 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db, RETURN_IF_NOT(it != index_value.end()); auto property = *it++; RETURN_IF_NOT(label.IsString() && property.IsString()); - recovery_data.indexes.emplace_back(label.ValueString(), - property.ValueString()); + recovery_data->indexes.emplace_back(label.ValueString(), + property.ValueString()); } - database::GraphDbAccessor dba(db); + database::GraphDbAccessor dba(*db); std::unordered_map> edge_gid_endpoints_mapping; @@ -154,7 +137,7 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db, [&db, &dba](storage::VertexAddress &address) { if (address.is_local()) return; // If the worker id matches it should be a local apperance - if (address.worker_id() == db.WorkerId()) { + if (address.worker_id() == db->WorkerId()) { address = storage::VertexAddress( dba.db().storage().LocalAddress(address.gid())); CHECK(address.is_local()) << "Address should be local but isn't"; @@ -165,7 +148,7 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db, [&db, &dba](storage::EdgeAddress &address) { if (address.is_local()) return; // If the worker id matches it should be a local apperance - if (address.worker_id() == db.WorkerId()) { + if (address.worker_id() == db->WorkerId()) { address = storage::EdgeAddress( dba.db().storage().LocalAddress(address.gid())); CHECK(address.is_local()) << "Address should be local but isn't"; @@ -245,8 +228,8 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db, // than the latest one we have recovered. Do this to make sure that // subsequently created snapshots and WAL files will have transactional info // that does not interfere with that found in previous snapshots and WAL. - tx::TransactionId max_id = recovery_data.snapshooter_tx_id; - auto &snap = recovery_data.snapshooter_tx_snapshot; + tx::TransactionId max_id = recovery_data->snapshooter_tx_id; + auto &snap = recovery_data->snapshooter_tx_snapshot; if (!snap.empty()) max_id = *std::max_element(snap.begin(), snap.end()); dba.db().tx_engine().EnsureNextIdGreater(max_id); dba.Commit(); @@ -255,32 +238,99 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db, #undef RETURN_IF_NOT -// TODO - finer-grained recovery feedback could be useful here. -bool RecoverWal(const fs::path &wal_dir, database::GraphDb &db, - RecoveryData &recovery_data) { +std::vector GetWalFiles(const fs::path &wal_dir) { // Get paths to all the WAL files and sort them (on date). std::vector wal_files; - if (!fs::exists(wal_dir)) return true; + if (!fs::exists(wal_dir)) return {}; for (auto &wal_file : fs::directory_iterator(wal_dir)) wal_files.emplace_back(wal_file); std::sort(wal_files.begin(), wal_files.end()); + return wal_files; +} - // Track which transaction should be recovered first, and define logic for - // which transactions should be skipped in recovery. +bool ApplyOverDeltas( + const std::vector &wal_files, tx::TransactionId first_to_recover, + const std::function &f) { + for (auto &wal_file : wal_files) { + auto wal_file_max_tx_id = TransactionIdFromWalFilename(wal_file.filename()); + if (!wal_file_max_tx_id || *wal_file_max_tx_id < first_to_recover) continue; + + HashedFileReader wal_reader; + if (!wal_reader.Open(wal_file)) return false; + communication::bolt::Decoder decoder(wal_reader); + while (true) { + auto delta = database::StateDelta::Decode(wal_reader, decoder); + if (!delta) break; + f(*delta); + } + } + + return true; +} + +auto FirstWalTxToRecover(const RecoveryData &recovery_data) { auto &tx_sn = recovery_data.snapshooter_tx_snapshot; auto first_to_recover = tx_sn.empty() ? recovery_data.snapshooter_tx_id + 1 : *std::min(tx_sn.begin(), tx_sn.end()); - auto should_skip = [&tx_sn, &recovery_data, + return first_to_recover; +} + +std::vector ReadWalRecoverableTransactions( + const fs::path &wal_dir, database::GraphDb *db, + const RecoveryData &recovery_data) { + auto wal_files = GetWalFiles(wal_dir); + + std::unordered_set committed_set; + auto first_to_recover = FirstWalTxToRecover(recovery_data); + ApplyOverDeltas( + wal_files, first_to_recover, [&](const database::StateDelta &delta) { + if (delta.transaction_id >= first_to_recover && + delta.type == database::StateDelta::Type::TRANSACTION_COMMIT) { + committed_set.insert(delta.transaction_id); + } + }); + + std::vector committed_tx_ids(committed_set.size()); + for (auto id : committed_set) committed_tx_ids.push_back(id); + return committed_tx_ids; +} + +// TODO - finer-grained recovery feedback could be useful here. +bool RecoverWal(const fs::path &wal_dir, database::GraphDb *db, + RecoveryData *recovery_data) { + auto wal_files = GetWalFiles(wal_dir); + // Track which transaction should be recovered first, and define logic for + // which transactions should be skipped in recovery. + auto &tx_sn = recovery_data->snapshooter_tx_snapshot; + auto first_to_recover = FirstWalTxToRecover(*recovery_data); + + // Set of transactions which can be recovered, since not every transaction in + // wal can be recovered because it might not be present on some workers (there + // wasn't enough time for it to flush to disk or similar) + std::unordered_set common_wal_tx; + for (auto tx_id : recovery_data->wal_tx_to_recover) + common_wal_tx.insert(tx_id); + + auto should_skip = [&tx_sn, recovery_data, &common_wal_tx, first_to_recover](tx::TransactionId tx_id) { return tx_id < first_to_recover || - (tx_id < recovery_data.snapshooter_tx_id && - !utils::Contains(tx_sn, tx_id)); + (tx_id < recovery_data->snapshooter_tx_id && + !utils::Contains(tx_sn, tx_id)) || + !utils::Contains(common_wal_tx, tx_id); }; std::unordered_map accessors; auto get_accessor = - [&accessors](tx::TransactionId tx_id) -> database::GraphDbAccessor & { + [db, &accessors](tx::TransactionId tx_id) -> database::GraphDbAccessor & { auto found = accessors.find(tx_id); + + // Currently accessors are created on transaction_begin, but since workers + // don't have a transaction begin, the accessors are not created. + if (db->type() == database::GraphDb::Type::DISTRIBUTED_WORKER && + found == accessors.end()) { + std::tie(found, std::ignore) = accessors.emplace(tx_id, *db); + } + CHECK(found != accessors.end()) << "Accessor does not exist for transaction: " << tx_id; return found->second; @@ -294,59 +344,49 @@ bool RecoverWal(const fs::path &wal_dir, database::GraphDb &db, // Read all the WAL files whose max_tx_id is not smaller than // min_tx_to_recover. - for (auto &wal_file : wal_files) { - auto wal_file_max_tx_id = TransactionIdFromWalFilename(wal_file.filename()); - if (!wal_file_max_tx_id || *wal_file_max_tx_id < first_to_recover) continue; - - HashedFileReader wal_reader; - if (!wal_reader.Open(wal_file)) return false; - communication::bolt::Decoder decoder(wal_reader); - while (true) { - auto delta = database::StateDelta::Decode(wal_reader, decoder); - if (!delta) break; - max_observed_tx_id = std::max(max_observed_tx_id, delta->transaction_id); - if (should_skip(delta->transaction_id)) continue; - switch (delta->type) { - case database::StateDelta::Type::TRANSACTION_BEGIN: - DCHECK(accessors.find(delta->transaction_id) == accessors.end()) - << "Double transaction start"; - accessors.emplace(delta->transaction_id, db); - break; - case database::StateDelta::Type::TRANSACTION_ABORT: - get_accessor(delta->transaction_id).Abort(); - accessors.erase(accessors.find(delta->transaction_id)); - break; - case database::StateDelta::Type::TRANSACTION_COMMIT: - get_accessor(delta->transaction_id).Commit(); - recovery_data.wal_max_recovered_tx_id = delta->transaction_id; - accessors.erase(accessors.find(delta->transaction_id)); - break; - case database::StateDelta::Type::BUILD_INDEX: - // TODO index building might still be problematic in HA - recovery_data.indexes.emplace_back(delta->label_name, - delta->property_name); - break; - default: - delta->Apply(get_accessor(delta->transaction_id)); - } - } // reading all deltas in a single wal file - } // reading all wal files + // + ApplyOverDeltas( + wal_files, first_to_recover, [&](const database::StateDelta &delta) { + max_observed_tx_id = std::max(max_observed_tx_id, delta.transaction_id); + if (should_skip(delta.transaction_id)) return; + switch (delta.type) { + case database::StateDelta::Type::TRANSACTION_BEGIN: + CHECK(accessors.find(delta.transaction_id) == accessors.end()) + << "Double transaction start"; + accessors.emplace(delta.transaction_id, *db); + break; + case database::StateDelta::Type::TRANSACTION_ABORT: + get_accessor(delta.transaction_id).Abort(); + accessors.erase(accessors.find(delta.transaction_id)); + break; + case database::StateDelta::Type::TRANSACTION_COMMIT: + get_accessor(delta.transaction_id).Commit(); + accessors.erase(accessors.find(delta.transaction_id)); + break; + case database::StateDelta::Type::BUILD_INDEX: + // TODO index building might still be problematic in HA + recovery_data->indexes.emplace_back(delta.label_name, + delta.property_name); + break; + default: + delta.Apply(get_accessor(delta.transaction_id)); + } + }); // TODO when implementing proper error handling return one of the following: // - WAL fully recovered // - WAL partially recovered // - WAL recovery error - db.tx_engine().EnsureNextIdGreater(max_observed_tx_id); + db->tx_engine().EnsureNextIdGreater(max_observed_tx_id); return true; } } // anonymous namespace -RecoveryInfo Recover( - const fs::path &durability_dir, database::GraphDb &db, - std::experimental::optional required_recovery_info) { - RecoveryData recovery_data; - +RecoveryInfo RecoverOnlySnapshot( + const fs::path &durability_dir, database::GraphDb *db, + RecoveryData *recovery_data, + std::experimental::optional required_snapshot_tx_id) { // Attempt to recover from snapshot files in reverse order (from newest // backwards). const auto snapshot_dir = durability_dir / kSnapshotDir; @@ -356,21 +396,21 @@ RecoveryInfo Recover( snapshot_files.emplace_back(file); std::sort(snapshot_files.rbegin(), snapshot_files.rend()); for (auto &snapshot_file : snapshot_files) { - if (required_recovery_info) { + if (required_snapshot_tx_id) { auto snapshot_file_tx_id = TransactionIdFromSnapshotFilename(snapshot_file); - if (!snapshot_file_tx_id || snapshot_file_tx_id.value() != - required_recovery_info->snapshot_tx_id) { + if (!snapshot_file_tx_id || + snapshot_file_tx_id.value() != *required_snapshot_tx_id) { LOG(INFO) << "Skipping snapshot file '" << snapshot_file << "' because it does not match the required snapshot tx id: " - << required_recovery_info->snapshot_tx_id; + << *required_snapshot_tx_id; continue; } } LOG(INFO) << "Starting snapshot recovery from: " << snapshot_file; if (!RecoverSnapshot(snapshot_file, db, recovery_data)) { - db.ReinitializeStorage(); - recovery_data.Clear(); + db->ReinitializeStorage(); + recovery_data->Clear(); LOG(WARNING) << "Snapshot recovery failed, trying older snapshot..."; continue; } else { @@ -379,12 +419,19 @@ RecoveryInfo Recover( } } - // If snapshot recovery is required, and we failed, don't even deal with the - // WAL recovery. - if (required_recovery_info && - recovery_data.snapshooter_tx_id != required_recovery_info->snapshot_tx_id) - return {recovery_data.snapshooter_tx_id, 0}; + // If snapshot recovery is required, and we failed, don't even deal with + // the WAL recovery. + if (required_snapshot_tx_id && + recovery_data->snapshooter_tx_id != *required_snapshot_tx_id) + return {recovery_data->snapshooter_tx_id, {}}; + return {recovery_data->snapshooter_tx_id, + ReadWalRecoverableTransactions(durability_dir / kWalDir, db, + *recovery_data)}; +} + +void RecoverWalAndIndexes(const fs::path &durability_dir, database::GraphDb *db, + RecoveryData *recovery_data) { // Write-ahead-log recovery. // WAL recovery does not have to be complete for the recovery to be // considered successful. For the time being ignore the return value, @@ -392,18 +439,15 @@ RecoveryInfo Recover( RecoverWal(durability_dir / kWalDir, db, recovery_data); // Index recovery. - database::GraphDbAccessor db_accessor_indices{db}; - for (const auto &label_prop : recovery_data.indexes) { + database::GraphDbAccessor db_accessor_indices{*db}; + for (const auto &label_prop : recovery_data->indexes) { const database::LabelPropertyIndex::Key key{ db_accessor_indices.Label(label_prop.first), db_accessor_indices.Property(label_prop.second)}; - db_accessor_indices.db().storage().label_property_index_.CreateIndex(key); + db_accessor_indices.db().storage().label_property_index().CreateIndex(key); db_accessor_indices.PopulateIndex(key); db_accessor_indices.EnableIndex(key); } db_accessor_indices.Commit(); - - return {recovery_data.snapshooter_tx_id, - recovery_data.wal_max_recovered_tx_id}; } } // namespace durability diff --git a/src/durability/recovery.hpp b/src/durability/recovery.hpp index 87cfe6c11..e7d987666 100644 --- a/src/durability/recovery.hpp +++ b/src/durability/recovery.hpp @@ -3,11 +3,15 @@ #include #include -#include "database/graph_db.hpp" #include "durability/hashed_file_reader.hpp" #include "durability/recovery.capnp.h" #include "storage/vertex_accessor.hpp" #include "transactions/type.hpp" +#include "utils/serialization.hpp" + +namespace database { +class GraphDb; +}; namespace durability { @@ -15,25 +19,27 @@ namespace durability { struct RecoveryInfo { RecoveryInfo() {} RecoveryInfo(tx::TransactionId snapshot_tx_id, - tx::TransactionId max_wal_tx_id) - : snapshot_tx_id(snapshot_tx_id), max_wal_tx_id(max_wal_tx_id) {} + const std::vector &wal_recovered) + : snapshot_tx_id(snapshot_tx_id), wal_recovered(wal_recovered) {} tx::TransactionId snapshot_tx_id; - tx::TransactionId max_wal_tx_id; + std::vector wal_recovered; bool operator==(const RecoveryInfo &other) const { return snapshot_tx_id == other.snapshot_tx_id && - max_wal_tx_id == other.max_wal_tx_id; + wal_recovered == other.wal_recovered; } bool operator!=(const RecoveryInfo &other) const { return !(*this == other); } void Save(capnp::RecoveryInfo::Builder *builder) const { builder->setSnapshotTxId(snapshot_tx_id); - builder->setMaxWalTxId(max_wal_tx_id); + auto list_builder = builder->initWalRecovered(wal_recovered.size()); + utils::SaveVector(wal_recovered, &list_builder); } void Load(const capnp::RecoveryInfo::Reader &reader) { snapshot_tx_id = reader.getSnapshotTxId(); - max_wal_tx_id = reader.getMaxWalTxId(); + auto list_reader = reader.getWalRecovered(); + utils::LoadVector(&wal_recovered, list_reader); } private: @@ -42,7 +48,77 @@ struct RecoveryInfo { template void serialize(TArchive &ar, unsigned int) { ar &snapshot_tx_id; - ar &max_wal_tx_id; + ar &wal_recovered; + } +}; + +// A data structure for exchanging info between main recovery function and +// snapshot and WAL recovery functions. +struct RecoveryData { + tx::TransactionId snapshooter_tx_id{0}; + std::vector wal_tx_to_recover{}; + std::vector snapshooter_tx_snapshot; + // A collection into which the indexes should be added so they + // can be rebuilt at the end of the recovery transaction. + std::vector> indexes; + + void Clear() { + snapshooter_tx_id = 0; + snapshooter_tx_snapshot.clear(); + indexes.clear(); + } + + void Save(capnp::RecoveryData::Builder *builder) const { + builder->setSnapshooterTxId(snapshooter_tx_id); + { + auto list_builder = builder->initWalTxToRecover(wal_tx_to_recover.size()); + utils::SaveVector(wal_tx_to_recover, &list_builder); + } + { + auto list_builder = + builder->initSnapshooterTxSnapshot(snapshooter_tx_snapshot.size()); + utils::SaveVector(snapshooter_tx_snapshot, &list_builder); + } + { + auto list_builder = builder->initIndexes(indexes.size()); + utils::SaveVector, + std::pair>( + indexes, &list_builder, [](auto *builder, const auto value) { + builder->setFirst(value.first); + builder->setSecond(value.second); + }); + } + } + + void Load(const capnp::RecoveryData::Reader &reader) { + snapshooter_tx_id = reader.getSnapshooterTxId(); + { + auto list_reader = reader.getWalTxToRecover(); + utils::LoadVector(&wal_tx_to_recover, list_reader); + } + { + auto list_reader = reader.getSnapshooterTxSnapshot(); + utils::LoadVector(&snapshooter_tx_snapshot, list_reader); + } + { + auto list_reader = reader.getIndexes(); + utils::LoadVector, + std::pair>( + &indexes, list_reader, [](const auto &reader) { + return std::make_pair(reader.getFirst(), reader.getSecond()); + }); + } + } + + private: + friend class boost::serialization::access; + + template + void serialize(TArchive &ar, unsigned int) { + ar &snapshooter_tx_id; + ar &wal_tx_to_recover; + ar &snapshooter_tx_snapshot; + ar &indexes; } }; @@ -52,20 +128,22 @@ bool ReadSnapshotSummary(HashedFileReader &buffer, int64_t &vertex_count, int64_t &edge_count, uint64_t &hash); /** - * Recovers database from durability. If recovering fails, false is returned - * and db_accessor aborts transaction, else true is returned and transaction is - * commited. + * Recovers database from the latest possible snapshot. If recovering fails, + * false is returned and db_accessor aborts transaction, else true is returned + * and transaction is commited. * * @param durability_dir - Path to durability directory. * @param db - The database to recover into. - * @param required_recovery_info - Only used on distributed worker. Indicates - * what the master recovered. The same transactions must be recovered on the + * @param required_snapshot_tx_id - Only used on distributed worker. Indicates + * what the master recovered. The same snapshot must be recovered on the * worker. * @return - recovery info */ -RecoveryInfo Recover( +RecoveryInfo RecoverOnlySnapshot( const std::experimental::filesystem::path &durability_dir, - database::GraphDb &db, - std::experimental::optional required_recovery_info); + database::GraphDb *db, durability::RecoveryData *recovery_data, + std::experimental::optional required_snapshot_tx_id); +void RecoverWalAndIndexes(const std::experimental::filesystem::path &dir, + database::GraphDb *db, RecoveryData *recovery_data); } // namespace durability diff --git a/src/lisp/lcp.lisp b/src/lisp/lcp.lisp index f6b47d0fe..6e5526a4a 100644 --- a/src/lisp/lcp.lisp +++ b/src/lisp/lcp.lisp @@ -982,7 +982,8 @@ used for outside definition." (member-builder (format nil "~A_builder" (cpp-member-name member :struct t))) (capnp-name (cpp-type-name (cpp-member-symbol member)))) (cond - ((capnp-primitive-type-p (capnp-type-of-member member)) + ((and (not (cpp-member-capnp-save member)) + (capnp-primitive-type-p (capnp-type-of-member member))) (format s " builder->set~A(~A);~%" capnp-name member-name)) (t (write-line "{" s) ;; Enclose larger save code in new scope @@ -1166,7 +1167,8 @@ used for outside definition." (member-reader (format nil "~A_reader" (cpp-member-name member :struct t))) (capnp-name (cpp-type-name (cpp-member-symbol member)))) (cond - ((capnp-primitive-type-p (capnp-type-of-member member)) + ((and (not (cpp-member-capnp-load member)) + (capnp-primitive-type-p (capnp-type-of-member member))) (format s " ~A = reader.get~A();~%" member-name capnp-name)) (t (write-line "{" s) ;; Enclose larger load code in new scope diff --git a/src/storage/garbage_collector.hpp b/src/storage/garbage_collector.hpp index 5aa6c6f3e..3dd21f2bf 100644 --- a/src/storage/garbage_collector.hpp +++ b/src/storage/garbage_collector.hpp @@ -5,7 +5,6 @@ #include "data_structures/concurrent/skiplist.hpp" #include "mvcc/version_list.hpp" #include "storage/deferred_deleter.hpp" -#include "storage/deferred_deleter.hpp" #include "transactions/engine.hpp" /** @@ -52,8 +51,8 @@ class GarbageCollector { if (ret.second != nullptr) deleted_records.emplace_back(ret.second, engine.LocalLast()); } - DLOG_IF(INFO, count > 0) << "GC started cleaning with snapshot: " - << snapshot; + DLOG_IF(INFO, count > 0) + << "GC started cleaning with snapshot: " << snapshot; DLOG_IF(INFO, count > 0) << "Destroyed: " << count; // Add records to deleter, with the id larger or equal than the last active diff --git a/src/utils/algorithm.hpp b/src/utils/algorithm.hpp index 14e69f3e2..1c1910236 100644 --- a/src/utils/algorithm.hpp +++ b/src/utils/algorithm.hpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include "utils/exceptions.hpp" @@ -91,6 +92,12 @@ inline TVal First(TIterable &&iterable, TVal &&empty_value) { return empty_value; } +template +inline bool Contains(const std::unordered_set &iterable, + const TElement &element) { + return iterable.find(element) != iterable.end(); +} + /** * Returns `true` if the given iterable contains the given element. * diff --git a/tests/distributed/card_fraud/card_fraud.py b/tests/distributed/card_fraud/card_fraud.py index ba657bd17..7113c7804 100644 --- a/tests/distributed/card_fraud/card_fraud.py +++ b/tests/distributed/card_fraud/card_fraud.py @@ -78,7 +78,7 @@ class MgCluster: ]) # sleep to allow the workers to startup - time.sleep(5) + time.sleep(15) # store initial usage self._usage_start = [self._master.get_usage()] diff --git a/tests/unit/distributed_coordination.cpp b/tests/unit/distributed_coordination.cpp index ff37cccad..ccb23de00 100644 --- a/tests/unit/distributed_coordination.cpp +++ b/tests/unit/distributed_coordination.cpp @@ -60,7 +60,10 @@ class WorkerCoordinationInThread { } auto worker_ids() { return worker->coord.GetWorkerIds(); } void join() { worker_thread_.join(); } - void NotifyWorkerRecovered() { worker->discovery.NotifyWorkerRecovered(); } + void NotifyWorkerRecovered() { + std::experimental::optional no_recovery_info; + worker->discovery.NotifyWorkerRecovered(no_recovery_info); + } private: std::thread worker_thread_; @@ -72,7 +75,7 @@ TEST(Distributed, Coordination) { std::vector> workers; { MasterCoordination master_coord(master_server.endpoint()); - master_coord.SetRecoveryInfo(std::experimental::nullopt); + master_coord.SetRecoveredSnapshot(std::experimental::nullopt); RpcWorkerClients rpc_worker_clients(master_coord); ClusterDiscoveryMaster master_discovery_(master_server, master_coord, rpc_worker_clients); @@ -102,7 +105,7 @@ TEST(Distributed, DesiredAndUniqueId) { std::vector> workers; { MasterCoordination master_coord(master_server.endpoint()); - master_coord.SetRecoveryInfo(std::experimental::nullopt); + master_coord.SetRecoveredSnapshot(std::experimental::nullopt); RpcWorkerClients rpc_worker_clients(master_coord); ClusterDiscoveryMaster master_discovery_(master_server, master_coord, rpc_worker_clients); @@ -125,7 +128,7 @@ TEST(Distributed, CoordinationWorkersId) { std::vector> workers; { MasterCoordination master_coord(master_server.endpoint()); - master_coord.SetRecoveryInfo(std::experimental::nullopt); + master_coord.SetRecoveredSnapshot(std::experimental::nullopt); RpcWorkerClients rpc_worker_clients(master_coord); ClusterDiscoveryMaster master_discovery_(master_server, master_coord, rpc_worker_clients); @@ -151,7 +154,7 @@ TEST(Distributed, ClusterDiscovery) { std::vector> workers; { MasterCoordination master_coord(master_server.endpoint()); - master_coord.SetRecoveryInfo(std::experimental::nullopt); + master_coord.SetRecoveredSnapshot(std::experimental::nullopt); RpcWorkerClients rpc_worker_clients(master_coord); ClusterDiscoveryMaster master_discovery_(master_server, master_coord, rpc_worker_clients); @@ -182,7 +185,7 @@ TEST(Distributed, KeepsTrackOfRecovered) { std::vector> workers; { MasterCoordination master_coord(master_server.endpoint()); - master_coord.SetRecoveryInfo(std::experimental::nullopt); + master_coord.SetRecoveredSnapshot(std::experimental::nullopt); RpcWorkerClients rpc_worker_clients(master_coord); ClusterDiscoveryMaster master_discovery_(master_server, master_coord, rpc_worker_clients); diff --git a/tests/unit/rpc_worker_clients.cpp b/tests/unit/rpc_worker_clients.cpp index 1153a4fb0..c220b48ee 100644 --- a/tests/unit/rpc_worker_clients.cpp +++ b/tests/unit/rpc_worker_clients.cpp @@ -52,7 +52,7 @@ class RpcWorkerClientsTest : public ::testing::Test { const io::network::Endpoint kLocalHost{"127.0.0.1", 0}; const int kWorkerCount = 2; void SetUp() override { - master_coord_->SetRecoveryInfo(std::experimental::nullopt); + master_coord_->SetRecoveredSnapshot(std::experimental::nullopt); for (int i = 1; i <= kWorkerCount; ++i) { workers_server_.emplace_back( std::make_unique(kLocalHost)); diff --git a/tools/tests/mg_recovery_check.cpp b/tools/tests/mg_recovery_check.cpp index 962f5e73d..1ae6959e1 100644 --- a/tools/tests/mg_recovery_check.cpp +++ b/tools/tests/mg_recovery_check.cpp @@ -21,7 +21,10 @@ class RecoveryTest : public ::testing::Test { protected: void SetUp() override { std::string durability_dir(FLAGS_durability_dir); - durability::Recover(durability_dir, db_, std::experimental::nullopt); + durability::RecoveryData recovery_data; + durability::RecoverOnlySnapshot(durability_dir, &db_, &recovery_data, + std::experimental::nullopt); + durability::RecoverWalAndIndexes(durability_dir, &db_, &recovery_data); } database::SingleNode db_;