Fix recovery bug (transaction ID bumping)

Summary:
When performing recovery, ensure that the transaction ID in engine is
bumped to one after the max tx id seen in recovery.

Reviewers: dgleich

Reviewed By: dgleich

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1312
This commit is contained in:
florijan 2018-03-22 10:44:38 +01:00
parent b9c5af2568
commit c826fa6640
13 changed files with 158 additions and 10 deletions

View File

@ -45,6 +45,8 @@ BOOST_CLASS_EXPORT(tx::GcSnapshotReq);
BOOST_CLASS_EXPORT(tx::ClogInfoReq); BOOST_CLASS_EXPORT(tx::ClogInfoReq);
BOOST_CLASS_EXPORT(tx::ClogInfoRes); BOOST_CLASS_EXPORT(tx::ClogInfoRes);
BOOST_CLASS_EXPORT(tx::ActiveTransactionsReq); BOOST_CLASS_EXPORT(tx::ActiveTransactionsReq);
BOOST_CLASS_EXPORT(tx::EnsureNextIdGreaterReq);
BOOST_CLASS_EXPORT(tx::EnsureNextIdGreaterRes);
// Distributed coordination. // Distributed coordination.
BOOST_CLASS_EXPORT(distributed::RegisterWorkerReq); BOOST_CLASS_EXPORT(distributed::RegisterWorkerReq);

View File

