From 261d50a02eb4b1573c56f195aedd52339e3dc2d5 Mon Sep 17 00:00:00 2001 From: Dominik Gleich Date: Fri, 20 Apr 2018 14:16:54 +0200 Subject: [PATCH] Destroy and create storage object after failed recovery Reviewers: buda Reviewed By: buda Subscribers: mferencevic, pullbot Differential Revision: https://phabricator.memgraph.io/D1370 --- src/communication/rpc/server.hpp | 16 ++++++++- src/database/graph_db.cpp | 58 ++++++++++++++++++++++++------ src/database/graph_db.hpp | 9 ++++- src/database/storage_gc_master.hpp | 1 + src/distributed/data_manager.cpp | 11 +++--- src/distributed/data_manager.hpp | 10 ++---- src/durability/recovery.cpp | 1 + 7 files changed, 81 insertions(+), 25 deletions(-) diff --git a/src/communication/rpc/server.hpp b/src/communication/rpc/server.hpp index b663b6e92..100a0f0eb 100644 --- a/src/communication/rpc/server.hpp +++ b/src/communication/rpc/server.hpp @@ -58,6 +58,20 @@ class Server { } } + template + void UnRegister() { + static_assert( + std::is_base_of::value, + "TRequestResponse::Request must be derived from Message"); + static_assert( + std::is_base_of::value, + "TRequestResponse::Response must be derived from Message"); + auto callbacks_accessor = callbacks_.access(); + auto deleted = + callbacks_accessor.remove(typeid(typename TRequestResponse::Request)); + CHECK(deleted) << "Trying to remove unknown message type callback"; + } + private: friend class Session; @@ -67,6 +81,6 @@ class Server { std::mutex mutex_; communication::Server server_; -}; +}; // namespace communication::rpc } // namespace communication::rpc diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index 20f5dd149..479f5b7be 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -48,7 +48,7 @@ class PrivateBase : public GraphDb { const Config config_; - Storage &storage() override { return storage_; } + Storage &storage() override { return *storage_; } durability::WriteAheadLog &wal() override { return wal_; } int WorkerId() const override { return config_.worker_id; } @@ -65,6 +65,10 @@ class PrivateBase : public GraphDb { return status; } + void ReinitializeStorage() override { + storage_ = std::make_unique(WorkerId()); + } + distributed::PullRpcClients &pull_clients() override { LOG(FATAL) << "Remote pull clients only available in master."; } @@ -82,7 +86,8 @@ class PrivateBase : public GraphDb { } protected: - Storage storage_{config_.worker_id}; + std::unique_ptr storage_ = + std::make_unique(config_.worker_id); durability::WriteAheadLog wal_{config_.worker_id, config_.durability_directory, config_.durability_enabled}; @@ -111,7 +116,7 @@ struct TypemapPack { return typemap_pack_.property; \ } \ database::Counters &counters() override { return counters_; } \ - void CollectGarbage() override { storage_gc_.CollectGarbage(); } + void CollectGarbage() override { storage_gc_->CollectGarbage(); } class SingleNode : public PrivateBase { public: @@ -120,7 +125,9 @@ class SingleNode : public PrivateBase { IMPL_GETTERS tx::SingleNodeEngine tx_engine_{&wal_}; - StorageGcSingleNode storage_gc_{storage_, tx_engine_, config_.gc_cycle_sec}; + std::unique_ptr storage_gc_ = + std::make_unique(*storage_, tx_engine_, + config_.gc_cycle_sec); TypemapPack typemap_pack_; database::SingleNodeCounters counters_; std::vector GetWorkerIds() const override { return {0}; } @@ -145,6 +152,13 @@ class SingleNode : public PrivateBase { distributed::DataManager &data_manager() override { LOG(FATAL) << "Remote data manager not available in single-node."; } + void ReinitializeStorage() override { + // Release gc scheduler to stop it from touching storage + storage_gc_ = nullptr; + PrivateBase::ReinitializeStorage(); + storage_gc_ = std::make_unique(*storage_, tx_engine_, + config_.gc_cycle_sec); + } }; #define IMPL_DISTRIBUTED_GETTERS \ @@ -195,12 +209,21 @@ class Master : public PrivateBase { return index_rpc_clients_; } + void ReinitializeStorage() override { + // Release gc scheduler to stop it from touching storage + storage_gc_ = nullptr; + PrivateBase::ReinitializeStorage(); + storage_gc_ = std::make_unique( + *storage_, tx_engine_, config_.gc_cycle_sec, server_, coordination_); + } + communication::rpc::Server server_{ config_.master_endpoint, static_cast(config_.rpc_num_workers)}; tx::MasterEngine tx_engine_{server_, rpc_worker_clients_, &wal_}; distributed::MasterCoordination coordination_{server_.endpoint()}; - StorageGcMaster storage_gc_{storage_, tx_engine_, config_.gc_cycle_sec, - server_, coordination_}; + std::unique_ptr storage_gc_ = + std::make_unique( + *storage_, tx_engine_, config_.gc_cycle_sec, server_, coordination_); distributed::RpcWorkerClients rpc_worker_clients_{coordination_}; TypemapPack typemap_pack_{server_}; database::MasterCounters counters_{server_}; @@ -213,7 +236,7 @@ class Master : public PrivateBase { distributed::IndexRpcClients index_rpc_clients_{rpc_worker_clients_}; distributed::UpdatesRpcServer updates_server_{*this, server_}; distributed::UpdatesRpcClients updates_clients_{rpc_worker_clients_}; - distributed::DataManager data_manager_{storage_, data_clients_}; + distributed::DataManager data_manager_{*this, data_clients_}; distributed::TransactionalCacheCleaner cache_cleaner_{ tx_engine_, updates_server_, data_manager_}; distributed::ClusterDiscoveryMaster cluster_discovery_{server_, coordination_, @@ -236,15 +259,25 @@ class Worker : public PrivateBase { return produce_server_; } + void ReinitializeStorage() override { + // Release gc scheduler to stop it from touching storage + storage_gc_ = nullptr; + PrivateBase::ReinitializeStorage(); + storage_gc_ = std::make_unique( + *storage_, tx_engine_, config_.gc_cycle_sec, + rpc_worker_clients_.GetClientPool(0), config_.worker_id); + } + communication::rpc::Server server_{ config_.worker_endpoint, static_cast(config_.rpc_num_workers)}; distributed::WorkerCoordination coordination_{server_, config_.master_endpoint}; distributed::RpcWorkerClients rpc_worker_clients_{coordination_}; tx::WorkerEngine tx_engine_{rpc_worker_clients_.GetClientPool(0)}; - StorageGcWorker storage_gc_{storage_, tx_engine_, config_.gc_cycle_sec, - rpc_worker_clients_.GetClientPool(0), - config_.worker_id}; + std::unique_ptr storage_gc_ = + std::make_unique( + *storage_, tx_engine_, config_.gc_cycle_sec, + rpc_worker_clients_.GetClientPool(0), config_.worker_id); TypemapPack typemap_pack_{ rpc_worker_clients_.GetClientPool(0)}; database::WorkerCounters counters_{rpc_worker_clients_.GetClientPool(0)}; @@ -256,7 +289,7 @@ class Worker : public PrivateBase { distributed::IndexRpcServer index_rpc_server_{*this, server_}; distributed::UpdatesRpcServer updates_server_{*this, server_}; distributed::UpdatesRpcClients updates_clients_{rpc_worker_clients_}; - distributed::DataManager data_manager_{storage_, data_clients_}; + distributed::DataManager data_manager_{*this, data_clients_}; distributed::WorkerTransactionalCacheCleaner cache_cleaner_{ tx_engine_, server_, produce_server_, updates_server_, data_manager_}; distributed::DurabilityRpcServer durability_rpc_server_{*this, server_}; @@ -401,6 +434,9 @@ distributed::DataManager &PublicBase::data_manager() { bool PublicBase::MakeSnapshot(GraphDbAccessor &accessor) { return impl_->MakeSnapshot(accessor); } + +void PublicBase::ReinitializeStorage() { impl_->ReinitializeStorage(); } + } // namespace impl MasterBase::MasterBase(std::unique_ptr impl) diff --git a/src/database/graph_db.hpp b/src/database/graph_db.hpp index c70441fde..2fd1d45a0 100644 --- a/src/database/graph_db.hpp +++ b/src/database/graph_db.hpp @@ -114,6 +114,12 @@ class GraphDb { // Makes a snapshot from the visibility of the given accessor virtual bool MakeSnapshot(GraphDbAccessor &accessor) = 0; + // Releases the storage object safely and creates a new object. + // This is needed because of recovery, otherwise we might try to recover into + // a storage which has already been polluted because of a failed previous + // recovery + virtual void ReinitializeStorage() = 0; + GraphDb(const GraphDb &) = delete; GraphDb(GraphDb &&) = delete; GraphDb &operator=(const GraphDb &) = delete; @@ -154,6 +160,7 @@ class PublicBase : public GraphDb { bool is_accepting_transactions() const { return is_accepting_transactions_; } bool MakeSnapshot(GraphDbAccessor &accessor) override; + void ReinitializeStorage() override; protected: explicit PublicBase(std::unique_ptr impl); @@ -170,7 +177,7 @@ class PublicBase : public GraphDb { class MasterBase : public impl::PublicBase { public: - MasterBase(std::unique_ptr impl); + explicit MasterBase(std::unique_ptr impl); ~MasterBase(); private: diff --git a/src/database/storage_gc_master.hpp b/src/database/storage_gc_master.hpp index 70015b4a1..02b9d513b 100644 --- a/src/database/storage_gc_master.hpp +++ b/src/database/storage_gc_master.hpp @@ -29,6 +29,7 @@ class StorageGcMaster : public StorageGc { // a task might try to utilize methods in this class which might cause pure // virtual method called since they are not implemented for the base class. scheduler_.Stop(); + rpc_server_.UnRegister(); } void CollectCommitLogGarbage(tx::TransactionId oldest_active) final { diff --git a/src/distributed/data_manager.cpp b/src/distributed/data_manager.cpp index e94df319b..9a619d692 100644 --- a/src/distributed/data_manager.cpp +++ b/src/distributed/data_manager.cpp @@ -1,5 +1,5 @@ -#include "distributed/data_manager.hpp" #include "database/storage.hpp" +#include "distributed/data_manager.hpp" namespace distributed { @@ -11,8 +11,9 @@ Cache &DataManager::GetCache(CacheT &collection, if (found != access.end()) return found->second; return access - .emplace(tx_id, std::make_tuple(tx_id), - std::make_tuple(std::ref(storage_), std::ref(data_clients_))) + .emplace( + tx_id, std::make_tuple(tx_id), + std::make_tuple(std::ref(db_.storage()), std::ref(data_clients_))) .first->second; } @@ -26,9 +27,9 @@ Cache &DataManager::Elements(tx::TransactionId tx_id) { return GetCache(edges_caches_, tx_id); } -DataManager::DataManager(database::Storage &storage, +DataManager::DataManager(database::GraphDb &db, distributed::DataRpcClients &data_clients) - : storage_(storage), data_clients_(data_clients) {} + : db_(db), data_clients_(data_clients) {} void DataManager::ClearCacheForSingleTransaction(tx::TransactionId tx_id) { Elements(tx_id).ClearCache(); diff --git a/src/distributed/data_manager.hpp b/src/distributed/data_manager.hpp index 0c946549e..4f2888ac2 100644 --- a/src/distributed/data_manager.hpp +++ b/src/distributed/data_manager.hpp @@ -1,6 +1,7 @@ #pragma once #include "data_structures/concurrent/concurrent_map.hpp" +#include "database/graph_db.hpp" #include "distributed/cache.hpp" #include "distributed/data_rpc_clients.hpp" #include "transactions/type.hpp" @@ -8,10 +9,6 @@ class Vertex; class Edge; -namespace database { -class Storage; -} - namespace distributed { /// Handles remote data caches for edges and vertices, per transaction. @@ -25,8 +22,7 @@ class DataManager { tx::TransactionId tx_id); public: - DataManager(database::Storage &storage, - distributed::DataRpcClients &data_clients); + DataManager(database::GraphDb &db, distributed::DataRpcClients &data_clients); /// Gets or creates the remote vertex/edge cache for the given transaction. template @@ -40,7 +36,7 @@ class DataManager { void ClearTransactionalCache(tx::TransactionId oldest_active); private: - database::Storage &storage_; + database::GraphDb &db_; DataRpcClients &data_clients_; CacheT vertices_caches_; CacheT edges_caches_; diff --git a/src/durability/recovery.cpp b/src/durability/recovery.cpp index 094c2bed7..3e0b5e930 100644 --- a/src/durability/recovery.cpp +++ b/src/durability/recovery.cpp @@ -355,6 +355,7 @@ RecoveryInfo Recover( } LOG(INFO) << "Starting snapshot recovery from: " << snapshot_file; if (!RecoverSnapshot(snapshot_file, db, recovery_data)) { + db.ReinitializeStorage(); recovery_data.Clear(); LOG(WARNING) << "Snapshot recovery failed, trying older snapshot..."; continue;