diff --git a/CMakeLists.txt b/CMakeLists.txt index ae6e6a44a..ecf5e8fec 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -331,7 +331,6 @@ set(memgraph_src_files ${src_dir}/io/network/network_endpoint.cpp ${src_dir}/io/network/socket.cpp ${src_dir}/threading/thread.cpp - ${src_dir}/mvcc/id.cpp ${src_dir}/durability/snapshooter.cpp ${src_dir}/durability/recovery.cpp ${src_dir}/storage/property_value.cpp @@ -339,7 +338,6 @@ set(memgraph_src_files ${src_dir}/storage/record_accessor.cpp ${src_dir}/storage/vertex_accessor.cpp ${src_dir}/storage/edge_accessor.cpp - ${src_dir}/transactions/snapshot.cpp ${src_dir}/transactions/transaction.cpp ${src_dir}/template_engine/engine.cpp ${src_dir}/logging/streams/stdout.cpp diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index 8462c2c01..4f760f63c 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -34,25 +34,29 @@ GraphDb::GraphDb(const std::string &name, const fs::path &snapshot_db_dir) gc_scheduler_.Run(std::chrono::seconds(FLAGS_gc_cycle_sec), [this]() { // main garbage collection logic // see wiki documentation for logic explanation - const auto next_id = this->tx_engine.count() + 1; - const auto id = this->tx_engine.oldest_active().get_or(next_id); + const auto snapshot = this->tx_engine.GcSnapshot(); { // This can be run concurrently - this->gc_vertices_.Run(id, this->tx_engine); - this->gc_edges_.Run(id, this->tx_engine); + this->gc_vertices_.Run(snapshot, this->tx_engine); + this->gc_edges_.Run(snapshot, this->tx_engine); } // This has to be run sequentially after gc because gc modifies // version_lists and changes the oldest visible record, on which Refresh // depends. { // This can be run concurrently - this->labels_index_.Refresh(id, this->tx_engine); - this->edge_types_index_.Refresh(id, this->tx_engine); + this->labels_index_.Refresh(snapshot, this->tx_engine); + this->edge_types_index_.Refresh(snapshot, this->tx_engine); } - this->edge_record_deleter_.FreeExpiredObjects(id); - this->vertex_record_deleter_.FreeExpiredObjects(id); - this->edge_version_list_deleter_.FreeExpiredObjects(id); - this->vertex_version_list_deleter_.FreeExpiredObjects(id); + // we free expired objects with snapshot.back(), which is + // the ID of the oldest active transaction (or next active, if there + // are no currently active). that's legal because that was the + // last possible transaction that could have obtained pointers + // to those records + this->edge_record_deleter_.FreeExpiredObjects(snapshot.back()); + this->vertex_record_deleter_.FreeExpiredObjects(snapshot.back()); + this->edge_version_list_deleter_.FreeExpiredObjects(snapshot.back()); + this->vertex_version_list_deleter_.FreeExpiredObjects(snapshot.back()); }); } @@ -122,8 +126,9 @@ GraphDb::~GraphDb() { for (auto &edge : this->edges_.access()) delete edge; // Free expired records with the maximal possible id from all the deleters. - this->edge_record_deleter_.FreeExpiredObjects(Id::MaximalId()); - this->vertex_record_deleter_.FreeExpiredObjects(Id::MaximalId()); - this->edge_version_list_deleter_.FreeExpiredObjects(Id::MaximalId()); - this->vertex_version_list_deleter_.FreeExpiredObjects(Id::MaximalId()); + this->edge_record_deleter_.FreeExpiredObjects(tx::Transaction::MaxId()); + this->vertex_record_deleter_.FreeExpiredObjects(tx::Transaction::MaxId()); + this->edge_version_list_deleter_.FreeExpiredObjects(tx::Transaction::MaxId()); + this->vertex_version_list_deleter_.FreeExpiredObjects( + tx::Transaction::MaxId()); } diff --git a/src/database/graph_db.hpp b/src/database/graph_db.hpp index 4adc6df1f..b03150e95 100644 --- a/src/database/graph_db.hpp +++ b/src/database/graph_db.hpp @@ -67,10 +67,6 @@ class GraphDb : public Loggable { /** transaction engine related to this database */ tx::Engine tx_engine; - /** garbage collector related to this database*/ - // TODO bring back garbage collection - // Garbage garbage = {tx_engine}; - // database name // TODO consider if this is even necessary const std::string name_; diff --git a/src/database/graph_db_accessor.cpp b/src/database/graph_db_accessor.cpp index 53618f62e..cb2bf560f 100644 --- a/src/database/graph_db_accessor.cpp +++ b/src/database/graph_db_accessor.cpp @@ -8,7 +8,7 @@ #include "utils/assert.hpp" GraphDbAccessor::GraphDbAccessor(GraphDb &db) - : db_(db), transaction_(db.tx_engine.begin()) {} + : db_(db), transaction_(db.tx_engine.Begin()) {} GraphDbAccessor::~GraphDbAccessor() { if (!commited_ && !aborted_) { @@ -19,20 +19,20 @@ GraphDbAccessor::~GraphDbAccessor() { const std::string &GraphDbAccessor::name() const { return db_.name_; } void GraphDbAccessor::advance_command() { - transaction_->engine.advance(transaction_->id); + transaction_->engine_.Advance(transaction_->id_); } void GraphDbAccessor::commit() { debug_assert(!commited_ && !aborted_, "Already aborted or commited transaction."); - transaction_->commit(); + transaction_->Commit(); commited_ = true; } void GraphDbAccessor::abort() { debug_assert(!commited_ && !aborted_, "Already aborted or commited transaction."); - transaction_->abort(); + transaction_->Abort(); aborted_ = true; } diff --git a/src/database/graph_db_accessor.hpp b/src/database/graph_db_accessor.hpp index 979853b3e..020d5845c 100644 --- a/src/database/graph_db_accessor.hpp +++ b/src/database/graph_db_accessor.hpp @@ -261,12 +261,19 @@ class GraphDbAccessor { // index automatically, but we still have to add to index everything that // happened earlier. We have to first wait for every transaction that // happend before, or a bit later than CreateIndex to end. - auto wait_transaction = db_.tx_engine.begin(); - wait_transaction->wait_for_active_except(transaction_->id); - wait_transaction->commit(); + { + auto wait_transaction = db_.tx_engine.Begin(); + for (auto id : wait_transaction->snapshot()) { + if (id == transaction_->id_) continue; + while (wait_transaction->engine_.clog_.fetch_info(id).is_active()) + // TODO reconsider this constant, currently rule-of-thumb chosen + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + wait_transaction->Commit(); + } // This transaction surely sees everything that happened before CreateIndex. - auto transaction = db_.tx_engine.begin(); + auto transaction = db_.tx_engine.Begin(); for (auto vertex_vlist : db_.vertices_.access()) { auto vertex_record = vertex_vlist->find(*transaction); @@ -277,7 +284,7 @@ class GraphDbAccessor { } // Commit transaction as we finished applying method on newest visible // records. - transaction->commit(); + transaction->Commit(); // After these two operations we are certain that everything is contained in // the index under the assumption that this transaction contained no // vertex/edge insert/update before this method was invoked. diff --git a/src/database/indexes/index_common.hpp b/src/database/indexes/index_common.hpp index 24a8d4549..3fe00581d 100644 --- a/src/database/indexes/index_common.hpp +++ b/src/database/indexes/index_common.hpp @@ -116,12 +116,13 @@ static auto GetVlists( /** * @brief - Removes from the index all entries for which records don't contain * the given label/edge type/label + property anymore. Also update (remove) - * all record which are not visible for any transaction with an id larger or - * equal to `id`. This method assumes that the MVCC GC has been run with the - * same 'id'. + * all records which are not visible for any transaction in the given + * 'snapshot'. This method assumes that the MVCC GC has been run with the + * same 'snapshot'. + * * @param indices - map of index entries (TIndexKey, skiplist) - * @param id - oldest active id, safe to remove everything deleted before this - * id. + * @param snapshot - the GC snapshot. Consists of the oldest active + * transaction's snapshot, with that transaction's id appened as last. * @param engine - transaction engine to see which records are commited * @param exists - function which checks 'key' and 'entry' if the entry still * contains required properties (key + optional value (in case of label_property @@ -132,45 +133,42 @@ static auto GetVlists( */ template static void Refresh( - ConcurrentMap *> &indices, const Id &id, - tx::Engine &engine, + ConcurrentMap *> &indices, + const tx::Snapshot &snapshot, tx::Engine &engine, const std::function exists) { + // iterate over all the indices for (auto &key_indices_pair : indices.access()) { + // iterate over index entries auto indices_entries_accessor = key_indices_pair.second->access(); for (auto indices_entry : indices_entries_accessor) { - // Remove it from index if it's deleted before the current id, or it - // doesn't have that label/edge_type/label+property anymore. We need to - // be careful when we are deleting the record which is not visible - // anymore because even though that record is not visible, some other, - // newer record could be visible, and might contain the label, and might - // not be in the index - that's why we can't remove it completely, but - // just update it's record value. - // We also need to be extra careful when checking for existance of - // label/edge_type because that record could still be under the update - // operation and as such could be modified while we are reading the - // label/edge_type - that's why we have to wait for it's creation id to - // be lower than ours `id` as that means it's surely either commited or - // aborted as we always refresh with the oldest active id. - if (indices_entry.record_->is_not_visible_from(id, engine)) { - // We first have to insert and then remove, because otherwise we might - // have a situation where there exists a vlist with the record - // containg the label/edge_type/label+property but it's not in our - // index. - // Get the oldest not deleted version for current 'id' (MVCC GC takes - // care of this.) + if (indices_entry.record_->is_not_visible_from(snapshot, engine)) { + // be careful when deleting the record which is not visible anymore. + // it's newer copy could be visible, and might still logically belong to + // index (it satisfies the `exists` function). that's why we can't just + // remove the index entry, but also re-insert the oldest visible record + // to the index. if that record does not satisfy `exists`, it will be + // cleaned up in the next Refresh first insert and then remove, + // otherwise there is a timeframe during which the record is not present + // in the index auto new_record = indices_entry.vlist_->Oldest(); if (new_record != nullptr) indices_entries_accessor.insert( TIndexEntry(indices_entry, new_record)); auto success = indices_entries_accessor.remove(indices_entry); - debug_assert(success == true, "Not able to delete entry."); - } else if (indices_entry.record_->tx.cre() < id && - !exists(key_indices_pair.first, indices_entry)) { - // Since id is the oldest active id, if the record has been created - // before it we are sure that it won't be modified anymore and that - // the creating transaction finished, that's why it's safe to check - // it, and potentially remove it from index. + debug_assert(success, "Unable to delete entry."); + } + + // if the record is still visible, + // check if it satisfies the `exists` function. if not + // it does not belong in index anymore. + // be careful when using the `exists` function + // because it's creator transaction could still be modifying it, + // and modify+read is not thread-safe. for that reason we need to + // first see if the the transaction that created it has ended + // (tx.cre() < oldest active trancsation). + else if (indices_entry.record_->tx.cre() < snapshot.back() && + !exists(key_indices_pair.first, indices_entry)) { indices_entries_accessor.remove(indices_entry); } } diff --git a/src/database/indexes/key_index.hpp b/src/database/indexes/key_index.hpp index 5dabbd67c..8d44869f5 100644 --- a/src/database/indexes/key_index.hpp +++ b/src/database/indexes/key_index.hpp @@ -83,13 +83,15 @@ class KeyIndex { * @brief - Removes from the index all entries for which records don't contain * the given label anymore. Update all record which are not visible for any * transaction with an id larger or equal to `id`. - * @param id - oldest active id, safe to remove everything deleted before this - * id. + * + * @param snapshot - the GC snapshot. Consists of the oldest active + * transaction's snapshot, with that transaction's id appened as last. * @param engine - transaction engine to see which records are commited */ - void Refresh(const Id &id, tx::Engine &engine) { + void Refresh(const tx::Snapshot &snapshot, tx::Engine &engine) { return IndexUtils::Refresh( - indices_, id, engine, [](const TKey &key, const IndexEntry &entry) { + indices_, snapshot, engine, + [](const TKey &key, const IndexEntry &entry) { return KeyIndex::Exists(key, entry.record_); }); } diff --git a/src/database/indexes/label_property_index.hpp b/src/database/indexes/label_property_index.hpp index 13db27666..ca8c1849f 100644 --- a/src/database/indexes/label_property_index.hpp +++ b/src/database/indexes/label_property_index.hpp @@ -243,12 +243,13 @@ class LabelPropertyIndex { * @brief - Removes from the index all entries for which records don't contain * the given label anymore, or the record was deleted before this transaction * id. - * @param id - oldest active id, safe to remove everything deleted before this - * id. + * + * @param snapshot - the GC snapshot. Consists of the oldest active + * transaction's snapshot, with that transaction's id appened as last. */ - void Refresh(const Id &id, tx::Engine &engine) { + void Refresh(const tx::Snapshot &snapshot, tx::Engine &engine) { return IndexUtils::Refresh( - indices_, id, engine, [](const Key &key, const IndexEntry &entry) { + indices_, snapshot, engine, [](const Key &key, const IndexEntry &entry) { return LabelPropertyIndex::Exists(key, entry.value_, entry.record_); }); } diff --git a/src/mvcc/id.cpp b/src/mvcc/id.cpp deleted file mode 100644 index 9f32d5186..000000000 --- a/src/mvcc/id.cpp +++ /dev/null @@ -1,13 +0,0 @@ -#include "mvcc/id.hpp" - -Id::Id(uint64_t id) : id(id) {} - -bool operator<(const Id& a, const Id& b) { return a.id < b.id; } - -bool operator==(const Id& a, const Id& b) { return a.id == b.id; } - -std::ostream& operator<<(std::ostream& stream, const Id& id) { - return stream << id.id; -} - -Id::operator uint64_t() const { return id; } diff --git a/src/mvcc/id.hpp b/src/mvcc/id.hpp deleted file mode 100644 index 91cb93ce0..000000000 --- a/src/mvcc/id.hpp +++ /dev/null @@ -1,32 +0,0 @@ -#pragma once - -#include -#include -#include - -#include "utils/total_ordering.hpp" - -class Id : public TotalOrdering { - public: - Id() = default; - - Id(uint64_t id); - - friend bool operator<(const Id &a, const Id &b); - - friend bool operator==(const Id &a, const Id &b); - - friend std::ostream &operator<<(std::ostream &stream, const Id &id); - - operator uint64_t() const; - - /** - * @brief - Return maximal possible Id. - */ - static const Id MaximalId() { - return Id(std::numeric_limits::max()); - } - - private: - uint64_t id{0}; -}; diff --git a/src/mvcc/record.hpp b/src/mvcc/record.hpp index 6b0b200ff..0ef44f77b 100644 --- a/src/mvcc/record.hpp +++ b/src/mvcc/record.hpp @@ -9,7 +9,6 @@ #include "mvcc/cre_exp.hpp" #include "mvcc/hints.hpp" -#include "mvcc/id.hpp" #include "mvcc/version.hpp" #include "storage/locking/record_lock.hpp" @@ -30,19 +29,19 @@ class Record : public Version { // TODO maybe disable the copy-constructor and instead use a // data variable in the version_list update() function (and similar) // like it was in Dominik's implementation - Record(const Record &other) {} + Record(const Record &) {} // tx.cre is the id of the transaction that created the record // and tx.exp is the id of the transaction that deleted the record // these values are used to determine the visibility of the record // to the current transaction - CreExp tx; + CreExp tx; // cmd.cre is the id of the command in this transaction that created the // record and cmd.exp is the id of the command in this transaction that // deleted the record. these values are used to determine the visibility // of the record to the current command in the running transaction - CreExp cmd; + CreExp cmd; Hints hints; @@ -52,67 +51,86 @@ class Record : public Version { // check if this record is visible to the transaction t bool visible(const tx::Transaction &t) { - // TODO check if the record was created by a transaction that has been - // aborted. one might implement this by checking the hints in mvcc - // anc/or consulting the commit log - // Mike Olson says 17 march 1993: the tests in this routine are correct; // if you think they're not, you're wrong, and you should think about it // again. i know, it happened to me. - return ((tx.cre() == t.id && // inserted by the current transaction - cmd.cre() < t.cid && // before this command, and - (tx.exp() == Id(0) || // the row has not been deleted, or - (tx.exp() == t.id && // it was deleted by the current + // fetch expiration info in a safe way (see fetch_exp for details) + tx::transaction_id_t tx_exp; + tx::command_id_t cmd_exp; + std::tie(tx_exp, cmd_exp) = fetch_exp(); + + return ((tx.cre() == t.id_ && // inserted by the current transaction + cmd.cre() < t.cid() && // before this command, and + (tx_exp == 0 || // the row has not been deleted, or + (tx_exp == t.id_ && // it was deleted by the current // transaction - cmd.exp() >= t.cid))) // but not before this command, + cmd_exp >= t.cid()))) // but not before this command, || // or (cre_committed(tx.cre(), t) && // the record was inserted by a // committed transaction, and - (tx.exp() == Id(0) || // the record has not been deleted, or - (tx.exp() == t.id && // the row is being deleted by this - // transaction - cmd.exp() >= t.cid) || // but it's not deleted "yet", or - (tx.exp() != t.id && // the row was deleted by another - // transaction - !exp_committed(tx.exp(), t) // that has not been committed + (tx_exp == 0 || // the record has not been deleted, or + (tx_exp == t.id_ && // the row is being deleted by this + // transaction + cmd_exp >= t.cid()) || // but it's not deleted "yet", or + (tx_exp != t.id_ && // the row was deleted by another + // transaction + !exp_committed(tx_exp, t) // that has not been committed )))); } void mark_created(const tx::Transaction &t) { - debug_assert(tx.cre() == Id(0), "Marking node as created twice."); - tx.cre(t.id); - cmd.cre(t.cid); + debug_assert(tx.cre() == 0, "Marking node as created twice."); + tx.cre(t.id_); + cmd.cre(t.cid()); } void mark_deleted(const tx::Transaction &t) { - if (tx.exp() != Id(0)) hints.exp.clear(); - tx.exp(t.id); - cmd.exp(t.cid); + if (tx.exp() != 0) hints.exp.clear(); + tx.exp(t.id_); + cmd.exp(t.cid()); } - bool exp_committed(const Id &id, const tx::Transaction &t) { + bool exp_committed(tx::transaction_id_t id, const tx::Transaction &t) { return committed(hints.exp, id, t); } - bool exp_committed(const tx::Transaction &t) { - return committed(hints.exp, tx.exp(), t.engine); + bool exp_committed(tx::Engine &engine) { + return committed(hints.exp, tx.exp(), engine); } - bool cre_committed(const Id &id, const tx::Transaction &t) { + bool cre_committed(tx::transaction_id_t id, const tx::Transaction &t) { return committed(hints.cre, id, t); } - bool cre_committed(const tx::Transaction &t) { - return committed(hints.cre, tx.cre(), t); - } + /** + * Check if this record is visible w.r.t. to the given garbage collection + * snapshot. See source comments for exact logic. + * + * @param snapshot - the GC snapshot. Consists of the oldest active + * transaction's snapshot, with that transaction's id appened as last. + */ + bool is_not_visible_from(const tx::Snapshot &snapshot, + tx::Engine &engine) const { + // first get tx.exp so that all the subsequent checks operate on + // the same id. otherwise there could be a race condition + auto exp_id = tx.exp(); - // Record won'te be visible to any transaction after or at `id` if it was - // expired before id or it's creation was aborted. - bool is_not_visible_from(const Id &id, tx::Engine &engine) const { - return (tx.exp() != Id(0) && tx.exp() < id && - engine.clog.is_committed(tx.exp())) || - engine.clog.is_aborted(tx.cre()); + // a record is NOT visible if: + // 1. it creating transaction aborted (last check) + // OR + // 2. a) it's expiration is not 0 (some transaction expired it) + // AND + // b) the expiring transaction is older than latest active + // AND + // c) that transaction committed (as opposed to aborted) + // AND + // d) that transaction is not in oldest active transaction's + // snapshot (consequently also not in the snapshots of + // newer transactions) + return (exp_id != 0 && exp_id < snapshot.back() && + engine.clog_.is_committed(exp_id) && !snapshot.contains(exp_id)) || + engine.clog_.is_aborted(tx.cre()); } // TODO: Test this @@ -122,12 +140,17 @@ class Record : public Version { // OR DURING this command. this is done to support cypher's // queries which can match, update and return in the same query bool is_visible_write(const tx::Transaction &t) { - return (tx.cre() == t.id && // inserted by the current transaction - cmd.cre() <= t.cid && // before OR DURING this command, and - (tx.exp() == Id(0) || // the row has not been deleted, or - (tx.exp() == t.id && // it was deleted by the current + // fetch expiration info in a safe way (see fetch_exp for details) + tx::transaction_id_t tx_exp; + tx::command_id_t cmd_exp; + std::tie(tx_exp, cmd_exp) = fetch_exp(); + + return (tx.cre() == t.id_ && // inserted by the current transaction + cmd.cre() <= t.cid() && // before OR DURING this command, and + (tx_exp == 0 || // the row has not been deleted, or + (tx_exp == t.id_ && // it was deleted by the current // transaction - cmd.exp() >= t.cid))); // but not before this command, + cmd_exp >= t.cid()))); // but not before this command, } /** @@ -135,7 +158,7 @@ class Record : public Version { * of the given transaction. */ bool is_created_by(const tx::Transaction &t) { - return tx.cre() == t.id && cmd.cre() == t.cid; + return tx.cre() == t.id_ && cmd.cre() == t.cid(); } /** @@ -143,22 +166,38 @@ class Record : public Version { * of the given transaction. */ bool is_deleted_by(const tx::Transaction &t) { - return tx.exp() == t.id && cmd.exp() == t.cid; + return tx.exp() == t.id_ && cmd.exp() == t.cid(); + } + + private: + /** Fetch the (transaction, command) expiration before the check + * because they can be concurrently modified by multiple transactions. + * Do it in a loop to ensure that command is consistent with transaction. + */ + auto fetch_exp() { + tx::transaction_id_t tx_exp; + tx::command_id_t cmd_exp; + do { + tx_exp = tx.exp(); + cmd_exp = cmd.exp(); + } while (tx_exp != tx.exp()); + return std::make_pair(tx_exp, cmd_exp); } - protected: /** - * @brief - Check if the id is commited from the perspective of transactio, - * i.e. transaction can see the transaction with that id (it happened before - * the transaction, and is not in the snapshot). This method is used to test - * for visibility of some record. + * @brief - Check if the transaction with the given `id` + * is commited from the perspective of transaction `t`. + * + * Evaluates to true if that transaction has committed, + * it started before `t` and it's not in it's snapshot. + * * @param hints - hints to use to determine commit/abort * about transactions commit/abort status * @param id - id to check if it's commited and visible * @return true if the id is commited and visible for the transaction t. */ template - bool committed(U &hints, const Id &id, const tx::Transaction &t) { + bool committed(U &hints, tx::transaction_id_t id, const tx::Transaction &t) { // Dominik Gleich says 4 april 2017: the tests in this routine are correct; // if you think they're not, you're wrong, and you should think about it // again. I know, it happened to me (and also to Matej Gradicek). @@ -166,47 +205,47 @@ class Record : public Version { // You certainly can't see the transaction with id greater than yours as // that means it started after this transaction and if it commited, it // commited after this transaction has started. - if (id >= t.id) return false; + if (id >= t.id_) return false; // The creating transaction is still in progress (examine snapshot) - if (t.in_snapshot(id)) return false; + if (t.snapshot().contains(id)) return false; - auto hint_bits = hints.load(); - - // TODO: Validate if this position is valid for next if. - // if hints are set, return if xid is committed - if (!hint_bits.is_unknown()) return hint_bits.is_committed(); - - // if hints are not set: - // - you are the first one to check since it ended, consult commit log - auto info = t.engine.clog.fetch_info(id); - - if (info.is_committed()) return hints.set_committed(), true; - - debug_assert(info.is_aborted(), - "Info isn't aborted, but function would return as aborted."); - return hints.set_aborted(), false; + return committed(hints, id, t.engine_); } /** - * @brief - Check if the id is commited. + * @brief - Check if the transaction with the given `id` + * is committed. + * * @param hints - hints to use to determine commit/abort * @param id - id to check if commited - * @param engine - engine instance with information about transactions + * @param engine - engine instance with information about transaction * statuses * @return true if it's commited, false otherwise */ template - bool committed(U &hints, const Id &id, tx::Engine &engine) { + bool committed(U &hints, tx::transaction_id_t id, tx::Engine &engine) { auto hint_bits = hints.load(); - // if hints are set, return if xid is committed + // if hints are set, return if id is committed if (!hint_bits.is_unknown()) return hint_bits.is_committed(); - // if hints are not set: - // - you are the first one to check since it ended, consult commit log - auto info = engine.clog.fetch_info(id); + // if hints are not set consult the commit log + auto info = engine.clog_.fetch_info(id); + + // committed if (info.is_committed()) return hints.set_committed(), true; - if (info.is_aborted()) return hints.set_aborted(), false; + + // we can't set_aborted hints because of a race-condition that + // can occurr when tx.exp gets changed by some transaction. + // to be correct, tx.exp and hints.exp.set_aborted should be + // atomic. + // + // this is not a problem with hints.cre.X because + // only one transaction ever creates a record + // + // it's also not a problem with hints.exp.set_committed + // because only one transaction ever can expire a record + // and commit return false; } }; diff --git a/src/mvcc/version_list.hpp b/src/mvcc/version_list.hpp index 2c5de38bc..ad6103d10 100644 --- a/src/mvcc/version_list.hpp +++ b/src/mvcc/version_list.hpp @@ -72,19 +72,25 @@ class VersionList { } /** - * This method is NOT thread-safe. This should never be called with a - * transaction id newer than the oldest active transaction id. - * Re-links all records which are no longer visible for any - * transaction with an id greater or equal to id. - * @param id - transaction id from which to start garbage collection - * that is not visible anymore. If none exists to_delete will rbecome nullptr. + * Garbage collects records that are not reachable/visible anymore. + * + * Relinks this version-list so that garbage collected records are no + * longer reachable through this version list. + * Visibility is defined in mvcc::Record::is_not_visible_from, + * to which the given `snapshot` is passed. + * + * This method is NOT thread-safe. + * + * @param snapshot - the GC snapshot. Consists of the oldest active + * transaction's snapshot, with that transaction's id appened as last. * @param engine - transaction engine to use - we need it to check which * records were commited and which werent * @return pair; status is true - If version list is empty * after garbage collection. to_delete points to the newest record that is not * visible anymore. If none exists to_delete will point to nullptr. */ - std::pair GcDeleted(const Id &id, tx::Engine &engine) { + std::pair GcDeleted(const tx::Snapshot &snapshot, + tx::Engine &engine) { // nullptr // | // [v1] ... all of this gets deleted! @@ -100,7 +106,7 @@ class VersionList { T *head_of_deletable_records = current; T *oldest_visible_record = nullptr; while (current) { - if (!current->is_not_visible_from(id, engine)) + if (!current->is_not_visible_from(snapshot, engine)) oldest_visible_record = current; current = current->next(); } @@ -242,15 +248,13 @@ class VersionList { "Record is nullptr on lock and validation."); // take a lock on this node - t.take_lock(lock); + t.TakeLock(lock); // if the record hasn't been deleted yet or the deleting transaction // has aborted, it's ok to modify it - if (!record->tx.exp() || !record->exp_committed(t)) return; + if (!record->tx.exp() || !record->exp_committed(t.engine_)) return; // if it committed, then we have a serialization conflict - debug_assert(record->hints.load().exp.is_committed(), - "Serialization conflict."); throw SerializationError(); } diff --git a/src/storage/deferred_deleter.hpp b/src/storage/deferred_deleter.hpp index 337ca9e29..a11602cc0 100644 --- a/src/storage/deferred_deleter.hpp +++ b/src/storage/deferred_deleter.hpp @@ -4,8 +4,8 @@ #include -#include "mvcc/id.hpp" #include "mvcc/record.hpp" +#include "transactions/transaction.hpp" #include "utils/assert.hpp" /** @@ -31,7 +31,8 @@ class DeferredDeleter { * @param last_transaction - nothing newer or equal to it can see these * objects */ - void AddObjects(const std::vector &objects, const Id &last_transaction) { + void AddObjects(const std::vector &objects, + tx::transaction_id_t last_transaction) { debug_assert( objects_.size() == 0 || objects_.back().deleted_at <= last_transaction, "Transaction ids are not non-decreasing."); @@ -43,7 +44,7 @@ class DeferredDeleter { * @brief - Free memory of objects deleted before the id. * @param id - delete before this id */ - void FreeExpiredObjects(const Id &id) { + void FreeExpiredObjects(tx::transaction_id_t id) { auto it = objects_.begin(); while (it != objects_.end() && it->deleted_at < id) { delete it->object; @@ -69,8 +70,8 @@ class DeferredDeleter { */ struct DeletedObject { const T *object; - const Id deleted_at; - DeletedObject(T *object, const Id &deleted_at) + const tx::transaction_id_t deleted_at; + DeletedObject(T *object, tx::transaction_id_t deleted_at) : object(object), deleted_at(deleted_at) {} }; diff --git a/src/storage/garbage/delete_sensitive.hpp b/src/storage/garbage/delete_sensitive.hpp deleted file mode 100644 index 1ba8b136f..000000000 --- a/src/storage/garbage/delete_sensitive.hpp +++ /dev/null @@ -1,8 +0,0 @@ -#pragma once - -// Base class for all classes which need to be safely disposed. Main usage is -// for garbage class operations. -class DeleteSensitive { - public: - virtual ~DeleteSensitive() {} -}; diff --git a/src/storage/garbage/garbage.cpp b/src/storage/garbage/garbage.cpp deleted file mode 100644 index 442031f69..000000000 --- a/src/storage/garbage/garbage.cpp +++ /dev/null @@ -1,16 +0,0 @@ -#include "storage/garbage/garbage.hpp" - -void Garbage::dispose(tx::Snapshot &&snapshot, DeleteSensitive *data) { - // If this fails it's better to leak memory than to cause read after free. - gar.begin().push(std::make_pair(snapshot, data)); -} - -void Garbage::clean() { - for (auto it = gar.begin(); it != gar.end(); it++) { - if (it->first.all_finished(engine) && it.remove()) { - // All transactions who could have seen data are finished and this - // thread successfull removed item from list. - it->second->~DeleteSensitive(); - } - } -} diff --git a/src/storage/garbage/garbage.hpp b/src/storage/garbage/garbage.hpp deleted file mode 100644 index d1f7999fd..000000000 --- a/src/storage/garbage/garbage.hpp +++ /dev/null @@ -1,29 +0,0 @@ -#pragma once - -#include "data_structures/concurrent/concurrent_list.hpp" -#include "mvcc/id.hpp" -#include "storage/garbage/delete_sensitive.hpp" -#include "transactions/snapshot.hpp" - -namespace tx { -class Engine; -} - -// Collection of delete sensitive data which need to be safely deleted. That -// meens that all transactions that may have pointer to it must finish before -// the sensitive data can be safely destroyed. -class Garbage { - public: - Garbage(tx::Engine &e) : engine(e) {} - - // Will safely dispose of data. - void dispose(tx::Snapshot &&snapshot, DeleteSensitive *data); - - // Cleaner thread should call this method every some time. Removes data - // which is safe to be deleted. - void clean(); - - private: - ConcurrentList, DeleteSensitive *>> gar; - tx::Engine &engine; -}; diff --git a/src/storage/garbage_collector.hpp b/src/storage/garbage_collector.hpp index e77569947..4d901f181 100644 --- a/src/storage/garbage_collector.hpp +++ b/src/storage/garbage_collector.hpp @@ -2,7 +2,6 @@ #include "data_structures/concurrent/skiplist.hpp" #include "logging/loggable.hpp" -#include "mvcc/id.hpp" #include "mvcc/version_list.hpp" #include "storage/deferred_deleter.hpp" #include "transactions/engine.hpp" @@ -25,20 +24,22 @@ class GarbageCollector : public Loggable { /** * @brief - Runs garbage collector. Populates deferred deleters with version * lists and records. - * @param id - oldest active transaction id + * + * @param snapshot - the GC snapshot. Consists of the oldest active + * transaction's snapshot, with that transaction's id appened as last. * @param engine - reference to engine object */ - void Run(const Id &id, tx::Engine &engine) { + void Run(const tx::Snapshot &snapshot, tx::Engine &engine) { auto collection_accessor = this->skiplist_.access(); uint64_t count = 0; std::vector deleted_records; std::vector *> deleted_version_lists; if (logger.Initialized()) - logger.trace("Gc started cleaning everything deleted before {}", id); + logger.trace("GC started cleaning with snapshot: ", snapshot); for (auto version_list : collection_accessor) { // If the version_list is empty, i.e. there is nothing else to be read // from it we can delete it. - auto ret = version_list->GcDeleted(id, engine); + auto ret = version_list->GcDeleted(snapshot, engine); if (ret.first) { deleted_version_lists.push_back(version_list); count += collection_accessor.remove(version_list); @@ -49,10 +50,10 @@ class GarbageCollector : public Loggable { // Add records to deleter, with the id larger or equal than the last active // transaction. - record_deleter_.AddObjects(deleted_records, engine.count()); + record_deleter_.AddObjects(deleted_records, engine.Count()); // Add version_lists to deleter, with the id larger or equal than the last // active transaction. - version_list_deleter_.AddObjects(deleted_version_lists, engine.count()); + version_list_deleter_.AddObjects(deleted_version_lists, engine.Count()); } private: diff --git a/src/storage/locking/record_lock.cpp b/src/storage/locking/record_lock.cpp index bc7a56d12..b4cd8c6ed 100644 --- a/src/storage/locking/record_lock.cpp +++ b/src/storage/locking/record_lock.cpp @@ -2,7 +2,7 @@ void RecordLock::lock() { mutex.lock(&timeout); } -LockStatus RecordLock::lock(const Id& id) { +LockStatus RecordLock::lock(tx::transaction_id_t id) { if (mutex.try_lock()) return owner = id, LockStatus::Acquired; if (owner == id) return LockStatus::AlreadyHeld; @@ -11,9 +11,7 @@ LockStatus RecordLock::lock(const Id& id) { } void RecordLock::unlock() { - owner = INVALID; mutex.unlock(); } constexpr struct timespec RecordLock::timeout; -constexpr Id RecordLock::INVALID; diff --git a/src/storage/locking/record_lock.hpp b/src/storage/locking/record_lock.hpp index 4427236f7..3b1ab8d3e 100644 --- a/src/storage/locking/record_lock.hpp +++ b/src/storage/locking/record_lock.hpp @@ -1,19 +1,19 @@ #pragma once -#include "mvcc/id.hpp" +#include "transactions/type.hpp" #include "storage/locking/lock_status.hpp" #include "threading/sync/futex.hpp" class RecordLock { + // TODO arbitrary constant, reconsider static constexpr struct timespec timeout { 2, 0 }; - static constexpr Id INVALID = Id(); public: - LockStatus lock(const Id& id); + LockStatus lock(tx::transaction_id_t id); void lock(); void unlock(); private: Futex mutex; - Id owner; + tx::transaction_id_t owner; }; diff --git a/src/transactions/commit_log.hpp b/src/transactions/commit_log.hpp index d46e60f17..0a98b92a4 100644 --- a/src/transactions/commit_log.hpp +++ b/src/transactions/commit_log.hpp @@ -1,7 +1,7 @@ #pragma once #include "data_structures/bitset/dynamic_bitset.hpp" -#include "mvcc/id.hpp" +#include "type.hpp" namespace tx { @@ -31,17 +31,19 @@ class CommitLog { CommitLog operator=(CommitLog) = delete; - Info fetch_info(const Id &id) { return Info{log.at(2 * id, 2)}; } + Info fetch_info(transaction_id_t id) { return Info{log.at(2 * id, 2)}; } - bool is_active(const Id &id) { return fetch_info(id).is_active(); } + bool is_active(transaction_id_t id) { return fetch_info(id).is_active(); } - bool is_committed(const Id &id) { return fetch_info(id).is_committed(); } + bool is_committed(transaction_id_t id) { + return fetch_info(id).is_committed(); + } - void set_committed(const Id &id) { log.set(2 * id); } + void set_committed(transaction_id_t id) { log.set(2 * id); } - bool is_aborted(const Id &id) { return fetch_info(id).is_aborted(); } + bool is_aborted(transaction_id_t id) { return fetch_info(id).is_aborted(); } - void set_aborted(const Id &id) { log.set(2 * id + 1); } + void set_aborted(transaction_id_t id) { log.set(2 * id + 1); } private: // TODO: Searching the log will take more and more time the more and more diff --git a/src/transactions/engine.hpp b/src/transactions/engine.hpp index c03da8561..fd2666725 100644 --- a/src/transactions/engine.hpp +++ b/src/transactions/engine.hpp @@ -14,123 +14,153 @@ namespace tx { +/** Indicates an error in transaction handling (currently + * only command id overflow). */ class TransactionError : public utils::BasicException { public: using utils::BasicException::BasicException; }; -// max value that could be stored as a command id -static constexpr auto kMaxCommandId = - std::numeric_limits().cid)>::max(); - +/** Database transaction egine. + * + * Used for managing transactions and the related information + * such as transaction snapshots and the commit log. + */ class Engine : Lockable { + // limit for the command id, used for checking if we're about + // to overflow. slightly unneccessary since command id should + // be a 64-bit int + static constexpr auto kMaxCommandId = + std::numeric_limits().cid())>::max(); + public: - using sptr = std::shared_ptr; - - Engine() : counter(0) {} - - // Begins transaction and runs given functions in same atomic step. - // Functions will be given Transaction& - template - Transaction *begin(F... fun) { + /** Begins a transaction and returns a pointer to + * it's object. + * + * The transaction object is owned by this engine. + * It will be released when the transaction gets + * committted or aborted. + */ + Transaction *Begin() { auto guard = this->acquire_unique(); - auto id = Id(counter.next()); - auto t = new Transaction(id, active, *this); + transaction_id_t id{counter_.next()}; + auto t = new Transaction(id, active_, *this); - active.insert(id); - store.put(id, t); - - call(*t, fun...); + active_.insert(id); + store_.put(id, t); return t; } - Transaction &advance(const Id &id) { + /** Advances the command on the transaction with the + * given id. + * + * @param id - Transation id. That transaction must + * be currently active. + * @return Pointer to the transaction object for id. + */ + Transaction &Advance(transaction_id_t id) { auto guard = this->acquire_unique(); - auto *t = store.get(id); + auto *t = store_.get(id); + debug_assert(t != nullptr, + "Transaction::advance on non-existing transaction"); - if (t == nullptr) throw TransactionError("Transaction does not exist."); - if (t->cid == kMaxCommandId) + if (t->cid_ == kMaxCommandId) throw TransactionError( "Reached maximum number of commands in this transaction."); - // this is a new command - t->cid++; - + t->cid_++; return *t; } - // Returns copy of current snapshot - Snapshot snapshot() { - auto guard = this->acquire_unique(); - - return active; - } - - void commit(const Transaction &t) { - auto guard = this->acquire_unique(); - clog.set_committed(t.id); - - finalize(t); - } - - void abort(const Transaction &t) { - auto guard = this->acquire_unique(); - clog.set_aborted(t.id); - - finalize(t); - } - - /* - *@brief Return oldest active transaction in the active transaction pool. In - *case none exist return None. - *@return Id of transaction + /** Returns the snapshot relevant to garbage collection + * of database records. + * + * If there are no active transactions that means + * a snapshot containing only the next transaction ID. + * If there are active transactions, that means the + * oldest active transaction's snapshot, with that + * transaction's ID appened as last. + * + * The idea is that data records can only be deleted + * if they were expired (and that was committed) by + * a transaction older then the older currently active. + * We need the full snapshot to prevent overlaps (see + * general GC documentation). */ - Option oldest_active() { + Snapshot GcSnapshot() { auto guard = this->acquire_unique(); - if (active.size() == 0) return Option(); - return Option(active.front()); + + // no active transactions + if (active_.size() == 0) { + auto snapshot_copy = active_; + snapshot_copy.insert(counter_.count() + 1); + return snapshot_copy; + } + + // there are active transactions + auto snapshot_copy = store_.get(active_.front())->snapshot(); + snapshot_copy.insert(active_.front()); + return snapshot_copy; } - // total number of transactions started from the beginning of time - uint64_t count() { + /** Comits the given transaction. Deletes the transaction + * object, it's not valid after this function executes. */ + void Commit(const Transaction &t) { auto guard = this->acquire_unique(); - return counter.count(); + clog_.set_committed(t.id_); + + Finalize(t); } - // the number of currently active transactions - size_t size() { + /** Aborts the given transaction. Deletes the transaction + * object, it's not valid after this function executes. */ + void Abort(const Transaction &t) { auto guard = this->acquire_unique(); - return active.size(); + clog_.set_aborted(t.id_); + + Finalize(t); } - CommitLog clog; + /** The total number of transactions that have + * executed since the creation of this engine */ + auto Count() { + auto guard = this->acquire_unique(); + return counter_.count(); + } + + /** The count of currently active transactions */ + size_t ActiveCount() { + auto guard = this->acquire_unique(); + return active_.size(); + } + + // TODO make this private and expose "const CommitLog" + // through a getter. To do that you need to make the + // appropriate CommitLog functions const. To do THAT, + // you need to make appropriate DynamicBitset functions + // const. While doing that, clean the DynamicBitset up. + /** Commit log of this engine */ + CommitLog clog_; private: - template - void call(Transaction &t, T fun, F... funs) { - call(t, fun); - call(t, funs...); + // Performs cleanup common to ending the transaction + // with either commit or abort + void Finalize(const Transaction &t) { + active_.remove(t.id_); + store_.del(t.id_); } - template - void call(Transaction &t, T fun) { - fun(t); - } + // transaction counter. contains the number of transactions + // ever created till now + SimpleCounter counter_{0}; - void call(Transaction &t) {} + // a snapshot of currently active transactions + Snapshot active_; - void finalize(const Transaction &t) { - active.remove(t.id); - - // remove transaction from store - store.del(t.id); - } - - SimpleCounter counter; - Snapshot active; - TransactionStore store; + // storage for the transactions + TransactionStore store_; }; } diff --git a/src/transactions/snapshot.cpp b/src/transactions/snapshot.cpp deleted file mode 100644 index e4b2c6b92..000000000 --- a/src/transactions/snapshot.cpp +++ /dev/null @@ -1,16 +0,0 @@ -#include "transactions/snapshot.hpp" - -#include "transactions/engine.hpp" - -template -bool tx::Snapshot::all_finished(Engine &engine) const { - for (auto &sid : active) { - if (engine.clog.is_active(sid)) { - return false; - } - } - - return true; -} - -template class tx::Snapshot; diff --git a/src/transactions/snapshot.hpp b/src/transactions/snapshot.hpp index cd4926fa4..9399952d4 100644 --- a/src/transactions/snapshot.hpp +++ b/src/transactions/snapshot.hpp @@ -1,66 +1,90 @@ #pragma once #include +#include #include -#include "mvcc/id.hpp" +#include "transaction.hpp" +#include "utils/algorithm.hpp" +#include "utils/assert.hpp" #include "utils/option.hpp" namespace tx { class Engine; -template +/** Ascendingly sorted collection of transaction ids. + * + * Represents the transactions that were active at + * some point in the discrete transaction time. + */ class Snapshot { public: Snapshot() = default; + Snapshot(std::vector &&active) + : transaction_ids_(std::move(active)) {} + // all the copy/move constructors/assignments act naturally - Snapshot(std::vector active) : active(std::move(active)) {} - - Snapshot(const Snapshot &other) { active = other.active; } - - Snapshot(Snapshot &&other) { active = std::move(other.active); } - - // True if all transaction from snapshot have finished. - bool all_finished(Engine &engine) const; - - bool is_active(id_t xid) const { - return std::binary_search(active.begin(), active.end(), xid); + /** Returns true if this snapshot contains the given + * transaction id. + * + * @param xid - The transcation id in question + */ + bool contains(transaction_id_t id) const { + return std::binary_search(transaction_ids_.begin(), transaction_ids_.end(), + id); } - // Return id of oldest transaction. None if there is no transactions in - // snapshot. - Option oldest_active() const { - auto n = active.size(); - if (n > 0) { - Id min = active[0]; - for (auto i = 1; i < n; i++) { - if (active[i] < min) { - min = active[i]; - } - } - return Option(min); - - } else { - return Option(); - } + /** Adds the given transaction id to the end of this Snapshot. + * The given id must be greater then all the existing ones, + * to maintain ascending sort order. + * + * @param id - the transaction id to add + */ + void insert(transaction_id_t id) { + transaction_ids_.push_back(id); + debug_assert( + std::is_sorted(transaction_ids_.begin(), transaction_ids_.end()), + "Snapshot must be sorted"); } - void insert(const id_t &id) { active.push_back(id); } - - void remove(const id_t &id) { - // remove transaction from the active transactions list - auto last = std::remove(active.begin(), active.end(), id); - active.erase(last, active.end()); + /** Removes the given transaction id from this Snapshot. + * + * @param id - the transaction id to remove + */ + void remove(transaction_id_t id) { + auto last = + std::remove(transaction_ids_.begin(), transaction_ids_.end(), id); + transaction_ids_.erase(last, transaction_ids_.end()); } - const id_t &front() const { return active.front(); } + transaction_id_t front() const { + debug_assert(transaction_ids_.size(), "Snapshot.front() on empty Snapshot"); + return transaction_ids_.front(); + } - const id_t &back() const { return active.back(); } + transaction_id_t back() const { + debug_assert(transaction_ids_.size(), "Snapshot.back() on empty Snapshot"); + return transaction_ids_.back(); + } - size_t size() { return active.size(); } + size_t size() const { return transaction_ids_.size(); } + bool empty() const { return transaction_ids_.empty(); } + bool operator==(const Snapshot &other) const { + return transaction_ids_ == other.transaction_ids_; + } + auto begin() const { return transaction_ids_.begin(); } + auto end() const { return transaction_ids_.end(); } + + friend std::ostream &operator<<(std::ostream &stream, + const Snapshot &snapshot) { + stream << "Snapshot("; + PrintIterable(stream, snapshot.transaction_ids_); + stream << ")"; + return stream; + } private: - std::vector active; + std::vector transaction_ids_; }; } diff --git a/src/transactions/transaction.cpp b/src/transactions/transaction.cpp index 9e4a1e9ce..315f98413 100644 --- a/src/transactions/transaction.cpp +++ b/src/transactions/transaction.cpp @@ -7,44 +7,13 @@ #include "transactions/engine.hpp" namespace tx { -Transaction::Transaction(Engine &engine) - : Transaction(Id(), Snapshot(), engine) {} - -Transaction::Transaction(const Id &&id, const Snapshot &&snapshot, +Transaction::Transaction(transaction_id_t id, const Snapshot &snapshot, Engine &engine) - : id(id), cid(1), engine(engine), snapshot(std::move(snapshot)) {} + : id_(id), engine_(engine), snapshot_(snapshot) {} -Transaction::Transaction(const Id &id, const Snapshot &snapshot, - Engine &engine) - : id(id), cid(1), engine(engine), snapshot(snapshot) {} +void Transaction::TakeLock(RecordLock &lock) { locks_.take(&lock, id_); } -void Transaction::wait_for_active_except(const Id &id) const { - Snapshot local_snapshot = snapshot; - local_snapshot.remove(id); - while (local_snapshot.size() > 0) { - auto sid = local_snapshot.front(); - while (engine.clog.fetch_info(sid).is_active()) { - std::this_thread::sleep_for(std::chrono::microseconds(100)); - } - local_snapshot.remove(sid); - } -} - -void Transaction::take_lock(RecordLock &lock) { locks.take(&lock, id); } - -void Transaction::commit() { engine.commit(*this); } - -void Transaction::abort() { engine.abort(*this); } - -bool Transaction::all_finished() { - return !engine.clog.is_active(id) && snapshot.all_finished(engine); -} - -bool Transaction::in_snapshot(const Id &id) const { - return snapshot.is_active(id); -} - -Id Transaction::oldest_active() { - return snapshot.oldest_active().take_or(Id(id)); -} +void Transaction::Commit() { engine_.Commit(*this); } + +void Transaction::Abort() { engine_.Abort(*this); } } diff --git a/src/transactions/transaction.hpp b/src/transactions/transaction.hpp index 4b3e7a342..7dd315b70 100644 --- a/src/transactions/transaction.hpp +++ b/src/transactions/transaction.hpp @@ -1,59 +1,73 @@ - #pragma once #include #include #include -#include "mvcc/id.hpp" #include "storage/locking/record_lock.hpp" #include "transactions/lock_store.hpp" #include "transactions/snapshot.hpp" +#include "type.hpp" namespace tx { -class Engine; - +/** A database transaction. Encapsulates an atomic, + * abortable unit of work. Also defines that all db + * ops are single-threaded within a single transaction */ class Transaction { - friend class Engine; - public: - explicit Transaction(Engine &engine); - Transaction(const Id &&id, const Snapshot &&snapshot, Engine &engine); - Transaction(const Id &id, const Snapshot &snapshot, Engine &engine); - Transaction(const Transaction &) = delete; - Transaction(Transaction &&) = default; - - // Blocks until all transactions from snapshot finish, except the 'id' one. - // After this method, snapshot will be either empty or contain transaction - // with Id 'id'. - void wait_for_active_except(const Id &id) const; - - void take_lock(RecordLock &lock); - void commit(); - void abort(); - - // True if this transaction and every transaction from snapshot have - // finished. - bool all_finished(); - - // Return id of oldest transaction from snapshot. - Id oldest_active(); - - // True if id is in snapshot. - bool in_snapshot(const Id &id) const; - - // index of this transaction - const Id id; - - // index of the current command in the current transaction; - uint64_t cid; - - Engine &engine; + /** Returns the maximum possible transcation id */ + static transaction_id_t MaxId() { + return std::numeric_limits::max(); + } private: + friend class Engine; + + // the constructor is private, only the Engine ever uses it + Transaction(transaction_id_t id, const Snapshot &snapshot, Engine &engine); + + // a transaction can't be moved nor copied. it's owned by the transaction + // engine, and it's lifetime is managed by it + Transaction(const Transaction &) = delete; + Transaction(Transaction &&) = delete; + Transaction &operator=(const Transaction &) = delete; + Transaction &operator=(Transaction &&) = delete; + + public: + /** Acquires the lock over the given RecordLock, preventing + * other transactions from doing the same */ + void TakeLock(RecordLock &lock); + + /** Commits this transaction. After this call this transaction + * object is no longer valid for use (it gets deleted by the + * engine that owns it). */ + void Commit(); + + /** Aborts this transaction. After this call this transaction + * object is no longer valid for use (it gets deleted by the + * engine that owns it). */ + void Abort(); + + /** Transaction's id. Unique in the engine that owns it */ + const transaction_id_t id_; + + /** The transaction engine to which this transaction belongs */ + Engine &engine_; + + /** Returns the current transaction's current command id */ + // TODO rename to cmd_id (variable and function + auto cid() const { return cid_; } + + /** Returns this transaction's snapshot. */ + const Snapshot &snapshot() const { return snapshot_; } + + private: + // index of the current command in the current transaction; + command_id_t cid_{1}; // a snapshot of currently active transactions - const Snapshot snapshot; - LockStore locks; + const Snapshot snapshot_; + // locks + LockStore locks_; }; } diff --git a/src/transactions/type.hpp b/src/transactions/type.hpp new file mode 100644 index 000000000..788aeb33a --- /dev/null +++ b/src/transactions/type.hpp @@ -0,0 +1,12 @@ +#include + +// transcation and command types defined +// in a separate header to avoid cyclic dependencies +namespace tx { + + /** Type of a tx::Transcation's id member */ + using transaction_id_t = uint64_t; + + /** Type of a tx::Transcation's command id member */ + using command_id_t = uint64_t; +} diff --git a/tests/benchmark/mvcc.cpp b/tests/benchmark/mvcc.cpp index b8443b21a..ffd81be02 100644 --- a/tests/benchmark/mvcc.cpp +++ b/tests/benchmark/mvcc.cpp @@ -15,11 +15,11 @@ void MvccMix(benchmark::State &state) { while (state.KeepRunning()) { state.PauseTiming(); tx::Engine engine; - auto t1 = engine.begin(); + auto t1 = engine.Begin(); mvcc::VersionList version_list(*t1); - t1->commit(); - auto t2 = engine.begin(); + t1->Commit(); + auto t2 = engine.Begin(); state.ResumeTiming(); version_list.update(*t2); @@ -29,13 +29,13 @@ void MvccMix(benchmark::State &state) { version_list.find(*t2); state.PauseTiming(); - t2->abort(); + t2->Abort(); - auto t3 = engine.begin(); + auto t3 = engine.Begin(); state.ResumeTiming(); version_list.update(*t3); state.PauseTiming(); - auto t4 = engine.begin(); + auto t4 = engine.Begin(); // Repeat find state.range(0) number of times. state.ResumeTiming(); @@ -44,8 +44,8 @@ void MvccMix(benchmark::State &state) { } state.PauseTiming(); - t3->commit(); - t4->commit(); + t3->Commit(); + t4->Commit(); state.ResumeTiming(); } } diff --git a/tests/concurrent/transaction_engine.cpp b/tests/concurrent/transaction_engine.cpp index d8f5e41c3..e2f7fd17b 100644 --- a/tests/concurrent/transaction_engine.cpp +++ b/tests/concurrent/transaction_engine.cpp @@ -18,9 +18,9 @@ int main() { uint64_t sum = 0; for (int i = 0; i < n; ++i) { - auto t = engine.begin(); - sum += t->id; - engine.commit(*t); + auto t = engine.Begin(); + sum += t->id_; + engine.Commit(*t); } sums[idx] = sum; diff --git a/tests/unit/database_key_index.cpp b/tests/unit/database_key_index.cpp index f4e4c3103..7c7f1a1a9 100644 --- a/tests/unit/database_key_index.cpp +++ b/tests/unit/database_key_index.cpp @@ -7,6 +7,8 @@ #include "dbms/dbms.hpp" #include "storage/vertex.hpp" +#include "mvcc_gc_common.hpp" + using testing::UnorderedElementsAreArray; // Test index does it insert everything uniquely @@ -15,10 +17,10 @@ TEST(LabelsIndex, UniqueInsert) { Dbms dbms; auto dba = dbms.active(); tx::Engine engine; - auto t1 = engine.begin(); + auto t1 = engine.Begin(); mvcc::VersionList vlist(*t1); - t1->commit(); - auto t2 = engine.begin(); + t1->Commit(); + auto t2 = engine.Begin(); vlist.find(*t2)->labels_.push_back(dba->label("1")); index.Update(dba->label("1"), &vlist, vlist.find(*t2)); @@ -30,7 +32,7 @@ TEST(LabelsIndex, UniqueInsert) { vlist.find(*t2)->labels_.push_back(dba->label("3")); index.Update(dba->label("3"), &vlist, vlist.find(*t2)); - t2->commit(); + t2->Commit(); EXPECT_EQ(index.Count(dba->label("1")), 1); EXPECT_EQ(index.Count(dba->label("2")), 1); @@ -44,11 +46,10 @@ TEST(LabelsIndex, UniqueFilter) { auto dba = dbms.active(); tx::Engine engine; - auto t1 = engine.begin(); + auto t1 = engine.Begin(); mvcc::VersionList vlist1(*t1); mvcc::VersionList vlist2(*t1); - t1->engine.advance( - t1->id); // advance command so we can see our inserted version + engine.Advance(t1->id_); auto r1v1 = vlist1.find(*t1); auto r1v2 = vlist2.find(*t1); EXPECT_NE(vlist1.find(*t1), nullptr); @@ -58,16 +59,16 @@ TEST(LabelsIndex, UniqueFilter) { vlist2.find(*t1)->labels_.push_back(label1); index.Update(label1, &vlist1, r1v1); index.Update(label1, &vlist2, r1v2); - t1->commit(); + t1->Commit(); - auto t2 = engine.begin(); + auto t2 = engine.Begin(); auto r2v1 = vlist1.update(*t2); auto r2v2 = vlist2.update(*t2); index.Update(label1, &vlist1, r2v1); index.Update(label1, &vlist2, r2v2); - t2->commit(); + t2->Commit(); - auto t3 = engine.begin(); + auto t3 = engine.Begin(); std::vector *> expected = {&vlist1, &vlist2}; sort(expected.begin(), expected.end()); // Entries will be sorted by vlist pointers. @@ -85,36 +86,37 @@ TEST(LabelsIndex, Refresh) { auto access = dbms.active(); tx::Engine engine; - auto t1 = engine.begin(); + // add two vertices to database + auto t1 = engine.Begin(); mvcc::VersionList vlist1(*t1); mvcc::VersionList vlist2(*t1); - t1->engine.advance( - t1->id); // advance command so we can see our inserted version - auto r1v1 = vlist1.find(*t1); - auto r1v2 = vlist2.find(*t1); - EXPECT_NE(vlist1.find(*t1), nullptr); + engine.Advance(t1->id_); - auto label1 = access->label("1"); - vlist1.find(*t1)->labels_.push_back(label1); - vlist2.find(*t1)->labels_.push_back(label1); - index.Update(label1, &vlist1, r1v1); - index.Update(label1, &vlist2, r1v2); - t1->commit(); + auto v1r1 = vlist1.find(*t1); + auto v2r1 = vlist2.find(*t1); + EXPECT_NE(v1r1, nullptr); + EXPECT_NE(v2r1, nullptr); - auto t2 = engine.begin(); - auto r2v1 = vlist1.update(*t2); - auto r2v2 = vlist2.update(*t2); - index.Update(label1, &vlist1, r2v1); - index.Update(label1, &vlist2, r2v2); - int last_id = t2->id; - t2->commit(); - EXPECT_EQ(index.Count(label1), 4); + auto label = access->label("label"); + v1r1->labels_.push_back(label); + v2r1->labels_.push_back(label); + index.Update(label, &vlist1, v1r1); + index.Update(label, &vlist2, v2r1); + t1->Commit(); - index.Refresh(last_id, engine); - EXPECT_EQ(index.Count(label1), 4); + auto t2 = engine.Begin(); + auto v1r2 = vlist1.update(*t2); + auto v2r2 = vlist2.update(*t2); + index.Update(label, &vlist1, v1r2); + index.Update(label, &vlist2, v2r2); - index.Refresh(last_id + 1, engine); - EXPECT_EQ(index.Count(label1), 2); + index.Refresh(GcSnapshot(engine, t2), engine); + EXPECT_EQ(index.Count(label), 4); + + t2->Commit(); + EXPECT_EQ(index.Count(label), 4); + index.Refresh(GcSnapshot(engine, nullptr), engine); + EXPECT_EQ(index.Count(label), 2); } // Transaction hasn't ended and so the vertex is not visible. diff --git a/tests/unit/database_label_property_index.cpp b/tests/unit/database_label_property_index.cpp index d92e7166c..418b76fab 100644 --- a/tests/unit/database_label_property_index.cpp +++ b/tests/unit/database_label_property_index.cpp @@ -5,6 +5,8 @@ #include "database/indexes/label_property_index.hpp" #include "dbms/dbms.hpp" +#include "mvcc_gc_common.hpp" + class LabelPropertyIndexComplexTest : public ::testing::Test { protected: virtual void SetUp() { @@ -19,9 +21,9 @@ class LabelPropertyIndexComplexTest : public ::testing::Test { EXPECT_EQ(index.CreateIndex(*key), true); index.IndexFinishedBuilding(*key); - t = engine.begin(); + t = engine.Begin(); vlist = new mvcc::VersionList(*t); - engine.advance(t->id); + engine.Advance(t->id_); vertex = vlist->find(*t); ASSERT_NE(vertex, nullptr); @@ -140,19 +142,19 @@ TEST_F(LabelPropertyIndexComplexTest, UniqueInsert) { // Check if index filters duplicates. TEST_F(LabelPropertyIndexComplexTest, UniqueFilter) { index.UpdateOnLabelProperty(vlist, vertex); - t->commit(); + t->Commit(); - auto t2 = engine.begin(); + auto t2 = engine.Begin(); auto vertex2 = vlist->update(*t2); - t2->commit(); + t2->Commit(); index.UpdateOnLabelProperty(vlist, vertex2); EXPECT_EQ(index.Count(*key), 2); - auto t3 = engine.begin(); + auto t3 = engine.Begin(); auto iter = index.GetVlists(*key, *t3, false); EXPECT_EQ(std::distance(iter.begin(), iter.end()), 1); - t3->commit(); + t3->Commit(); } // Remove label and check if index vertex is not returned now. @@ -182,11 +184,11 @@ TEST_F(LabelPropertyIndexComplexTest, RemoveProperty) { // Refresh with a vertex that looses its labels and properties. TEST_F(LabelPropertyIndexComplexTest, Refresh) { index.UpdateOnLabelProperty(vlist, vertex); - t->commit(); + t->Commit(); EXPECT_EQ(index.Count(*key), 1); vertex->labels_.clear(); vertex->properties_.clear(); - index.Refresh(engine.count() + 1, engine); + index.Refresh(GcSnapshot(engine, nullptr), engine); auto iter = index.GetVlists(*key, *t, false); EXPECT_EQ(std::distance(iter.begin(), iter.end()), 0); } diff --git a/tests/unit/deferred_deleter.cpp b/tests/unit/deferred_deleter.cpp index e796813f5..528a13d4c 100644 --- a/tests/unit/deferred_deleter.cpp +++ b/tests/unit/deferred_deleter.cpp @@ -4,7 +4,7 @@ #include "storage/deferred_deleter.hpp" #include "storage/vertex.hpp" -#include "gc_common.hpp" +#include "mvcc_gc_common.hpp" // Add and count objects. TEST(DeferredDeleter, AddObjects) { @@ -13,44 +13,44 @@ TEST(DeferredDeleter, AddObjects) { std::vector V; V.push_back(new Vertex()); V.push_back(new Vertex()); - deleter.AddObjects(V, Id(5)); - EXPECT_EQ(deleter.Count(), (i + 1) * 2); - } - deleter.FreeExpiredObjects(Id::MaximalId()); +deleter.AddObjects(V, 5); +EXPECT_EQ(deleter.Count(), (i + 1) * 2); +} +deleter.FreeExpiredObjects(tx::Transaction::MaxId()); } // Check that the deleter can't be destroyed while it still has objects. TEST(DeferredDeleter, Destructor) { std::atomic count{0}; - DeferredDeleter *deleter = new DeferredDeleter; + DeferredDeleter *deleter = new DeferredDeleter; for (int i = 0; i < 10; ++i) { - std::vector V; - V.push_back(new PropCount(count)); - V.push_back(new PropCount(count)); - deleter->AddObjects(V, Id(5)); + std::vector V; + V.push_back(new DestrCountRec(count)); + V.push_back(new DestrCountRec(count)); + deleter->AddObjects(V, 5); EXPECT_EQ(deleter->Count(), (i + 1) * 2); } EXPECT_EQ(0, count); EXPECT_DEATH(delete deleter, ""); // We shouldn't leak memory. - deleter->FreeExpiredObjects(Id::MaximalId()); + deleter->FreeExpiredObjects(tx::Transaction::MaxId()); delete deleter; } // Check if deleter frees objects. TEST(DeferredDeleter, FreeExpiredObjects) { - DeferredDeleter deleter; - std::vector V; + DeferredDeleter deleter; + std::vector V; std::atomic count{0}; - V.push_back(new PropCount(count)); - V.push_back(new PropCount(count)); - deleter.AddObjects(V, Id(5)); + V.push_back(new DestrCountRec(count)); + V.push_back(new DestrCountRec(count)); + deleter.AddObjects(V, 5); - deleter.FreeExpiredObjects(Id(5)); + deleter.FreeExpiredObjects(5); EXPECT_EQ(deleter.Count(), 2); EXPECT_EQ(count, 0); - deleter.FreeExpiredObjects(Id(6)); + deleter.FreeExpiredObjects(6); EXPECT_EQ(deleter.Count(), 0); EXPECT_EQ(count, 2); } diff --git a/tests/unit/gc_common.hpp b/tests/unit/gc_common.hpp deleted file mode 100644 index a7e7ad0c5..000000000 --- a/tests/unit/gc_common.hpp +++ /dev/null @@ -1,22 +0,0 @@ -#pragma once - -#include "mvcc/record.hpp" - -/** - * @brief - Empty class which inherits from mvcc:Record. - */ -class Prop : public mvcc::Record {}; - -/** - * @brief - Class which inherits from mvcc::Record and takes an atomic variable - * to count number of destructor calls (to test if the record is actually - * deleted). - */ -class PropCount : public mvcc::Record { - public: - PropCount(std::atomic &count) : count_(count) {} - ~PropCount() { ++count_; } - - private: - std::atomic &count_; -}; diff --git a/tests/unit/id.cpp b/tests/unit/id.cpp deleted file mode 100644 index a157462b6..000000000 --- a/tests/unit/id.cpp +++ /dev/null @@ -1,30 +0,0 @@ -#include "gtest/gtest.h" - -#include "mvcc/id.hpp" - -TEST(IdTest, BasicUsageAndTotalOrdering) { - Id id0(0); - Id id1(1); - Id id2(1); - Id id3(id2); - Id id4 = id3; - Id id5(5); - - ASSERT_EQ(id0 < id5, true); - ASSERT_EQ(id1 == id2, true); - ASSERT_EQ(id3 == id4, true); - ASSERT_EQ(id5 > id0, true); - ASSERT_EQ(id5 > id0, true); - ASSERT_EQ(id5 != id3, true); - ASSERT_EQ(id1 >= id2, true); - ASSERT_EQ(id3 <= id4, true); -} - -TEST(IdTest, MaxId) { - EXPECT_TRUE(Id(std::numeric_limits::max()) == Id::MaximalId()); -} - -int main(int argc, char **argv) { - ::testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -} diff --git a/tests/unit/mvcc.cpp b/tests/unit/mvcc.cpp index 97068f57d..82e545ab7 100644 --- a/tests/unit/mvcc.cpp +++ b/tests/unit/mvcc.cpp @@ -1,8 +1,6 @@ #include -#include "gc_common.hpp" #include "gtest/gtest.h" -#include "mvcc/id.hpp" #include "mvcc/record.hpp" #include "mvcc/version.hpp" #include "mvcc/version_list.hpp" @@ -10,18 +8,20 @@ #include "transactions/engine.hpp" #include "transactions/transaction.cpp" +#include "mvcc_gc_common.hpp" + class TestClass : public mvcc::Record {}; TEST(MVCC, Deadlock) { tx::Engine engine; - auto t0 = engine.begin(); + auto t0 = engine.Begin(); mvcc::VersionList version_list1(*t0); mvcc::VersionList version_list2(*t0); - t0->commit(); + t0->Commit(); - auto t1 = engine.begin(); - auto t2 = engine.begin(); + auto t1 = engine.Begin(); + auto t2 = engine.Begin(); version_list1.update(*t1); version_list2.update(*t2); @@ -34,23 +34,23 @@ TEST(MVCC, UpdateDontDelete) { std::atomic count{0}; { tx::Engine engine; - auto t1 = engine.begin(); - mvcc::VersionList version_list(*t1, count); - t1->commit(); + auto t1 = engine.Begin(); + mvcc::VersionList version_list(*t1, count); + t1->Commit(); - auto t2 = engine.begin(); + auto t2 = engine.Begin(); version_list.update(*t2); - t2->abort(); + t2->Abort(); EXPECT_EQ(count, 0); - auto t3 = engine.begin(); + auto t3 = engine.Begin(); // Update re-links the node and shouldn't clear it yet. version_list.update(*t3); EXPECT_EQ(count, 0); // TODO Gleich: why don't we also test that remove doesn't delete? - t3->commit(); + t3->Commit(); } EXPECT_EQ(count, 3); } @@ -58,13 +58,13 @@ TEST(MVCC, UpdateDontDelete) { // Check that we get the oldest record. TEST(MVCC, Oldest) { tx::Engine engine; - auto t1 = engine.begin(); + auto t1 = engine.Begin(); mvcc::VersionList version_list(*t1); auto first = version_list.Oldest(); EXPECT_NE(first, nullptr); // TODO Gleich: no need to do 10 checks of the same thing for (int i = 0; i < 10; ++i) { - engine.advance(t1->id); + engine.Advance(t1->id_); version_list.update(*t1); EXPECT_EQ(version_list.Oldest(), first); } diff --git a/tests/unit/mvcc_find.cpp b/tests/unit/mvcc_find.cpp index 6bbd291aa..a9a141950 100644 --- a/tests/unit/mvcc_find.cpp +++ b/tests/unit/mvcc_find.cpp @@ -84,7 +84,7 @@ TEST_F(Mvcc, ReadUncommitedUpdateFromSameTXSameCommand) { TEST_F(Mvcc, ReadUncommitedUpdateFromSameTXNotSameCommand) { T2_UPDATE; - engine.advance(t2->id); + engine.Advance(t2->id_); EXPECT_EQ(v2, version_list.find(*t2)); } @@ -101,6 +101,6 @@ TEST_F(Mvcc, ReadUncommitedRemoveFromSameTXNotSameCommand) { T2_COMMIT; T3_BEGIN; T3_REMOVE; - engine.advance(t3->id); + engine.Advance(t3->id_); EXPECT_NE(v2, version_list.find(*t3)); } diff --git a/tests/unit/mvcc_find_update_common.hpp b/tests/unit/mvcc_find_update_common.hpp index 0413db1ba..44ccf12a7 100644 --- a/tests/unit/mvcc_find_update_common.hpp +++ b/tests/unit/mvcc_find_update_common.hpp @@ -1,7 +1,6 @@ #include #include "gtest/gtest.h" -#include "mvcc/id.hpp" #include "mvcc/record.hpp" #include "mvcc/version.hpp" #include "mvcc/version_list.hpp" @@ -9,11 +8,6 @@ #include "transactions/engine.hpp" #include "transactions/transaction.cpp" -// make it easy to compare Id with int -bool operator==(const Id &left, const int right) { - return static_cast(left) == static_cast(right); -} - class TestClass : public mvcc::Record { public: // constructs first version, size should be 0 @@ -53,22 +47,22 @@ class TestClass : public mvcc::Record { class Mvcc : public ::testing::Test { protected: virtual void SetUp() { - id0 = Id{0}; - t1 = &engine.advance(t1->id); - id1 = t1->id; + id0 = 0; + t1 = &engine.Advance(t1->id_); + id1 = t1->id_; v1 = version_list.find(*t1); - t1->commit(); - t2 = engine.begin(); - id2 = t2->id; + t1->Commit(); + t2 = engine.Begin(); + id2 = t2->id_; } // variable where number of versions is stored int version_list_size = 0; tx::Engine engine; - tx::Transaction *t1 = engine.begin(); + tx::Transaction *t1 = engine.Begin(); mvcc::VersionList version_list{*t1, version_list_size}; TestClass *v1 = nullptr; tx::Transaction *t2 = nullptr; - int id0, id1, id2; + tx::transaction_id_t id0, id1, id2; }; // helper macros. important: @@ -79,12 +73,12 @@ class Mvcc : public ::testing::Test { #define T4_FIND __attribute__((unused)) auto v4 = version_list.find(*t4) #define T2_UPDATE __attribute__((unused)) auto v2 = version_list.update(*t2) #define T3_UPDATE __attribute__((unused)) auto v3 = version_list.update(*t3) -#define T2_COMMIT t2->commit(); -#define T3_COMMIT t3->commit(); -#define T2_ABORT t2->abort(); -#define T3_ABORT t3->abort(); -#define T3_BEGIN auto t3 = engine.begin(); __attribute__((unused)) int id3 = t3->id -#define T4_BEGIN auto t4 = engine.begin(); +#define T2_COMMIT t2->Commit(); +#define T3_COMMIT t3->Commit(); +#define T2_ABORT t2->Abort(); +#define T3_ABORT t3->Abort(); +#define T3_BEGIN auto t3 = engine.Begin(); __attribute__((unused)) int id3 = t3->id_ +#define T4_BEGIN auto t4 = engine.Begin(); #define T2_REMOVE version_list.remove(*t2) #define T3_REMOVE version_list.remove(*t3) #define EXPECT_CRE(record, expected) EXPECT_EQ(record->tx.cre(), id##expected) diff --git a/tests/unit/mvcc_gc.cpp b/tests/unit/mvcc_gc.cpp index 07964921f..7119c0d0c 100644 --- a/tests/unit/mvcc_gc.cpp +++ b/tests/unit/mvcc_gc.cpp @@ -14,117 +14,141 @@ #include "storage/vertex.hpp" #include "transactions/engine.hpp" -#include "gc_common.hpp" +#include "mvcc_gc_common.hpp" -/** - * Test will the mvcc gc delete records inside the version list because they - * are not longer visible. - */ -TEST(VersionList, GcDeleted) { +class MvccGcTest : public ::testing::Test { + protected: tx::Engine engine; + private: + tx::Transaction *t0 = engine.Begin(); + protected: + std::atomic record_destruction_count{0}; + mvcc::VersionList version_list{*t0, record_destruction_count}; + std::vector transactions{t0}; - // create a version_list with one record - std::vector ids; - auto t1 = engine.begin(); - std::atomic count{0}; - mvcc::VersionList version_list(*t1, count); - ids.push_back(t1->id); - t1->commit(); + void SetUp() override { t0->Commit(); } - // create some updates - const int UPDATES = 10; - for (int i = 0; i < UPDATES; ++i) { - auto t2 = engine.begin(); - ids.push_back(t2->id); - version_list.update(*t2); - t2->commit(); + void MakeUpdates(int update_count, bool commit) { + for (int i = 0; i < update_count; i++) { + auto t = engine.Begin(); + version_list.update(*t); + if (commit) + t->Commit(); + else + t->Abort(); + } } - // deleting with the first transaction does nothing - { - auto ret = version_list.GcDeleted(ids[0], engine); - EXPECT_EQ(ret.first, false); - EXPECT_EQ(ret.second, nullptr); + auto GcDeleted(tx::Transaction *latest=nullptr) { + return version_list.GcDeleted(GcSnapshot(engine, latest), engine); } - // deleting with the last transaction + 1 deletes - // everything except the last update - { - auto ret = version_list.GcDeleted(ids.back() + 1, engine); - EXPECT_EQ(ret.first, false); - EXPECT_NE(ret.second, nullptr); - delete ret.second; - EXPECT_EQ(count, UPDATES); - } +}; - // remove and abort, nothing gets deleted - { - auto t = engine.begin(); - version_list.remove(*t); - auto id = t->id + 1; - t->abort(); - auto ret = version_list.GcDeleted(id, engine); - EXPECT_EQ(ret.first, false); - EXPECT_EQ(ret.second, nullptr); - } +TEST_F(MvccGcTest, RemoveAndAbort) { + auto t = engine.Begin(); + version_list.remove(*t); + t->Abort(); + auto ret = GcDeleted(); + EXPECT_EQ(ret.first, false); + EXPECT_EQ(ret.second, nullptr); + EXPECT_EQ(record_destruction_count, 0); +} - // update and abort, nothing gets deleted - { - auto t = engine.begin(); - version_list.update(*t); - auto id = t->id + 1; - t->abort(); - auto ret = version_list.GcDeleted(id, engine); - EXPECT_EQ(ret.first, false); - EXPECT_EQ(ret.second, nullptr); - } +TEST_F(MvccGcTest, UpdateAndAbort) { + MakeUpdates(1, false); + auto ret = GcDeleted(); + EXPECT_EQ(ret.first, false); + EXPECT_EQ(ret.second, nullptr); + EXPECT_EQ(record_destruction_count, 0); - // remove and commit, everything gets deleted - { - auto t = engine.begin(); - version_list.remove(*t); - auto id = t->id + 1; - t->commit(); - auto ret = version_list.GcDeleted(id, engine); - EXPECT_EQ(ret.first, true); - EXPECT_NE(ret.second, nullptr); - delete ret.second; - EXPECT_EQ(count, UPDATES + 2); - } + MakeUpdates(3, false); + ret = GcDeleted(); + EXPECT_EQ(ret.first, false); + EXPECT_EQ(ret.second, nullptr); + EXPECT_EQ(record_destruction_count, 0); +} + +TEST_F(MvccGcTest, RemoveAndCommit) { + auto t = engine.Begin(); + version_list.remove(*t); + t->Commit(); + auto ret = GcDeleted(); + EXPECT_EQ(ret.first, true); + EXPECT_NE(ret.second, nullptr); + delete ret.second; + EXPECT_EQ(record_destruction_count, 1); +} + +TEST_F(MvccGcTest, UpdateAndCommit) { + MakeUpdates(4, true); + auto ret = GcDeleted(); + EXPECT_EQ(ret.first, false); + EXPECT_NE(ret.second, nullptr); + delete ret.second; + EXPECT_EQ(record_destruction_count, 4); +} + +TEST_F(MvccGcTest, OldestTransactionSnapshot) { + // this test validates that we can't delete + // a record that has been expired by a transaction (t1) + // committed before GC starts (when t2 is oldest), + // if t1 is in t2's snapshot. + // this is because there could exist transcation t3 + // that also has t1 in it's snapshot, and consequently + // does not see the expiration and sees the record + auto t1 = engine.Begin(); + auto t2 = engine.Begin(); + version_list.remove(*t1); + t1->Commit(); + + auto ret = GcDeleted(t2); + EXPECT_EQ(ret.first, false); + EXPECT_EQ(ret.second, nullptr); + EXPECT_EQ(record_destruction_count, 0); } /** - * Test integration of garbage collector with MVCC GC. Delete mvcc's which are + * Test integration of garbage collector with MVCC GC. Delete version lists + * which are * empty (not visible from any future transaction) from the skiplist. */ TEST(GarbageCollector, GcClean) { - SkipList *> skiplist; + SkipList *> skiplist; tx::Engine engine; - DeferredDeleter deleter; - DeferredDeleter> vlist_deleter; - GarbageCollector gc(skiplist, deleter, vlist_deleter); - - auto t1 = engine.begin(); - std::atomic count{0}; - auto vl = new mvcc::VersionList(*t1, count); + DeferredDeleter deleter; + DeferredDeleter> vlist_deleter; + GarbageCollector gc(skiplist, deleter, vlist_deleter); + // create a version list in transaction t1 + auto t1 = engine.Begin(); + std::atomic record_destruction_count{0}; + auto vl = new mvcc::VersionList(*t1, record_destruction_count); auto access = skiplist.access(); access.insert(vl); - gc.Run(Id(2), engine); - t1->commit(); + t1->Commit(); - auto t2 = engine.begin(); - vl->remove(*t2); - t2->commit(); - gc.Run(Id(3), engine); - - EXPECT_EQ(deleter.Count(), 1); - deleter.FreeExpiredObjects(engine.count() + 1); + // run garbage collection that has nothing co collect + gc.Run(GcSnapshot(engine, nullptr), engine); EXPECT_EQ(deleter.Count(), 0); - EXPECT_EQ(count, 1); + EXPECT_EQ(vlist_deleter.Count(), 0); + EXPECT_EQ(record_destruction_count, 0); + // delete the only record in the version-list in transaction t2 + auto t2 = engine.Begin(); + vl->remove(*t2); + t2->Commit(); + gc.Run(GcSnapshot(engine, nullptr), engine); + + // check that we destroyed the record + EXPECT_EQ(deleter.Count(), 1); + deleter.FreeExpiredObjects(engine.Count() + 1); + EXPECT_EQ(deleter.Count(), 0); + EXPECT_EQ(record_destruction_count, 1); + + // check that we destroyed the version list EXPECT_EQ(vlist_deleter.Count(), 1); - vlist_deleter.FreeExpiredObjects(engine.count() + 1); + vlist_deleter.FreeExpiredObjects(engine.Count() + 1); EXPECT_EQ(vlist_deleter.Count(), 0); EXPECT_EQ(access.size(), (size_t)0); diff --git a/tests/unit/mvcc_gc_common.hpp b/tests/unit/mvcc_gc_common.hpp new file mode 100644 index 000000000..19aa316c7 --- /dev/null +++ b/tests/unit/mvcc_gc_common.hpp @@ -0,0 +1,37 @@ +#pragma once + +#include "mvcc/record.hpp" + +/** + * @brief - Empty class which inherits from mvcc:Record. + */ +class Prop : public mvcc::Record {}; + +/** + * @brief - Class which inherits from mvcc::Record and takes an atomic variable + * to count number of destructor calls (to test if the record is actually + * deleted). + */ +class DestrCountRec : public mvcc::Record { + public: + DestrCountRec(std::atomic &count) : count_(count) {} + ~DestrCountRec() { ++count_; } + + private: + std::atomic &count_; +}; + +// helper function for creating a GC snapshot +// if given a nullptr it makes a GC snapshot like there +// are no active transactions +auto GcSnapshot(tx::Engine &engine, tx::Transaction *t) { + if (t != nullptr) { + tx::Snapshot gc_snap = t->snapshot(); + gc_snap.insert(t->id_); + return gc_snap; + } else { + tx::Snapshot gc_snap; + gc_snap.insert(engine.Count() + 1); + return gc_snap; + } +} diff --git a/tests/unit/mvcc_one_transaction.cpp b/tests/unit/mvcc_one_transaction.cpp index 661e8e5c9..5b81c829d 100644 --- a/tests/unit/mvcc_one_transaction.cpp +++ b/tests/unit/mvcc_one_transaction.cpp @@ -65,7 +65,7 @@ TEST_F(Mvcc, RemoveNotAdvanceRemove) { TEST_F(Mvcc, UpdateAdvanceUpdate) { T2_UPDATE; EXPECT_EQ(T2_FIND, v1); - engine.advance(t2->id); + engine.Advance(t2->id_); EXPECT_EQ(T2_FIND, v2); auto v2_2 = version_list.update(*t2); EXPECT_NXT(v2, v1); @@ -82,7 +82,7 @@ TEST_F(Mvcc, UpdateAdvanceUpdate) { TEST_F(Mvcc, UpdateAdvanceRemove) { T2_UPDATE; EXPECT_EQ(T2_FIND, v1); - engine.advance(t2->id); + engine.Advance(t2->id_); EXPECT_EQ(T2_FIND, v2); T2_REMOVE; EXPECT_NXT(v2, v1); @@ -96,7 +96,7 @@ TEST_F(Mvcc, UpdateAdvanceRemove) { TEST_F(Mvcc, RemoveAdvanceUpdate) { T2_REMOVE; EXPECT_EQ(T2_FIND, v1); - engine.advance(t2->id); + engine.Advance(t2->id_); EXPECT_EQ(T2_FIND, nullptr); EXPECT_DEATH(T2_UPDATE, ".*nullptr.*"); } @@ -104,7 +104,7 @@ TEST_F(Mvcc, RemoveAdvanceUpdate) { TEST_F(Mvcc, RemoveAdvanceRemove) { T2_REMOVE; EXPECT_EQ(T2_FIND, v1); - engine.advance(t2->id); + engine.Advance(t2->id_); EXPECT_EQ(T2_FIND, nullptr); EXPECT_DEATH(T2_REMOVE, ".*nullptr.*"); } diff --git a/tests/unit/transaction_engine.cpp b/tests/unit/transaction_engine.cpp index 4a5ed06e4..242dc0ffe 100644 --- a/tests/unit/transaction_engine.cpp +++ b/tests/unit/transaction_engine.cpp @@ -5,54 +5,75 @@ #include "transactions/engine.hpp" #include "transactions/transaction.hpp" +TEST(Engine, CountEmpty) { + tx::Engine engine; + EXPECT_EQ(engine.Count(), 0); +} + TEST(Engine, Count) { - tx::Engine eng; - EXPECT_EQ(eng.count(), 0); -} - -TEST(Engine, CountFive) { - tx::Engine eng; - EXPECT_EQ(eng.count(), (uint64_t)0); - std::vector V; + tx::Engine engine; + EXPECT_EQ(engine.Count(), (uint64_t)0); + std::vector transactions; for (int i = 0; i < 5; ++i) { - V.push_back(eng.begin()); - EXPECT_EQ(eng.count(), (uint64_t)(i + 1)); + transactions.push_back(engine.Begin()); + EXPECT_EQ(engine.Count(), (uint64_t)(i + 1)); } - EXPECT_EQ(eng.size(), (uint64_t)5); - for (int i = 0; i < 5; ++i) V[i]->commit(); - EXPECT_EQ(eng.count(), (uint64_t)5); + EXPECT_EQ(engine.ActiveCount(), (uint64_t)5); + for (int i = 0; i < 5; ++i) transactions[i]->Commit(); + EXPECT_EQ(engine.Count(), (uint64_t)5); } -TEST(Engine, LastKnownActiveEmpty) { - tx::Engine eng; - EXPECT_EQ(eng.oldest_active().is_present(), false); -} +TEST(Engine, GcSnapshot) { + tx::Engine engine; + ASSERT_EQ(engine.GcSnapshot(), tx::Snapshot({1})); -TEST(Engine, LastKnownActive) { - tx::Engine eng; - std::vector V; + std::vector transactions; + // create transactions and check the GC snapshot for (int i = 0; i < 5; ++i) { - V.push_back(eng.begin()); - EXPECT_EQ(eng.size(), (size_t)i + 1); + transactions.push_back(engine.Begin()); + EXPECT_EQ(engine.GcSnapshot(), tx::Snapshot({1})); } - for (int i = 0; i < 5; ++i) { - EXPECT_EQ(eng.oldest_active().get(), Id(i + 1)); - V[i]->commit(); - } - EXPECT_EQ(eng.oldest_active().is_present(), false); + + // commit transactions in the middle, expect + // the GcSnapshot did not change + transactions[1]->Commit(); + EXPECT_EQ(engine.GcSnapshot(), tx::Snapshot({1})); + transactions[2]->Commit(); + EXPECT_EQ(engine.GcSnapshot(), tx::Snapshot({1})); + + // have the first three transactions committed + transactions[0]->Commit(); + EXPECT_EQ(engine.GcSnapshot(), tx::Snapshot({1, 2, 3, 4})); + + // commit all + transactions[3]->Commit(); + transactions[4]->Commit(); + EXPECT_EQ(engine.GcSnapshot(), tx::Snapshot({6})); } -TEST(Engine, Size) { - tx::Engine eng; - std::vector V; +TEST(Engine, ActiveCount) { + tx::Engine engine; + std::vector transactions; for (int i = 0; i < 5; ++i) { - V.push_back(eng.begin()); - EXPECT_EQ(eng.size(), (size_t)i + 1); + transactions.push_back(engine.Begin()); + EXPECT_EQ(engine.ActiveCount(), (size_t)i + 1); + } + + for (int i = 0; i < 5; ++i) { + transactions[i]->Commit(); + EXPECT_EQ(engine.ActiveCount(), 4 - i); } - for (int i = 0; i < 5; ++i) V[i]->commit(); } -int main(int argc, char **argv) { - ::testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); +TEST(Engine, Advance) { + tx::Engine engine; + + auto t0 = engine.Begin(); + auto t1 = engine.Begin(); + EXPECT_EQ(t0->cid(), 1); + engine.Advance(t0->id_); + EXPECT_EQ(t0->cid(), 2); + engine.Advance(t0->id_); + EXPECT_EQ(t0->cid(), 3); + EXPECT_EQ(t1->cid(), 1); }