@ -226,6 +226,14 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db,
vertex->out_ = iterate_and_transform(vertex->out_); vertex->out_ = iterate_and_transform(vertex->out_);
} }
// Ensure that the next transaction ID in the recovered DB will be greater
// than the latest one we have recovered. Do this to make sure that
// subsequently created snapshots and WAL files will have transactional info
// that does not interfere with that found in previous snapshots and WAL.
tx::transaction_id_t max_id = recovery_data.snapshooter_tx_id;
auto &snap = recovery_data.snapshooter_tx_snapshot;
if (!snap.empty()) max_id = *std::max_element(snap.begin(), snap.end());
dba.db().tx_engine().EnsureNextIdGreater(max_id);
dba.Commit(); dba.Commit();
return true; return true;
} }
@ -259,15 +267,21 @@ bool RecoverWal(const fs::path &wal_dir, database::GraphDb &db,
[&accessors](tx::transaction_id_t tx_id) -> database::GraphDbAccessor & { [&accessors](tx::transaction_id_t tx_id) -> database::GraphDbAccessor & {
auto found = accessors.find(tx_id); auto found = accessors.find(tx_id);
CHECK(found != accessors.end()) CHECK(found != accessors.end())
<< "Accessor does not exist for transaction"; << "Accessor does not exist for transaction: " << tx_id;
return found->second; return found->second;
}; };
// Ensure that the next transaction ID in the recovered DB will be greater
// then the latest one we have recovered. Do this to make sure that
// subsequently created snapshots and WAL files will have transactional info
// that does not interfere with that found in previous snapshots and WAL.
tx::transaction_id_t max_observed_tx_id{0};
// Read all the WAL files whose max_tx_id is not smaller than // Read all the WAL files whose max_tx_id is not smaller than
// min_tx_to_recover. // min_tx_to_recover.
for (auto &wal_file : wal_files) { for (auto &wal_file : wal_files) {
auto wal_file_tx_id = TransactionIdFromWalFilename(wal_file.filename()); auto wal_file_max_tx_id = TransactionIdFromWalFilename(wal_file.filename());
if (!wal_file_tx_id || *wal_file_tx_id < first_to_recover) continue; if (!wal_file_max_tx_id || *wal_file_max_tx_id < first_to_recover) continue;
HashedFileReader wal_reader; HashedFileReader wal_reader;
if (!wal_reader.Open(wal_file)) return false; if (!wal_reader.Open(wal_file)) return false;
@ -275,6 +289,7 @@ bool RecoverWal(const fs::path &wal_dir, database::GraphDb &db,
while (true) { while (true) {
auto delta = database::StateDelta::Decode(wal_reader, decoder); auto delta = database::StateDelta::Decode(wal_reader, decoder);
if (!delta) break; if (!delta) break;
max_observed_tx_id = std::max(max_observed_tx_id, delta->transaction_id);
if (should_skip(delta->transaction_id)) continue; if (should_skip(delta->transaction_id)) continue;
switch (delta->type) { switch (delta->type) {
case database::StateDelta::Type::TRANSACTION_BEGIN: case database::StateDelta::Type::TRANSACTION_BEGIN:
@ -305,6 +320,8 @@ bool RecoverWal(const fs::path &wal_dir, database::GraphDb &db,
// - WAL fully recovered // - WAL fully recovered
// - WAL partially recovered // - WAL partially recovered
// - WAL recovery error // - WAL recovery error
db.tx_engine().EnsureNextIdGreater(max_observed_tx_id);
return true; return true;
} }
} // anonymous namespace } // anonymous namespace

View File

@ -83,6 +83,10 @@ class Engine {
/** Gets a transaction object for a running transaction. */ /** Gets a transaction object for a running transaction. */
virtual tx::Transaction *RunningTransaction(transaction_id_t tx_id) = 0; virtual tx::Transaction *RunningTransaction(transaction_id_t tx_id) = 0;
/** Ensures the next transaction that starts will have the ID greater than
* the given id. */
virtual void EnsureNextIdGreater(transaction_id_t tx_id) = 0;
auto &local_lock_graph() { return local_lock_graph_; } auto &local_lock_graph() { return local_lock_graph_; }
const auto &local_lock_graph() const { return local_lock_graph_; } const auto &local_lock_graph() const { return local_lock_graph_; }

View File

@ -57,5 +57,11 @@ MasterEngine::MasterEngine(communication::rpc::Server &server,
[this](const communication::rpc::Message &) { [this](const communication::rpc::Message &) {
return std::make_unique<SnapshotRes>(GlobalActiveTransactions()); return std::make_unique<SnapshotRes>(GlobalActiveTransactions());
}); });
rpc_server_.Register<EnsureNextIdGreaterRpc>(
[this](const EnsureNextIdGreaterReq &req) {
EnsureNextIdGreater(req.member);
return std::make_unique<EnsureNextIdGreaterRes>();
});
} }
} // namespace tx } // namespace tx

View File

@ -57,4 +57,10 @@ RPC_NO_MEMBER_MESSAGE(ActiveTransactionsReq)
using ActiveTransactionsRpc = using ActiveTransactionsRpc =
communication::rpc::RequestResponse<ActiveTransactionsReq, SnapshotRes>; communication::rpc::RequestResponse<ActiveTransactionsReq, SnapshotRes>;
RPC_SINGLE_MEMBER_MESSAGE(EnsureNextIdGreaterReq, transaction_id_t);
RPC_NO_MEMBER_MESSAGE(EnsureNextIdGreaterRes);
using EnsureNextIdGreaterRpc =
communication::rpc::RequestResponse<EnsureNextIdGreaterReq,
EnsureNextIdGreaterRes>;
} // namespace tx } // namespace tx

View File

@ -95,7 +95,10 @@ Snapshot SingleNodeEngine::GlobalActiveTransactions() {
return active_transactions; return active_transactions;
} }
transaction_id_t SingleNodeEngine::LocalLast() const { return counter_.load(); } transaction_id_t SingleNodeEngine::LocalLast() const {
std::lock_guard<SpinLock> guard(lock_);
return counter_;
}
transaction_id_t SingleNodeEngine::LocalOldestActive() const { transaction_id_t SingleNodeEngine::LocalOldestActive() const {
std::lock_guard<SpinLock> guard(lock_); std::lock_guard<SpinLock> guard(lock_);
@ -117,4 +120,10 @@ Transaction *SingleNodeEngine::RunningTransaction(transaction_id_t tx_id) {
<< "Can't return snapshot for an inactive transaction"; << "Can't return snapshot for an inactive transaction";
return found->second.get(); return found->second.get();
} }
void SingleNodeEngine::EnsureNextIdGreater(transaction_id_t tx_id) {
std::lock_guard<SpinLock> guard(lock_);
counter_ = std::max(tx_id, counter_);
}
} // namespace tx } // namespace tx

View File

@ -1,6 +1,5 @@
#pragma once #pragma once
#include <atomic>
#include <experimental/optional> #include <experimental/optional>
#include <unordered_map> #include <unordered_map>
@ -37,14 +36,15 @@ class SingleNodeEngine : public Engine {
CommitLog::Info Info(transaction_id_t tx) const override; CommitLog::Info Info(transaction_id_t tx) const override;
Snapshot GlobalGcSnapshot() override; Snapshot GlobalGcSnapshot() override;
Snapshot GlobalActiveTransactions() override; Snapshot GlobalActiveTransactions() override;
tx::transaction_id_t LocalLast() const override; transaction_id_t LocalLast() const override;
transaction_id_t LocalOldestActive() const override; transaction_id_t LocalOldestActive() const override;
void LocalForEachActiveTransaction( void LocalForEachActiveTransaction(
std::function<void(Transaction &)> f) override; std::function<void(Transaction &)> f) override;
tx::Transaction *RunningTransaction(tx::transaction_id_t tx_id) override; Transaction *RunningTransaction(transaction_id_t tx_id) override;
void EnsureNextIdGreater(transaction_id_t tx_id) override;
private: private:
std::atomic<transaction_id_t> counter_{0}; transaction_id_t counter_{0};
CommitLog clog_; CommitLog clog_;
std::unordered_map<transaction_id_t, std::unique_ptr<Transaction>> store_; std::unordered_map<transaction_id_t, std::unique_ptr<Transaction>> store_;
Snapshot active_; Snapshot active_;

View File

@ -175,4 +175,8 @@ void WorkerEngine::UpdateOldestActive(const Snapshot &snapshot,
oldest_active_.store(snapshot.front()); oldest_active_.store(snapshot.front());
} }
} }
void WorkerEngine::EnsureNextIdGreater(transaction_id_t tx_id) {
master_client_pool_.Call<EnsureNextIdGreaterRpc>(tx_id);
}
} // namespace tx } // namespace tx

