From 23241e76b93d2fe1760ee54da8090e422f6920e5 Mon Sep 17 00:00:00 2001 From: florijan Date: Fri, 24 Nov 2017 14:46:42 +0100 Subject: [PATCH] Prepare tx::Engine for distributed Summary: This diff contains step 1: - Remove clog exposure from tx::engine - Reduce and cleanup tx::Engine API All current functionality is kept, but the API is reduced. This is very desirable because every function in tx::Engine will need to be considered and implemented in both Master and Worker situations. The less we have, the better. Next step is exactly that: seeing how each of these functions behaves in a distributed system and implementing accordingly. Reviewers: dgleich, mislav.bradac, buda Reviewed By: mislav.bradac Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1008 --- src/database/graph_db_accessor.cpp | 3 +- src/mvcc/record.hpp | 4 +- src/transactions/engine.hpp | 83 ++++++++++---------------- src/transactions/transaction_store.hpp | 28 --------- tests/unit/mvcc_find_update_common.hpp | 2 +- tests/unit/mvcc_gc.cpp | 7 +-- tests/unit/mvcc_gc_common.hpp | 4 +- tests/unit/transaction_engine.cpp | 32 ---------- 8 files changed, 39 insertions(+), 124 deletions(-) delete mode 100644 src/transactions/transaction_store.hpp diff --git a/src/database/graph_db_accessor.cpp b/src/database/graph_db_accessor.cpp index a98b9cb9f..ebc374573 100644 --- a/src/database/graph_db_accessor.cpp +++ b/src/database/graph_db_accessor.cpp @@ -127,9 +127,10 @@ void GraphDbAccessor::BuildIndex(const GraphDbTypes::Label &label, auto wait_transaction = db_.tx_engine_.Begin(); for (auto id : wait_transaction->snapshot()) { if (id == transaction_->id_) continue; - while (wait_transaction->engine_.clog().is_active(id)) + while (wait_transaction->engine_.IsActive(id)) { // TODO reconsider this constant, currently rule-of-thumb chosen std::this_thread::sleep_for(std::chrono::microseconds(100)); + } } // We must notify the WAL about this transaction manually since it's not // handled by a GraphDbAccessor. diff --git a/src/mvcc/record.hpp b/src/mvcc/record.hpp index 1748b5aba..7556ac437 100644 --- a/src/mvcc/record.hpp +++ b/src/mvcc/record.hpp @@ -251,7 +251,7 @@ class Record : public Version { if (hints_.Get(mask)) return hints_.Get(Hints::kCmt & mask); // If hints are not set consult the commit log. - auto info = engine.clog().fetch_info(id); + auto info = engine.FetchInfo(id); if (info.is_committed()) { hints_.Set(Hints::kCmt & mask); return true; @@ -284,7 +284,7 @@ class Record : public Version { if (hints_.Get(Hints::kCre)) return hints_.Get(Hints::kAbt & Hints::kCre); // If hints are not set consult the commit log. - auto info = engine.clog().fetch_info(tx_.cre); + auto info = engine.FetchInfo(tx_.cre); if (info.is_aborted()) { hints_.Set(Hints::kAbt & Hints::kCre); return true; diff --git a/src/transactions/engine.hpp b/src/transactions/engine.hpp index 4b8d23835..33565cf07 100644 --- a/src/transactions/engine.hpp +++ b/src/transactions/engine.hpp @@ -2,14 +2,15 @@ #include #include +#include +#include +#include #include #include "glog/logging.h" -#include "threading/sync/lockable.hpp" #include "threading/sync/spinlock.hpp" #include "transactions/commit_log.hpp" #include "transactions/transaction.hpp" -#include "transactions/transaction_store.hpp" #include "utils/exceptions.hpp" namespace tx { @@ -26,14 +27,8 @@ class TransactionError : public utils::BasicException { * Used for managing transactions and the related information * such as transaction snapshots and the commit log. */ -class Engine : Lockable { - // Limit for the command id, used for checking if we're about - // to overflow. - static constexpr auto kMaxCommandId = - std::numeric_limits().cid())>::max(); - +class Engine { public: - Engine() = default; /** Begins a transaction and returns a pointer to @@ -44,13 +39,13 @@ class Engine : Lockable { * committted or aborted. */ Transaction *Begin() { - auto guard = acquire_unique(); + std::lock_guard guard(lock_); transaction_id_t id{++counter_}; auto t = new Transaction(id, active_, *this); active_.insert(id); - store_.put(id, t); + store_.emplace(id, t); return t; } @@ -60,21 +55,21 @@ class Engine : Lockable { * * @param id - Transation id. That transaction must * be currently active. - * @return Pointer to the transaction object for id. */ - Transaction &Advance(transaction_id_t id) { - auto guard = acquire_unique(); + void Advance(transaction_id_t id) { + std::lock_guard guard(lock_); - auto *t = store_.get(id); - DCHECK(t != nullptr) << "Transaction::advance on non-existing transaction"; + auto it = store_.find(id); + DCHECK(it != store_.end()) + << "Transaction::advance on non-existing transaction"; - if (t->cid_ == kMaxCommandId) + Transaction *t = it->second.get(); + if (t->cid_ == std::numeric_limits::max()) throw TransactionError( "Reached maximum number of commands in this " "transaction."); t->cid_++; - return *t; } /** Returns the snapshot relevant to garbage collection of database records. @@ -90,7 +85,7 @@ class Engine : Lockable { * documentation). */ Snapshot GcSnapshot() { - auto guard = acquire_unique(); + std::lock_guard guard(lock_); // No active transactions. if (active_.size() == 0) { @@ -100,7 +95,7 @@ class Engine : Lockable { } // There are active transactions. - auto snapshot_copy = store_.get(active_.front())->snapshot(); + auto snapshot_copy = store_.find(active_.front())->second->snapshot(); snapshot_copy.insert(active_.front()); return snapshot_copy; } @@ -108,17 +103,19 @@ class Engine : Lockable { /** Comits the given transaction. Deletes the transaction object, it's not * valid after this function executes. */ void Commit(const Transaction &t) { - auto guard = acquire_unique(); + std::lock_guard guard(lock_); clog_.set_committed(t.id_); - Finalize(t); + active_.remove(t.id_); + store_.erase(store_.find(t.id_)); } /** Aborts the given transaction. Deletes the transaction object, it's not * valid after this function executes. */ void Abort(const Transaction &t) { - auto guard = acquire_unique(); + std::lock_guard guard(lock_); clog_.set_aborted(t.id_); - Finalize(t); + active_.remove(t.id_); + store_.erase(store_.find(t.id_)); } /** Returns transaction id of last transaction without taking a lock. New @@ -126,48 +123,29 @@ class Engine : Lockable { */ auto LockFreeCount() const { return counter_.load(); } - /** The total number of transactions that have executed since the creation of - * this engine */ - tx::transaction_id_t Count() const { - auto guard = acquire_unique(); - return counter_; - } - - /** The count of currently active transactions */ - int64_t ActiveCount() const { - auto guard = acquire_unique(); - return active_.size(); - } + /** Returns true if the transaction with the given ID is currently active. */ + bool IsActive(transaction_id_t tx) const { return clog_.is_active(tx); } /** Calls function f on each active transaction. */ void ForEachActiveTransaction(std::function f) { - auto guard = acquire_unique(); + std::lock_guard guard(lock_); for (auto transaction : active_) { - f(*store_.get(transaction)); + f(*store_.find(transaction)->second); } } - const auto &clog() const { return clog_; } + /** Returns the commit log Info about the given transaction. */ + auto FetchInfo(transaction_id_t tx) const { return clog_.fetch_info(tx); } auto &lock_graph() { return lock_graph_; } const auto &lock_graph() const { return lock_graph_; } private: - // Commit log of this engine. + std::atomic counter_{0}; CommitLog clog_; - - // Performs cleanup common to ending the transaction with either commit or - // abort. - void Finalize(const Transaction &t) { - active_.remove(t.id_); - store_.del(t.id_); - } - - // A snapshot of currently active transactions. + std::unordered_map> store_; Snapshot active_; - - // Storage for the transactions. - TransactionStore store_; + SpinLock lock_; // For each active transaction we store a transaction that holds a lock that // mentioned transaction is also trying to acquire. We can think of this @@ -176,6 +154,5 @@ class Engine : Lockable { // garbage collected and we are sure that we will not be having problems with // lifetimes of each object. ConcurrentMap lock_graph_; - std::atomic counter_{0}; }; } // namespace tx diff --git a/src/transactions/transaction_store.hpp b/src/transactions/transaction_store.hpp deleted file mode 100644 index 71c23c230..000000000 --- a/src/transactions/transaction_store.hpp +++ /dev/null @@ -1,28 +0,0 @@ -#pragma once - -#include -#include - -#include "transactions/transaction.hpp" - -namespace tx { - -template -class TransactionStore { - public: - Transaction* get(id_t id) const { - auto it = store.find(id); - return it != store.end() ? it->second.get() : nullptr; - } - - void put(id_t id, Transaction* transaction) { - store.emplace( - std::make_pair(id, std::unique_ptr(transaction))); - } - - void del(id_t id) { store.erase(id); } - - private: - std::map> store; -}; -} diff --git a/tests/unit/mvcc_find_update_common.hpp b/tests/unit/mvcc_find_update_common.hpp index 68cda5001..b0e6add10 100644 --- a/tests/unit/mvcc_find_update_common.hpp +++ b/tests/unit/mvcc_find_update_common.hpp @@ -49,7 +49,7 @@ class Mvcc : public ::testing::Test { protected: virtual void SetUp() { id0 = 0; - t1 = &engine.Advance(t1->id_); + engine.Advance(t1->id_); id1 = t1->id_; v1 = version_list.find(*t1); t1->Commit(); diff --git a/tests/unit/mvcc_gc.cpp b/tests/unit/mvcc_gc.cpp index 2fa1f01d1..6b56f0584 100644 --- a/tests/unit/mvcc_gc.cpp +++ b/tests/unit/mvcc_gc.cpp @@ -112,8 +112,7 @@ TEST_F(MvccGcTest, OldestTransactionSnapshot) { /** * Test integration of garbage collector with MVCC GC. Delete version lists - * which are - * empty (not visible from any future transaction) from the skiplist. + * which are empty (not visible from any future transaction) from the skiplist. */ TEST(GarbageCollector, GcClean) { ConcurrentMap *> collection; @@ -146,13 +145,13 @@ TEST(GarbageCollector, GcClean) { // check that we destroyed the record EXPECT_EQ(deleter.Count(), 1); - deleter.FreeExpiredObjects(engine.Count() + 1); + deleter.FreeExpiredObjects(3); EXPECT_EQ(deleter.Count(), 0); EXPECT_EQ(record_destruction_count, 1); // check that we destroyed the version list EXPECT_EQ(vlist_deleter.Count(), 1); - vlist_deleter.FreeExpiredObjects(engine.Count() + 1); + vlist_deleter.FreeExpiredObjects(3); EXPECT_EQ(vlist_deleter.Count(), 0); EXPECT_EQ(access.size(), 0U); diff --git a/tests/unit/mvcc_gc_common.hpp b/tests/unit/mvcc_gc_common.hpp index ac3d5b8b5..8defac184 100644 --- a/tests/unit/mvcc_gc_common.hpp +++ b/tests/unit/mvcc_gc_common.hpp @@ -34,8 +34,6 @@ auto GcSnapshot(tx::Engine &engine, tx::Transaction *t) { gc_snap.insert(t->id_); return gc_snap; } else { - tx::Snapshot gc_snap; - gc_snap.insert(engine.Count() + 1); - return gc_snap; + return engine.GcSnapshot(); } } diff --git a/tests/unit/transaction_engine.cpp b/tests/unit/transaction_engine.cpp index 242dc0ffe..00a3a335a 100644 --- a/tests/unit/transaction_engine.cpp +++ b/tests/unit/transaction_engine.cpp @@ -5,24 +5,6 @@ #include "transactions/engine.hpp" #include "transactions/transaction.hpp" -TEST(Engine, CountEmpty) { - tx::Engine engine; - EXPECT_EQ(engine.Count(), 0); -} - -TEST(Engine, Count) { - tx::Engine engine; - EXPECT_EQ(engine.Count(), (uint64_t)0); - std::vector transactions; - for (int i = 0; i < 5; ++i) { - transactions.push_back(engine.Begin()); - EXPECT_EQ(engine.Count(), (uint64_t)(i + 1)); - } - EXPECT_EQ(engine.ActiveCount(), (uint64_t)5); - for (int i = 0; i < 5; ++i) transactions[i]->Commit(); - EXPECT_EQ(engine.Count(), (uint64_t)5); -} - TEST(Engine, GcSnapshot) { tx::Engine engine; ASSERT_EQ(engine.GcSnapshot(), tx::Snapshot({1})); @@ -51,20 +33,6 @@ TEST(Engine, GcSnapshot) { EXPECT_EQ(engine.GcSnapshot(), tx::Snapshot({6})); } -TEST(Engine, ActiveCount) { - tx::Engine engine; - std::vector transactions; - for (int i = 0; i < 5; ++i) { - transactions.push_back(engine.Begin()); - EXPECT_EQ(engine.ActiveCount(), (size_t)i + 1); - } - - for (int i = 0; i < 5; ++i) { - transactions[i]->Commit(); - EXPECT_EQ(engine.ActiveCount(), 4 - i); - } -} - TEST(Engine, Advance) { tx::Engine engine;