Remove dependency on distributed from durability

Reviewers: msantl, vkasljevic

Reviewed By: msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1521
This commit is contained in:
Teon Banek 2018-08-01 09:48:38 +02:00
parent 09bc9cb164
commit eeb03b132f
10 changed files with 204 additions and 101 deletions

View File

@ -425,6 +425,77 @@ class WorkerAccessor final : public DistributedAccessor {
}
};
//////////////////////////////////////////////////////////////////////
// RecoveryTransactions implementations
//////////////////////////////////////////////////////////////////////
class DistributedRecoveryTransanctions
: public durability::RecoveryTransactions {
public:
explicit DistributedRecoveryTransanctions(DistributedGraphDb *db) : db_(db) {}
void Begin(const tx::TransactionId &tx_id) override {
CHECK(accessors_.find(tx_id) == accessors_.end())
<< "Double transaction start";
accessors_.emplace(tx_id, db_->Access());
}
void Abort(const tx::TransactionId &tx_id) final {
GetAccessor(tx_id)->Abort();
accessors_.erase(accessors_.find(tx_id));
}
void Commit(const tx::TransactionId &tx_id) final {
GetAccessor(tx_id)->Commit();
accessors_.erase(accessors_.find(tx_id));
}
void Apply(const database::StateDelta &delta) final {
delta.Apply(*GetAccessor(delta.transaction_id));
}
protected:
virtual GraphDbAccessor *GetAccessor(const tx::TransactionId &tx_id) {
auto found = accessors_.find(tx_id);
// Currently accessors are created on transaction_begin, but since workers
// don't have a transaction begin, the accessors are not created.
if (db_->type() == database::GraphDb::Type::DISTRIBUTED_WORKER &&
found == accessors_.end()) {
std::tie(found, std::ignore) = accessors_.emplace(tx_id, db_->Access());
}
CHECK(found != accessors_.end())
<< "Accessor does not exist for transaction: " << tx_id;
return found->second.get();
}
DistributedGraphDb *db_;
std::unordered_map<tx::TransactionId, std::unique_ptr<GraphDbAccessor>>
accessors_;
};
class WorkerRecoveryTransactions final
: public DistributedRecoveryTransanctions {
public:
explicit WorkerRecoveryTransactions(Worker *db)
: DistributedRecoveryTransanctions(db) {}
void Begin(const tx::TransactionId &tx_id) override {
LOG(FATAL) << "Unexpected transaction begin on worker recovery.";
}
protected:
GraphDbAccessor *GetAccessor(const tx::TransactionId &tx_id) override {
auto found = accessors_.find(tx_id);
// Currently accessors are created on transaction_begin, but since workers
// don't have a transaction begin, the accessors are not created.
if (found == accessors_.end()) {
std::tie(found, std::ignore) = accessors_.emplace(tx_id, db_->Access());
}
return found->second.get();
}
};
} // namespace
//////////////////////////////////////////////////////////////////////
@ -524,7 +595,7 @@ Master::Master(Config config)
if (impl_->config_.db_recover_on_startup) {
recovery_info = durability::RecoverOnlySnapshot(
impl_->config_.durability_directory, this, &recovery_data,
std::experimental::nullopt);
std::experimental::nullopt, config.worker_id);
}
// Post-recovery setup and checking.
@ -548,8 +619,9 @@ Master::Master(Config config)
// workers and on master
recovery_data.wal_tx_to_recover =
impl_->coordination_.CommonWalTransactions(*recovery_info);
DistributedRecoveryTransanctions recovery_transactions(this);
durability::RecoverWalAndIndexes(impl_->config_.durability_directory,
this, &recovery_data);
this, &recovery_data, &recovery_transactions);
auto workers_recovered_wal =
impl_->durability_rpc_.RecoverWalAndIndexes(&recovery_data);
workers_recovered_wal.get();
@ -658,9 +730,10 @@ bool Master::MakeSnapshot(GraphDbAccessor &accessor) {
// snapshot if we succeed in creating it and workers somehow fail. Because
// we have an assumption that every snapshot that exists on master with
// some tx_id visibility also exists on workers
const bool status = durability::MakeSnapshot(
*this, accessor, fs::path(impl_->config_.durability_directory),
impl_->config_.snapshot_max_retained);
const bool status =
durability::MakeSnapshot(*this, accessor, impl_->config_.worker_id,
fs::path(impl_->config_.durability_directory),
impl_->config_.snapshot_max_retained);
if (status) {
LOG(INFO) << "Snapshot created successfully.";
} else {
@ -834,7 +907,7 @@ Worker::Worker(Config config)
if (snapshot_to_recover) {
recovery_info = durability::RecoverOnlySnapshot(
impl_->config_.durability_directory, this, &recovery_data,
snapshot_to_recover);
snapshot_to_recover, config.worker_id);
}
// Post-recovery setup and checking.
@ -916,9 +989,10 @@ std::vector<int> Worker::GetWorkerIds() const {
bool Worker::MakeSnapshot(GraphDbAccessor &accessor) {
// Makes a local snapshot from the visibility of accessor
const bool status = durability::MakeSnapshot(
*this, accessor, fs::path(impl_->config_.durability_directory),
impl_->config_.snapshot_max_retained);
const bool status =
durability::MakeSnapshot(*this, accessor, impl_->config_.worker_id,
fs::path(impl_->config_.durability_directory),
impl_->config_.snapshot_max_retained);
if (status) {
LOG(INFO) << "Snapshot created successfully.";
} else {
@ -938,8 +1012,9 @@ void Worker::ReinitializeStorage() {
}
void Worker::RecoverWalAndIndexes(durability::RecoveryData *recovery_data) {
WorkerRecoveryTransactions recovery_transactions(this);
durability::RecoverWalAndIndexes(impl_->config_.durability_directory, this,
recovery_data);
recovery_data, &recovery_transactions);
}
io::network::Endpoint Worker::endpoint() const {

View File

@ -197,6 +197,8 @@ class SingleNode {
SingleNode::SingleNode(Config config)
: impl_(std::make_unique<impl::SingleNode>(config)) {
CHECK(config.worker_id == 0)
<< "Worker ID should only be set in distributed GraphDb";
if (impl_->config_.durability_enabled)
utils::CheckDir(impl_->config_.durability_directory);
@ -210,14 +212,16 @@ SingleNode::SingleNode(Config config)
if (impl_->config_.db_recover_on_startup) {
recovery_info = durability::RecoverOnlySnapshot(
impl_->config_.durability_directory, this, &recovery_data,
std::experimental::nullopt);
std::experimental::nullopt, 0);
}
// Post-recovery setup and checking.
if (recovery_info) {
recovery_data.wal_tx_to_recover = recovery_info->wal_recovered;
SingleNodeRecoveryTransanctions recovery_transactions(this);
durability::RecoverWalAndIndexes(impl_->config_.durability_directory,
this, &recovery_data);
this, &recovery_data,
&recovery_transactions);
}
}
@ -301,7 +305,7 @@ void SingleNode::CollectGarbage() { impl_->storage_gc_->CollectGarbage(); }
bool SingleNode::MakeSnapshot(GraphDbAccessor &accessor) {
const bool status = durability::MakeSnapshot(
*this, accessor, fs::path(impl_->config_.durability_directory),
*this, accessor, 0, fs::path(impl_->config_.durability_directory),
impl_->config_.snapshot_max_retained);
if (status) {
LOG(INFO) << "Snapshot created successfully.";
@ -320,4 +324,39 @@ void SingleNode::ReinitializeStorage() {
*impl_->storage_, impl_->tx_engine_, impl_->config_.gc_cycle_sec);
}
SingleNodeRecoveryTransanctions::SingleNodeRecoveryTransanctions(SingleNode *db)
: db_(db) {}
SingleNodeRecoveryTransanctions::~SingleNodeRecoveryTransanctions() {}
void SingleNodeRecoveryTransanctions::Begin(const tx::TransactionId &tx_id) {
CHECK(accessors_.find(tx_id) == accessors_.end())
<< "Double transaction start";
accessors_.emplace(tx_id, db_->Access());
}
GraphDbAccessor *GetAccessor(
const std::unordered_map<tx::TransactionId,
std::unique_ptr<GraphDbAccessor>> &accessors,
const tx::TransactionId &tx_id) {
auto found = accessors.find(tx_id);
CHECK(found != accessors.end())
<< "Accessor does not exist for transaction: " << tx_id;
return found->second.get();
}
void SingleNodeRecoveryTransanctions::Abort(const tx::TransactionId &tx_id) {
GetAccessor(accessors_, tx_id)->Abort();
accessors_.erase(accessors_.find(tx_id));
}
void SingleNodeRecoveryTransanctions::Commit(const tx::TransactionId &tx_id) {
GetAccessor(accessors_, tx_id)->Commit();
accessors_.erase(accessors_.find(tx_id));
}
void SingleNodeRecoveryTransanctions::Apply(const database::StateDelta &delta) {
delta.Apply(*GetAccessor(accessors_, delta.transaction_id));
}
} // namespace database

View File

@ -7,6 +7,7 @@
#include "database/counters.hpp"
#include "database/storage.hpp"
#include "database/storage_gc.hpp"
#include "durability/recovery.hpp"
#include "durability/wal.hpp"
#include "io/network/endpoint.hpp"
#include "storage/concurrent_id_mapper.hpp"
@ -145,4 +146,21 @@ class SingleNode final : public GraphDb {
utils::Scheduler transaction_killer_;
};
class SingleNodeRecoveryTransanctions final
: public durability::RecoveryTransactions {
public:
explicit SingleNodeRecoveryTransanctions(SingleNode *db);
~SingleNodeRecoveryTransanctions();
void Begin(const tx::TransactionId &tx_id) override;
void Abort(const tx::TransactionId &tx_id) override;
void Commit(const tx::TransactionId &tx_id) override;
void Apply(const database::StateDelta &delta) override;
private:
SingleNode *db_;
std::unordered_map<tx::TransactionId, std::unique_ptr<GraphDbAccessor>>
accessors_;
};
} // namespace database

View File

@ -4,7 +4,6 @@
#include <limits>
#include <unordered_map>
#include "database/distributed_graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/indexes/label_property_index.hpp"
#include "durability/hashed_file_reader.hpp"
@ -45,7 +44,7 @@ using communication::bolt::Value;
}
bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb *db,
RecoveryData *recovery_data) {
RecoveryData *recovery_data, int worker_id) {
HashedFileReader reader;
SnapshotDecoder<HashedFileReader> decoder(reader);
@ -67,15 +66,6 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb *db,
RETURN_IF_NOT(decoder.ReadValue(&dv, Value::Type::Int) &&
dv.ValueInt() == durability::kVersion);
int worker_id = 0;
// TODO: Figure out a better solution for SingleNode recovery vs
// DistributedGraphDb.
if (auto *distributed_db =
dynamic_cast<database::DistributedGraphDb *>(db)) {
worker_id = distributed_db->WorkerId();
} else {
CHECK(dynamic_cast<database::SingleNode *>(db));
}
// Checks worker id was set correctly
RETURN_IF_NOT(decoder.ReadValue(&dv, Value::Type::Int) &&
dv.ValueInt() == worker_id);
@ -307,7 +297,8 @@ std::vector<tx::TransactionId> ReadWalRecoverableTransactions(
// TODO - finer-grained recovery feedback could be useful here.
bool RecoverWal(const fs::path &wal_dir, database::GraphDb *db,
RecoveryData *recovery_data) {
RecoveryData *recovery_data,
RecoveryTransactions *transactions) {
auto wal_files = GetWalFiles(wal_dir);
// Track which transaction should be recovered first, and define logic for
// which transactions should be skipped in recovery.
@ -326,27 +317,7 @@ bool RecoverWal(const fs::path &wal_dir, database::GraphDb *db,
return tx_id < first_to_recover ||
(tx_id < recovery_data->snapshooter_tx_id &&
!utils::Contains(tx_sn, tx_id)) ||
!utils::Contains(common_wal_tx, tx_id);
};
std::unordered_map<tx::TransactionId,
std::unique_ptr<database::GraphDbAccessor>>
accessors;
auto get_accessor =
[db, &accessors](tx::TransactionId tx_id) -> database::GraphDbAccessor & {
auto found = accessors.find(tx_id);
// Currently accessors are created on transaction_begin, but since workers
// don't have a transaction begin, the accessors are not created.
if (db->type() == database::GraphDb::Type::DISTRIBUTED_WORKER &&
found == accessors.end()) {
// TODO: Do we want to call db->Access with tx_id?
std::tie(found, std::ignore) = accessors.emplace(tx_id, db->Access());
}
CHECK(found != accessors.end())
<< "Accessor does not exist for transaction: " << tx_id;
return *found->second;
!utils::Contains(common_wal_tx, tx_id);
};
// Ensure that the next transaction ID in the recovered DB will be greater
@ -364,17 +335,13 @@ bool RecoverWal(const fs::path &wal_dir, database::GraphDb *db,
if (should_skip(delta.transaction_id)) return;
switch (delta.type) {
case database::StateDelta::Type::TRANSACTION_BEGIN:
CHECK(accessors.find(delta.transaction_id) == accessors.end())
<< "Double transaction start";
accessors.emplace(delta.transaction_id, db->Access());
transactions->Begin(delta.transaction_id);
break;
case database::StateDelta::Type::TRANSACTION_ABORT:
get_accessor(delta.transaction_id).Abort();
accessors.erase(accessors.find(delta.transaction_id));
transactions->Abort(delta.transaction_id);
break;
case database::StateDelta::Type::TRANSACTION_COMMIT:
get_accessor(delta.transaction_id).Commit();
accessors.erase(accessors.find(delta.transaction_id));
transactions->Commit(delta.transaction_id);
break;
case database::StateDelta::Type::BUILD_INDEX:
// TODO index building might still be problematic in HA
@ -382,7 +349,7 @@ bool RecoverWal(const fs::path &wal_dir, database::GraphDb *db,
delta.property_name);
break;
default:
delta.Apply(get_accessor(delta.transaction_id));
transactions->Apply(delta);
}
});
@ -400,7 +367,8 @@ bool RecoverWal(const fs::path &wal_dir, database::GraphDb *db,
RecoveryInfo RecoverOnlySnapshot(
const fs::path &durability_dir, database::GraphDb *db,
RecoveryData *recovery_data,
std::experimental::optional<tx::TransactionId> required_snapshot_tx_id) {
std::experimental::optional<tx::TransactionId> required_snapshot_tx_id,
int worker_id) {
// Attempt to recover from snapshot files in reverse order (from newest
// backwards).
const auto snapshot_dir = durability_dir / kSnapshotDir;
@ -422,7 +390,7 @@ RecoveryInfo RecoverOnlySnapshot(
}
}
LOG(INFO) << "Starting snapshot recovery from: " << snapshot_file;
if (!RecoverSnapshot(snapshot_file, db, recovery_data)) {
if (!RecoverSnapshot(snapshot_file, db, recovery_data, worker_id)) {
db->ReinitializeStorage();
recovery_data->Clear();
LOG(WARNING) << "Snapshot recovery failed, trying older snapshot...";
@ -445,12 +413,13 @@ RecoveryInfo RecoverOnlySnapshot(
}
void RecoverWalAndIndexes(const fs::path &durability_dir, database::GraphDb *db,
RecoveryData *recovery_data) {
RecoveryData *recovery_data,
RecoveryTransactions *transactions) {
// Write-ahead-log recovery.
// WAL recovery does not have to be complete for the recovery to be
// considered successful. For the time being ignore the return value,
// consider a better system.
RecoverWal(durability_dir / kWalDir, db, recovery_data);
RecoverWal(durability_dir / kWalDir, db, recovery_data, transactions);
// Index recovery.
auto db_accessor_indices = db->Access();

View File

@ -122,8 +122,22 @@ bool ReadSnapshotSummary(HashedFileReader &buffer, int64_t &vertex_count,
RecoveryInfo RecoverOnlySnapshot(
const std::experimental::filesystem::path &durability_dir,
database::GraphDb *db, durability::RecoveryData *recovery_data,
std::experimental::optional<tx::TransactionId> required_snapshot_tx_id);
std::experimental::optional<tx::TransactionId> required_snapshot_tx_id,
int worker_id);
/** Interface for accessing transactions during WAL recovery. */
class RecoveryTransactions {
public:
virtual ~RecoveryTransactions() {}
virtual void Begin(const tx::TransactionId &) = 0;
virtual void Abort(const tx::TransactionId &) = 0;
virtual void Commit(const tx::TransactionId &) = 0;
virtual void Apply(const database::StateDelta &) = 0;
};
void RecoverWalAndIndexes(const std::experimental::filesystem::path &dir,
database::GraphDb *db, RecoveryData *recovery_data);
database::GraphDb *db, RecoveryData *recovery_data,
RecoveryTransactions *transactions);
} // namespace durability

View File

@ -4,7 +4,6 @@
#include <glog/logging.h>
#include "database/distributed_graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "durability/hashed_file_writer.hpp"
#include "durability/paths.hpp"
@ -22,7 +21,7 @@ static_assert(durability::kVersion == 6,
namespace {
bool Encode(const fs::path &snapshot_file, database::GraphDb &db,
database::GraphDbAccessor &dba) {
database::GraphDbAccessor &dba, int worker_id) {
try {
HashedFileWriter buffer(snapshot_file);
SnapshotEncoder<HashedFileWriter> encoder(buffer);
@ -32,15 +31,6 @@ bool Encode(const fs::path &snapshot_file, database::GraphDb &db,
durability::kMagicNumber.size());
encoder.WriteInt(durability::kVersion);
int worker_id = 0;
// TODO: Figure out a better solution for SingleNode recovery vs
// DistributedGraphDb.
if (auto *distributed_db =
dynamic_cast<database::DistributedGraphDb *>(&dba.db())) {
worker_id = distributed_db->WorkerId();
} else {
CHECK(dynamic_cast<database::SingleNode *>(&dba.db()));
}
// Writes the worker id to snapshot, used to guarantee consistent cluster
// state after recovery
encoder.WriteInt(worker_id);
@ -132,22 +122,13 @@ void RemoveOldWals(const fs::path &wal_dir,
} // namespace
bool MakeSnapshot(database::GraphDb &db, database::GraphDbAccessor &dba,
const fs::path &durability_dir,
const int snapshot_max_retained) {
int worker_id, const fs::path &durability_dir,
int snapshot_max_retained) {
if (!utils::EnsureDir(durability_dir / kSnapshotDir)) return false;
int worker_id = 0;
// TODO: Figure out a better solution for SingleNode recovery vs
// DistributedGraphDb.
if (auto *distributed_db =
dynamic_cast<database::DistributedGraphDb *>(&db)) {
worker_id = distributed_db->WorkerId();
} else {
CHECK(dynamic_cast<database::SingleNode *>(&db));
}
const auto snapshot_file =
MakeSnapshotPath(durability_dir, worker_id, dba.transaction_id());
if (fs::exists(snapshot_file)) return false;
if (Encode(snapshot_file, db, dba)) {
if (Encode(snapshot_file, db, dba, worker_id)) {
RemoveOldSnapshots(durability_dir / kSnapshotDir, snapshot_max_retained);
RemoveOldWals(durability_dir / kWalDir, dba.transaction());
return true;
@ -157,4 +138,5 @@ bool MakeSnapshot(database::GraphDb &db, database::GraphDbAccessor &dba,
return false;
}
}
} // namespace durability

View File

@ -14,6 +14,8 @@ namespace durability {
* @param snapshot_max_retained - maximum number of snapshots to retain.
*/
bool MakeSnapshot(database::GraphDb &db, database::GraphDbAccessor &dba,
int worker_id,
const std::experimental::filesystem::path &durability_dir,
int snapshot_max_retained);
} // namespace durability

View File

@ -131,7 +131,8 @@ TEST_F(DistributedDurability, RecoveryFailure) {
AddVertices();
// Make a snapshot on the master without the right snapshots on workers.
auto dba = master().Access();
bool status = durability::MakeSnapshot(master(), *dba, tmp_dir_, 100);
bool status = durability::MakeSnapshot(master(), *dba, master().WorkerId(),
tmp_dir_, 100);
ASSERT_TRUE(status);
}
::testing::FLAGS_gtest_death_test_style = "threadsafe";

View File

@ -303,9 +303,10 @@ class Durability : public ::testing::Test {
return config;
}
void MakeSnapshot(database::GraphDb &db, int snapshot_max_retained = -1) {
void MakeSnapshot(int worker_id, database::GraphDb &db,
int snapshot_max_retained = -1) {
auto dba = db.Access();
ASSERT_TRUE(durability::MakeSnapshot(db, *dba, durability_dir_,
ASSERT_TRUE(durability::MakeSnapshot(db, *dba, worker_id, durability_dir_,
snapshot_max_retained));
}
@ -415,7 +416,7 @@ TEST_F(Durability, SnapshotEncoding) {
ASSERT_EQ(e1.gid(), gid1);
dba->BuildIndex(dba->Label("l1"), dba->Property("p1"));
dba->Commit();
MakeSnapshot(db);
MakeSnapshot(0, db);
}
auto snapshot = GetLastFile(snapshot_dir_);
@ -512,7 +513,7 @@ TEST_F(Durability, SnapshotRecovery) {
MakeDb(db, 300, {0, 1, 2});
MakeDb(db, 300);
MakeDb(db, 300, {3, 4});
MakeSnapshot(db);
MakeSnapshot(0, db);
{
auto recovered_config = DbConfig();
recovered_config.db_recover_on_startup = true;
@ -534,7 +535,7 @@ TEST_F(Durability, SnapshotNoVerticesIdRecovery) {
dba->Commit();
}
MakeSnapshot(db);
MakeSnapshot(0, db);
{
auto recovered_config = DbConfig();
recovered_config.db_recover_on_startup = true;
@ -551,7 +552,7 @@ TEST_F(Durability, SnapshotAndWalIdRecovery) {
config.durability_enabled = true;
database::SingleNode db{config};
MakeDb(db, 300);
MakeSnapshot(db);
MakeSnapshot(0, db);
MakeDb(db, 300);
db.wal().Flush();
ASSERT_EQ(DirFiles(snapshot_dir_).size(), 1);
@ -612,7 +613,7 @@ TEST_F(Durability, SnapshotAndWalRecovery) {
database::SingleNode db{config};
MakeDb(db, 300, {0, 1, 2});
MakeDb(db, 300);
MakeSnapshot(db);
MakeSnapshot(0, db);
MakeDb(db, 300, {3, 4});
MakeDb(db, 300);
MakeDb(db, 300, {5});
@ -648,7 +649,7 @@ TEST_F(Durability, SnapshotAndWalRecoveryAfterComplexTxSituation) {
MakeDb(db.wal(), *dba_3, 100);
dba_3->Commit();
MakeSnapshot(db); // Snapshooter takes the fourth transaction.
MakeSnapshot(0, db); // Snapshooter takes the fourth transaction.
dba_2->Commit();
// The fifth transaction starts and commits after snapshot.
@ -710,7 +711,7 @@ TEST_F(Durability, SnapshotRetention) {
// Track the added snapshots to ensure the correct ones are pruned.
std::unordered_set<std::string> snapshots;
for (int i = 0; i < count; ++i) {
MakeSnapshot(db, retain);
MakeSnapshot(0, db, retain);
auto latest = GetLastFile(snapshot_dir_);
snapshots.emplace(GetLastFile(snapshot_dir_));
// Ensures that the latest snapshot was not in the snapshots collection
@ -730,13 +731,13 @@ TEST_F(Durability, WalRetention) {
config.durability_enabled = true;
database::SingleNode db{config};
MakeDb(db, 100);
MakeSnapshot(db);
MakeSnapshot(0, db);
MakeDb(db, 100);
EXPECT_EQ(DirFiles(snapshot_dir_).size(), 1);
db.wal().Flush();
// 1 current WAL file, plus retained ones
EXPECT_GT(DirFiles(wal_dir_).size(), 1);
MakeSnapshot(db);
MakeSnapshot(0, db);
db.wal().Flush();
}
@ -762,7 +763,7 @@ TEST_F(Durability, WorkerIdRecovery) {
config.recovering_cluster_size = 1;
database::Master db{config};
MakeDb(db, 100);
MakeSnapshot(db);
MakeSnapshot(db.WorkerId(), db);
EXPECT_EQ(DirFiles(snapshot_dir_).size(), 1);
// WorkerIds are equal and recovery should be sucessful
@ -830,20 +831,20 @@ TEST_F(Durability, SequentialRecovery) {
return threads;
};
auto make_updates = [&run_updates, this](database::GraphDb &db,
auto make_updates = [&run_updates, this](database::SingleNode &db,
bool snapshot_during,
bool snapshot_after) {
std::atomic<bool> keep_running{true};
auto update_theads = run_updates(db, keep_running);
std::this_thread::sleep_for(25ms);
if (snapshot_during) {
MakeSnapshot(db);
MakeSnapshot(0, db);
}
std::this_thread::sleep_for(25ms);
keep_running = false;
for (auto &t : update_theads) t.join();
if (snapshot_after) {
MakeSnapshot(db);
MakeSnapshot(0, db);
}
db.wal().Flush();

View File

@ -23,8 +23,10 @@ class RecoveryTest : public ::testing::Test {
std::string durability_dir(FLAGS_durability_dir);
durability::RecoveryData recovery_data;
durability::RecoverOnlySnapshot(durability_dir, &db_, &recovery_data,
std::experimental::nullopt);
durability::RecoverWalAndIndexes(durability_dir, &db_, &recovery_data);
std::experimental::nullopt, 0);
database::SingleNodeRecoveryTransanctions recovery_transactions(&db_);
durability::RecoverWalAndIndexes(durability_dir, &db_, &recovery_data,
&recovery_transactions);
}
database::SingleNode db_;