diff --git a/src/database/distributed/graph_db_accessor.hpp b/src/database/distributed/graph_db_accessor.hpp index a336810dd..409f4fe7d 100644 --- a/src/database/distributed/graph_db_accessor.hpp +++ b/src/database/distributed/graph_db_accessor.hpp @@ -37,6 +37,12 @@ class IndexCreationOnWorkerException : public utils::BasicException { using utils::BasicException::BasicException; }; +/// Thrown on concurrent index creation when the transaction engine fails to +/// start a new transaction. +class IndexCreationException : public utils::BasicException { + using utils::BasicException::BasicException; +}; + /** * Base accessor for the database object: exposes functions for operating on the * database. All the functions in this class should be self-sufficient: for diff --git a/src/database/single_node/graph_db.cpp b/src/database/single_node/graph_db.cpp index 15aed8eb1..c587de3ef 100644 --- a/src/database/single_node/graph_db.cpp +++ b/src/database/single_node/graph_db.cpp @@ -106,6 +106,12 @@ std::unique_ptr GraphDb::Access(tx::TransactionId tx_id) { return std::unique_ptr(new GraphDbAccessor(*this, tx_id)); } +std::unique_ptr GraphDb::AccessBlocking( + std::experimental::optional parent_tx) { + return std::unique_ptr( + new GraphDbAccessor(*this, parent_tx)); +} + Storage &GraphDb::storage() { return *storage_; } durability::WriteAheadLog &GraphDb::wal() { return wal_; } diff --git a/src/database/single_node/graph_db.hpp b/src/database/single_node/graph_db.hpp index e68dd7fd3..cf6d420d7 100644 --- a/src/database/single_node/graph_db.hpp +++ b/src/database/single_node/graph_db.hpp @@ -2,6 +2,7 @@ #pragma once #include +#include #include #include @@ -74,6 +75,8 @@ class GraphDb { /// Create a new accessor by starting a new transaction. std::unique_ptr Access(); + std::unique_ptr AccessBlocking( + std::experimental::optional parent_tx); /// Create an accessor for a running transaction. std::unique_ptr Access(tx::TransactionId); diff --git a/src/database/single_node/graph_db_accessor.cpp b/src/database/single_node/graph_db_accessor.cpp index 80fdbaed9..6926bb521 100644 --- a/src/database/single_node/graph_db_accessor.cpp +++ b/src/database/single_node/graph_db_accessor.cpp @@ -25,6 +25,12 @@ GraphDbAccessor::GraphDbAccessor(GraphDb &db, tx::TransactionId tx_id) transaction_(*db.tx_engine().RunningTransaction(tx_id)), transaction_starter_{false} {} +GraphDbAccessor::GraphDbAccessor( + GraphDb &db, std::experimental::optional parent_tx) + : db_(db), + transaction_(*db.tx_engine().BeginBlocking(parent_tx)), + transaction_starter_{true} {} + GraphDbAccessor::~GraphDbAccessor() { if (transaction_starter_ && !commited_ && !aborted_) { this->Abort(); @@ -109,16 +115,6 @@ void GraphDbAccessor::BuildIndex(storage::Label label, storage::Property property, bool unique) { DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted"; - db_.storage().index_build_tx_in_progress_.access().insert(transaction_.id_); - - // on function exit remove the create index transaction from - // build_tx_in_progress - utils::OnScopeExit on_exit_1([this] { - auto removed = db_.storage().index_build_tx_in_progress_.access().remove( - transaction_.id_); - DCHECK(removed) << "Index creation transaction should be inside set"; - }); - // Create the index const LabelPropertyIndex::Key key(label, property, unique); if (db_.storage().label_property_index_.CreateIndex(key) == false) { @@ -127,54 +123,20 @@ void GraphDbAccessor::BuildIndex(storage::Label label, "exists."); } - // TODO (msantl): If unique constraint, lock the tx engine - - // Everything that happens after the line above ended will be added to the - // index automatically, but we still have to add to index everything that - // happened earlier. We have to first wait for every transaction that - // happend before, or a bit later than CreateIndex to end. - { - auto wait_transactions = transaction_.engine_.GlobalActiveTransactions(); - auto active_index_creation_transactions = - db_.storage().index_build_tx_in_progress_.access(); - for (auto id : wait_transactions) { - if (active_index_creation_transactions.contains(id)) continue; - while (transaction_.engine_.Info(id).is_active()) { - // Active index creation set could only now start containing that id, - // since that thread could have not written to the set set and to avoid - // dead-lock we need to make sure we keep track of that - if (active_index_creation_transactions.contains(id)) continue; - // TODO reconsider this constant, currently rule-of-thumb chosen - std::this_thread::sleep_for(std::chrono::microseconds(100)); - } - } - } - - // This accessor's transaction surely sees everything that happened before - // CreateIndex. - auto dba = db_.Access(); - - // Add transaction to the build_tx_in_progress as this transaction doesn't - // change data and shouldn't block other parallel index creations - auto read_transaction_id = dba->transaction().id_; - db_.storage().index_build_tx_in_progress_.access().insert( - read_transaction_id); - // on function exit remove the read transaction from build_tx_in_progress - utils::OnScopeExit on_exit_2([read_transaction_id, this] { - auto removed = db_.storage().index_build_tx_in_progress_.access().remove( - read_transaction_id); - DCHECK(removed) << "Index building (read) transaction should be inside set"; - }); - try { + auto dba = + db_.AccessBlocking(std::experimental::make_optional(transaction_.id_)); + dba->PopulateIndex(key); + dba->EnableIndex(key); + dba->Commit(); } catch (const IndexConstraintViolationException &) { db_.storage().label_property_index_.DeleteIndex(key); throw; + } catch (const tx::TransactionEngineError &e) { + db_.storage().label_property_index_.DeleteIndex(key); + throw IndexCreationException(e.what()); } - - dba->EnableIndex(key); - dba->Commit(); } void GraphDbAccessor::EnableIndex(const LabelPropertyIndex::Key &key) { diff --git a/src/database/single_node/graph_db_accessor.hpp b/src/database/single_node/graph_db_accessor.hpp index 9ae68bce8..ffdb8534a 100644 --- a/src/database/single_node/graph_db_accessor.hpp +++ b/src/database/single_node/graph_db_accessor.hpp @@ -36,6 +36,12 @@ class IndexCreationOnWorkerException : public utils::BasicException { using utils::BasicException::BasicException; }; +/// Thrown on concurrent index creation when the transaction engine fails to +/// start a new transaction. +class IndexCreationException : public utils::BasicException { + using utils::BasicException::BasicException; +}; + /** * Base accessor for the database object: exposes functions for operating on the * database. All the functions in this class should be self-sufficient: for @@ -60,6 +66,9 @@ class GraphDbAccessor { /// Creates an accessor for a running transaction. GraphDbAccessor(GraphDb &db, tx::TransactionId tx_id); + GraphDbAccessor(GraphDb &db, + std::experimental::optional parent_tx); + public: ~GraphDbAccessor(); @@ -425,7 +434,8 @@ class GraphDbAccessor { * @param label - label to build for * @param property - property to build for */ - void BuildIndex(storage::Label label, storage::Property property, bool unique); + void BuildIndex(storage::Label label, storage::Property property, + bool unique); /// Populates index with vertices containing the key void PopulateIndex(const LabelPropertyIndex::Key &key); diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 2588e7fe9..befaab0e1 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -537,6 +537,8 @@ Callback HandleIndexQuery(IndexQuery *index_query, throw QueryRuntimeException(e.what()); } // Otherwise ignore creating an existing index. + } catch (const database::IndexCreationException &e) { + throw QueryRuntimeException(e.what()); } return std::vector>(); }; diff --git a/src/storage/single_node/storage.hpp b/src/storage/single_node/storage.hpp index 46d08a78b..784b3a742 100644 --- a/src/storage/single_node/storage.hpp +++ b/src/storage/single_node/storage.hpp @@ -4,7 +4,6 @@ #include #include "data_structures/concurrent/concurrent_map.hpp" -#include "data_structures/concurrent/skiplist.hpp" #include "mvcc/single_node/version_list.hpp" #include "storage/common/types.hpp" #include "storage/kvstore/kvstore.hpp" @@ -76,9 +75,6 @@ class Storage { std::vector properties_on_disk_; - // Set of transactions ids which are building indexes currently - SkipList index_build_tx_in_progress_; - /// Gets the Vertex/Edge main storage map. template const ConcurrentMap *> &GetMap() const; diff --git a/src/transactions/distributed/engine.hpp b/src/transactions/distributed/engine.hpp index 153b164ef..fe32a8b4b 100644 --- a/src/transactions/distributed/engine.hpp +++ b/src/transactions/distributed/engine.hpp @@ -95,7 +95,7 @@ class Engine { protected: Transaction *CreateTransaction(TransactionId id, const Snapshot &snapshot) { - return new Transaction(id, snapshot, *this); + return new Transaction(id, snapshot, *this, false); } CommandId AdvanceCommand(Transaction *t) { return t->AdvanceCommand(); } diff --git a/src/transactions/single_node/engine.cpp b/src/transactions/single_node/engine.cpp index 1e530b4ed..a22b69d98 100644 --- a/src/transactions/single_node/engine.cpp +++ b/src/transactions/single_node/engine.cpp @@ -14,15 +14,45 @@ Engine::Engine(durability::WriteAheadLog *wal) : wal_(wal) {} Transaction *Engine::Begin() { VLOG(11) << "[Tx] Starting transaction " << counter_ + 1; std::lock_guard guard(lock_); + if (!accepting_transactions_.load()) + throw TransactionEngineError( + "The transaction engine currently isn't accepting new transactions."); - TransactionId id{++counter_}; - auto t = CreateTransaction(id, active_); - active_.insert(id); - store_.emplace(id, t); - if (wal_) { - wal_->Emplace(database::StateDelta::TxBegin(id)); + return BeginTransaction(false); +} + +Transaction *Engine::BeginBlocking( + std::experimental::optional parent_tx) { + Snapshot wait_for_txs; + { + std::lock_guard guard(lock_); + if (!accepting_transactions_.load()) + throw TransactionEngineError("Engine is not accepting new transactions"); + + // Block the engine from acceping new transactions. + accepting_transactions_.store(false); + + // Set active transactions to abort ASAP. + for (auto transaction : active_) { + store_.find(transaction)->second->set_should_abort(); + } + + wait_for_txs = active_; } - return t; + + // Wait for all active transactions except the parent (optional) and ourselves + // to end. + for (auto id : wait_for_txs) { + if (parent_tx && *parent_tx == id) continue; + while (Info(id).is_active()) { + // TODO reconsider this constant, currently rule-of-thumb chosen + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + } + + // Only after all transactions have finished, start the blocking transaction. + std::lock_guard guard(lock_); + return BeginTransaction(true); } CommandId Engine::Advance(TransactionId id) { @@ -32,8 +62,7 @@ CommandId Engine::Advance(TransactionId id) { DCHECK(it != store_.end()) << "Transaction::advance on non-existing transaction"; - Transaction *t = it->second.get(); - return AdvanceCommand(t); + return it->second.get()->AdvanceCommand(); } CommandId Engine::UpdateCommand(TransactionId id) { @@ -53,6 +82,9 @@ void Engine::Commit(const Transaction &t) { wal_->Emplace(database::StateDelta::TxCommit(t.id_)); } store_.erase(store_.find(t.id_)); + if (t.blocking()) { + accepting_transactions_.store(true); + } } void Engine::Abort(const Transaction &t) { @@ -64,6 +96,9 @@ void Engine::Abort(const Transaction &t) { wal_->Emplace(database::StateDelta::TxAbort(t.id_)); } store_.erase(store_.find(t.id_)); + if (t.blocking()) { + accepting_transactions_.store(true); + } } CommitLog::Info Engine::Info(TransactionId tx) const { @@ -129,4 +164,15 @@ void Engine::EnsureNextIdGreater(TransactionId tx_id) { counter_ = std::max(tx_id, counter_); } +Transaction *Engine::BeginTransaction(bool blocking) { + TransactionId id{++counter_}; + Transaction *t = new Transaction(id, active_, *this, blocking); + active_.insert(id); + store_.emplace(id, t); + if (wal_) { + wal_->Emplace(database::StateDelta::TxBegin(id)); + } + return t; +} + } // namespace tx diff --git a/src/transactions/single_node/engine.hpp b/src/transactions/single_node/engine.hpp index 8d142fc33..33bcc01d2 100644 --- a/src/transactions/single_node/engine.hpp +++ b/src/transactions/single_node/engine.hpp @@ -13,6 +13,10 @@ namespace tx { +class TransactionEngineError : public utils::BasicException { + using utils::BasicException::BasicException; +}; + /// Single-node deployment transaction engine. Has complete functionality. class Engine final { public: @@ -26,6 +30,12 @@ class Engine final { Engine &operator=(Engine &&) = delete; Transaction *Begin(); + /// Blocking transactions are used when we can't allow any other transaction to + /// run (besides this one). This is the reason why this transactions blocks the + /// engine from creating new transactions and waits for the existing ones to + /// finish. + Transaction *BeginBlocking( + std::experimental::optional parent_tx); CommandId Advance(TransactionId id); CommandId UpdateCommand(TransactionId id); void Commit(const Transaction &t); @@ -44,15 +54,6 @@ class Engine final { auto &local_lock_graph() { return local_lock_graph_; } const auto &local_lock_graph() const { return local_lock_graph_; } - protected: - Transaction *CreateTransaction(TransactionId id, const Snapshot &snapshot) { - return new Transaction(id, snapshot, *this); - } - - CommandId AdvanceCommand(Transaction *t) { return t->AdvanceCommand(); } - - void SetCommand(Transaction *t, CommandId cid) { t->SetCommand(cid); } - private: // Map lock dependencies. Each entry maps (tx_that_wants_lock, // tx_that_holds_lock). Used for local deadlock resolution. @@ -67,5 +68,9 @@ class Engine final { // Optional. If present, the Engine will write tx Begin/Commit/Abort // atomically (while under lock). durability::WriteAheadLog *wal_{nullptr}; + std::atomic accepting_transactions_{true}; + + // Helper method for transaction begin. + Transaction *BeginTransaction(bool blocking); }; } // namespace tx diff --git a/src/transactions/transaction.hpp b/src/transactions/transaction.hpp index 25757e4c7..dcad92444 100644 --- a/src/transactions/transaction.hpp +++ b/src/transactions/transaction.hpp @@ -37,8 +37,12 @@ class Transaction final { friend class Engine; // The constructor is private, only the Engine ever uses it. - Transaction(TransactionId id, const Snapshot &snapshot, Engine &engine) - : id_(id), engine_(engine), snapshot_(snapshot) {} + Transaction(TransactionId id, const Snapshot &snapshot, Engine &engine, + bool blocking) + : id_(id), + engine_(engine), + snapshot_(snapshot), + blocking_(blocking) {} // A transaction can't be moved nor copied. it's owned by the transaction // engine, and it's lifetime is managed by it. @@ -74,6 +78,8 @@ class Transaction final { auto creation_time() const { return creation_time_; } + auto blocking() const { return blocking_; } + private: // Function used to advance the command. CommandId AdvanceCommand() { @@ -104,5 +110,7 @@ class Transaction final { // Creation time. const std::chrono::time_point creation_time_{ std::chrono::steady_clock::now()}; + + bool blocking_{false}; }; } // namespace tx diff --git a/tests/unit/graph_db_accessor_index_api.cpp b/tests/unit/graph_db_accessor_index_api.cpp index c78112690..e46b43674 100644 --- a/tests/unit/graph_db_accessor_index_api.cpp +++ b/tests/unit/graph_db_accessor_index_api.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -135,22 +136,38 @@ TEST_F(GraphDbAccessorIndex, LabelPropertyIndexCount) { } TEST(GraphDbAccessorIndexApi, LabelPropertyBuildIndexConcurrent) { - const int ITER_COUNT = 10; - for (int iter = 0; iter < ITER_COUNT; ++iter) { - database::GraphDb db; - const int THREAD_COUNT = 10; - std::vector threads; - for (int index = 0; index < THREAD_COUNT; ++index) { - threads.emplace_back([&db, index]() { + const int THREAD_COUNT = 10; + std::vector threads; + database::GraphDb db; + std::atomic failed{false}; + for (int index = 0; index < THREAD_COUNT; ++index) { + threads.emplace_back([&db, &failed, index]() { + // If we fail to create a new transaction, don't bother. + try { auto dba = db.Access(); - dba->BuildIndex(dba->Label("l" + std::to_string(index)), - dba->Property("p" + std::to_string(index)), false); - - }); - } - // All threads should end and there shouldn't be any deadlock - for (auto &thread : threads) thread.join(); + try { + // This could either pass or throw. + dba->BuildIndex(dba->Label("l" + std::to_string(index)), + dba->Property("p" + std::to_string(index)), false); + // If it throws, make sure the exception is right. + } catch (const database::IndexCreationException &e) { + // Nothing to see here, move along. + } catch (...) { + failed.store(true); + } + } catch (...) { + // Ignore this one also. + } + }); } + + for (auto &thread : threads) { + if (thread.joinable()) { + thread.join(); + } + } + + EXPECT_FALSE(failed.load()); } #define EXPECT_WITH_MARGIN(x, center) \ diff --git a/tests/unit/transaction_engine_single_node.cpp b/tests/unit/transaction_engine_single_node.cpp index c11f03555..09a4d72cb 100644 --- a/tests/unit/transaction_engine_single_node.cpp +++ b/tests/unit/transaction_engine_single_node.cpp @@ -1,5 +1,6 @@ #include "gtest/gtest.h" +#include #include #include @@ -81,3 +82,72 @@ TEST(Engine, EnsureTxIdGreater) { engine.EnsureNextIdGreater(42); EXPECT_EQ(engine.Begin()->id_, 43); } + +TEST(Engine, BlockingTransaction) { + Engine engine; + std::vector threads; + std::atomic finished{false}; + std::atomic blocking_started{false}; + std::atomic blocking_finished{false}; + std::atomic tx_counter{0}; + for (int i = 0; i < 10; ++i) { + threads.emplace_back([&engine, &tx_counter, &finished]() mutable { + auto t = engine.Begin(); + tx_counter++; + while (!finished.load()) { + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + engine.Commit(*t); + }); + } + + // Wait for all transactions to start. + do { + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } while (tx_counter.load() < 10); + + threads.emplace_back([&engine, &blocking_started, &blocking_finished]() { + // This should block until other transactions end. + blocking_started.store(true); + auto t = engine.BeginBlocking(std::experimental::nullopt); + engine.Commit(*t); + blocking_finished.store(true); + }); + + EXPECT_FALSE(finished.load()); + EXPECT_FALSE(blocking_finished.load()); + EXPECT_EQ(tx_counter.load(), 10); + + // Make sure the blocking transaction thread kicked off. + do { + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } while (!blocking_started.load()); + + // Make sure we can't start any new transaction + EXPECT_THROW(engine.Begin(), TransactionEngineError); + EXPECT_THROW(engine.BeginBlocking(std::experimental::nullopt), TransactionEngineError); + + // Release regular transactions. This will cause the blocking transaction to + // end also. + finished.store(true); + + for (auto &t : threads) { + if (t.joinable()) { + t.join(); + } + } + + EXPECT_TRUE(blocking_finished.load()); + + // Make sure we can start transactions now. + { + auto t = engine.Begin(); + EXPECT_NE(t, nullptr); + engine.Commit(*t); + } + { + auto t = engine.BeginBlocking(std::experimental::nullopt); + EXPECT_NE(t, nullptr); + engine.Commit(*t); + } +}