Fix distributed snapshot recovery failure

Summary:
This is still in progress.

As reported in
https://app.asana.com/0/743890251333732/971034572323525/f
memgraph distributed fails when the snapshot that it tries to recover from is
invalid. Instead of inheritance of `StorageGc` this diff adds composition and we
don't try to register a rpc server after the initialization.

Reviewers: ipaljak, vkasljevic, mtomic

Reviewed By: vkasljevic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1930
This commit is contained in:
Matija Santl 2019-03-21 17:20:16 +01:00
parent 63aeed0624
commit ad0d9e1f87
5 changed files with 167 additions and 87 deletions

View File

@ -281,8 +281,8 @@ class Master {
config_.rpc_num_client_workers};
tx::EngineMaster tx_engine_{&coordination_, &wal_};
std::unique_ptr<StorageGcMaster> storage_gc_ =
std::make_unique<StorageGcMaster>(
*storage_, tx_engine_, config_.gc_cycle_sec, &coordination_);
std::make_unique<StorageGcMaster>(storage_.get(), &tx_engine_,
config_.gc_cycle_sec, &coordination_);
TypemapPack<storage::MasterConcurrentIdMapper> typemap_pack_{&coordination_};
database::MasterCounters counters_{&coordination_};
distributed::BfsSubcursorStorage subcursor_storage_{&bfs_subcursor_clients_};
@ -385,14 +385,10 @@ bool Master::MakeSnapshot(GraphDbAccessor &accessor) {
}
void Master::ReinitializeStorage() {
// Release gc scheduler to stop it from touching storage
impl_->storage_gc_->Stop();
impl_->storage_gc_ = nullptr;
impl_->storage_ = std::make_unique<Storage>(
impl_->config_.worker_id, impl_->config_.properties_on_disk);
impl_->storage_gc_ = std::make_unique<StorageGcMaster>(
*impl_->storage_, impl_->tx_engine_, impl_->config_.gc_cycle_sec,
&impl_->coordination_);
impl_->storage_gc_->Reinitialize(impl_->storage_.get(), &impl_->tx_engine_);
}
io::network::Endpoint Master::endpoint() const {
@ -651,8 +647,8 @@ class Worker {
tx::EngineWorker tx_engine_{&coordination_, &wal_};
std::unique_ptr<StorageGcWorker> storage_gc_ =
std::make_unique<StorageGcWorker>(
*storage_, tx_engine_, config_.gc_cycle_sec,
*coordination_.GetClientPool(0), config_.worker_id);
storage_.get(), &tx_engine_, config_.gc_cycle_sec,
coordination_.GetClientPool(0), config_.worker_id);
TypemapPack<storage::WorkerConcurrentIdMapper> typemap_pack_{
coordination_.GetClientPool(0)};
database::WorkerCounters counters_{coordination_.GetClientPool(0)};
@ -674,7 +670,7 @@ class Worker {
config_.edge_cache_size};
distributed::DurabilityRpcWorker durability_rpc_{self_, &coordination_};
distributed::ClusterDiscoveryWorker cluster_discovery_{
&coordination_};
&coordination_};
distributed::TokenSharingRpcServer token_sharing_server_{
self_, config_.worker_id, &coordination_};
distributed::DynamicWorkerRegistration dynamic_worker_registration_{
@ -750,14 +746,10 @@ bool Worker::MakeSnapshot(GraphDbAccessor &accessor) {
}
void Worker::ReinitializeStorage() {
// Release gc scheduler to stop it from touching storage
impl_->storage_gc_->Stop();
impl_->storage_gc_ = nullptr;
impl_->storage_ = std::make_unique<Storage>(
impl_->config_.worker_id, impl_->config_.properties_on_disk);
impl_->storage_gc_ = std::make_unique<StorageGcWorker>(
*impl_->storage_, impl_->tx_engine_, impl_->config_.gc_cycle_sec,
*impl_->coordination_.GetClientPool(0), impl_->config_.worker_id);
impl_->storage_gc_->Reinitialize(impl_->storage_.get(), &impl_->tx_engine_);
}
void Worker::RecoverWalAndIndexes(durability::RecoveryData *recovery_data) {

View File

@ -1,14 +1,15 @@
/// @file
#pragma once
#include <chrono>
#include <queue>
#include "data_structures/concurrent/concurrent_map.hpp"
#include "storage/distributed/mvcc/version_list.hpp"
#include "storage/distributed/deferred_deleter.hpp"
#include "storage/distributed/edge.hpp"
#include "storage/distributed/garbage_collector.hpp"
#include "storage/distributed/gid.hpp"
#include "storage/distributed/mvcc/version_list.hpp"
#include "storage/distributed/storage.hpp"
#include "storage/distributed/vertex.hpp"
#include "transactions/distributed/engine.hpp"
@ -18,12 +19,13 @@
namespace database {
/** Garbage collection capabilities for database::Storage. Extracted into a
/**
* Garbage collection capabilities for database::Storage. Extracted into a
* separate class for better code organization, and because the GC requires a
* tx::Engine, while the Storage itself can exist without it. Even though, a
* database::Storage is always acompanied by a Gc.
*/
class StorageGc {
class StorageGc final {
template <typename TRecord>
class MvccDeleter {
using VlistT = mvcc::VersionList<TRecord>;
@ -37,14 +39,27 @@ class StorageGc {
};
public:
/** Creates a garbage collector for the given storage that uses the given
/**
* Creates a garbage collector for the given storage that uses the given
* tx::Engine. If `pause_sec` is greater then zero, then GC gets triggered
* periodically. */
StorageGc(Storage &storage, tx::Engine &tx_engine, int pause_sec)
: tx_engine_(tx_engine),
storage_(storage),
vertices_(storage.vertices_),
edges_(storage.edges_) {
* periodically. The `collect_commit_log_garbage` callback is used as a
* abstraction for the garbage collector that knows how to handle commit log
* garbage collection on master or worker implementation.
*
* @param storage - pointer to the current storage instance
* @param tx_engine - pointer to the current transaction engine instance
* @param pause_sec - garbage collector interval in seconds
* @param collect_commit_log_garbage - callback for the garbage collector
* that handles certain implementation
*/
StorageGc(Storage *storage, tx::Engine *tx_engine, int pause_sec,
std::function<void(tx::TransactionId, tx::Engine *)>
collect_commit_log_garbage)
: storage_(storage),
tx_engine_(tx_engine),
vertices_(storage->vertices_),
edges_(storage->edges_),
collect_commit_log_garbage_(collect_commit_log_garbage) {
if (pause_sec > 0)
scheduler_.Run(
"Storage GC", std::chrono::seconds(pause_sec), [this] {
@ -58,7 +73,7 @@ class StorageGc {
});
}
virtual ~StorageGc() {
~StorageGc() {
edges_.record_deleter_.FreeExpiredObjects(tx::Transaction::MaxId());
vertices_.record_deleter_.FreeExpiredObjects(tx::Transaction::MaxId());
edges_.version_list_deleter_.FreeExpiredObjects(tx::Transaction::MaxId());
@ -71,19 +86,17 @@ class StorageGc {
StorageGc &operator=(const StorageGc &) = delete;
StorageGc &operator=(StorageGc &&) = delete;
virtual void CollectCommitLogGarbage(tx::TransactionId oldest_active) = 0;
void CollectGarbage() {
// main garbage collection logic
// see wiki documentation for logic explanation
VLOG(21) << "Garbage collector started";
const auto snapshot_gc = tx_engine_.GlobalGcSnapshot();
const auto snapshot_gc = tx_engine_->GlobalGcSnapshot();
{
// This can be run concurrently
utils::Timer x;
vertices_.gc_.Run(snapshot_gc, tx_engine_);
edges_.gc_.Run(snapshot_gc, tx_engine_);
vertices_.gc_.Run(snapshot_gc, *tx_engine_);
edges_.gc_.Run(snapshot_gc, *tx_engine_);
VLOG(21) << "Garbage collector mvcc phase time: " << x.Elapsed().count();
}
@ -93,8 +106,8 @@ class StorageGc {
{
// This can be run concurrently
utils::Timer x;
storage_.labels_index_.Refresh(snapshot_gc, tx_engine_);
storage_.label_property_index_.Refresh(snapshot_gc, tx_engine_);
storage_->labels_index_.Refresh(snapshot_gc, *tx_engine_);
storage_->label_property_index_.Refresh(snapshot_gc, *tx_engine_);
VLOG(21) << "Garbage collector index phase time: " << x.Elapsed().count();
}
{
@ -105,7 +118,7 @@ class StorageGc {
// to those records. New snapshot can be used, different than one used for
// first two phases of gc.
utils::Timer x;
const auto snapshot_gc = tx_engine_.GlobalGcSnapshot();
const auto snapshot_gc = tx_engine_->GlobalGcSnapshot();
edges_.record_deleter_.FreeExpiredObjects(snapshot_gc.back());
vertices_.record_deleter_.FreeExpiredObjects(snapshot_gc.back());
edges_.version_list_deleter_.FreeExpiredObjects(snapshot_gc.back());
@ -114,8 +127,8 @@ class StorageGc {
<< x.Elapsed().count();
}
CollectCommitLogGarbage(snapshot_gc.back());
gc_txid_ranges_.emplace(snapshot_gc.back(), tx_engine_.GlobalLast());
collect_commit_log_garbage_(snapshot_gc.back(), tx_engine_);
gc_txid_ranges_.emplace(snapshot_gc.back(), tx_engine_->GlobalLast());
VLOG(21) << "gc snapshot: " << snapshot_gc;
VLOG(21) << "edge_record_deleter_ size: " << edges_.record_deleter_.Count();
@ -125,17 +138,18 @@ class StorageGc {
<< edges_.version_list_deleter_.Count();
VLOG(21) << "vertex_version_list_deleter_ size: "
<< vertices_.version_list_deleter_.Count();
VLOG(21) << "vertices_ size: " << storage_.vertices_.access().size();
VLOG(21) << "edges_ size: " << storage_.edges_.access().size();
VLOG(21) << "vertices_ size: " << storage_->vertices_.access().size();
VLOG(21) << "edges_ size: " << storage_->edges_.access().size();
VLOG(21) << "Garbage collector finished.";
}
protected:
// Find the largest transaction from which everything older is safe to
// delete, ones for which the hints have been set in the gc phase, and no
// alive transaction from the time before the hints were set is still alive
// (otherwise that transaction could still be waiting for a resolution of
// the query to the commit log about some old transaction)
/**
* Find the largest transaction from which everything older is safe to
* delete, ones for which the hints have been set in the gc phase, and no
* alive transaction from the time before the hints were set is still alive
* (otherwise that transaction could still be waiting for a resolution of
* the query to the commit log about some old transaction)
*/
std::experimental::optional<tx::TransactionId> GetClogSafeTransaction(
tx::TransactionId oldest_active) {
std::experimental::optional<tx::TransactionId> safe_to_delete;
@ -147,16 +161,22 @@ class StorageGc {
return safe_to_delete;
}
tx::Engine &tx_engine_;
utils::Scheduler scheduler_;
bool IsRunning() { return scheduler_.IsRunning(); }
void Stop() { scheduler_.Stop(); }
private:
Storage &storage_;
Storage *storage_;
tx::Engine *tx_engine_;
MvccDeleter<Vertex> vertices_;
MvccDeleter<Edge> edges_;
utils::Scheduler scheduler_;
// History of <oldest active transaction, next transaction to be ran> ranges
// that gc operated on at some previous time - used to clear commit log
std::queue<std::pair<tx::TransactionId, tx::TransactionId>> gc_txid_ranges_;
std::function<void(tx::TransactionId, tx::Engine *)>
collect_commit_log_garbage_;
};
} // namespace database

View File

@ -0,0 +1,58 @@
/// @file
#pragma once
#include <experimental/optional>
#include "storage/distributed/storage_gc.hpp"
#include "transactions/type.hpp"
namespace database {
/// A common base for wrappers around storage garbage collector in distributed.
/// This class uses composition on `StorageGc` to enable reinitialization.
/// Reinitialization is needed when we fail to recover from a snapshot and we
/// reinitialize the underlying storage object and thus we must reinitialize the
/// garbage collector.
class StorageGcDistributed {
public:
StorageGcDistributed() = delete;
StorageGcDistributed(Storage *storage, tx::Engine *tx_engine, int pause_sec)
: storage_gc_(std::make_unique<StorageGc>(
storage, tx_engine, pause_sec,
[this](tx::TransactionId oldest_active, tx::Engine *tx_engine) {
this->CollectCommitLogGarbage(oldest_active, tx_engine);
})),
pause_sec_(pause_sec) {}
virtual ~StorageGcDistributed() {
CHECK(!storage_gc_->IsRunning())
<< "You must call Stop on database::StorageGcMaster!";
}
virtual void CollectCommitLogGarbage(tx::TransactionId oldest_active,
tx::Engine *tx_engine) = 0;
virtual void Reinitialize(Storage *storage, tx::Engine *tx_engine) {
storage_gc_ = nullptr;
storage_gc_ = std::make_unique<StorageGc>(
storage, tx_engine, pause_sec_,
[this](tx::TransactionId oldest_active, tx::Engine *tx_engine) {
this->CollectCommitLogGarbage(oldest_active, tx_engine);
});
}
void Stop() { storage_gc_->Stop(); }
void CollectGarbage() { storage_gc_->CollectGarbage(); }
std::experimental::optional<tx::TransactionId> GetClogSafeTransaction(
tx::TransactionId oldest_active) {
return storage_gc_->GetClogSafeTransaction(oldest_active);
}
private:
std::unique_ptr<StorageGc> storage_gc_;
int pause_sec_;
};
} // namespace database

View File

@ -5,15 +5,25 @@
#include "distributed/coordination_master.hpp"
#include "distributed/storage_gc_rpc_messages.hpp"
#include "storage/distributed/storage_gc.hpp"
#include "storage/distributed/storage_gc_distributed.hpp"
namespace database {
class StorageGcMaster : public StorageGc {
/// Storage garbage collector specific for the master imeplementation.
/// On initialization, it will start a RPC server that receives information
/// about cleared transactions on workers and it'll start periodic garbage
/// collection.
class StorageGcMaster final : public StorageGcDistributed {
public:
using StorageGc::StorageGc;
StorageGcMaster(Storage &storage, tx::Engine &tx_engine, int pause_sec,
StorageGcMaster() = delete;
StorageGcMaster(const StorageGcMaster &) = delete;
StorageGcMaster(StorageGcMaster &&) = delete;
StorageGcMaster operator=(const StorageGcMaster &) = delete;
StorageGcMaster operator=(StorageGcMaster &&) = delete;
StorageGcMaster(Storage *storage, tx::Engine *tx_engine, int pause_sec,
distributed::MasterCoordination *coordination)
: StorageGc(storage, tx_engine, pause_sec),
: StorageGcDistributed(storage, tx_engine, pause_sec),
coordination_(coordination) {
coordination_->Register<distributed::RanLocalGcRpc>(
[this](const auto &req_reader, auto *res_builder) {
@ -24,19 +34,8 @@ class StorageGcMaster : public StorageGc {
});
}
~StorageGcMaster() {
// We have to stop scheduler before destroying this class because otherwise
// a task might try to utilize methods in this class which might cause pure
// virtual method called since they are not implemented for the base class.
CHECK(!scheduler_.IsRunning())
<< "You must call Stop on database::StorageGcMaster!";
}
void Stop() {
scheduler_.Stop();
}
void CollectCommitLogGarbage(tx::TransactionId oldest_active) final {
void CollectCommitLogGarbage(tx::TransactionId oldest_active,
tx::Engine *tx_engine) override {
// Workers are sending information when it's safe to delete every
// transaction older than oldest_active from their perspective i.e. there
// won't exist another transaction in the future with id larger than or
@ -55,12 +54,20 @@ class StorageGcMaster : public StorageGc {
}
// All workers reported back at least once
if (min_safe > 0) {
tx_engine_.GarbageCollectCommitLog(min_safe);
tx_engine->GarbageCollectCommitLog(min_safe);
LOG(INFO) << "Clearing master commit log with tx: " << min_safe;
}
}
}
void Reinitialize(Storage *storage, tx::Engine *tx_engine) override {
std::unique_lock<std::mutex> lock(worker_safe_transaction_mutex_);
worker_safe_transaction_.clear();
StorageGcDistributed::Reinitialize(storage, tx_engine);
}
private:
distributed::MasterCoordination *coordination_;
// Mapping of worker ids and oldest active transaction which is safe for
// deletion from worker perspective

View File

@ -1,31 +1,33 @@
/// @file
#pragma once
#include "communication/rpc/client_pool.hpp"
#include "distributed/storage_gc_rpc_messages.hpp"
#include "storage/distributed/storage_gc.hpp"
#include "storage/distributed/storage_gc_distributed.hpp"
#include "transactions/distributed/engine_worker.hpp"
namespace database {
class StorageGcWorker : public StorageGc {
/// Storage garbage collector specific for the worker implementation.
/// Starts a periodic garbage collection that will also send information about
/// the cleared local transactions to the master using RPC.
class StorageGcWorker final : public StorageGcDistributed {
public:
StorageGcWorker(Storage &storage, tx::Engine &tx_engine, int pause_sec,
communication::rpc::ClientPool &master_client_pool,
StorageGcWorker() = delete;
StorageGcWorker(const StorageGcWorker &) = delete;
StorageGcWorker(StorageGcWorker &&) = delete;
StorageGcWorker operator=(const StorageGcWorker &) = delete;
StorageGcWorker operator=(StorageGcWorker &&) = delete;
StorageGcWorker(Storage *storage, tx::Engine *tx_engine, int pause_sec,
communication::rpc::ClientPool *master_client_pool,
int worker_id)
: StorageGc(storage, tx_engine, pause_sec),
: StorageGcDistributed(storage, tx_engine, pause_sec),
master_client_pool_(master_client_pool),
worker_id_(worker_id) {}
~StorageGcWorker() {
// We have to stop scheduler before destroying this class because otherwise
// a task might try to utilize methods in this class which might cause pure
// virtual method called since they are not implemented for the base class.
CHECK(!scheduler_.IsRunning())
<< "You must call Stop on database::StorageGcWorker!";
}
void Stop() { scheduler_.Stop(); }
void CollectCommitLogGarbage(tx::TransactionId oldest_active) final {
void CollectCommitLogGarbage(tx::TransactionId oldest_active,
tx::Engine *tx_engine) override {
// We first need to delete transactions that we can delete to be sure that
// the locks are released as well. Otherwise some new transaction might
// try to acquire a lock which hasn't been released (if the transaction
@ -37,17 +39,18 @@ class StorageGcWorker : public StorageGc {
// cleaner. That code was then moved and can now be found in the
// `tx::EngineDistributed` garbage collector. This may not be correct,
// @storage_team please investigate this.
dynamic_cast<tx::EngineWorker &>(tx_engine_)
dynamic_cast<tx::EngineWorker &>(*tx_engine)
.ClearTransactionalCache(oldest_active);
auto safe_to_delete = GetClogSafeTransaction(oldest_active);
if (safe_to_delete) {
master_client_pool_.Call<distributed::RanLocalGcRpc>(*safe_to_delete,
worker_id_);
tx_engine_.GarbageCollectCommitLog(*safe_to_delete);
master_client_pool_->Call<distributed::RanLocalGcRpc>(*safe_to_delete,
worker_id_);
tx_engine->GarbageCollectCommitLog(*safe_to_delete);
}
}
communication::rpc::ClientPool &master_client_pool_;
private:
communication::rpc::ClientPool *master_client_pool_;
int worker_id_;
};
} // namespace database