Destroy and create storage object after failed recovery

Reviewers: buda

Reviewed By: buda

Subscribers: mferencevic, pullbot

Differential Revision: https://phabricator.memgraph.io/D1370
This commit is contained in:
Dominik Gleich 2018-04-20 14:16:54 +02:00
parent 5c7d3a908f
commit 261d50a02e
7 changed files with 81 additions and 25 deletions

View File

@ -58,6 +58,20 @@ class Server {
}
}
template <typename TRequestResponse>
void UnRegister() {
static_assert(
std::is_base_of<Message, typename TRequestResponse::Request>::value,
"TRequestResponse::Request must be derived from Message");
static_assert(
std::is_base_of<Message, typename TRequestResponse::Response>::value,
"TRequestResponse::Response must be derived from Message");
auto callbacks_accessor = callbacks_.access();
auto deleted =
callbacks_accessor.remove(typeid(typename TRequestResponse::Request));
CHECK(deleted) << "Trying to remove unknown message type callback";
}
private:
friend class Session;
@ -67,6 +81,6 @@ class Server {
std::mutex mutex_;
communication::Server<Session, Server> server_;
};
}; // namespace communication::rpc
} // namespace communication::rpc

View File

@ -48,7 +48,7 @@ class PrivateBase : public GraphDb {
const Config config_;
Storage &storage() override { return storage_; }
Storage &storage() override { return *storage_; }
durability::WriteAheadLog &wal() override { return wal_; }
int WorkerId() const override { return config_.worker_id; }
@ -65,6 +65,10 @@ class PrivateBase : public GraphDb {
return status;
}
void ReinitializeStorage() override {
storage_ = std::make_unique<Storage>(WorkerId());
}
distributed::PullRpcClients &pull_clients() override {
LOG(FATAL) << "Remote pull clients only available in master.";
}
@ -82,7 +86,8 @@ class PrivateBase : public GraphDb {
}
protected:
Storage storage_{config_.worker_id};
std::unique_ptr<Storage> storage_ =
std::make_unique<Storage>(config_.worker_id);
durability::WriteAheadLog wal_{config_.worker_id,
config_.durability_directory,
config_.durability_enabled};
@ -111,7 +116,7 @@ struct TypemapPack {
return typemap_pack_.property; \
} \
database::Counters &counters() override { return counters_; } \
void CollectGarbage() override { storage_gc_.CollectGarbage(); }
void CollectGarbage() override { storage_gc_->CollectGarbage(); }
class SingleNode : public PrivateBase {
public:
@ -120,7 +125,9 @@ class SingleNode : public PrivateBase {
IMPL_GETTERS
tx::SingleNodeEngine tx_engine_{&wal_};
StorageGcSingleNode storage_gc_{storage_, tx_engine_, config_.gc_cycle_sec};
std::unique_ptr<StorageGcSingleNode> storage_gc_ =
std::make_unique<StorageGcSingleNode>(*storage_, tx_engine_,
config_.gc_cycle_sec);
TypemapPack<SingleNodeConcurrentIdMapper> typemap_pack_;
database::SingleNodeCounters counters_;
std::vector<int> GetWorkerIds() const override { return {0}; }
@ -145,6 +152,13 @@ class SingleNode : public PrivateBase {
distributed::DataManager &data_manager() override {
LOG(FATAL) << "Remote data manager not available in single-node.";
}
void ReinitializeStorage() override {
// Release gc scheduler to stop it from touching storage
storage_gc_ = nullptr;
PrivateBase::ReinitializeStorage();
storage_gc_ = std::make_unique<StorageGcSingleNode>(*storage_, tx_engine_,
config_.gc_cycle_sec);
}
};
#define IMPL_DISTRIBUTED_GETTERS \
@ -195,12 +209,21 @@ class Master : public PrivateBase {
return index_rpc_clients_;
}
void ReinitializeStorage() override {
// Release gc scheduler to stop it from touching storage
storage_gc_ = nullptr;
PrivateBase::ReinitializeStorage();
storage_gc_ = std::make_unique<StorageGcMaster>(
*storage_, tx_engine_, config_.gc_cycle_sec, server_, coordination_);
}
communication::rpc::Server server_{
config_.master_endpoint, static_cast<size_t>(config_.rpc_num_workers)};
tx::MasterEngine tx_engine_{server_, rpc_worker_clients_, &wal_};
distributed::MasterCoordination coordination_{server_.endpoint()};
StorageGcMaster storage_gc_{storage_, tx_engine_, config_.gc_cycle_sec,
server_, coordination_};
std::unique_ptr<StorageGcMaster> storage_gc_ =
std::make_unique<StorageGcMaster>(
*storage_, tx_engine_, config_.gc_cycle_sec, server_, coordination_);
distributed::RpcWorkerClients rpc_worker_clients_{coordination_};
TypemapPack<MasterConcurrentIdMapper> typemap_pack_{server_};
database::MasterCounters counters_{server_};
@ -213,7 +236,7 @@ class Master : public PrivateBase {
distributed::IndexRpcClients index_rpc_clients_{rpc_worker_clients_};
distributed::UpdatesRpcServer updates_server_{*this, server_};
distributed::UpdatesRpcClients updates_clients_{rpc_worker_clients_};
distributed::DataManager data_manager_{storage_, data_clients_};
distributed::DataManager data_manager_{*this, data_clients_};
distributed::TransactionalCacheCleaner cache_cleaner_{
tx_engine_, updates_server_, data_manager_};
distributed::ClusterDiscoveryMaster cluster_discovery_{server_, coordination_,
@ -236,15 +259,25 @@ class Worker : public PrivateBase {
return produce_server_;
}
void ReinitializeStorage() override {
// Release gc scheduler to stop it from touching storage
storage_gc_ = nullptr;
PrivateBase::ReinitializeStorage();
storage_gc_ = std::make_unique<StorageGcWorker>(
*storage_, tx_engine_, config_.gc_cycle_sec,
rpc_worker_clients_.GetClientPool(0), config_.worker_id);
}
communication::rpc::Server server_{
config_.worker_endpoint, static_cast<size_t>(config_.rpc_num_workers)};
distributed::WorkerCoordination coordination_{server_,
config_.master_endpoint};
distributed::RpcWorkerClients rpc_worker_clients_{coordination_};
tx::WorkerEngine tx_engine_{rpc_worker_clients_.GetClientPool(0)};
StorageGcWorker storage_gc_{storage_, tx_engine_, config_.gc_cycle_sec,
rpc_worker_clients_.GetClientPool(0),
config_.worker_id};
std::unique_ptr<StorageGcWorker> storage_gc_ =
std::make_unique<StorageGcWorker>(
*storage_, tx_engine_, config_.gc_cycle_sec,
rpc_worker_clients_.GetClientPool(0), config_.worker_id);
TypemapPack<WorkerConcurrentIdMapper> typemap_pack_{
rpc_worker_clients_.GetClientPool(0)};
database::WorkerCounters counters_{rpc_worker_clients_.GetClientPool(0)};
@ -256,7 +289,7 @@ class Worker : public PrivateBase {
distributed::IndexRpcServer index_rpc_server_{*this, server_};
distributed::UpdatesRpcServer updates_server_{*this, server_};
distributed::UpdatesRpcClients updates_clients_{rpc_worker_clients_};
distributed::DataManager data_manager_{storage_, data_clients_};
distributed::DataManager data_manager_{*this, data_clients_};
distributed::WorkerTransactionalCacheCleaner cache_cleaner_{
tx_engine_, server_, produce_server_, updates_server_, data_manager_};
distributed::DurabilityRpcServer durability_rpc_server_{*this, server_};
@ -401,6 +434,9 @@ distributed::DataManager &PublicBase::data_manager() {
bool PublicBase::MakeSnapshot(GraphDbAccessor &accessor) {
return impl_->MakeSnapshot(accessor);
}
void PublicBase::ReinitializeStorage() { impl_->ReinitializeStorage(); }
} // namespace impl
MasterBase::MasterBase(std::unique_ptr<impl::PrivateBase> impl)

View File

@ -114,6 +114,12 @@ class GraphDb {
// Makes a snapshot from the visibility of the given accessor
virtual bool MakeSnapshot(GraphDbAccessor &accessor) = 0;
// Releases the storage object safely and creates a new object.
// This is needed because of recovery, otherwise we might try to recover into
// a storage which has already been polluted because of a failed previous
// recovery
virtual void ReinitializeStorage() = 0;
GraphDb(const GraphDb &) = delete;
GraphDb(GraphDb &&) = delete;
GraphDb &operator=(const GraphDb &) = delete;
@ -154,6 +160,7 @@ class PublicBase : public GraphDb {
bool is_accepting_transactions() const { return is_accepting_transactions_; }
bool MakeSnapshot(GraphDbAccessor &accessor) override;
void ReinitializeStorage() override;
protected:
explicit PublicBase(std::unique_ptr<PrivateBase> impl);
@ -170,7 +177,7 @@ class PublicBase : public GraphDb {
class MasterBase : public impl::PublicBase {
public:
MasterBase(std::unique_ptr<impl::PrivateBase> impl);
explicit MasterBase(std::unique_ptr<impl::PrivateBase> impl);
~MasterBase();
private:

View File

@ -29,6 +29,7 @@ class StorageGcMaster : public StorageGc {
// a task might try to utilize methods in this class which might cause pure
// virtual method called since they are not implemented for the base class.
scheduler_.Stop();
rpc_server_.UnRegister<distributed::RanLocalGcRpc>();
}
void CollectCommitLogGarbage(tx::TransactionId oldest_active) final {

View File

@ -1,5 +1,5 @@
#include "distributed/data_manager.hpp"
#include "database/storage.hpp"
#include "distributed/data_manager.hpp"
namespace distributed {
@ -11,8 +11,9 @@ Cache<TRecord> &DataManager::GetCache(CacheT<TRecord> &collection,
if (found != access.end()) return found->second;
return access
.emplace(tx_id, std::make_tuple(tx_id),
std::make_tuple(std::ref(storage_), std::ref(data_clients_)))
.emplace(
tx_id, std::make_tuple(tx_id),
std::make_tuple(std::ref(db_.storage()), std::ref(data_clients_)))
.first->second;
}
@ -26,9 +27,9 @@ Cache<Edge> &DataManager::Elements<Edge>(tx::TransactionId tx_id) {
return GetCache(edges_caches_, tx_id);
}
DataManager::DataManager(database::Storage &storage,
DataManager::DataManager(database::GraphDb &db,
distributed::DataRpcClients &data_clients)
: storage_(storage), data_clients_(data_clients) {}
: db_(db), data_clients_(data_clients) {}
void DataManager::ClearCacheForSingleTransaction(tx::TransactionId tx_id) {
Elements<Vertex>(tx_id).ClearCache();

View File

@ -1,6 +1,7 @@
#pragma once
#include "data_structures/concurrent/concurrent_map.hpp"
#include "database/graph_db.hpp"
#include "distributed/cache.hpp"
#include "distributed/data_rpc_clients.hpp"
#include "transactions/type.hpp"
@ -8,10 +9,6 @@
class Vertex;
class Edge;
namespace database {
class Storage;
}
namespace distributed {
/// Handles remote data caches for edges and vertices, per transaction.
@ -25,8 +22,7 @@ class DataManager {
tx::TransactionId tx_id);
public:
DataManager(database::Storage &storage,
distributed::DataRpcClients &data_clients);
DataManager(database::GraphDb &db, distributed::DataRpcClients &data_clients);
/// Gets or creates the remote vertex/edge cache for the given transaction.
template <typename TRecord>
@ -40,7 +36,7 @@ class DataManager {
void ClearTransactionalCache(tx::TransactionId oldest_active);
private:
database::Storage &storage_;
database::GraphDb &db_;
DataRpcClients &data_clients_;
CacheT<Vertex> vertices_caches_;
CacheT<Edge> edges_caches_;

View File

@ -355,6 +355,7 @@ RecoveryInfo Recover(
}
LOG(INFO) << "Starting snapshot recovery from: " << snapshot_file;
if (!RecoverSnapshot(snapshot_file, db, recovery_data)) {
db.ReinitializeStorage();
recovery_data.Clear();
LOG(WARNING) << "Snapshot recovery failed, trying older snapshot...";
continue;