Add blocking transactions for index creation

Summary:
Blocking transaction has the ability to stop the transaction engine from
starting new transactions (regular or blocking) and to wait all other active
transactions to finish (to become non active, committed or aborted). One thing
that blocking transactions support is defining the parent transaction which
does not need to end in order for the blocking one to start. This is because of
a use case where we start nested transactions.

One could thing we should build indexes inside those blocking transactions. This
is true and I wanted to implement this, but this would require some digging in
the interpreter which I didn't want to do in this change.

Reviewers: mferencevic, vkasljevic, teon.banek

Reviewed By: mferencevic, teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1695
This commit is contained in:
Matija Santl 2018-10-23 16:01:02 +02:00
parent 910fc7c4d1
commit 800db5058e
13 changed files with 223 additions and 92 deletions

View File

@ -37,6 +37,12 @@ class IndexCreationOnWorkerException : public utils::BasicException {
using utils::BasicException::BasicException;
};
/// Thrown on concurrent index creation when the transaction engine fails to
/// start a new transaction.
class IndexCreationException : public utils::BasicException {
using utils::BasicException::BasicException;
};
/**
* Base accessor for the database object: exposes functions for operating on the
* database. All the functions in this class should be self-sufficient: for

View File

@ -106,6 +106,12 @@ std::unique_ptr<GraphDbAccessor> GraphDb::Access(tx::TransactionId tx_id) {
return std::unique_ptr<GraphDbAccessor>(new GraphDbAccessor(*this, tx_id));
}
std::unique_ptr<GraphDbAccessor> GraphDb::AccessBlocking(
std::experimental::optional<tx::TransactionId> parent_tx) {
return std::unique_ptr<GraphDbAccessor>(
new GraphDbAccessor(*this, parent_tx));
}
Storage &GraphDb::storage() { return *storage_; }
durability::WriteAheadLog &GraphDb::wal() { return wal_; }

View File

@ -2,6 +2,7 @@
#pragma once
#include <atomic>
#include <experimental/optional>
#include <memory>
#include <vector>
@ -74,6 +75,8 @@ class GraphDb {
/// Create a new accessor by starting a new transaction.
std::unique_ptr<GraphDbAccessor> Access();
std::unique_ptr<GraphDbAccessor> AccessBlocking(
std::experimental::optional<tx::TransactionId> parent_tx);
/// Create an accessor for a running transaction.
std::unique_ptr<GraphDbAccessor> Access(tx::TransactionId);

View File

@ -25,6 +25,12 @@ GraphDbAccessor::GraphDbAccessor(GraphDb &db, tx::TransactionId tx_id)
transaction_(*db.tx_engine().RunningTransaction(tx_id)),
transaction_starter_{false} {}
GraphDbAccessor::GraphDbAccessor(
GraphDb &db, std::experimental::optional<tx::TransactionId> parent_tx)
: db_(db),
transaction_(*db.tx_engine().BeginBlocking(parent_tx)),
transaction_starter_{true} {}
GraphDbAccessor::~GraphDbAccessor() {
if (transaction_starter_ && !commited_ && !aborted_) {
this->Abort();
@ -109,16 +115,6 @@ void GraphDbAccessor::BuildIndex(storage::Label label,
storage::Property property, bool unique) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
db_.storage().index_build_tx_in_progress_.access().insert(transaction_.id_);
// on function exit remove the create index transaction from
// build_tx_in_progress
utils::OnScopeExit on_exit_1([this] {
auto removed = db_.storage().index_build_tx_in_progress_.access().remove(
transaction_.id_);
DCHECK(removed) << "Index creation transaction should be inside set";
});
// Create the index
const LabelPropertyIndex::Key key(label, property, unique);
if (db_.storage().label_property_index_.CreateIndex(key) == false) {
@ -127,54 +123,20 @@ void GraphDbAccessor::BuildIndex(storage::Label label,
"exists.");
}
// TODO (msantl): If unique constraint, lock the tx engine
// Everything that happens after the line above ended will be added to the
// index automatically, but we still have to add to index everything that
// happened earlier. We have to first wait for every transaction that
// happend before, or a bit later than CreateIndex to end.
{
auto wait_transactions = transaction_.engine_.GlobalActiveTransactions();
auto active_index_creation_transactions =
db_.storage().index_build_tx_in_progress_.access();
for (auto id : wait_transactions) {
if (active_index_creation_transactions.contains(id)) continue;
while (transaction_.engine_.Info(id).is_active()) {
// Active index creation set could only now start containing that id,
// since that thread could have not written to the set set and to avoid
// dead-lock we need to make sure we keep track of that
if (active_index_creation_transactions.contains(id)) continue;
// TODO reconsider this constant, currently rule-of-thumb chosen
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
}
}
// This accessor's transaction surely sees everything that happened before
// CreateIndex.
auto dba = db_.Access();
// Add transaction to the build_tx_in_progress as this transaction doesn't
// change data and shouldn't block other parallel index creations
auto read_transaction_id = dba->transaction().id_;
db_.storage().index_build_tx_in_progress_.access().insert(
read_transaction_id);
// on function exit remove the read transaction from build_tx_in_progress
utils::OnScopeExit on_exit_2([read_transaction_id, this] {
auto removed = db_.storage().index_build_tx_in_progress_.access().remove(
read_transaction_id);
DCHECK(removed) << "Index building (read) transaction should be inside set";
});
try {
auto dba =
db_.AccessBlocking(std::experimental::make_optional(transaction_.id_));
dba->PopulateIndex(key);
dba->EnableIndex(key);
dba->Commit();
} catch (const IndexConstraintViolationException &) {
db_.storage().label_property_index_.DeleteIndex(key);
throw;
} catch (const tx::TransactionEngineError &e) {
db_.storage().label_property_index_.DeleteIndex(key);
throw IndexCreationException(e.what());
}
dba->EnableIndex(key);
dba->Commit();
}
void GraphDbAccessor::EnableIndex(const LabelPropertyIndex::Key &key) {

View File

@ -36,6 +36,12 @@ class IndexCreationOnWorkerException : public utils::BasicException {
using utils::BasicException::BasicException;
};
/// Thrown on concurrent index creation when the transaction engine fails to
/// start a new transaction.
class IndexCreationException : public utils::BasicException {
using utils::BasicException::BasicException;
};
/**
* Base accessor for the database object: exposes functions for operating on the
* database. All the functions in this class should be self-sufficient: for
@ -60,6 +66,9 @@ class GraphDbAccessor {
/// Creates an accessor for a running transaction.
GraphDbAccessor(GraphDb &db, tx::TransactionId tx_id);
GraphDbAccessor(GraphDb &db,
std::experimental::optional<tx::TransactionId> parent_tx);
public:
~GraphDbAccessor();
@ -425,7 +434,8 @@ class GraphDbAccessor {
* @param label - label to build for
* @param property - property to build for
*/
void BuildIndex(storage::Label label, storage::Property property, bool unique);
void BuildIndex(storage::Label label, storage::Property property,
bool unique);
/// Populates index with vertices containing the key
void PopulateIndex(const LabelPropertyIndex::Key &key);