View File

@ -20,7 +20,7 @@ class WorkerEngine : public Engine {
/// expired on the master. /// expired on the master.
static constexpr std::chrono::seconds kCacheReleasePeriod{1}; static constexpr std::chrono::seconds kCacheReleasePeriod{1};
WorkerEngine(communication::rpc::ClientPool &master_client_pool); explicit WorkerEngine(communication::rpc::ClientPool &master_client_pool);
~WorkerEngine(); ~WorkerEngine();
Transaction *Begin() override; Transaction *Begin() override;
@ -41,6 +41,8 @@ class WorkerEngine : public Engine {
Transaction *RunningTransaction(transaction_id_t tx_id, Transaction *RunningTransaction(transaction_id_t tx_id,
const Snapshot &snapshot); const Snapshot &snapshot);
void EnsureNextIdGreater(transaction_id_t tx_id) override;
/// Clears the cache of local transactions that have expired. The signature of /// Clears the cache of local transactions that have expired. The signature of
/// this method is dictated by `distributed::TransactionalCacheCleaner`. /// this method is dictated by `distributed::TransactionalCacheCleaner`.
void ClearTransactionalCache(transaction_id_t oldest_active) const; void ClearTransactionalCache(transaction_id_t oldest_active) const;

View File

@ -17,7 +17,7 @@ void EnsureAtomicGe(std::atomic<TValue> &atomic, TValue value) {
while (true) { while (true) {
auto current = atomic.load(); auto current = atomic.load();
if (current >= value) break; if (current >= value) break;
if (atomic.compare_exchange_strong(current, value)) break; if (atomic.compare_exchange_weak(current, value)) break;
} }
} }
} // namespace utils } // namespace utils

View File

