diff --git a/CMakeLists.txt b/CMakeLists.txt index 823cd0a2e..254fc37dd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -222,7 +222,8 @@ set(memgraph_src_files ${src_dir}/storage/record_accessor.cpp ${src_dir}/storage/vertex_accessor.cpp ${src_dir}/threading/thread.cpp - ${src_dir}/transactions/transaction.cpp + ${src_dir}/transactions/engine_master.cpp + ${src_dir}/transactions/engine_worker.cpp ${src_dir}/utils/watchdog.cpp ) # ----------------------------------------------------------------------------- diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index b585b4510..6b4a13fff 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -11,12 +11,14 @@ #include "durability/snapshooter.hpp" #include "storage/edge.hpp" #include "storage/garbage_collector.hpp" +#include "transactions/engine_master.hpp" #include "utils/timer.hpp" namespace fs = std::experimental::filesystem; GraphDb::GraphDb(GraphDb::Config config) : config_(config), + tx_engine_(new tx::MasterEngine()), gc_vertices_(vertices_, vertex_record_deleter_, vertex_version_list_deleter_), gc_edges_(edges_, edge_record_deleter_, edge_version_list_deleter_), @@ -41,7 +43,7 @@ GraphDb::GraphDb(GraphDb::Config config) std::chrono::seconds( std::max(1, std::min(5, config.query_execution_time_sec / 4))), [this]() { - tx_engine_.ForEachActiveTransaction([this](tx::Transaction &t) { + tx_engine_->LocalForEachActiveTransaction([this](tx::Transaction &t) { if (t.creation_time() + std::chrono::seconds(config_.query_execution_time_sec) < std::chrono::steady_clock::now()) { @@ -54,7 +56,8 @@ GraphDb::GraphDb(GraphDb::Config config) void GraphDb::Shutdown() { is_accepting_transactions_ = false; - tx_engine_.ForEachActiveTransaction([](auto &t) { t.set_should_abort(); }); + tx_engine_->LocalForEachActiveTransaction( + [](auto &t) { t.set_should_abort(); }); } void GraphDb::StartSnapshooting() { @@ -77,12 +80,12 @@ void GraphDb::CollectGarbage() { // main garbage collection logic // see wiki documentation for logic explanation LOG(INFO) << "Garbage collector started"; - const auto snapshot = tx_engine_.GcSnapshot(); + const auto snapshot = tx_engine_->GlobalGcSnapshot(); { // This can be run concurrently utils::Timer x; - gc_vertices_.Run(snapshot, tx_engine_); - gc_edges_.Run(snapshot, tx_engine_); + gc_vertices_.Run(snapshot, *tx_engine_); + gc_edges_.Run(snapshot, *tx_engine_); VLOG(1) << "Garbage collector mvcc phase time: " << x.Elapsed().count(); } // This has to be run sequentially after gc because gc modifies @@ -91,8 +94,8 @@ void GraphDb::CollectGarbage() { { // This can be run concurrently utils::Timer x; - labels_index_.Refresh(snapshot, tx_engine_); - label_property_index_.Refresh(snapshot, tx_engine_); + labels_index_.Refresh(snapshot, *tx_engine_); + label_property_index_.Refresh(snapshot, *tx_engine_); VLOG(1) << "Garbage collector index phase time: " << x.Elapsed().count(); } { @@ -103,7 +106,7 @@ void GraphDb::CollectGarbage() { // to those records. New snapshot can be used, different than one used for // first two phases of gc. utils::Timer x; - const auto snapshot = tx_engine_.GcSnapshot(); + const auto snapshot = tx_engine_->GlobalGcSnapshot(); edge_record_deleter_.FreeExpiredObjects(snapshot.back()); vertex_record_deleter_.FreeExpiredObjects(snapshot.back()); edge_version_list_deleter_.FreeExpiredObjects(snapshot.back()); diff --git a/src/database/graph_db.hpp b/src/database/graph_db.hpp index 6ddba6406..29c1ee5d8 100644 --- a/src/database/graph_db.hpp +++ b/src/database/graph_db.hpp @@ -1,5 +1,7 @@ #pragma once +#include <memory> + #include "cppitertools/filter.hpp" #include "cppitertools/imap.hpp" @@ -88,11 +90,13 @@ class GraphDb { Config config_; - /** transaction engine related to this database */ - tx::Engine tx_engine_; + /** Transaction engine related to this database. Master instance if this + * GraphDb is a single-node deployment, or the master in a distributed system. + * Otherwise a WorkerEngine instance. */ + std::unique_ptr<tx::Engine> tx_engine_; std::atomic<int64_t> next_vertex_id_{0}; - std::atomic<int64_t> next_edge_id{0}; + std::atomic<int64_t> next_edge_id_{0}; // main storage for the graph ConcurrentMap<int64_t, mvcc::VersionList<Vertex> *> vertices_; diff --git a/src/database/graph_db_accessor.cpp b/src/database/graph_db_accessor.cpp index 8092329d5..9aa7d0c23 100644 --- a/src/database/graph_db_accessor.cpp +++ b/src/database/graph_db_accessor.cpp @@ -1,14 +1,16 @@ +#include "glog/logging.h" + #include "database/creation_exception.hpp" #include "database/graph_db_accessor.hpp" - #include "storage/edge.hpp" #include "storage/edge_accessor.hpp" #include "storage/vertex.hpp" #include "storage/vertex_accessor.hpp" +#include "utils/atomic.hpp" #include "utils/on_scope_exit.hpp" GraphDbAccessor::GraphDbAccessor(GraphDb &db) - : db_(db), transaction_(db.tx_engine_.Begin()) { + : db_(db), transaction_(MasterEngine().Begin()) { db_.wal_.TxBegin(transaction_->id_); } @@ -24,13 +26,13 @@ tx::transaction_id_t GraphDbAccessor::transaction_id() const { void GraphDbAccessor::AdvanceCommand() { DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted"; - transaction_->engine_.Advance(transaction_->id_); + MasterEngine().Advance(transaction_->id_); } void GraphDbAccessor::Commit() { DCHECK(!commited_ && !aborted_) << "Already aborted or commited transaction."; auto tid = transaction_->id_; - transaction_->Commit(); + MasterEngine().Commit(*transaction_); db_.wal_.TxCommit(tid); commited_ = true; } @@ -38,7 +40,7 @@ void GraphDbAccessor::Commit() { void GraphDbAccessor::Abort() { DCHECK(!commited_ && !aborted_) << "Already aborted or commited transaction."; auto tid = transaction_->id_; - transaction_->Abort(); + MasterEngine().Abort(*transaction_); db_.wal_.TxAbort(tid); aborted_ = true; } @@ -56,11 +58,7 @@ VertexAccessor GraphDbAccessor::InsertVertex( auto id = opt_id ? *opt_id : db_.next_vertex_id_++; if (opt_id) { - while (true) { - auto next_id = db_.next_vertex_id_.load(); - if (next_id > id) break; - if (db_.next_vertex_id_.compare_exchange_strong(next_id, id + 1)) break; - } + utils::EnsureAtomicGe(db_.next_vertex_id_, id + 1); } auto vertex_vlist = new mvcc::VersionList<Vertex>(*transaction_, id); @@ -118,12 +116,12 @@ void GraphDbAccessor::BuildIndex(const GraphDbTypes::Label &label, // happened earlier. We have to first wait for every transaction that // happend before, or a bit later than CreateIndex to end. { - auto wait_transactions = db_.tx_engine_.ActiveTransactions(); + auto wait_transactions = db_.tx_engine_->GlobalActiveTransactions(); auto active_index_creation_transactions = db_.index_build_tx_in_progress_.access(); for (auto id : wait_transactions) { if (active_index_creation_transactions.contains(id)) continue; - while (db_.tx_engine_.IsActive(id)) { + while (db_.tx_engine_->GlobalIsActive(id)) { // Active index creation set could only now start containing that id, // since that thread could have not written to the set set and to avoid // dead-lock we need to make sure we keep track of that @@ -278,13 +276,9 @@ EdgeAccessor GraphDbAccessor::InsertEdge( VertexAccessor &from, VertexAccessor &to, GraphDbTypes::EdgeType edge_type, std::experimental::optional<int64_t> opt_id) { DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted"; - auto id = opt_id ? *opt_id : db_.next_edge_id++; + auto id = opt_id ? *opt_id : db_.next_edge_id_++; if (opt_id) { - while (true) { - auto next_id = db_.next_edge_id.load(); - if (next_id > id) break; - if (db_.next_edge_id.compare_exchange_strong(next_id, id + 1)) break; - } + utils::EnsureAtomicGe(db_.next_edge_id_, id + 1); } auto edge_vlist = new mvcc::VersionList<Edge>(*transaction_, id, from.vlist_, diff --git a/src/database/graph_db_accessor.hpp b/src/database/graph_db_accessor.hpp index 9129ea8fb..0e1b3356b 100644 --- a/src/database/graph_db_accessor.hpp +++ b/src/database/graph_db_accessor.hpp @@ -15,6 +15,7 @@ #include "graph_db.hpp" #include "storage/edge_accessor.hpp" #include "storage/vertex_accessor.hpp" +#include "transactions/engine_master.hpp" #include "transactions/transaction.hpp" #include "utils/bound.hpp" @@ -493,24 +494,16 @@ class GraphDbAccessor { /** Returns the id of this accessor's transaction */ tx::transaction_id_t transaction_id() const; - /** - * Advances transaction's command id by 1. - */ + /** Advances transaction's command id by 1. */ void AdvanceCommand(); - /** - * Commit transaction. - */ + /** Commit transaction. */ void Commit(); - /** - * Abort transaction. - */ + /** Abort transaction. */ void Abort(); - /** - * Return true if transaction is hinted to abort. - */ + /** Return true if transaction is hinted to abort. */ bool should_abort() const; /** Returns the transaction of this accessor */ @@ -573,6 +566,15 @@ class GraphDbAccessor { !accessor.new_->is_expired_by(*transaction_)); } + /** Casts the DB's engine to MasterEngine and returns it. If the DB's engine + * is + * RemoteEngine, this function will crash MG. */ + tx::MasterEngine &MasterEngine() { + auto *local_engine = dynamic_cast<tx::MasterEngine *>(db_.tx_engine_.get()); + DCHECK(local_engine) << "Asked for MasterEngine on distributed worker"; + return *local_engine; + } + GraphDb &db_; /** The current transaction */ diff --git a/src/mvcc/record.hpp b/src/mvcc/record.hpp index 0b88b18ea..16bb6d589 100644 --- a/src/mvcc/record.hpp +++ b/src/mvcc/record.hpp @@ -251,7 +251,7 @@ class Record : public Version<T> { if (hints_.Get(mask)) return hints_.Get(Hints::kCmt & mask); // If hints are not set consult the commit log. - auto info = engine.FetchInfo(id); + auto info = engine.Info(id); if (info.is_committed()) { hints_.Set(Hints::kCmt & mask); return true; @@ -284,7 +284,7 @@ class Record : public Version<T> { if (hints_.Get(Hints::kCre)) return hints_.Get(Hints::kAbt & Hints::kCre); // If hints are not set consult the commit log. - auto info = engine.FetchInfo(tx_.cre); + auto info = engine.Info(tx_.cre); if (info.is_aborted()) { hints_.Set(Hints::kAbt & Hints::kCre); return true; diff --git a/src/storage/garbage_collector.hpp b/src/storage/garbage_collector.hpp index d09192d57..5aa6c6f3e 100644 --- a/src/storage/garbage_collector.hpp +++ b/src/storage/garbage_collector.hpp @@ -46,11 +46,11 @@ class GarbageCollector { // from it we can delete it. auto ret = vlist->GcDeleted(snapshot, engine); if (ret.first) { - deleted_version_lists.emplace_back(vlist, engine.LockFreeCount()); + deleted_version_lists.emplace_back(vlist, engine.LocalLast()); count += collection_accessor.remove(id_vlist.first); } if (ret.second != nullptr) - deleted_records.emplace_back(ret.second, engine.LockFreeCount()); + deleted_records.emplace_back(ret.second, engine.LocalLast()); } DLOG_IF(INFO, count > 0) << "GC started cleaning with snapshot: " << snapshot; diff --git a/src/storage/locking/record_lock.cpp b/src/storage/locking/record_lock.cpp index 2826d6512..f8994f698 100644 --- a/src/storage/locking/record_lock.cpp +++ b/src/storage/locking/record_lock.cpp @@ -55,8 +55,8 @@ LockStatus RecordLock::Lock(const tx::Transaction &tx, tx::Engine &engine) { tx::transaction_id_t owner = owner_; if (owner_ == tx.id_) return LockStatus::AlreadyHeld; - // Insert edge into lock_graph. - auto accessor = engine.lock_graph().access(); + // Insert edge into local lock_graph. + auto accessor = engine.local_lock_graph().access(); auto it = accessor.insert(tx.id_, owner).first; auto abort_oldest_tx_in_lock_cycle = [&tx, &accessor, &engine]() { @@ -67,7 +67,7 @@ LockStatus RecordLock::Lock(const tx::Transaction &tx, tx::Engine &engine) { // oldest one. auto oldest = FindOldestTxInLockCycle(tx.id_, accessor); if (oldest) { - engine.ForEachActiveTransaction([&](tx::Transaction &t) { + engine.LocalForEachActiveTransaction([&](tx::Transaction &t) { if (t.id_ == oldest) { t.set_should_abort(); } diff --git a/src/transactions/engine.hpp b/src/transactions/engine.hpp index 082de47b9..b68cc985b 100644 --- a/src/transactions/engine.hpp +++ b/src/transactions/engine.hpp @@ -1,76 +1,29 @@ #pragma once -#include <atomic> -#include <limits> -#include <memory> -#include <mutex> -#include <unordered_map> -#include <vector> - -#include "glog/logging.h" -#include "threading/sync/spinlock.hpp" +#include "data_structures/concurrent/concurrent_map.hpp" #include "transactions/commit_log.hpp" #include "transactions/transaction.hpp" -#include "utils/exceptions.hpp" +#include "transactions/type.hpp" namespace tx { - -/** Indicates an error in transaction handling (currently - * only command id overflow). */ -class TransactionError : public utils::BasicException { - public: - using utils::BasicException::BasicException; -}; - -/** Database transaction egine. +/** + * Database transaction engine. Used for managing transactions and the related + * information such as transaction snapshots and the transaction state info. * - * Used for managing transactions and the related information - * such as transaction snapshots and the commit log. + * This is an abstract base class for implementing a single-node transactional + * engine (MasterEngine), an engine for the master in a distributed system (also + * MasterEngine), and for the worker in a distributed system (WorkerEngine). + * + * Methods in this class are often prefixed with "Global" or "Local", depending + * on the guarantees that they need to satisfy. These guarantee requirements are + * determined by the users of a particular method. */ class Engine { public: - Engine() = default; + virtual ~Engine() = default; - /** Begins a transaction and returns a pointer to - * it's object. - * - * The transaction object is owned by this engine. - * It will be released when the transaction gets - * committted or aborted. - */ - Transaction *Begin() { - std::lock_guard<SpinLock> guard(lock_); - - transaction_id_t id{++counter_}; - auto t = new Transaction(id, active_, *this); - - active_.insert(id); - store_.emplace(id, t); - - return t; - } - - /** Advances the command on the transaction with the - * given id. - * - * @param id - Transation id. That transaction must - * be currently active. - */ - void Advance(transaction_id_t id) { - std::lock_guard<SpinLock> guard(lock_); - - auto it = store_.find(id); - DCHECK(it != store_.end()) - << "Transaction::advance on non-existing transaction"; - - Transaction *t = it->second.get(); - if (t->cid_ == std::numeric_limits<command_id_t>::max()) - throw TransactionError( - "Reached maximum number of commands in this " - "transaction."); - - t->cid_++; - } + /** Returns the commit log Info about the given transaction. */ + virtual CommitLog::Info Info(transaction_id_t tx) const = 0; /** Returns the snapshot relevant to garbage collection of database records. * @@ -83,85 +36,33 @@ class Engine { * that was committed) by a transaction older then the older currently active. * We need the full snapshot to prevent overlaps (see general GC * documentation). + * + * The returned snapshot must be for the globally oldest active transaction. + * If we only looked at locally known transactions, it would be possible to + * delete something that and older active transaction can still see. */ - Snapshot GcSnapshot() { - std::lock_guard<SpinLock> guard(lock_); + virtual Snapshot GlobalGcSnapshot() = 0; - // No active transactions. - if (active_.size() == 0) { - auto snapshot_copy = active_; - snapshot_copy.insert(counter_ + 1); - return snapshot_copy; - } - - // There are active transactions. - auto snapshot_copy = store_.find(active_.front())->second->snapshot(); - snapshot_copy.insert(active_.front()); - return snapshot_copy; - } - - /** - * Returns active transactions. - */ - Snapshot ActiveTransactions() { - std::lock_guard<SpinLock> guard(lock_); - Snapshot active_transactions = active_; - return active_transactions; - } - - /** Comits the given transaction. Deletes the transaction object, it's not - * valid after this function executes. */ - void Commit(const Transaction &t) { - std::lock_guard<SpinLock> guard(lock_); - clog_.set_committed(t.id_); - active_.remove(t.id_); - store_.erase(store_.find(t.id_)); - } - - /** Aborts the given transaction. Deletes the transaction object, it's not - * valid after this function executes. */ - void Abort(const Transaction &t) { - std::lock_guard<SpinLock> guard(lock_); - clog_.set_aborted(t.id_); - active_.remove(t.id_); - store_.erase(store_.find(t.id_)); - } - - /** Returns transaction id of last transaction without taking a lock. New - * transactions can be created or destroyed during call of this function. - */ - auto LockFreeCount() const { return counter_.load(); } + /** Returns active transactions. */ + virtual Snapshot GlobalActiveTransactions() = 0; /** Returns true if the transaction with the given ID is currently active. */ - bool IsActive(transaction_id_t tx) const { return clog_.is_active(tx); } + virtual bool GlobalIsActive(transaction_id_t tx) const = 0; - /** Calls function f on each active transaction. */ - void ForEachActiveTransaction(std::function<void(Transaction &)> f) { - std::lock_guard<SpinLock> guard(lock_); - for (auto transaction : active_) { - f(*store_.find(transaction)->second); - } - } + /** Returns the ID of last locally known transaction. */ + virtual tx::transaction_id_t LocalLast() const = 0; - /** Returns the commit log Info about the given transaction. */ - auto FetchInfo(transaction_id_t tx) const { return clog_.fetch_info(tx); } + /** Calls function f on each locally active transaction. */ + virtual void LocalForEachActiveTransaction( + std::function<void(Transaction &)> f) = 0; - auto &lock_graph() { return lock_graph_; } - const auto &lock_graph() const { return lock_graph_; } + auto &local_lock_graph() { return local_lock_graph_; } + const auto &local_lock_graph() const { return local_lock_graph_; } private: - std::atomic<transaction_id_t> counter_{0}; - CommitLog clog_; - std::unordered_map<transaction_id_t, std::unique_ptr<Transaction>> store_; - Snapshot active_; - SpinLock lock_; - - // For each active transaction we store a transaction that holds a lock that - // mentioned transaction is also trying to acquire. We can think of this - // data structure as a graph: key being a start node of directed edges and - // value being an end node of that edge. ConcurrentMap is used since it is - // garbage collected and we are sure that we will not be having problems with - // lifetimes of each object. - ConcurrentMap<transaction_id_t, transaction_id_t> lock_graph_; + // Map lock dependencies. Each entry maps (tx_that_wants_lock, + // tx_that_holds_lock). Used for local deadlock resolution. + // TODO consider global deadlock resolution. + ConcurrentMap<transaction_id_t, transaction_id_t> local_lock_graph_; }; } // namespace tx diff --git a/src/transactions/engine_master.cpp b/src/transactions/engine_master.cpp new file mode 100644 index 000000000..19f064a8c --- /dev/null +++ b/src/transactions/engine_master.cpp @@ -0,0 +1,90 @@ +#include <limits> +#include <mutex> + +#include "glog/logging.h" + +#include "transactions/engine_master.hpp" + +namespace tx { +Transaction *MasterEngine::Begin() { + std::lock_guard<SpinLock> guard(lock_); + + transaction_id_t id{++counter_}; + auto t = new Transaction(id, active_, *this); + + active_.insert(id); + store_.emplace(id, t); + + return t; +} + +void MasterEngine::Advance(transaction_id_t id) { + std::lock_guard<SpinLock> guard(lock_); + + auto it = store_.find(id); + DCHECK(it != store_.end()) + << "Transaction::advance on non-existing transaction"; + + Transaction *t = it->second.get(); + if (t->cid_ == std::numeric_limits<command_id_t>::max()) + throw TransactionError( + "Reached maximum number of commands in this " + "transaction."); + + t->cid_++; +} + +void MasterEngine::Commit(const Transaction &t) { + std::lock_guard<SpinLock> guard(lock_); + clog_.set_committed(t.id_); + active_.remove(t.id_); + store_.erase(store_.find(t.id_)); +} + +void MasterEngine::Abort(const Transaction &t) { + std::lock_guard<SpinLock> guard(lock_); + clog_.set_aborted(t.id_); + active_.remove(t.id_); + store_.erase(store_.find(t.id_)); +} + +CommitLog::Info MasterEngine::Info(transaction_id_t tx) const { + return clog_.fetch_info(tx); +} + +Snapshot MasterEngine::GlobalGcSnapshot() { + std::lock_guard<SpinLock> guard(lock_); + + // No active transactions. + if (active_.size() == 0) { + auto snapshot_copy = active_; + snapshot_copy.insert(counter_ + 1); + return snapshot_copy; + } + + // There are active transactions. + auto snapshot_copy = store_.find(active_.front())->second->snapshot(); + snapshot_copy.insert(active_.front()); + return snapshot_copy; +} + +Snapshot MasterEngine::GlobalActiveTransactions() { + std::lock_guard<SpinLock> guard(lock_); + Snapshot active_transactions = active_; + return active_transactions; +} + +bool MasterEngine::GlobalIsActive(transaction_id_t tx) const { + return clog_.is_active(tx); +} + +tx::transaction_id_t MasterEngine::LocalLast() const { return counter_.load(); } + +void MasterEngine::LocalForEachActiveTransaction( + std::function<void(Transaction &)> f) { + std::lock_guard<SpinLock> guard(lock_); + for (auto transaction : active_) { + f(*store_.find(transaction)->second); + } +} +} // namespace tx diff --git a/src/transactions/engine_master.hpp b/src/transactions/engine_master.hpp new file mode 100644 index 000000000..e20e53ed7 --- /dev/null +++ b/src/transactions/engine_master.hpp @@ -0,0 +1,71 @@ +#pragma once + +#include <atomic> +#include <memory> +#include <unordered_map> + +#include "threading/sync/spinlock.hpp" +#include "transactions/commit_log.hpp" +#include "transactions/engine.hpp" +#include "transactions/transaction.hpp" +#include "utils/exceptions.hpp" + +namespace tx { + +/** Indicates an error in transaction handling (currently + * only command id overflow). */ +class TransactionError : public utils::BasicException { + public: + using utils::BasicException::BasicException; +}; + +/** + * A transaction engine that contains everything necessary for transactional + * handling. Used for single-node Memgraph deployments and for the master in a + * distributed system. + */ +class MasterEngine : public Engine { + public: + /** + * Begins a transaction and returns a pointer to + * it's object. + * + * The transaction object is owned by this engine. + * It will be released when the transaction gets + * committted or aborted. + */ + Transaction *Begin(); + + /** + * Advances the command on the transaction with the + * given id. + * + * @param id - Transation id. That transaction must + * be currently active. + */ + void Advance(transaction_id_t id); + + /** Comits the given transaction. Deletes the transaction object, it's not + * valid after this function executes. */ + void Commit(const Transaction &t); + + /** Aborts the given transaction. Deletes the transaction object, it's not + * valid after this function executes. */ + void Abort(const Transaction &t); + + CommitLog::Info Info(transaction_id_t tx) const override; + Snapshot GlobalGcSnapshot() override; + Snapshot GlobalActiveTransactions() override; + bool GlobalIsActive(transaction_id_t tx) const override; + tx::transaction_id_t LocalLast() const override; + void LocalForEachActiveTransaction( + std::function<void(Transaction &)> f) override; + + private: + std::atomic<transaction_id_t> counter_{0}; + CommitLog clog_; + std::unordered_map<transaction_id_t, std::unique_ptr<Transaction>> store_; + Snapshot active_; + SpinLock lock_; +}; +} // namespace tx diff --git a/src/transactions/engine_worker.cpp b/src/transactions/engine_worker.cpp new file mode 100644 index 000000000..43484a497 --- /dev/null +++ b/src/transactions/engine_worker.cpp @@ -0,0 +1,61 @@ +#include "glog/logging.h" + +#include "transactions/engine_worker.hpp" +#include "utils/atomic.hpp" + +namespace tx { +Transaction *WorkerEngine::LocalBegin(transaction_id_t tx_id) { + auto accessor = active_.access(); + auto found = accessor.find(tx_id); + if (found != accessor.end()) return found->second; + + Snapshot snapshot = rpc_.Call<Snapshot>("Snapshot", tx_id); + auto *tx = new Transaction(tx_id, snapshot, *this); + auto insertion = accessor.insert(tx_id, tx); + if (!insertion.second) { + delete tx; + tx = insertion.first->second; + } + utils::EnsureAtomicGe(local_last_, tx_id); + return tx; +} + +CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const { + auto info = clog_.fetch_info(tid); + // If we don't know the transaction to be commited nor aborted, ask the + // master about it and update the local commit log. + if (!(info.is_aborted() || info.is_committed())) { + // @review: this version of Call is just used because Info has no default + // constructor. + info = rpc_.Call<CommitLog::Info>("Info", info, tid); + DCHECK(info.is_committed() || info.is_aborted()) + << "It is expected that the transaction is not running anymore. This " + "function should be used only after the snapshot of the current " + "transaction is checked (in MVCC)."; + if (info.is_committed()) clog_.set_committed(tid); + if (info.is_aborted()) clog_.set_aborted(tid); + } + + return info; +} + +Snapshot WorkerEngine::GlobalGcSnapshot() { + return rpc_.Call<Snapshot>("Snapshot"); +} + +Snapshot WorkerEngine::GlobalActiveTransactions() { + return rpc_.Call<Snapshot>("Active"); +} + +bool WorkerEngine::GlobalIsActive(transaction_id_t tid) const { + return rpc_.Call<bool>("GlobalIsActive", tid); +} + +tx::transaction_id_t WorkerEngine::LocalLast() const { return local_last_; } + +void WorkerEngine::LocalForEachActiveTransaction( + std::function<void(Transaction &)> f) { + for (auto pair : active_.access()) f(*pair.second); +} + +} // namespace tx diff --git a/src/transactions/engine_worker.hpp b/src/transactions/engine_worker.hpp new file mode 100644 index 000000000..d1baf59be --- /dev/null +++ b/src/transactions/engine_worker.hpp @@ -0,0 +1,52 @@ +#pragma once + +#include <atomic> +#include <memory> + +#include "data_structures/concurrent/concurrent_map.hpp" +#include "transactions/commit_log.hpp" +#include "transactions/engine.hpp" +#include "transactions/transaction.hpp" + +namespace tx { +/** A transactional engine for the worker in a distributed system. */ +class WorkerEngine : public Engine { + // Mock class for RPC. + // TODO Replace with the real thing, once available. + class Rpc { + public: + template <typename TReturn, typename... TArgs> + TReturn Call(const std::string &, TArgs &&...) { + return TReturn{}; + } + + template <typename TReturn, typename... TArgs> + TReturn Call(const std::string &, TReturn default_return, TArgs &&...) { + return default_return; + } + }; + + public: + WorkerEngine(Rpc &rpc) : rpc_(rpc) {} + + Transaction *LocalBegin(transaction_id_t tx_id); + + CommitLog::Info Info(transaction_id_t tid) const override; + Snapshot GlobalGcSnapshot() override; + Snapshot GlobalActiveTransactions() override; + bool GlobalIsActive(transaction_id_t tid) const override; + tx::transaction_id_t LocalLast() const override; + void LocalForEachActiveTransaction( + std::function<void(Transaction &)> f) override; + + private: + // Communication with the transactional engine on the master. + Rpc &rpc_; + + // Local caches. + ConcurrentMap<transaction_id_t, Transaction *> active_; + std::atomic<transaction_id_t> local_last_; + // Mutable because just getting info can cause a cache fill. + mutable CommitLog clog_; +}; +} // namespace tx diff --git a/src/transactions/transaction.cpp b/src/transactions/transaction.cpp deleted file mode 100644 index 88d542e15..000000000 --- a/src/transactions/transaction.cpp +++ /dev/null @@ -1,18 +0,0 @@ -#include "transactions/transaction.hpp" - -#include "transactions/engine.hpp" - -namespace tx { - -Transaction::Transaction(transaction_id_t id, const Snapshot &snapshot, - Engine &engine) - : id_(id), engine_(engine), snapshot_(snapshot) {} - -void Transaction::TakeLock(RecordLock &lock) const { - locks_.Take(&lock, *this, engine_); -} - -void Transaction::Commit() { engine_.Commit(*this); } - -void Transaction::Abort() { engine_.Abort(*this); } -} diff --git a/src/transactions/transaction.hpp b/src/transactions/transaction.hpp index 2d6cd111a..143355dcc 100644 --- a/src/transactions/transaction.hpp +++ b/src/transactions/transaction.hpp @@ -26,10 +26,12 @@ class Transaction { } private: - friend class Engine; + friend class MasterEngine; + friend class WorkerEngine; // The constructor is private, only the Engine ever uses it. - Transaction(transaction_id_t id, const Snapshot &snapshot, Engine &engine); + Transaction(transaction_id_t id, const Snapshot &snapshot, Engine &engine) + : id_(id), engine_(engine), snapshot_(snapshot) {} // A transaction can't be moved nor copied. it's owned by the transaction // engine, and it's lifetime is managed by it. @@ -39,10 +41,6 @@ class Transaction { Transaction &operator=(Transaction &&) = delete; public: - /** Acquires the lock over the given RecordLock, preventing other transactions - * from doing the same */ - void TakeLock(RecordLock &lock) const; - /** Commits this transaction. After this call this transaction object is no * longer valid for use (it gets deleted by the engine that owns it). */ void Commit(); @@ -51,6 +49,10 @@ class Transaction { * longer valid for use (it gets deleted by the engine that owns it). */ void Abort(); + /** Acquires the lock over the given RecordLock, preventing other transactions + * from doing the same */ + void TakeLock(RecordLock &lock) const { locks_.Take(&lock, *this, engine_); } + /** Transaction's id. Unique in the engine that owns it */ const transaction_id_t id_; diff --git a/src/utils/atomic.hpp b/src/utils/atomic.hpp new file mode 100644 index 000000000..ccd031d45 --- /dev/null +++ b/src/utils/atomic.hpp @@ -0,0 +1,23 @@ +#pragma once + +#include <atomic> + +namespace utils { + +/** + * Ensures that the given atomic is greater or equal to the given value. If it + * already satisfies that predicate, it's not modified. + * + * @param atomic - The atomic variable to ensure on. + * @param value - The minimal value the atomic must have after this call. + * @tparam TValue - Type of value. + */ +template <typename TValue> +void EnsureAtomicGe(std::atomic<TValue> &atomic, TValue value) { + while (true) { + auto current = atomic.load(); + if (current >= value) break; + if (atomic.compare_exchange_strong(current, value)) break; + } +} +} // namespace utils diff --git a/tests/benchmark/mvcc.cpp b/tests/benchmark/mvcc.cpp index 2809db4c1..61dd2ce63 100644 --- a/tests/benchmark/mvcc.cpp +++ b/tests/benchmark/mvcc.cpp @@ -4,6 +4,7 @@ #include "mvcc/record.hpp" #include "mvcc/version_list.hpp" +#include "transactions/engine_master.hpp" class Prop : public mvcc::Record<Prop> { public: @@ -18,11 +19,11 @@ class Prop : public mvcc::Record<Prop> { void MvccMix(benchmark::State &state) { while (state.KeepRunning()) { state.PauseTiming(); - tx::Engine engine; + tx::MasterEngine engine; auto t1 = engine.Begin(); mvcc::VersionList<Prop> version_list(*t1, 0); - t1->Commit(); + engine.Commit(*t1); auto t2 = engine.Begin(); state.ResumeTiming(); @@ -33,7 +34,7 @@ void MvccMix(benchmark::State &state) { version_list.find(*t2); state.PauseTiming(); - t2->Abort(); + engine.Abort(*t2); auto t3 = engine.Begin(); state.ResumeTiming(); @@ -48,8 +49,8 @@ void MvccMix(benchmark::State &state) { } state.PauseTiming(); - t3->Commit(); - t4->Commit(); + engine.Commit(*t3); + engine.Commit(*t4); state.ResumeTiming(); } } diff --git a/tests/concurrent/transaction_engine.cpp b/tests/concurrent/transaction_engine.cpp deleted file mode 100644 index dcaf0eaee..000000000 --- a/tests/concurrent/transaction_engine.cpp +++ /dev/null @@ -1,47 +0,0 @@ -#include <thread> -#include <vector> - -#include "glog/logging.h" - -#include "transactions/engine.hpp" - -int main() { - // (try to) test correctness of the transaction life cycle - constexpr int THREADS = 16; - constexpr int TRANSACTIONS = 10; - - tx::Engine engine; - std::vector<uint64_t> sums; - - sums.resize(THREADS); - - auto f = [&engine, &sums](int idx, int n) { - uint64_t sum = 0; - - for (int i = 0; i < n; ++i) { - auto t = engine.Begin(); - sum += t->id_; - engine.Commit(*t); - } - - sums[idx] = sum; - }; - - std::vector<std::thread> threads; - - for (int i = 0; i < THREADS; ++i) - threads.push_back(std::thread(f, i, TRANSACTIONS)); - - for (auto &thread : threads) thread.join(); - - uint64_t sum_computed = 0; - - for (int i = 0; i < THREADS; ++i) sum_computed += sums[i]; - - uint64_t sum_actual = 0; - for (uint64_t i = 1; i <= THREADS * TRANSACTIONS; ++i) sum_actual += i; - - std::cout << sum_computed << " " << sum_actual << std::endl; - CHECK(sum_computed == sum_actual) << "sums have to be the same"; - return 0; -} diff --git a/tests/unit/database_key_index.cpp b/tests/unit/database_key_index.cpp index 4644a8dfb..e413520d0 100644 --- a/tests/unit/database_key_index.cpp +++ b/tests/unit/database_key_index.cpp @@ -4,6 +4,7 @@ #include "database/graph_db_accessor.hpp" #include "database/graph_db_datatypes.hpp" #include "storage/vertex.hpp" +#include "transactions/engine_master.hpp" #include "mvcc_gc_common.hpp" @@ -14,10 +15,10 @@ TEST(LabelsIndex, UniqueInsert) { KeyIndex<GraphDbTypes::Label, Vertex> index; GraphDb db; GraphDbAccessor dba(db); - tx::Engine engine; + tx::MasterEngine engine; auto t1 = engine.Begin(); mvcc::VersionList<Vertex> vlist(*t1, 0); - t1->Commit(); + engine.Commit(*t1); auto t2 = engine.Begin(); vlist.find(*t2)->labels_.push_back(dba.Label("1")); @@ -30,7 +31,7 @@ TEST(LabelsIndex, UniqueInsert) { vlist.find(*t2)->labels_.push_back(dba.Label("3")); index.Update(dba.Label("3"), &vlist, vlist.find(*t2)); - t2->Commit(); + engine.Commit(*t2); EXPECT_EQ(index.Count(dba.Label("1")), 1); EXPECT_EQ(index.Count(dba.Label("2")), 1); @@ -42,7 +43,7 @@ TEST(LabelsIndex, UniqueFilter) { GraphDb db; KeyIndex<GraphDbTypes::Label, Vertex> index; GraphDbAccessor dba(db); - tx::Engine engine; + tx::MasterEngine engine; auto t1 = engine.Begin(); mvcc::VersionList<Vertex> vlist1(*t1, 0); @@ -57,14 +58,14 @@ TEST(LabelsIndex, UniqueFilter) { vlist2.find(*t1)->labels_.push_back(label1); index.Update(label1, &vlist1, r1v1); index.Update(label1, &vlist2, r1v2); - t1->Commit(); + engine.Commit(*t1); auto t2 = engine.Begin(); auto r2v1 = vlist1.update(*t2); auto r2v2 = vlist2.update(*t2); index.Update(label1, &vlist1, r2v1); index.Update(label1, &vlist2, r2v2); - t2->Commit(); + engine.Commit(*t2); auto t3 = engine.Begin(); std::vector<mvcc::VersionList<Vertex> *> expected = {&vlist1, &vlist2}; @@ -82,7 +83,7 @@ TEST(LabelsIndex, Refresh) { KeyIndex<GraphDbTypes::Label, Vertex> index; GraphDb db; GraphDbAccessor access(db); - tx::Engine engine; + tx::MasterEngine engine; // add two vertices to database auto t1 = engine.Begin(); @@ -100,7 +101,7 @@ TEST(LabelsIndex, Refresh) { v2r1->labels_.push_back(label); index.Update(label, &vlist1, v1r1); index.Update(label, &vlist2, v2r1); - t1->Commit(); + engine.Commit(*t1); auto t2 = engine.Begin(); auto v1r2 = vlist1.update(*t2); @@ -111,7 +112,7 @@ TEST(LabelsIndex, Refresh) { index.Refresh(GcSnapshot(engine, t2), engine); EXPECT_EQ(index.Count(label), 4); - t2->Commit(); + engine.Commit(*t2); EXPECT_EQ(index.Count(label), 4); index.Refresh(GcSnapshot(engine, nullptr), engine); EXPECT_EQ(index.Count(label), 2); diff --git a/tests/unit/database_label_property_index.cpp b/tests/unit/database_label_property_index.cpp index 2fd682d89..408cca246 100644 --- a/tests/unit/database_label_property_index.cpp +++ b/tests/unit/database_label_property_index.cpp @@ -4,6 +4,7 @@ #include "database/graph_db_accessor.hpp" #include "database/graph_db_datatypes.hpp" #include "database/indexes/label_property_index.hpp" +#include "transactions/engine_master.hpp" #include "mvcc_gc_common.hpp" @@ -43,7 +44,7 @@ class LabelPropertyIndexComplexTest : public ::testing::Test { LabelPropertyIndex index; LabelPropertyIndex::Key *key; - tx::Engine engine; + tx::MasterEngine engine; tx::Transaction *t{nullptr}; mvcc::VersionList<Vertex> *vlist; @@ -142,11 +143,11 @@ TEST_F(LabelPropertyIndexComplexTest, UniqueInsert) { // Check if index filters duplicates. TEST_F(LabelPropertyIndexComplexTest, UniqueFilter) { index.UpdateOnLabelProperty(vlist, vertex); - t->Commit(); + engine.Commit(*t); auto t2 = engine.Begin(); auto vertex2 = vlist->update(*t2); - t2->Commit(); + engine.Commit(*t2); index.UpdateOnLabelProperty(vlist, vertex2); EXPECT_EQ(index.Count(*key), 2); @@ -154,7 +155,7 @@ TEST_F(LabelPropertyIndexComplexTest, UniqueFilter) { auto t3 = engine.Begin(); auto iter = index.GetVlists(*key, *t3, false); EXPECT_EQ(std::distance(iter.begin(), iter.end()), 1); - t3->Commit(); + engine.Commit(*t3); } // Remove label and check if index vertex is not returned now. @@ -184,7 +185,7 @@ TEST_F(LabelPropertyIndexComplexTest, RemoveProperty) { // Refresh with a vertex that looses its labels and properties. TEST_F(LabelPropertyIndexComplexTest, Refresh) { index.UpdateOnLabelProperty(vlist, vertex); - t->Commit(); + engine.Commit(*t); EXPECT_EQ(index.Count(*key), 1); vertex->labels_.clear(); vertex->properties_.clear(); diff --git a/tests/unit/mvcc.cpp b/tests/unit/mvcc.cpp index f156cad11..146e9434d 100644 --- a/tests/unit/mvcc.cpp +++ b/tests/unit/mvcc.cpp @@ -5,18 +5,18 @@ #include "mvcc/version.hpp" #include "mvcc/version_list.hpp" #include "threading/sync/lock_timeout_exception.hpp" -#include "transactions/engine.hpp" -#include "transactions/transaction.cpp" +#include "transactions/engine_master.hpp" +#include "transactions/transaction.hpp" #include "mvcc_gc_common.hpp" TEST(MVCC, Deadlock) { - tx::Engine engine; + tx::MasterEngine engine; auto t0 = engine.Begin(); mvcc::VersionList<Prop> version_list1(*t0, 0); mvcc::VersionList<Prop> version_list2(*t0, 1); - t0->Commit(); + engine.Commit(*t0); auto t1 = engine.Begin(); auto t2 = engine.Begin(); @@ -31,14 +31,14 @@ TEST(MVCC, Deadlock) { TEST(MVCC, UpdateDontDelete) { std::atomic<int> count{0}; { - tx::Engine engine; + tx::MasterEngine engine; auto t1 = engine.Begin(); mvcc::VersionList<DestrCountRec> version_list(*t1, 0, count); - t1->Commit(); + engine.Commit(*t1); auto t2 = engine.Begin(); version_list.update(*t2); - t2->Abort(); + engine.Abort(*t2); EXPECT_EQ(count, 0); auto t3 = engine.Begin(); @@ -48,14 +48,14 @@ TEST(MVCC, UpdateDontDelete) { EXPECT_EQ(count, 0); // TODO Gleich: why don't we also test that remove doesn't delete? - t3->Commit(); + engine.Commit(*t3); } EXPECT_EQ(count, 3); } // Check that we get the oldest record. TEST(MVCC, Oldest) { - tx::Engine engine; + tx::MasterEngine engine; auto t1 = engine.Begin(); mvcc::VersionList<Prop> version_list(*t1, 0); auto first = version_list.Oldest(); diff --git a/tests/unit/mvcc_find_update_common.hpp b/tests/unit/mvcc_find_update_common.hpp index b0e6add10..733b58916 100644 --- a/tests/unit/mvcc_find_update_common.hpp +++ b/tests/unit/mvcc_find_update_common.hpp @@ -5,8 +5,8 @@ #include "mvcc/version.hpp" #include "mvcc/version_list.hpp" #include "threading/sync/lock_timeout_exception.hpp" -#include "transactions/engine.hpp" -#include "transactions/transaction.cpp" +#include "transactions/engine_master.hpp" +#include "transactions/transaction.hpp" class TestClass : public mvcc::Record<TestClass> { public: @@ -52,13 +52,13 @@ class Mvcc : public ::testing::Test { engine.Advance(t1->id_); id1 = t1->id_; v1 = version_list.find(*t1); - t1->Commit(); + engine.Commit(*t1); t2 = engine.Begin(); id2 = t2->id_; } // variable where number of versions is stored int version_list_size = 0; - tx::Engine engine; + tx::MasterEngine engine; tx::Transaction *t1 = engine.Begin(); mvcc::VersionList<TestClass> version_list{*t1, 0, version_list_size}; TestClass *v1 = nullptr; @@ -74,10 +74,10 @@ class Mvcc : public ::testing::Test { #define T4_FIND __attribute__((unused)) auto v4 = version_list.find(*t4) #define T2_UPDATE __attribute__((unused)) auto v2 = version_list.update(*t2) #define T3_UPDATE __attribute__((unused)) auto v3 = version_list.update(*t3) -#define T2_COMMIT t2->Commit(); -#define T3_COMMIT t3->Commit(); -#define T2_ABORT t2->Abort(); -#define T3_ABORT t3->Abort(); +#define T2_COMMIT engine.Commit(*t2); +#define T3_COMMIT engine.Commit(*t3); +#define T2_ABORT engine.Abort(*t2); +#define T3_ABORT engine.Abort(*t3); #define T3_BEGIN \ auto t3 = engine.Begin(); \ __attribute__((unused)) int id3 = t3->id_ diff --git a/tests/unit/mvcc_gc.cpp b/tests/unit/mvcc_gc.cpp index 6b56f0584..c3575fae6 100644 --- a/tests/unit/mvcc_gc.cpp +++ b/tests/unit/mvcc_gc.cpp @@ -12,13 +12,13 @@ #include "mvcc/version_list.hpp" #include "storage/garbage_collector.hpp" #include "storage/vertex.hpp" -#include "transactions/engine.hpp" +#include "transactions/engine_master.hpp" #include "mvcc_gc_common.hpp" class MvccGcTest : public ::testing::Test { protected: - tx::Engine engine; + tx::MasterEngine engine; private: tx::Transaction *t0 = engine.Begin(); @@ -29,16 +29,16 @@ class MvccGcTest : public ::testing::Test { record_destruction_count}; std::vector<tx::Transaction *> transactions{t0}; - void SetUp() override { t0->Commit(); } + void SetUp() override { engine.Commit(*t0); } void MakeUpdates(int update_count, bool commit) { for (int i = 0; i < update_count; i++) { auto t = engine.Begin(); version_list.update(*t); if (commit) - t->Commit(); + engine.Commit(*t); else - t->Abort(); + engine.Abort(*t); } } @@ -50,7 +50,7 @@ class MvccGcTest : public ::testing::Test { TEST_F(MvccGcTest, RemoveAndAbort) { auto t = engine.Begin(); version_list.remove(version_list.find(*t), *t); - t->Abort(); + engine.Abort(*t); auto ret = GcDeleted(); EXPECT_EQ(ret.first, false); EXPECT_EQ(ret.second, nullptr); @@ -74,7 +74,7 @@ TEST_F(MvccGcTest, UpdateAndAbort) { TEST_F(MvccGcTest, RemoveAndCommit) { auto t = engine.Begin(); version_list.remove(version_list.find(*t), *t); - t->Commit(); + engine.Commit(*t); auto ret = GcDeleted(); EXPECT_EQ(ret.first, true); EXPECT_NE(ret.second, nullptr); @@ -102,7 +102,7 @@ TEST_F(MvccGcTest, OldestTransactionSnapshot) { auto t1 = engine.Begin(); auto t2 = engine.Begin(); version_list.remove(version_list.find(*t1), *t1); - t1->Commit(); + engine.Commit(*t1); auto ret = GcDeleted(t2); EXPECT_EQ(ret.first, false); @@ -116,7 +116,7 @@ TEST_F(MvccGcTest, OldestTransactionSnapshot) { */ TEST(GarbageCollector, GcClean) { ConcurrentMap<int64_t, mvcc::VersionList<DestrCountRec> *> collection; - tx::Engine engine; + tx::MasterEngine engine; DeferredDeleter<DestrCountRec> deleter; DeferredDeleter<mvcc::VersionList<DestrCountRec>> vlist_deleter; GarbageCollector<decltype(collection), DestrCountRec> gc(collection, deleter, @@ -129,7 +129,7 @@ TEST(GarbageCollector, GcClean) { new mvcc::VersionList<DestrCountRec>(*t1, 0, record_destruction_count); auto access = collection.access(); access.insert(0, vl); - t1->Commit(); + engine.Commit(*t1); // run garbage collection that has nothing co collect gc.Run(GcSnapshot(engine, nullptr), engine); @@ -140,7 +140,7 @@ TEST(GarbageCollector, GcClean) { // delete the only record in the version-list in transaction t2 auto t2 = engine.Begin(); vl->remove(vl->find(*t2), *t2); - t2->Commit(); + engine.Commit(*t2); gc.Run(GcSnapshot(engine, nullptr), engine); // check that we destroyed the record diff --git a/tests/unit/mvcc_gc_common.hpp b/tests/unit/mvcc_gc_common.hpp index 8defac184..8f6445c25 100644 --- a/tests/unit/mvcc_gc_common.hpp +++ b/tests/unit/mvcc_gc_common.hpp @@ -1,6 +1,7 @@ #pragma once #include "mvcc/record.hpp" +#include "transactions/engine_master.hpp" /** * @brief - Empty class which inherits from mvcc:Record. @@ -28,12 +29,12 @@ class DestrCountRec : public mvcc::Record<DestrCountRec> { // helper function for creating a GC snapshot // if given a nullptr it makes a GC snapshot like there // are no active transactions -auto GcSnapshot(tx::Engine &engine, tx::Transaction *t) { +auto GcSnapshot(tx::MasterEngine &engine, tx::Transaction *t) { if (t != nullptr) { tx::Snapshot gc_snap = t->snapshot(); gc_snap.insert(t->id_); return gc_snap; } else { - return engine.GcSnapshot(); + return engine.GlobalGcSnapshot(); } } diff --git a/tests/unit/transaction_engine.cpp b/tests/unit/transaction_engine.cpp deleted file mode 100644 index 00a3a335a..000000000 --- a/tests/unit/transaction_engine.cpp +++ /dev/null @@ -1,47 +0,0 @@ -#include "gtest/gtest.h" - -#include <vector> - -#include "transactions/engine.hpp" -#include "transactions/transaction.hpp" - -TEST(Engine, GcSnapshot) { - tx::Engine engine; - ASSERT_EQ(engine.GcSnapshot(), tx::Snapshot({1})); - - std::vector<tx::Transaction *> transactions; - // create transactions and check the GC snapshot - for (int i = 0; i < 5; ++i) { - transactions.push_back(engine.Begin()); - EXPECT_EQ(engine.GcSnapshot(), tx::Snapshot({1})); - } - - // commit transactions in the middle, expect - // the GcSnapshot did not change - transactions[1]->Commit(); - EXPECT_EQ(engine.GcSnapshot(), tx::Snapshot({1})); - transactions[2]->Commit(); - EXPECT_EQ(engine.GcSnapshot(), tx::Snapshot({1})); - - // have the first three transactions committed - transactions[0]->Commit(); - EXPECT_EQ(engine.GcSnapshot(), tx::Snapshot({1, 2, 3, 4})); - - // commit all - transactions[3]->Commit(); - transactions[4]->Commit(); - EXPECT_EQ(engine.GcSnapshot(), tx::Snapshot({6})); -} - -TEST(Engine, Advance) { - tx::Engine engine; - - auto t0 = engine.Begin(); - auto t1 = engine.Begin(); - EXPECT_EQ(t0->cid(), 1); - engine.Advance(t0->id_); - EXPECT_EQ(t0->cid(), 2); - engine.Advance(t0->id_); - EXPECT_EQ(t0->cid(), 3); - EXPECT_EQ(t1->cid(), 1); -} diff --git a/tests/unit/transaction_local_engine.cpp b/tests/unit/transaction_local_engine.cpp new file mode 100644 index 000000000..2adfeec47 --- /dev/null +++ b/tests/unit/transaction_local_engine.cpp @@ -0,0 +1,66 @@ +#include "gtest/gtest.h" + +#include <thread> +#include <vector> + +#include "data_structures/concurrent/concurrent_set.hpp" +#include "transactions/engine_master.hpp" +#include "transactions/transaction.hpp" + +TEST(Engine, GcSnapshot) { + tx::MasterEngine engine; + ASSERT_EQ(engine.GlobalGcSnapshot(), tx::Snapshot({1})); + + std::vector<tx::Transaction *> transactions; + // create transactions and check the GC snapshot + for (int i = 0; i < 5; ++i) { + transactions.push_back(engine.Begin()); + EXPECT_EQ(engine.GlobalGcSnapshot(), tx::Snapshot({1})); + } + + // commit transactions in the middle, expect + // the GcSnapshot did not change + engine.Commit(*transactions[1]); + EXPECT_EQ(engine.GlobalGcSnapshot(), tx::Snapshot({1})); + engine.Commit(*transactions[2]); + EXPECT_EQ(engine.GlobalGcSnapshot(), tx::Snapshot({1})); + + // have the first three transactions committed + engine.Commit(*transactions[0]); + EXPECT_EQ(engine.GlobalGcSnapshot(), tx::Snapshot({1, 2, 3, 4})); + + // commit all + engine.Commit(*transactions[3]); + engine.Commit(*transactions[4]); + EXPECT_EQ(engine.GlobalGcSnapshot(), tx::Snapshot({6})); +} + +TEST(Engine, Advance) { + tx::MasterEngine engine; + + auto t0 = engine.Begin(); + auto t1 = engine.Begin(); + EXPECT_EQ(t0->cid(), 1); + engine.Advance(t0->id_); + EXPECT_EQ(t0->cid(), 2); + engine.Advance(t0->id_); + EXPECT_EQ(t0->cid(), 3); + EXPECT_EQ(t1->cid(), 1); +} + +TEST(Engine, ConcurrentBegin) { + tx::MasterEngine engine; + std::vector<std::thread> threads; + ConcurrentSet<tx::transaction_id_t> tx_ids; + for (int i = 0; i < 10; ++i) { + threads.emplace_back( + [&tx_ids, &engine, accessor = tx_ids.access() ]() mutable { + for (int j = 0; j < 100; ++j) { + auto t = engine.Begin(); + accessor.insert(t->id_); + } + }); + } + for (auto &t : threads) t.join(); + EXPECT_EQ(tx_ids.access().size(), 1000); +} diff --git a/tools/src/CMakeLists.txt b/tools/src/CMakeLists.txt index 20c7beb9f..1fd9293fe 100644 --- a/tools/src/CMakeLists.txt +++ b/tools/src/CMakeLists.txt @@ -13,7 +13,8 @@ add_executable(mg_import_csv ${memgraph_src_dir}/storage/property_value.cpp ${memgraph_src_dir}/storage/record_accessor.cpp ${memgraph_src_dir}/storage/vertex_accessor.cpp - ${memgraph_src_dir}/transactions/transaction.cpp + ${memgraph_src_dir}/transactions/engine_master.cpp + ${memgraph_src_dir}/transactions/engine_worker.cpp ) # Strip the executable in release build. diff --git a/tools/tests/CMakeLists.txt b/tools/tests/CMakeLists.txt index c2c963bc1..9853a055c 100644 --- a/tools/tests/CMakeLists.txt +++ b/tools/tests/CMakeLists.txt @@ -16,7 +16,8 @@ add_executable(mg_recovery_check ${memgraph_src_dir}/storage/property_value.cpp ${memgraph_src_dir}/storage/record_accessor.cpp ${memgraph_src_dir}/storage/vertex_accessor.cpp - ${memgraph_src_dir}/transactions/transaction.cpp + ${memgraph_src_dir}/transactions/engine_master.cpp + ${memgraph_src_dir}/transactions/engine_worker.cpp ) target_link_libraries(mg_recovery_check stdc++fs Threads::Threads fmt glog