View File

@ -537,6 +537,8 @@ Callback HandleIndexQuery(IndexQuery *index_query,
throw QueryRuntimeException(e.what());
}
// Otherwise ignore creating an existing index.
} catch (const database::IndexCreationException &e) {
throw QueryRuntimeException(e.what());
}
return std::vector<std::vector<TypedValue>>();
};

View File

@ -4,7 +4,6 @@
#include <experimental/optional>
#include "data_structures/concurrent/concurrent_map.hpp"
#include "data_structures/concurrent/skiplist.hpp"
#include "mvcc/single_node/version_list.hpp"
#include "storage/common/types.hpp"
#include "storage/kvstore/kvstore.hpp"
@ -76,9 +75,6 @@ class Storage {
std::vector<std::string> properties_on_disk_;
// Set of transactions ids which are building indexes currently
SkipList<tx::TransactionId> index_build_tx_in_progress_;
/// Gets the Vertex/Edge main storage map.
template <typename TRecord>
const ConcurrentMap<gid::Gid, mvcc::VersionList<TRecord> *> &GetMap() const;

View File

@ -95,7 +95,7 @@ class Engine {
protected:
Transaction *CreateTransaction(TransactionId id, const Snapshot &snapshot) {
return new Transaction(id, snapshot, *this);
return new Transaction(id, snapshot, *this, false);
}
CommandId AdvanceCommand(Transaction *t) { return t->AdvanceCommand(); }

View File

@ -14,15 +14,45 @@ Engine::Engine(durability::WriteAheadLog *wal) : wal_(wal) {}
Transaction *Engine::Begin() {
VLOG(11) << "[Tx] Starting transaction " << counter_ + 1;
std::lock_guard<utils::SpinLock> guard(lock_);
if (!accepting_transactions_.load())
throw TransactionEngineError(
"The transaction engine currently isn't accepting new transactions.");
TransactionId id{++counter_};
auto t = CreateTransaction(id, active_);
active_.insert(id);
store_.emplace(id, t);
if (wal_) {
wal_->Emplace(database::StateDelta::TxBegin(id));
return BeginTransaction(false);
}
Transaction *Engine::BeginBlocking(
std::experimental::optional<TransactionId> parent_tx) {
Snapshot wait_for_txs;
{
std::lock_guard<utils::SpinLock> guard(lock_);
if (!accepting_transactions_.load())
throw TransactionEngineError("Engine is not accepting new transactions");
// Block the engine from acceping new transactions.
accepting_transactions_.store(false);
// Set active transactions to abort ASAP.
for (auto transaction : active_) {
store_.find(transaction)->second->set_should_abort();
}
wait_for_txs = active_;
}
return t;
// Wait for all active transactions except the parent (optional) and ourselves
// to end.
for (auto id : wait_for_txs) {
if (parent_tx && *parent_tx == id) continue;
while (Info(id).is_active()) {
// TODO reconsider this constant, currently rule-of-thumb chosen
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
}
// Only after all transactions have finished, start the blocking transaction.
std::lock_guard<utils::SpinLock> guard(lock_);
return BeginTransaction(true);
}
CommandId Engine::Advance(TransactionId id) {
@ -32,8 +62,7 @@ CommandId Engine::Advance(TransactionId id) {
DCHECK(it != store_.end())
<< "Transaction::advance on non-existing transaction";
Transaction *t = it->second.get();
return AdvanceCommand(t);
return it->second.get()->AdvanceCommand();
}
CommandId Engine::UpdateCommand(TransactionId id) {
@ -53,6 +82,9 @@ void Engine::Commit(const Transaction &t) {
wal_->Emplace(database::StateDelta::TxCommit(t.id_));
}
store_.erase(store_.find(t.id_));
if (t.blocking()) {
accepting_transactions_.store(true);
}
}
void Engine::Abort(const Transaction &t) {
@ -64,6 +96,9 @@ void Engine::Abort(const Transaction &t) {
wal_->Emplace(database::StateDelta::TxAbort(t.id_));
}
store_.erase(store_.find(t.id_));
if (t.blocking()) {
accepting_transactions_.store(true);
}
}
CommitLog::Info Engine::Info(TransactionId tx) const {
@ -129,4 +164,15 @@ void Engine::EnsureNextIdGreater(TransactionId tx_id) {
counter_ = std::max(tx_id, counter_);
}
Transaction *Engine::BeginTransaction(bool blocking) {
TransactionId id{++counter_};
Transaction *t = new Transaction(id, active_, *this, blocking);
active_.insert(id);
store_.emplace(id, t);
if (wal_) {
wal_->Emplace(database::StateDelta::TxBegin(id));
}
return t;
}
} // namespace tx

View File

@ -13,6 +13,10 @@
namespace tx {
class TransactionEngineError : public utils::BasicException {
using utils::BasicException::BasicException;
};
/// Single-node deployment transaction engine. Has complete functionality.
class Engine final {
public:
@ -26,6 +30,12 @@ class Engine final {
Engine &operator=(Engine &&) = delete;
Transaction *Begin();
/// Blocking transactions are used when we can't allow any other transaction to
/// run (besides this one). This is the reason why this transactions blocks the
/// engine from creating new transactions and waits for the existing ones to
/// finish.
Transaction *BeginBlocking(
std::experimental::optional<TransactionId> parent_tx);
CommandId Advance(TransactionId id);
CommandId UpdateCommand(TransactionId id);
void Commit(const Transaction &t);
@ -44,15 +54,6 @@ class Engine final {
auto &local_lock_graph() { return local_lock_graph_; }
const auto &local_lock_graph() const { return local_lock_graph_; }
protected:
Transaction *CreateTransaction(TransactionId id, const Snapshot &snapshot) {
return new Transaction(id, snapshot, *this);
}
CommandId AdvanceCommand(Transaction *t) { return t->AdvanceCommand(); }
void SetCommand(Transaction *t, CommandId cid) { t->SetCommand(cid); }
private:
// Map lock dependencies. Each entry maps (tx_that_wants_lock,
// tx_that_holds_lock). Used for local deadlock resolution.
@ -67,5 +68,9 @@ class Engine final {
// Optional. If present, the Engine will write tx Begin/Commit/Abort
// atomically (while under lock).
durability::WriteAheadLog *wal_{nullptr};
std::atomic<bool> accepting_transactions_{true};
// Helper method for transaction begin.
Transaction *BeginTransaction(bool blocking);
};
} // namespace tx

View File

@ -37,8 +37,12 @@ class Transaction final {
friend class Engine;
// The constructor is private, only the Engine ever uses it.
Transaction(TransactionId id, const Snapshot &snapshot, Engine &engine)
: id_(id), engine_(engine), snapshot_(snapshot) {}
Transaction(TransactionId id, const Snapshot &snapshot, Engine &engine,
bool blocking)
: id_(id),
engine_(engine),
snapshot_(snapshot),
blocking_(blocking) {}
// A transaction can't be moved nor copied. it's owned by the transaction
// engine, and it's lifetime is managed by it.
@ -74,6 +78,8 @@ class Transaction final {
auto creation_time() const { return creation_time_; }
auto blocking() const { return blocking_; }
private:
// Function used to advance the command.
CommandId AdvanceCommand() {
@ -104,5 +110,7 @@ class Transaction final {
// Creation time.
const std::chrono::time_point<std::chrono::steady_clock> creation_time_{
std::chrono::steady_clock::now()};
bool blocking_{false};
};
} // namespace tx

View File

@ -1,3 +1,4 @@
#include <atomic>
#include <experimental/optional>
#include <memory>
#include <thread>
@ -135,22 +136,38 @@ TEST_F(GraphDbAccessorIndex, LabelPropertyIndexCount) {
}
TEST(GraphDbAccessorIndexApi, LabelPropertyBuildIndexConcurrent) {
const int ITER_COUNT = 10;
for (int iter = 0; iter < ITER_COUNT; ++iter) {
database::GraphDb db;
const int THREAD_COUNT = 10;
std::vector<std::thread> threads;
for (int index = 0; index < THREAD_COUNT; ++index) {
threads.emplace_back([&db, index]() {
const int THREAD_COUNT = 10;
std::vector<std::thread> threads;
database::GraphDb db;
std::atomic<bool> failed{false};
for (int index = 0; index < THREAD_COUNT; ++index) {
threads.emplace_back([&db, &failed, index]() {
// If we fail to create a new transaction, don't bother.
try {
auto dba = db.Access();
dba->BuildIndex(dba->Label("l" + std::to_string(index)),
dba->Property("p" + std::to_string(index)), false);
});
}
// All threads should end and there shouldn't be any deadlock
for (auto &thread : threads) thread.join();
try {
// This could either pass or throw.
dba->BuildIndex(dba->Label("l" + std::to_string(index)),
dba->Property("p" + std::to_string(index)), false);
// If it throws, make sure the exception is right.
} catch (const database::IndexCreationException &e) {
// Nothing to see here, move along.
} catch (...) {
failed.store(true);
}
} catch (...) {
// Ignore this one also.
}
});
}
for (auto &thread : threads) {
if (thread.joinable()) {
thread.join();
}
}
EXPECT_FALSE(failed.load());
}
#define EXPECT_WITH_MARGIN(x, center) \

View File

@ -1,5 +1,6 @@
#include "gtest/gtest.h"
#include <experimental/optional>
#include <thread>
#include <vector>
@ -81,3 +82,72 @@ TEST(Engine, EnsureTxIdGreater) {
engine.EnsureNextIdGreater(42);
EXPECT_EQ(engine.Begin()->id_, 43);
}
TEST(Engine, BlockingTransaction) {
Engine engine;
std::vector<std::thread> threads;
std::atomic<bool> finished{false};
std::atomic<bool> blocking_started{false};
std::atomic<bool> blocking_finished{false};
std::atomic<int> tx_counter{0};
for (int i = 0; i < 10; ++i) {
threads.emplace_back([&engine, &tx_counter, &finished]() mutable {
auto t = engine.Begin();
tx_counter++;
while (!finished.load()) {
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
engine.Commit(*t);
});
}
// Wait for all transactions to start.
do {
std::this_thread::sleep_for(std::chrono::microseconds(100));
} while (tx_counter.load() < 10);
threads.emplace_back([&engine, &blocking_started, &blocking_finished]() {
// This should block until other transactions end.
blocking_started.store(true);
auto t = engine.BeginBlocking(std::experimental::nullopt);
engine.Commit(*t);
blocking_finished.store(true);
});
EXPECT_FALSE(finished.load());
EXPECT_FALSE(blocking_finished.load());
EXPECT_EQ(tx_counter.load(), 10);
// Make sure the blocking transaction thread kicked off.
do {
std::this_thread::sleep_for(std::chrono::microseconds(100));
} while (!blocking_started.load());
// Make sure we can't start any new transaction
EXPECT_THROW(engine.Begin(), TransactionEngineError);
EXPECT_THROW(engine.BeginBlocking(std::experimental::nullopt), TransactionEngineError);
// Release regular transactions. This will cause the blocking transaction to
// end also.
finished.store(true);
for (auto &t : threads) {
if (t.joinable()) {
t.join();
}
}
EXPECT_TRUE(blocking_finished.load());
// Make sure we can start transactions now.
{
auto t = engine.Begin();
EXPECT_NE(t, nullptr);
engine.Commit(*t);
}
{
auto t = engine.BeginBlocking(std::experimental::nullopt);
EXPECT_NE(t, nullptr);
engine.Commit(*t);
}
}