@ -2,8 +2,10 @@
#include <experimental/optional> #include <experimental/optional>
#include <functional> #include <functional>
#include <random> #include <random>
#include <thread>
#include <unordered_map> #include <unordered_map>
#include <unordered_set> #include <unordered_set>
#include <vector>
#include "gflags/gflags.h" #include "gflags/gflags.h"
#include "glog/logging.h" #include "glog/logging.h"
@ -780,3 +782,85 @@ TEST_F(Durability, WorkerIdRecovery) {
EXPECT_EQ(dba.EdgesCount(), 0); EXPECT_EQ(dba.EdgesCount(), 0);
} }
} }
TEST_F(Durability, SequentialRecovery) {
const int kNumWorkers = 6;
const int kNumVertices = 1000;
auto random_int = [](int upper_exclusive) {
static thread_local std::mt19937 pseudo_rand_gen{std::random_device{}()};
static thread_local std::uniform_int_distribution<int> rand_dist;
return rand_dist(pseudo_rand_gen) % upper_exclusive;
};
auto init_db = [](database::GraphDb &db) {
database::GraphDbAccessor dba{db};
for (int i = 0; i < kNumVertices; ++i) dba.InsertVertex(i);
dba.Commit();
};
auto run_updates = [&random_int](database::GraphDb &db,
std::atomic<bool> &keep_running) {
std::vector<std::thread> threads;
for (int i = 0; i < kNumWorkers; ++i) {
threads.emplace_back([&random_int, &db, &keep_running]() {
while (keep_running) {
database::GraphDbAccessor dba{db};
auto v = dba.FindVertex(random_int(kNumVertices), false);
try {
v.PropsSet(dba.Property("prop"), random_int(100));
} catch (LockTimeoutException &) {
} catch (mvcc::SerializationError &) {
}
dba.InsertVertex();
dba.Commit();
}
});
}
return threads;
};
auto make_updates = [&run_updates, this](
database::GraphDb &db, bool snapshot_during, bool snapshot_after) {
std::atomic<bool> keep_running{true};
auto update_theads = run_updates(db, keep_running);
std::this_thread::sleep_for(25ms);
if (snapshot_during) {
MakeSnapshot(db);
}
std::this_thread::sleep_for(25ms);
keep_running = false;
for (auto &t : update_theads) t.join();
if (snapshot_after) {
MakeSnapshot(db);
}
// Sleep to ensure the WAL gets flushed.
std::this_thread::sleep_for(std::chrono::milliseconds(25ms));
};
const std::vector<std::pair<bool, bool>> combinations{{0, 0}, {1, 0}, {0, 1}};
for (auto &combo : combinations) {
CleanDurability();
auto config = DbConfig();
config.durability_enabled = true;
database::SingleNode db{config};
init_db(db);
make_updates(db, combo.first, combo.second);
{
auto recovered_config = DbConfig();
recovered_config.db_recover_on_startup = true;
recovered_config.durability_enabled = true;
database::SingleNode recovered{recovered_config};
CompareDbs(db, recovered);
{
for (auto &combo2 : combinations) {
make_updates(recovered, combo2.first, combo2.second);
database::SingleNode recovered_2{recovered_config};
CompareDbs(recovered, recovered_2);
}
}
}
}
}

View File

@ -126,3 +126,10 @@ TEST_F(WorkerEngineTest, LocalForEachActiveTransaction) {
[&local](Transaction &t) { local.insert(t.id_); }); [&local](Transaction &t) { local.insert(t.id_); });
EXPECT_EQ(local, std::unordered_set<tx::transaction_id_t>({1, 4})); EXPECT_EQ(local, std::unordered_set<tx::transaction_id_t>({1, 4}));
} }
TEST_F(WorkerEngineTest, EnsureTxIdGreater) {
ASSERT_LE(master_.Begin()->id_, 40);
worker_.EnsureNextIdGreater(42);
EXPECT_EQ(master_.Begin()->id_, 43);
EXPECT_EQ(worker_.Begin()->id_, 44);
}

View File

@ -74,3 +74,10 @@ TEST(Engine, RunningTransaction) {
EXPECT_NE(t1, engine.RunningTransaction(t0->id_)); EXPECT_NE(t1, engine.RunningTransaction(t0->id_));
EXPECT_EQ(t1, engine.RunningTransaction(t1->id_)); EXPECT_EQ(t1, engine.RunningTransaction(t1->id_));
} }
TEST(Engine, EnsureTxIdGreater) {
SingleNodeEngine engine;
ASSERT_LE(engine.Begin()->id_, 40);
engine.EnsureNextIdGreater(42);
EXPECT_EQ(engine.Begin()->id_, 43);
}