Prepare tx::Engine for distributed

Summary:
This diff contains step 1:
- Remove clog exposure from tx::engine
- Reduce and cleanup tx::Engine API

All current functionality is kept, but the API is reduced. This is very
desirable because every function in tx::Engine will need to be
considered and implemented in both Master and Worker situations. The
less we have, the better.

Next step is exactly that: seeing how each of these functions behaves in
a distributed system and implementing accordingly.

Reviewers: dgleich, mislav.bradac, buda

Reviewed By: mislav.bradac

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1008
This commit is contained in:
florijan 2017-11-24 14:46:42 +01:00
parent 61e28bffd0
commit 23241e76b9
8 changed files with 39 additions and 124 deletions

View File

@ -127,10 +127,11 @@ void GraphDbAccessor::BuildIndex(const GraphDbTypes::Label &label,
auto wait_transaction = db_.tx_engine_.Begin(); auto wait_transaction = db_.tx_engine_.Begin();
for (auto id : wait_transaction->snapshot()) { for (auto id : wait_transaction->snapshot()) {
if (id == transaction_->id_) continue; if (id == transaction_->id_) continue;
while (wait_transaction->engine_.clog().is_active(id)) while (wait_transaction->engine_.IsActive(id)) {
// TODO reconsider this constant, currently rule-of-thumb chosen // TODO reconsider this constant, currently rule-of-thumb chosen
std::this_thread::sleep_for(std::chrono::microseconds(100)); std::this_thread::sleep_for(std::chrono::microseconds(100));
} }
}
// We must notify the WAL about this transaction manually since it's not // We must notify the WAL about this transaction manually since it's not
// handled by a GraphDbAccessor. // handled by a GraphDbAccessor.
db_.wal_.TxBegin(wait_transaction->id_); db_.wal_.TxBegin(wait_transaction->id_);

View File

@ -251,7 +251,7 @@ class Record : public Version<T> {
if (hints_.Get(mask)) return hints_.Get(Hints::kCmt & mask); if (hints_.Get(mask)) return hints_.Get(Hints::kCmt & mask);
// If hints are not set consult the commit log. // If hints are not set consult the commit log.
auto info = engine.clog().fetch_info(id); auto info = engine.FetchInfo(id);
if (info.is_committed()) { if (info.is_committed()) {
hints_.Set(Hints::kCmt & mask); hints_.Set(Hints::kCmt & mask);
return true; return true;
@ -284,7 +284,7 @@ class Record : public Version<T> {
if (hints_.Get(Hints::kCre)) return hints_.Get(Hints::kAbt & Hints::kCre); if (hints_.Get(Hints::kCre)) return hints_.Get(Hints::kAbt & Hints::kCre);
// If hints are not set consult the commit log. // If hints are not set consult the commit log.
auto info = engine.clog().fetch_info(tx_.cre); auto info = engine.FetchInfo(tx_.cre);
if (info.is_aborted()) { if (info.is_aborted()) {
hints_.Set(Hints::kAbt & Hints::kCre); hints_.Set(Hints::kAbt & Hints::kCre);
return true; return true;

View File

@ -2,14 +2,15 @@
#include <atomic> #include <atomic>
#include <limits> #include <limits>
#include <memory>
#include <mutex>
#include <unordered_map>
#include <vector> #include <vector>
#include "glog/logging.h" #include "glog/logging.h"
#include "threading/sync/lockable.hpp"
#include "threading/sync/spinlock.hpp" #include "threading/sync/spinlock.hpp"
#include "transactions/commit_log.hpp" #include "transactions/commit_log.hpp"
#include "transactions/transaction.hpp" #include "transactions/transaction.hpp"
#include "transactions/transaction_store.hpp"
#include "utils/exceptions.hpp" #include "utils/exceptions.hpp"
namespace tx { namespace tx {
@ -26,14 +27,8 @@ class TransactionError : public utils::BasicException {
* Used for managing transactions and the related information * Used for managing transactions and the related information
* such as transaction snapshots and the commit log. * such as transaction snapshots and the commit log.
*/ */
class Engine : Lockable<SpinLock> { class Engine {
// Limit for the command id, used for checking if we're about
// to overflow.
static constexpr auto kMaxCommandId =
std::numeric_limits<decltype(std::declval<Transaction>().cid())>::max();
public: public:
Engine() = default; Engine() = default;
/** Begins a transaction and returns a pointer to /** Begins a transaction and returns a pointer to
@ -44,13 +39,13 @@ class Engine : Lockable<SpinLock> {
* committted or aborted. * committted or aborted.
*/ */
Transaction *Begin() { Transaction *Begin() {
auto guard = acquire_unique(); std::lock_guard<SpinLock> guard(lock_);
transaction_id_t id{++counter_}; transaction_id_t id{++counter_};
auto t = new Transaction(id, active_, *this); auto t = new Transaction(id, active_, *this);
active_.insert(id); active_.insert(id);
store_.put(id, t); store_.emplace(id, t);
return t; return t;
} }
@ -60,21 +55,21 @@ class Engine : Lockable<SpinLock> {
* *
* @param id - Transation id. That transaction must * @param id - Transation id. That transaction must
* be currently active. * be currently active.
* @return Pointer to the transaction object for id.
*/ */
Transaction &Advance(transaction_id_t id) { void Advance(transaction_id_t id) {
auto guard = acquire_unique(); std::lock_guard<SpinLock> guard(lock_);
auto *t = store_.get(id); auto it = store_.find(id);
DCHECK(t != nullptr) << "Transaction::advance on non-existing transaction"; DCHECK(it != store_.end())
<< "Transaction::advance on non-existing transaction";
if (t->cid_ == kMaxCommandId) Transaction *t = it->second.get();
if (t->cid_ == std::numeric_limits<command_id_t>::max())
throw TransactionError( throw TransactionError(
"Reached maximum number of commands in this " "Reached maximum number of commands in this "
"transaction."); "transaction.");
t->cid_++; t->cid_++;
return *t;
} }
/** Returns the snapshot relevant to garbage collection of database records. /** Returns the snapshot relevant to garbage collection of database records.
@ -90,7 +85,7 @@ class Engine : Lockable<SpinLock> {
* documentation). * documentation).
*/ */
Snapshot GcSnapshot() { Snapshot GcSnapshot() {
auto guard = acquire_unique(); std::lock_guard<SpinLock> guard(lock_);
// No active transactions. // No active transactions.
if (active_.size() == 0) { if (active_.size() == 0) {
@ -100,7 +95,7 @@ class Engine : Lockable<SpinLock> {
} }
// There are active transactions. // There are active transactions.
auto snapshot_copy = store_.get(active_.front())->snapshot(); auto snapshot_copy = store_.find(active_.front())->second->snapshot();
snapshot_copy.insert(active_.front()); snapshot_copy.insert(active_.front());
return snapshot_copy; return snapshot_copy;
} }
@ -108,17 +103,19 @@ class Engine : Lockable<SpinLock> {
/** Comits the given transaction. Deletes the transaction object, it's not /** Comits the given transaction. Deletes the transaction object, it's not
* valid after this function executes. */ * valid after this function executes. */
void Commit(const Transaction &t) { void Commit(const Transaction &t) {
auto guard = acquire_unique(); std::lock_guard<SpinLock> guard(lock_);
clog_.set_committed(t.id_); clog_.set_committed(t.id_);
Finalize(t); active_.remove(t.id_);
store_.erase(store_.find(t.id_));
} }
/** Aborts the given transaction. Deletes the transaction object, it's not /** Aborts the given transaction. Deletes the transaction object, it's not
* valid after this function executes. */ * valid after this function executes. */
void Abort(const Transaction &t) { void Abort(const Transaction &t) {
auto guard = acquire_unique(); std::lock_guard<SpinLock> guard(lock_);
clog_.set_aborted(t.id_); clog_.set_aborted(t.id_);
Finalize(t); active_.remove(t.id_);
store_.erase(store_.find(t.id_));
} }
/** Returns transaction id of last transaction without taking a lock. New /** Returns transaction id of last transaction without taking a lock. New
@ -126,48 +123,29 @@ class Engine : Lockable<SpinLock> {
*/ */
auto LockFreeCount() const { return counter_.load(); } auto LockFreeCount() const { return counter_.load(); }
/** The total number of transactions that have executed since the creation of /** Returns true if the transaction with the given ID is currently active. */
* this engine */ bool IsActive(transaction_id_t tx) const { return clog_.is_active(tx); }
tx::transaction_id_t Count() const {
auto guard = acquire_unique();
return counter_;
}
/** The count of currently active transactions */
int64_t ActiveCount() const {
auto guard = acquire_unique();
return active_.size();
}
/** Calls function f on each active transaction. */ /** Calls function f on each active transaction. */
void ForEachActiveTransaction(std::function<void(Transaction &)> f) { void ForEachActiveTransaction(std::function<void(Transaction &)> f) {
auto guard = acquire_unique(); std::lock_guard<SpinLock> guard(lock_);
for (auto transaction : active_) { for (auto transaction : active_) {
f(*store_.get(transaction)); f(*store_.find(transaction)->second);
} }
} }
const auto &clog() const { return clog_; } /** Returns the commit log Info about the given transaction. */
auto FetchInfo(transaction_id_t tx) const { return clog_.fetch_info(tx); }
auto &lock_graph() { return lock_graph_; } auto &lock_graph() { return lock_graph_; }
const auto &lock_graph() const { return lock_graph_; } const auto &lock_graph() const { return lock_graph_; }
private: private:
// Commit log of this engine. std::atomic<transaction_id_t> counter_{0};
CommitLog clog_; CommitLog clog_;
std::unordered_map<transaction_id_t, std::unique_ptr<Transaction>> store_;
// Performs cleanup common to ending the transaction with either commit or
// abort.
void Finalize(const Transaction &t) {
active_.remove(t.id_);
store_.del(t.id_);
}
// A snapshot of currently active transactions.
Snapshot active_; Snapshot active_;
SpinLock lock_;
// Storage for the transactions.
TransactionStore<transaction_id_t> store_;
// For each active transaction we store a transaction that holds a lock that // For each active transaction we store a transaction that holds a lock that
// mentioned transaction is also trying to acquire. We can think of this // mentioned transaction is also trying to acquire. We can think of this
@ -176,6 +154,5 @@ class Engine : Lockable<SpinLock> {
// garbage collected and we are sure that we will not be having problems with // garbage collected and we are sure that we will not be having problems with
// lifetimes of each object. // lifetimes of each object.
ConcurrentMap<transaction_id_t, transaction_id_t> lock_graph_; ConcurrentMap<transaction_id_t, transaction_id_t> lock_graph_;
std::atomic<transaction_id_t> counter_{0};
}; };
} // namespace tx } // namespace tx

View File

@ -1,28 +0,0 @@
#pragma once
#include <map>
#include <memory>
#include "transactions/transaction.hpp"
namespace tx {
template <class id_t>
class TransactionStore {
public:
Transaction* get(id_t id) const {
auto it = store.find(id);
return it != store.end() ? it->second.get() : nullptr;
}
void put(id_t id, Transaction* transaction) {
store.emplace(
std::make_pair(id, std::unique_ptr<Transaction>(transaction)));
}
void del(id_t id) { store.erase(id); }
private:
std::map<id_t, std::unique_ptr<Transaction>> store;
};
}

View File

@ -49,7 +49,7 @@ class Mvcc : public ::testing::Test {
protected: protected:
virtual void SetUp() { virtual void SetUp() {
id0 = 0; id0 = 0;
t1 = &engine.Advance(t1->id_); engine.Advance(t1->id_);
id1 = t1->id_; id1 = t1->id_;
v1 = version_list.find(*t1); v1 = version_list.find(*t1);
t1->Commit(); t1->Commit();

View File

@ -112,8 +112,7 @@ TEST_F(MvccGcTest, OldestTransactionSnapshot) {
/** /**
* Test integration of garbage collector with MVCC GC. Delete version lists * Test integration of garbage collector with MVCC GC. Delete version lists
* which are * which are empty (not visible from any future transaction) from the skiplist.
* empty (not visible from any future transaction) from the skiplist.
*/ */
TEST(GarbageCollector, GcClean) { TEST(GarbageCollector, GcClean) {
ConcurrentMap<int64_t, mvcc::VersionList<DestrCountRec> *> collection; ConcurrentMap<int64_t, mvcc::VersionList<DestrCountRec> *> collection;
@ -146,13 +145,13 @@ TEST(GarbageCollector, GcClean) {
// check that we destroyed the record // check that we destroyed the record
EXPECT_EQ(deleter.Count(), 1); EXPECT_EQ(deleter.Count(), 1);
deleter.FreeExpiredObjects(engine.Count() + 1); deleter.FreeExpiredObjects(3);
EXPECT_EQ(deleter.Count(), 0); EXPECT_EQ(deleter.Count(), 0);
EXPECT_EQ(record_destruction_count, 1); EXPECT_EQ(record_destruction_count, 1);
// check that we destroyed the version list // check that we destroyed the version list
EXPECT_EQ(vlist_deleter.Count(), 1); EXPECT_EQ(vlist_deleter.Count(), 1);
vlist_deleter.FreeExpiredObjects(engine.Count() + 1); vlist_deleter.FreeExpiredObjects(3);
EXPECT_EQ(vlist_deleter.Count(), 0); EXPECT_EQ(vlist_deleter.Count(), 0);
EXPECT_EQ(access.size(), 0U); EXPECT_EQ(access.size(), 0U);

View File

@ -34,8 +34,6 @@ auto GcSnapshot(tx::Engine &engine, tx::Transaction *t) {
gc_snap.insert(t->id_); gc_snap.insert(t->id_);
return gc_snap; return gc_snap;
} else { } else {
tx::Snapshot gc_snap; return engine.GcSnapshot();
gc_snap.insert(engine.Count() + 1);
return gc_snap;
} }
} }

View File

@ -5,24 +5,6 @@
#include "transactions/engine.hpp" #include "transactions/engine.hpp"
#include "transactions/transaction.hpp" #include "transactions/transaction.hpp"
TEST(Engine, CountEmpty) {
tx::Engine engine;
EXPECT_EQ(engine.Count(), 0);
}
TEST(Engine, Count) {
tx::Engine engine;
EXPECT_EQ(engine.Count(), (uint64_t)0);
std::vector<tx::Transaction *> transactions;
for (int i = 0; i < 5; ++i) {
transactions.push_back(engine.Begin());
EXPECT_EQ(engine.Count(), (uint64_t)(i + 1));
}
EXPECT_EQ(engine.ActiveCount(), (uint64_t)5);
for (int i = 0; i < 5; ++i) transactions[i]->Commit();
EXPECT_EQ(engine.Count(), (uint64_t)5);
}
TEST(Engine, GcSnapshot) { TEST(Engine, GcSnapshot) {
tx::Engine engine; tx::Engine engine;
ASSERT_EQ(engine.GcSnapshot(), tx::Snapshot({1})); ASSERT_EQ(engine.GcSnapshot(), tx::Snapshot({1}));
@ -51,20 +33,6 @@ TEST(Engine, GcSnapshot) {
EXPECT_EQ(engine.GcSnapshot(), tx::Snapshot({6})); EXPECT_EQ(engine.GcSnapshot(), tx::Snapshot({6}));
} }
TEST(Engine, ActiveCount) {
tx::Engine engine;
std::vector<tx::Transaction *> transactions;
for (int i = 0; i < 5; ++i) {
transactions.push_back(engine.Begin());
EXPECT_EQ(engine.ActiveCount(), (size_t)i + 1);
}
for (int i = 0; i < 5; ++i) {
transactions[i]->Commit();
EXPECT_EQ(engine.ActiveCount(), 4 - i);
}
}
TEST(Engine, Advance) { TEST(Engine, Advance) {
tx::Engine engine; tx::Engine engine;