diff --git a/src/communication/rpc/messages-inl.hpp b/src/communication/rpc/messages-inl.hpp index 8bbf9a5c6..80ffaa915 100644 --- a/src/communication/rpc/messages-inl.hpp +++ b/src/communication/rpc/messages-inl.hpp @@ -45,6 +45,8 @@ BOOST_CLASS_EXPORT(tx::GcSnapshotReq); BOOST_CLASS_EXPORT(tx::ClogInfoReq); BOOST_CLASS_EXPORT(tx::ClogInfoRes); BOOST_CLASS_EXPORT(tx::ActiveTransactionsReq); +BOOST_CLASS_EXPORT(tx::EnsureNextIdGreaterReq); +BOOST_CLASS_EXPORT(tx::EnsureNextIdGreaterRes); // Distributed coordination. BOOST_CLASS_EXPORT(distributed::RegisterWorkerReq); diff --git a/src/durability/recovery.cpp b/src/durability/recovery.cpp index 7580e6751..592c3716c 100644 --- a/src/durability/recovery.cpp +++ b/src/durability/recovery.cpp @@ -226,6 +226,14 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db, vertex->out_ = iterate_and_transform(vertex->out_); } + // Ensure that the next transaction ID in the recovered DB will be greater + // than the latest one we have recovered. Do this to make sure that + // subsequently created snapshots and WAL files will have transactional info + // that does not interfere with that found in previous snapshots and WAL. + tx::transaction_id_t max_id = recovery_data.snapshooter_tx_id; + auto &snap = recovery_data.snapshooter_tx_snapshot; + if (!snap.empty()) max_id = *std::max_element(snap.begin(), snap.end()); + dba.db().tx_engine().EnsureNextIdGreater(max_id); dba.Commit(); return true; } @@ -259,15 +267,21 @@ bool RecoverWal(const fs::path &wal_dir, database::GraphDb &db, [&accessors](tx::transaction_id_t tx_id) -> database::GraphDbAccessor & { auto found = accessors.find(tx_id); CHECK(found != accessors.end()) - << "Accessor does not exist for transaction"; + << "Accessor does not exist for transaction: " << tx_id; return found->second; }; + // Ensure that the next transaction ID in the recovered DB will be greater + // then the latest one we have recovered. Do this to make sure that + // subsequently created snapshots and WAL files will have transactional info + // that does not interfere with that found in previous snapshots and WAL. + tx::transaction_id_t max_observed_tx_id{0}; + // Read all the WAL files whose max_tx_id is not smaller than // min_tx_to_recover. for (auto &wal_file : wal_files) { - auto wal_file_tx_id = TransactionIdFromWalFilename(wal_file.filename()); - if (!wal_file_tx_id || *wal_file_tx_id < first_to_recover) continue; + auto wal_file_max_tx_id = TransactionIdFromWalFilename(wal_file.filename()); + if (!wal_file_max_tx_id || *wal_file_max_tx_id < first_to_recover) continue; HashedFileReader wal_reader; if (!wal_reader.Open(wal_file)) return false; @@ -275,6 +289,7 @@ bool RecoverWal(const fs::path &wal_dir, database::GraphDb &db, while (true) { auto delta = database::StateDelta::Decode(wal_reader, decoder); if (!delta) break; + max_observed_tx_id = std::max(max_observed_tx_id, delta->transaction_id); if (should_skip(delta->transaction_id)) continue; switch (delta->type) { case database::StateDelta::Type::TRANSACTION_BEGIN: @@ -305,6 +320,8 @@ bool RecoverWal(const fs::path &wal_dir, database::GraphDb &db, // - WAL fully recovered // - WAL partially recovered // - WAL recovery error + + db.tx_engine().EnsureNextIdGreater(max_observed_tx_id); return true; } } // anonymous namespace diff --git a/src/transactions/engine.hpp b/src/transactions/engine.hpp index fdd3cba95..5e6c6c8a2 100644 --- a/src/transactions/engine.hpp +++ b/src/transactions/engine.hpp @@ -83,6 +83,10 @@ class Engine { /** Gets a transaction object for a running transaction. */ virtual tx::Transaction *RunningTransaction(transaction_id_t tx_id) = 0; + /** Ensures the next transaction that starts will have the ID greater than + * the given id. */ + virtual void EnsureNextIdGreater(transaction_id_t tx_id) = 0; + auto &local_lock_graph() { return local_lock_graph_; } const auto &local_lock_graph() const { return local_lock_graph_; } diff --git a/src/transactions/engine_master.cpp b/src/transactions/engine_master.cpp index 9c7aeac16..f1772f227 100644 --- a/src/transactions/engine_master.cpp +++ b/src/transactions/engine_master.cpp @@ -57,5 +57,11 @@ MasterEngine::MasterEngine(communication::rpc::Server &server, [this](const communication::rpc::Message &) { return std::make_unique(GlobalActiveTransactions()); }); + + rpc_server_.Register( + [this](const EnsureNextIdGreaterReq &req) { + EnsureNextIdGreater(req.member); + return std::make_unique(); + }); } } // namespace tx diff --git a/src/transactions/engine_rpc_messages.hpp b/src/transactions/engine_rpc_messages.hpp index eab5b8a8b..bec33401f 100644 --- a/src/transactions/engine_rpc_messages.hpp +++ b/src/transactions/engine_rpc_messages.hpp @@ -57,4 +57,10 @@ RPC_NO_MEMBER_MESSAGE(ActiveTransactionsReq) using ActiveTransactionsRpc = communication::rpc::RequestResponse; +RPC_SINGLE_MEMBER_MESSAGE(EnsureNextIdGreaterReq, transaction_id_t); +RPC_NO_MEMBER_MESSAGE(EnsureNextIdGreaterRes); +using EnsureNextIdGreaterRpc = + communication::rpc::RequestResponse; + } // namespace tx diff --git a/src/transactions/engine_single_node.cpp b/src/transactions/engine_single_node.cpp index c95fa1179..98ee613f9 100644 --- a/src/transactions/engine_single_node.cpp +++ b/src/transactions/engine_single_node.cpp @@ -95,7 +95,10 @@ Snapshot SingleNodeEngine::GlobalActiveTransactions() { return active_transactions; } -transaction_id_t SingleNodeEngine::LocalLast() const { return counter_.load(); } +transaction_id_t SingleNodeEngine::LocalLast() const { + std::lock_guard guard(lock_); + return counter_; +} transaction_id_t SingleNodeEngine::LocalOldestActive() const { std::lock_guard guard(lock_); @@ -117,4 +120,10 @@ Transaction *SingleNodeEngine::RunningTransaction(transaction_id_t tx_id) { << "Can't return snapshot for an inactive transaction"; return found->second.get(); } + +void SingleNodeEngine::EnsureNextIdGreater(transaction_id_t tx_id) { + std::lock_guard guard(lock_); + counter_ = std::max(tx_id, counter_); +} + } // namespace tx diff --git a/src/transactions/engine_single_node.hpp b/src/transactions/engine_single_node.hpp index 4f1e7ad9e..77b489081 100644 --- a/src/transactions/engine_single_node.hpp +++ b/src/transactions/engine_single_node.hpp @@ -1,6 +1,5 @@ #pragma once -#include #include #include @@ -37,14 +36,15 @@ class SingleNodeEngine : public Engine { CommitLog::Info Info(transaction_id_t tx) const override; Snapshot GlobalGcSnapshot() override; Snapshot GlobalActiveTransactions() override; - tx::transaction_id_t LocalLast() const override; + transaction_id_t LocalLast() const override; transaction_id_t LocalOldestActive() const override; void LocalForEachActiveTransaction( std::function f) override; - tx::Transaction *RunningTransaction(tx::transaction_id_t tx_id) override; + Transaction *RunningTransaction(transaction_id_t tx_id) override; + void EnsureNextIdGreater(transaction_id_t tx_id) override; private: - std::atomic counter_{0}; + transaction_id_t counter_{0}; CommitLog clog_; std::unordered_map> store_; Snapshot active_; diff --git a/src/transactions/engine_worker.cpp b/src/transactions/engine_worker.cpp index 2806b975e..9b4660f3b 100644 --- a/src/transactions/engine_worker.cpp +++ b/src/transactions/engine_worker.cpp @@ -175,4 +175,8 @@ void WorkerEngine::UpdateOldestActive(const Snapshot &snapshot, oldest_active_.store(snapshot.front()); } } + +void WorkerEngine::EnsureNextIdGreater(transaction_id_t tx_id) { + master_client_pool_.Call(tx_id); +} } // namespace tx diff --git a/src/transactions/engine_worker.hpp b/src/transactions/engine_worker.hpp index 5e2666989..c4d9cab20 100644 --- a/src/transactions/engine_worker.hpp +++ b/src/transactions/engine_worker.hpp @@ -20,7 +20,7 @@ class WorkerEngine : public Engine { /// expired on the master. static constexpr std::chrono::seconds kCacheReleasePeriod{1}; - WorkerEngine(communication::rpc::ClientPool &master_client_pool); + explicit WorkerEngine(communication::rpc::ClientPool &master_client_pool); ~WorkerEngine(); Transaction *Begin() override; @@ -41,6 +41,8 @@ class WorkerEngine : public Engine { Transaction *RunningTransaction(transaction_id_t tx_id, const Snapshot &snapshot); + void EnsureNextIdGreater(transaction_id_t tx_id) override; + /// Clears the cache of local transactions that have expired. The signature of /// this method is dictated by `distributed::TransactionalCacheCleaner`. void ClearTransactionalCache(transaction_id_t oldest_active) const; diff --git a/src/utils/atomic.hpp b/src/utils/atomic.hpp index ccd031d45..5b727635f 100644 --- a/src/utils/atomic.hpp +++ b/src/utils/atomic.hpp @@ -17,7 +17,7 @@ void EnsureAtomicGe(std::atomic &atomic, TValue value) { while (true) { auto current = atomic.load(); if (current >= value) break; - if (atomic.compare_exchange_strong(current, value)) break; + if (atomic.compare_exchange_weak(current, value)) break; } } } // namespace utils diff --git a/tests/unit/durability.cpp b/tests/unit/durability.cpp index 3959f2980..e29d5e77e 100644 --- a/tests/unit/durability.cpp +++ b/tests/unit/durability.cpp @@ -2,8 +2,10 @@ #include #include #include +#include #include #include +#include #include "gflags/gflags.h" #include "glog/logging.h" @@ -780,3 +782,85 @@ TEST_F(Durability, WorkerIdRecovery) { EXPECT_EQ(dba.EdgesCount(), 0); } } + +TEST_F(Durability, SequentialRecovery) { + const int kNumWorkers = 6; + const int kNumVertices = 1000; + + auto random_int = [](int upper_exclusive) { + static thread_local std::mt19937 pseudo_rand_gen{std::random_device{}()}; + static thread_local std::uniform_int_distribution rand_dist; + return rand_dist(pseudo_rand_gen) % upper_exclusive; + }; + + auto init_db = [](database::GraphDb &db) { + database::GraphDbAccessor dba{db}; + for (int i = 0; i < kNumVertices; ++i) dba.InsertVertex(i); + dba.Commit(); + }; + + auto run_updates = [&random_int](database::GraphDb &db, + std::atomic &keep_running) { + std::vector threads; + for (int i = 0; i < kNumWorkers; ++i) { + threads.emplace_back([&random_int, &db, &keep_running]() { + while (keep_running) { + database::GraphDbAccessor dba{db}; + auto v = dba.FindVertex(random_int(kNumVertices), false); + try { + v.PropsSet(dba.Property("prop"), random_int(100)); + } catch (LockTimeoutException &) { + } catch (mvcc::SerializationError &) { + } + dba.InsertVertex(); + dba.Commit(); + } + }); + } + return threads; + }; + + auto make_updates = [&run_updates, this]( + database::GraphDb &db, bool snapshot_during, bool snapshot_after) { + std::atomic keep_running{true}; + auto update_theads = run_updates(db, keep_running); + std::this_thread::sleep_for(25ms); + if (snapshot_during) { + MakeSnapshot(db); + } + std::this_thread::sleep_for(25ms); + keep_running = false; + for (auto &t : update_theads) t.join(); + if (snapshot_after) { + MakeSnapshot(db); + } + + // Sleep to ensure the WAL gets flushed. + std::this_thread::sleep_for(std::chrono::milliseconds(25ms)); + }; + + const std::vector> combinations{{0, 0}, {1, 0}, {0, 1}}; + for (auto &combo : combinations) { + CleanDurability(); + auto config = DbConfig(); + config.durability_enabled = true; + database::SingleNode db{config}; + init_db(db); + make_updates(db, combo.first, combo.second); + + { + auto recovered_config = DbConfig(); + recovered_config.db_recover_on_startup = true; + recovered_config.durability_enabled = true; + database::SingleNode recovered{recovered_config}; + CompareDbs(db, recovered); + { + for (auto &combo2 : combinations) { + make_updates(recovered, combo2.first, combo2.second); + database::SingleNode recovered_2{recovered_config}; + CompareDbs(recovered, recovered_2); + } + } + } + } +} diff --git a/tests/unit/transaction_engine_distributed.cpp b/tests/unit/transaction_engine_distributed.cpp index 418c0a627..168798664 100644 --- a/tests/unit/transaction_engine_distributed.cpp +++ b/tests/unit/transaction_engine_distributed.cpp @@ -126,3 +126,10 @@ TEST_F(WorkerEngineTest, LocalForEachActiveTransaction) { [&local](Transaction &t) { local.insert(t.id_); }); EXPECT_EQ(local, std::unordered_set({1, 4})); } + +TEST_F(WorkerEngineTest, EnsureTxIdGreater) { + ASSERT_LE(master_.Begin()->id_, 40); + worker_.EnsureNextIdGreater(42); + EXPECT_EQ(master_.Begin()->id_, 43); + EXPECT_EQ(worker_.Begin()->id_, 44); +} diff --git a/tests/unit/transaction_engine_single_node.cpp b/tests/unit/transaction_engine_single_node.cpp index 7061fc0a0..a8fd45090 100644 --- a/tests/unit/transaction_engine_single_node.cpp +++ b/tests/unit/transaction_engine_single_node.cpp @@ -74,3 +74,10 @@ TEST(Engine, RunningTransaction) { EXPECT_NE(t1, engine.RunningTransaction(t0->id_)); EXPECT_EQ(t1, engine.RunningTransaction(t1->id_)); } + +TEST(Engine, EnsureTxIdGreater) { + SingleNodeEngine engine; + ASSERT_LE(engine.Begin()->id_, 40); + engine.EnsureNextIdGreater(42); + EXPECT_EQ(engine.Begin()->id_, 43); +}