GC bugfixes, MVCC and transaction refactoring
Summary: - GC changed to evaluate old records w.r.t. the oldest transaction's id AND snapshot, as opposed to only id - MVCC hints exp+aborted race condition prevented - minor MVCC refactors and cleanups - minor Transaction refactors and cleanups Reviewers: buda, dgleich Reviewed By: buda, dgleich Subscribers: dtomicevic, pullbot Differential Revision: https://phabricator.memgraph.io/D434
This commit is contained in:
parent
025e557204
commit
afbb940a05
@ -331,7 +331,6 @@ set(memgraph_src_files
|
||||
${src_dir}/io/network/network_endpoint.cpp
|
||||
${src_dir}/io/network/socket.cpp
|
||||
${src_dir}/threading/thread.cpp
|
||||
${src_dir}/mvcc/id.cpp
|
||||
${src_dir}/durability/snapshooter.cpp
|
||||
${src_dir}/durability/recovery.cpp
|
||||
${src_dir}/storage/property_value.cpp
|
||||
@ -339,7 +338,6 @@ set(memgraph_src_files
|
||||
${src_dir}/storage/record_accessor.cpp
|
||||
${src_dir}/storage/vertex_accessor.cpp
|
||||
${src_dir}/storage/edge_accessor.cpp
|
||||
${src_dir}/transactions/snapshot.cpp
|
||||
${src_dir}/transactions/transaction.cpp
|
||||
${src_dir}/template_engine/engine.cpp
|
||||
${src_dir}/logging/streams/stdout.cpp
|
||||
|
@ -34,25 +34,29 @@ GraphDb::GraphDb(const std::string &name, const fs::path &snapshot_db_dir)
|
||||
gc_scheduler_.Run(std::chrono::seconds(FLAGS_gc_cycle_sec), [this]() {
|
||||
// main garbage collection logic
|
||||
// see wiki documentation for logic explanation
|
||||
const auto next_id = this->tx_engine.count() + 1;
|
||||
const auto id = this->tx_engine.oldest_active().get_or(next_id);
|
||||
const auto snapshot = this->tx_engine.GcSnapshot();
|
||||
{
|
||||
// This can be run concurrently
|
||||
this->gc_vertices_.Run(id, this->tx_engine);
|
||||
this->gc_edges_.Run(id, this->tx_engine);
|
||||
this->gc_vertices_.Run(snapshot, this->tx_engine);
|
||||
this->gc_edges_.Run(snapshot, this->tx_engine);
|
||||
}
|
||||
// This has to be run sequentially after gc because gc modifies
|
||||
// version_lists and changes the oldest visible record, on which Refresh
|
||||
// depends.
|
||||
{
|
||||
// This can be run concurrently
|
||||
this->labels_index_.Refresh(id, this->tx_engine);
|
||||
this->edge_types_index_.Refresh(id, this->tx_engine);
|
||||
this->labels_index_.Refresh(snapshot, this->tx_engine);
|
||||
this->edge_types_index_.Refresh(snapshot, this->tx_engine);
|
||||
}
|
||||
this->edge_record_deleter_.FreeExpiredObjects(id);
|
||||
this->vertex_record_deleter_.FreeExpiredObjects(id);
|
||||
this->edge_version_list_deleter_.FreeExpiredObjects(id);
|
||||
this->vertex_version_list_deleter_.FreeExpiredObjects(id);
|
||||
// we free expired objects with snapshot.back(), which is
|
||||
// the ID of the oldest active transaction (or next active, if there
|
||||
// are no currently active). that's legal because that was the
|
||||
// last possible transaction that could have obtained pointers
|
||||
// to those records
|
||||
this->edge_record_deleter_.FreeExpiredObjects(snapshot.back());
|
||||
this->vertex_record_deleter_.FreeExpiredObjects(snapshot.back());
|
||||
this->edge_version_list_deleter_.FreeExpiredObjects(snapshot.back());
|
||||
this->vertex_version_list_deleter_.FreeExpiredObjects(snapshot.back());
|
||||
});
|
||||
}
|
||||
|
||||
@ -122,8 +126,9 @@ GraphDb::~GraphDb() {
|
||||
for (auto &edge : this->edges_.access()) delete edge;
|
||||
|
||||
// Free expired records with the maximal possible id from all the deleters.
|
||||
this->edge_record_deleter_.FreeExpiredObjects(Id::MaximalId());
|
||||
this->vertex_record_deleter_.FreeExpiredObjects(Id::MaximalId());
|
||||
this->edge_version_list_deleter_.FreeExpiredObjects(Id::MaximalId());
|
||||
this->vertex_version_list_deleter_.FreeExpiredObjects(Id::MaximalId());
|
||||
this->edge_record_deleter_.FreeExpiredObjects(tx::Transaction::MaxId());
|
||||
this->vertex_record_deleter_.FreeExpiredObjects(tx::Transaction::MaxId());
|
||||
this->edge_version_list_deleter_.FreeExpiredObjects(tx::Transaction::MaxId());
|
||||
this->vertex_version_list_deleter_.FreeExpiredObjects(
|
||||
tx::Transaction::MaxId());
|
||||
}
|
||||
|
@ -67,10 +67,6 @@ class GraphDb : public Loggable {
|
||||
/** transaction engine related to this database */
|
||||
tx::Engine tx_engine;
|
||||
|
||||
/** garbage collector related to this database*/
|
||||
// TODO bring back garbage collection
|
||||
// Garbage garbage = {tx_engine};
|
||||
|
||||
// database name
|
||||
// TODO consider if this is even necessary
|
||||
const std::string name_;
|
||||
|
@ -8,7 +8,7 @@
|
||||
#include "utils/assert.hpp"
|
||||
|
||||
GraphDbAccessor::GraphDbAccessor(GraphDb &db)
|
||||
: db_(db), transaction_(db.tx_engine.begin()) {}
|
||||
: db_(db), transaction_(db.tx_engine.Begin()) {}
|
||||
|
||||
GraphDbAccessor::~GraphDbAccessor() {
|
||||
if (!commited_ && !aborted_) {
|
||||
@ -19,20 +19,20 @@ GraphDbAccessor::~GraphDbAccessor() {
|
||||
const std::string &GraphDbAccessor::name() const { return db_.name_; }
|
||||
|
||||
void GraphDbAccessor::advance_command() {
|
||||
transaction_->engine.advance(transaction_->id);
|
||||
transaction_->engine_.Advance(transaction_->id_);
|
||||
}
|
||||
|
||||
void GraphDbAccessor::commit() {
|
||||
debug_assert(!commited_ && !aborted_,
|
||||
"Already aborted or commited transaction.");
|
||||
transaction_->commit();
|
||||
transaction_->Commit();
|
||||
commited_ = true;
|
||||
}
|
||||
|
||||
void GraphDbAccessor::abort() {
|
||||
debug_assert(!commited_ && !aborted_,
|
||||
"Already aborted or commited transaction.");
|
||||
transaction_->abort();
|
||||
transaction_->Abort();
|
||||
aborted_ = true;
|
||||
}
|
||||
|
||||
|
@ -261,12 +261,19 @@ class GraphDbAccessor {
|
||||
// index automatically, but we still have to add to index everything that
|
||||
// happened earlier. We have to first wait for every transaction that
|
||||
// happend before, or a bit later than CreateIndex to end.
|
||||
auto wait_transaction = db_.tx_engine.begin();
|
||||
wait_transaction->wait_for_active_except(transaction_->id);
|
||||
wait_transaction->commit();
|
||||
{
|
||||
auto wait_transaction = db_.tx_engine.Begin();
|
||||
for (auto id : wait_transaction->snapshot()) {
|
||||
if (id == transaction_->id_) continue;
|
||||
while (wait_transaction->engine_.clog_.fetch_info(id).is_active())
|
||||
// TODO reconsider this constant, currently rule-of-thumb chosen
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(100));
|
||||
}
|
||||
wait_transaction->Commit();
|
||||
}
|
||||
|
||||
// This transaction surely sees everything that happened before CreateIndex.
|
||||
auto transaction = db_.tx_engine.begin();
|
||||
auto transaction = db_.tx_engine.Begin();
|
||||
|
||||
for (auto vertex_vlist : db_.vertices_.access()) {
|
||||
auto vertex_record = vertex_vlist->find(*transaction);
|
||||
@ -277,7 +284,7 @@ class GraphDbAccessor {
|
||||
}
|
||||
// Commit transaction as we finished applying method on newest visible
|
||||
// records.
|
||||
transaction->commit();
|
||||
transaction->Commit();
|
||||
// After these two operations we are certain that everything is contained in
|
||||
// the index under the assumption that this transaction contained no
|
||||
// vertex/edge insert/update before this method was invoked.
|
||||
|
@ -116,12 +116,13 @@ static auto GetVlists(
|
||||
/**
|
||||
* @brief - Removes from the index all entries for which records don't contain
|
||||
* the given label/edge type/label + property anymore. Also update (remove)
|
||||
* all record which are not visible for any transaction with an id larger or
|
||||
* equal to `id`. This method assumes that the MVCC GC has been run with the
|
||||
* same 'id'.
|
||||
* all records which are not visible for any transaction in the given
|
||||
* 'snapshot'. This method assumes that the MVCC GC has been run with the
|
||||
* same 'snapshot'.
|
||||
*
|
||||
* @param indices - map of index entries (TIndexKey, skiplist<TIndexEntry>)
|
||||
* @param id - oldest active id, safe to remove everything deleted before this
|
||||
* id.
|
||||
* @param snapshot - the GC snapshot. Consists of the oldest active
|
||||
* transaction's snapshot, with that transaction's id appened as last.
|
||||
* @param engine - transaction engine to see which records are commited
|
||||
* @param exists - function which checks 'key' and 'entry' if the entry still
|
||||
* contains required properties (key + optional value (in case of label_property
|
||||
@ -132,45 +133,42 @@ static auto GetVlists(
|
||||
*/
|
||||
template <class TKey, class TIndexEntry, class TRecord>
|
||||
static void Refresh(
|
||||
ConcurrentMap<TKey, SkipList<TIndexEntry> *> &indices, const Id &id,
|
||||
tx::Engine &engine,
|
||||
ConcurrentMap<TKey, SkipList<TIndexEntry> *> &indices,
|
||||
const tx::Snapshot &snapshot, tx::Engine &engine,
|
||||
const std::function<bool(const TKey &, const TIndexEntry &)> exists) {
|
||||
// iterate over all the indices
|
||||
for (auto &key_indices_pair : indices.access()) {
|
||||
// iterate over index entries
|
||||
auto indices_entries_accessor = key_indices_pair.second->access();
|
||||
for (auto indices_entry : indices_entries_accessor) {
|
||||
// Remove it from index if it's deleted before the current id, or it
|
||||
// doesn't have that label/edge_type/label+property anymore. We need to
|
||||
// be careful when we are deleting the record which is not visible
|
||||
// anymore because even though that record is not visible, some other,
|
||||
// newer record could be visible, and might contain the label, and might
|
||||
// not be in the index - that's why we can't remove it completely, but
|
||||
// just update it's record value.
|
||||
// We also need to be extra careful when checking for existance of
|
||||
// label/edge_type because that record could still be under the update
|
||||
// operation and as such could be modified while we are reading the
|
||||
// label/edge_type - that's why we have to wait for it's creation id to
|
||||
// be lower than ours `id` as that means it's surely either commited or
|
||||
// aborted as we always refresh with the oldest active id.
|
||||
if (indices_entry.record_->is_not_visible_from(id, engine)) {
|
||||
// We first have to insert and then remove, because otherwise we might
|
||||
// have a situation where there exists a vlist with the record
|
||||
// containg the label/edge_type/label+property but it's not in our
|
||||
// index.
|
||||
// Get the oldest not deleted version for current 'id' (MVCC GC takes
|
||||
// care of this.)
|
||||
if (indices_entry.record_->is_not_visible_from(snapshot, engine)) {
|
||||
// be careful when deleting the record which is not visible anymore.
|
||||
// it's newer copy could be visible, and might still logically belong to
|
||||
// index (it satisfies the `exists` function). that's why we can't just
|
||||
// remove the index entry, but also re-insert the oldest visible record
|
||||
// to the index. if that record does not satisfy `exists`, it will be
|
||||
// cleaned up in the next Refresh first insert and then remove,
|
||||
// otherwise there is a timeframe during which the record is not present
|
||||
// in the index
|
||||
auto new_record = indices_entry.vlist_->Oldest();
|
||||
if (new_record != nullptr)
|
||||
indices_entries_accessor.insert(
|
||||
TIndexEntry(indices_entry, new_record));
|
||||
|
||||
auto success = indices_entries_accessor.remove(indices_entry);
|
||||
debug_assert(success == true, "Not able to delete entry.");
|
||||
} else if (indices_entry.record_->tx.cre() < id &&
|
||||
!exists(key_indices_pair.first, indices_entry)) {
|
||||
// Since id is the oldest active id, if the record has been created
|
||||
// before it we are sure that it won't be modified anymore and that
|
||||
// the creating transaction finished, that's why it's safe to check
|
||||
// it, and potentially remove it from index.
|
||||
debug_assert(success, "Unable to delete entry.");
|
||||
}
|
||||
|
||||
// if the record is still visible,
|
||||
// check if it satisfies the `exists` function. if not
|
||||
// it does not belong in index anymore.
|
||||
// be careful when using the `exists` function
|
||||
// because it's creator transaction could still be modifying it,
|
||||
// and modify+read is not thread-safe. for that reason we need to
|
||||
// first see if the the transaction that created it has ended
|
||||
// (tx.cre() < oldest active trancsation).
|
||||
else if (indices_entry.record_->tx.cre() < snapshot.back() &&
|
||||
!exists(key_indices_pair.first, indices_entry)) {
|
||||
indices_entries_accessor.remove(indices_entry);
|
||||
}
|
||||
}
|
||||
|
@ -83,13 +83,15 @@ class KeyIndex {
|
||||
* @brief - Removes from the index all entries for which records don't contain
|
||||
* the given label anymore. Update all record which are not visible for any
|
||||
* transaction with an id larger or equal to `id`.
|
||||
* @param id - oldest active id, safe to remove everything deleted before this
|
||||
* id.
|
||||
*
|
||||
* @param snapshot - the GC snapshot. Consists of the oldest active
|
||||
* transaction's snapshot, with that transaction's id appened as last.
|
||||
* @param engine - transaction engine to see which records are commited
|
||||
*/
|
||||
void Refresh(const Id &id, tx::Engine &engine) {
|
||||
void Refresh(const tx::Snapshot &snapshot, tx::Engine &engine) {
|
||||
return IndexUtils::Refresh<TKey, IndexEntry, TRecord>(
|
||||
indices_, id, engine, [](const TKey &key, const IndexEntry &entry) {
|
||||
indices_, snapshot, engine,
|
||||
[](const TKey &key, const IndexEntry &entry) {
|
||||
return KeyIndex::Exists(key, entry.record_);
|
||||
});
|
||||
}
|
||||
|
@ -243,12 +243,13 @@ class LabelPropertyIndex {
|
||||
* @brief - Removes from the index all entries for which records don't contain
|
||||
* the given label anymore, or the record was deleted before this transaction
|
||||
* id.
|
||||
* @param id - oldest active id, safe to remove everything deleted before this
|
||||
* id.
|
||||
*
|
||||
* @param snapshot - the GC snapshot. Consists of the oldest active
|
||||
* transaction's snapshot, with that transaction's id appened as last.
|
||||
*/
|
||||
void Refresh(const Id &id, tx::Engine &engine) {
|
||||
void Refresh(const tx::Snapshot &snapshot, tx::Engine &engine) {
|
||||
return IndexUtils::Refresh<Key, IndexEntry, Vertex>(
|
||||
indices_, id, engine, [](const Key &key, const IndexEntry &entry) {
|
||||
indices_, snapshot, engine, [](const Key &key, const IndexEntry &entry) {
|
||||
return LabelPropertyIndex::Exists(key, entry.value_, entry.record_);
|
||||
});
|
||||
}
|
||||
|
@ -1,13 +0,0 @@
|
||||
#include "mvcc/id.hpp"
|
||||
|
||||
Id::Id(uint64_t id) : id(id) {}
|
||||
|
||||
bool operator<(const Id& a, const Id& b) { return a.id < b.id; }
|
||||
|
||||
bool operator==(const Id& a, const Id& b) { return a.id == b.id; }
|
||||
|
||||
std::ostream& operator<<(std::ostream& stream, const Id& id) {
|
||||
return stream << id.id;
|
||||
}
|
||||
|
||||
Id::operator uint64_t() const { return id; }
|
@ -1,32 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <stdint.h>
|
||||
#include <limits>
|
||||
#include <ostream>
|
||||
|
||||
#include "utils/total_ordering.hpp"
|
||||
|
||||
class Id : public TotalOrdering<Id> {
|
||||
public:
|
||||
Id() = default;
|
||||
|
||||
Id(uint64_t id);
|
||||
|
||||
friend bool operator<(const Id &a, const Id &b);
|
||||
|
||||
friend bool operator==(const Id &a, const Id &b);
|
||||
|
||||
friend std::ostream &operator<<(std::ostream &stream, const Id &id);
|
||||
|
||||
operator uint64_t() const;
|
||||
|
||||
/**
|
||||
* @brief - Return maximal possible Id.
|
||||
*/
|
||||
static const Id MaximalId() {
|
||||
return Id(std::numeric_limits<uint64_t>::max());
|
||||
}
|
||||
|
||||
private:
|
||||
uint64_t id{0};
|
||||
};
|
@ -9,7 +9,6 @@
|
||||
|
||||
#include "mvcc/cre_exp.hpp"
|
||||
#include "mvcc/hints.hpp"
|
||||
#include "mvcc/id.hpp"
|
||||
#include "mvcc/version.hpp"
|
||||
#include "storage/locking/record_lock.hpp"
|
||||
|
||||
@ -30,19 +29,19 @@ class Record : public Version<T> {
|
||||
// TODO maybe disable the copy-constructor and instead use a
|
||||
// data variable in the version_list update() function (and similar)
|
||||
// like it was in Dominik's implementation
|
||||
Record(const Record &other) {}
|
||||
Record(const Record &) {}
|
||||
|
||||
// tx.cre is the id of the transaction that created the record
|
||||
// and tx.exp is the id of the transaction that deleted the record
|
||||
// these values are used to determine the visibility of the record
|
||||
// to the current transaction
|
||||
CreExp<Id> tx;
|
||||
CreExp<tx::transaction_id_t> tx;
|
||||
|
||||
// cmd.cre is the id of the command in this transaction that created the
|
||||
// record and cmd.exp is the id of the command in this transaction that
|
||||
// deleted the record. these values are used to determine the visibility
|
||||
// of the record to the current command in the running transaction
|
||||
CreExp<uint8_t> cmd;
|
||||
CreExp<tx::command_id_t> cmd;
|
||||
|
||||
Hints hints;
|
||||
|
||||
@ -52,67 +51,86 @@ class Record : public Version<T> {
|
||||
|
||||
// check if this record is visible to the transaction t
|
||||
bool visible(const tx::Transaction &t) {
|
||||
// TODO check if the record was created by a transaction that has been
|
||||
// aborted. one might implement this by checking the hints in mvcc
|
||||
// anc/or consulting the commit log
|
||||
|
||||
// Mike Olson says 17 march 1993: the tests in this routine are correct;
|
||||
// if you think they're not, you're wrong, and you should think about it
|
||||
// again. i know, it happened to me.
|
||||
|
||||
return ((tx.cre() == t.id && // inserted by the current transaction
|
||||
cmd.cre() < t.cid && // before this command, and
|
||||
(tx.exp() == Id(0) || // the row has not been deleted, or
|
||||
(tx.exp() == t.id && // it was deleted by the current
|
||||
// fetch expiration info in a safe way (see fetch_exp for details)
|
||||
tx::transaction_id_t tx_exp;
|
||||
tx::command_id_t cmd_exp;
|
||||
std::tie(tx_exp, cmd_exp) = fetch_exp();
|
||||
|
||||
return ((tx.cre() == t.id_ && // inserted by the current transaction
|
||||
cmd.cre() < t.cid() && // before this command, and
|
||||
(tx_exp == 0 || // the row has not been deleted, or
|
||||
(tx_exp == t.id_ && // it was deleted by the current
|
||||
// transaction
|
||||
cmd.exp() >= t.cid))) // but not before this command,
|
||||
cmd_exp >= t.cid()))) // but not before this command,
|
||||
|| // or
|
||||
(cre_committed(tx.cre(), t) && // the record was inserted by a
|
||||
// committed transaction, and
|
||||
(tx.exp() == Id(0) || // the record has not been deleted, or
|
||||
(tx.exp() == t.id && // the row is being deleted by this
|
||||
// transaction
|
||||
cmd.exp() >= t.cid) || // but it's not deleted "yet", or
|
||||
(tx.exp() != t.id && // the row was deleted by another
|
||||
// transaction
|
||||
!exp_committed(tx.exp(), t) // that has not been committed
|
||||
(tx_exp == 0 || // the record has not been deleted, or
|
||||
(tx_exp == t.id_ && // the row is being deleted by this
|
||||
// transaction
|
||||
cmd_exp >= t.cid()) || // but it's not deleted "yet", or
|
||||
(tx_exp != t.id_ && // the row was deleted by another
|
||||
// transaction
|
||||
!exp_committed(tx_exp, t) // that has not been committed
|
||||
))));
|
||||
}
|
||||
|
||||
void mark_created(const tx::Transaction &t) {
|
||||
debug_assert(tx.cre() == Id(0), "Marking node as created twice.");
|
||||
tx.cre(t.id);
|
||||
cmd.cre(t.cid);
|
||||
debug_assert(tx.cre() == 0, "Marking node as created twice.");
|
||||
tx.cre(t.id_);
|
||||
cmd.cre(t.cid());
|
||||
}
|
||||
|
||||
void mark_deleted(const tx::Transaction &t) {
|
||||
if (tx.exp() != Id(0)) hints.exp.clear();
|
||||
tx.exp(t.id);
|
||||
cmd.exp(t.cid);
|
||||
if (tx.exp() != 0) hints.exp.clear();
|
||||
tx.exp(t.id_);
|
||||
cmd.exp(t.cid());
|
||||
}
|
||||
|
||||
bool exp_committed(const Id &id, const tx::Transaction &t) {
|
||||
bool exp_committed(tx::transaction_id_t id, const tx::Transaction &t) {
|
||||
return committed(hints.exp, id, t);
|
||||
}
|
||||
|
||||
bool exp_committed(const tx::Transaction &t) {
|
||||
return committed(hints.exp, tx.exp(), t.engine);
|
||||
bool exp_committed(tx::Engine &engine) {
|
||||
return committed(hints.exp, tx.exp(), engine);
|
||||
}
|
||||
|
||||
bool cre_committed(const Id &id, const tx::Transaction &t) {
|
||||
bool cre_committed(tx::transaction_id_t id, const tx::Transaction &t) {
|
||||
return committed(hints.cre, id, t);
|
||||
}
|
||||
|
||||
bool cre_committed(const tx::Transaction &t) {
|
||||
return committed(hints.cre, tx.cre(), t);
|
||||
}
|
||||
/**
|
||||
* Check if this record is visible w.r.t. to the given garbage collection
|
||||
* snapshot. See source comments for exact logic.
|
||||
*
|
||||
* @param snapshot - the GC snapshot. Consists of the oldest active
|
||||
* transaction's snapshot, with that transaction's id appened as last.
|
||||
*/
|
||||
bool is_not_visible_from(const tx::Snapshot &snapshot,
|
||||
tx::Engine &engine) const {
|
||||
// first get tx.exp so that all the subsequent checks operate on
|
||||
// the same id. otherwise there could be a race condition
|
||||
auto exp_id = tx.exp();
|
||||
|
||||
// Record won'te be visible to any transaction after or at `id` if it was
|
||||
// expired before id or it's creation was aborted.
|
||||
bool is_not_visible_from(const Id &id, tx::Engine &engine) const {
|
||||
return (tx.exp() != Id(0) && tx.exp() < id &&
|
||||
engine.clog.is_committed(tx.exp())) ||
|
||||
engine.clog.is_aborted(tx.cre());
|
||||
// a record is NOT visible if:
|
||||
// 1. it creating transaction aborted (last check)
|
||||
// OR
|
||||
// 2. a) it's expiration is not 0 (some transaction expired it)
|
||||
// AND
|
||||
// b) the expiring transaction is older than latest active
|
||||
// AND
|
||||
// c) that transaction committed (as opposed to aborted)
|
||||
// AND
|
||||
// d) that transaction is not in oldest active transaction's
|
||||
// snapshot (consequently also not in the snapshots of
|
||||
// newer transactions)
|
||||
return (exp_id != 0 && exp_id < snapshot.back() &&
|
||||
engine.clog_.is_committed(exp_id) && !snapshot.contains(exp_id)) ||
|
||||
engine.clog_.is_aborted(tx.cre());
|
||||
}
|
||||
|
||||
// TODO: Test this
|
||||
@ -122,12 +140,17 @@ class Record : public Version<T> {
|
||||
// OR DURING this command. this is done to support cypher's
|
||||
// queries which can match, update and return in the same query
|
||||
bool is_visible_write(const tx::Transaction &t) {
|
||||
return (tx.cre() == t.id && // inserted by the current transaction
|
||||
cmd.cre() <= t.cid && // before OR DURING this command, and
|
||||
(tx.exp() == Id(0) || // the row has not been deleted, or
|
||||
(tx.exp() == t.id && // it was deleted by the current
|
||||
// fetch expiration info in a safe way (see fetch_exp for details)
|
||||
tx::transaction_id_t tx_exp;
|
||||
tx::command_id_t cmd_exp;
|
||||
std::tie(tx_exp, cmd_exp) = fetch_exp();
|
||||
|
||||
return (tx.cre() == t.id_ && // inserted by the current transaction
|
||||
cmd.cre() <= t.cid() && // before OR DURING this command, and
|
||||
(tx_exp == 0 || // the row has not been deleted, or
|
||||
(tx_exp == t.id_ && // it was deleted by the current
|
||||
// transaction
|
||||
cmd.exp() >= t.cid))); // but not before this command,
|
||||
cmd_exp >= t.cid()))); // but not before this command,
|
||||
}
|
||||
|
||||
/**
|
||||
@ -135,7 +158,7 @@ class Record : public Version<T> {
|
||||
* of the given transaction.
|
||||
*/
|
||||
bool is_created_by(const tx::Transaction &t) {
|
||||
return tx.cre() == t.id && cmd.cre() == t.cid;
|
||||
return tx.cre() == t.id_ && cmd.cre() == t.cid();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -143,22 +166,38 @@ class Record : public Version<T> {
|
||||
* of the given transaction.
|
||||
*/
|
||||
bool is_deleted_by(const tx::Transaction &t) {
|
||||
return tx.exp() == t.id && cmd.exp() == t.cid;
|
||||
return tx.exp() == t.id_ && cmd.exp() == t.cid();
|
||||
}
|
||||
|
||||
private:
|
||||
/** Fetch the (transaction, command) expiration before the check
|
||||
* because they can be concurrently modified by multiple transactions.
|
||||
* Do it in a loop to ensure that command is consistent with transaction.
|
||||
*/
|
||||
auto fetch_exp() {
|
||||
tx::transaction_id_t tx_exp;
|
||||
tx::command_id_t cmd_exp;
|
||||
do {
|
||||
tx_exp = tx.exp();
|
||||
cmd_exp = cmd.exp();
|
||||
} while (tx_exp != tx.exp());
|
||||
return std::make_pair(tx_exp, cmd_exp);
|
||||
}
|
||||
|
||||
protected:
|
||||
/**
|
||||
* @brief - Check if the id is commited from the perspective of transactio,
|
||||
* i.e. transaction can see the transaction with that id (it happened before
|
||||
* the transaction, and is not in the snapshot). This method is used to test
|
||||
* for visibility of some record.
|
||||
* @brief - Check if the transaction with the given `id`
|
||||
* is commited from the perspective of transaction `t`.
|
||||
*
|
||||
* Evaluates to true if that transaction has committed,
|
||||
* it started before `t` and it's not in it's snapshot.
|
||||
*
|
||||
* @param hints - hints to use to determine commit/abort
|
||||
* about transactions commit/abort status
|
||||
* @param id - id to check if it's commited and visible
|
||||
* @return true if the id is commited and visible for the transaction t.
|
||||
*/
|
||||
template <class U>
|
||||
bool committed(U &hints, const Id &id, const tx::Transaction &t) {
|
||||
bool committed(U &hints, tx::transaction_id_t id, const tx::Transaction &t) {
|
||||
// Dominik Gleich says 4 april 2017: the tests in this routine are correct;
|
||||
// if you think they're not, you're wrong, and you should think about it
|
||||
// again. I know, it happened to me (and also to Matej Gradicek).
|
||||
@ -166,47 +205,47 @@ class Record : public Version<T> {
|
||||
// You certainly can't see the transaction with id greater than yours as
|
||||
// that means it started after this transaction and if it commited, it
|
||||
// commited after this transaction has started.
|
||||
if (id >= t.id) return false;
|
||||
if (id >= t.id_) return false;
|
||||
|
||||
// The creating transaction is still in progress (examine snapshot)
|
||||
if (t.in_snapshot(id)) return false;
|
||||
if (t.snapshot().contains(id)) return false;
|
||||
|
||||
auto hint_bits = hints.load();
|
||||
|
||||
// TODO: Validate if this position is valid for next if.
|
||||
// if hints are set, return if xid is committed
|
||||
if (!hint_bits.is_unknown()) return hint_bits.is_committed();
|
||||
|
||||
// if hints are not set:
|
||||
// - you are the first one to check since it ended, consult commit log
|
||||
auto info = t.engine.clog.fetch_info(id);
|
||||
|
||||
if (info.is_committed()) return hints.set_committed(), true;
|
||||
|
||||
debug_assert(info.is_aborted(),
|
||||
"Info isn't aborted, but function would return as aborted.");
|
||||
return hints.set_aborted(), false;
|
||||
return committed(hints, id, t.engine_);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief - Check if the id is commited.
|
||||
* @brief - Check if the transaction with the given `id`
|
||||
* is committed.
|
||||
*
|
||||
* @param hints - hints to use to determine commit/abort
|
||||
* @param id - id to check if commited
|
||||
* @param engine - engine instance with information about transactions
|
||||
* @param engine - engine instance with information about transaction
|
||||
* statuses
|
||||
* @return true if it's commited, false otherwise
|
||||
*/
|
||||
template <class U>
|
||||
bool committed(U &hints, const Id &id, tx::Engine &engine) {
|
||||
bool committed(U &hints, tx::transaction_id_t id, tx::Engine &engine) {
|
||||
auto hint_bits = hints.load();
|
||||
// if hints are set, return if xid is committed
|
||||
// if hints are set, return if id is committed
|
||||
if (!hint_bits.is_unknown()) return hint_bits.is_committed();
|
||||
|
||||
// if hints are not set:
|
||||
// - you are the first one to check since it ended, consult commit log
|
||||
auto info = engine.clog.fetch_info(id);
|
||||
// if hints are not set consult the commit log
|
||||
auto info = engine.clog_.fetch_info(id);
|
||||
|
||||
// committed
|
||||
if (info.is_committed()) return hints.set_committed(), true;
|
||||
if (info.is_aborted()) return hints.set_aborted(), false;
|
||||
|
||||
// we can't set_aborted hints because of a race-condition that
|
||||
// can occurr when tx.exp gets changed by some transaction.
|
||||
// to be correct, tx.exp and hints.exp.set_aborted should be
|
||||
// atomic.
|
||||
//
|
||||
// this is not a problem with hints.cre.X because
|
||||
// only one transaction ever creates a record
|
||||
//
|
||||
// it's also not a problem with hints.exp.set_committed
|
||||
// because only one transaction ever can expire a record
|
||||
// and commit
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
@ -72,19 +72,25 @@ class VersionList {
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is NOT thread-safe. This should never be called with a
|
||||
* transaction id newer than the oldest active transaction id.
|
||||
* Re-links all records which are no longer visible for any
|
||||
* transaction with an id greater or equal to id.
|
||||
* @param id - transaction id from which to start garbage collection
|
||||
* that is not visible anymore. If none exists to_delete will rbecome nullptr.
|
||||
* Garbage collects records that are not reachable/visible anymore.
|
||||
*
|
||||
* Relinks this version-list so that garbage collected records are no
|
||||
* longer reachable through this version list.
|
||||
* Visibility is defined in mvcc::Record::is_not_visible_from,
|
||||
* to which the given `snapshot` is passed.
|
||||
*
|
||||
* This method is NOT thread-safe.
|
||||
*
|
||||
* @param snapshot - the GC snapshot. Consists of the oldest active
|
||||
* transaction's snapshot, with that transaction's id appened as last.
|
||||
* @param engine - transaction engine to use - we need it to check which
|
||||
* records were commited and which werent
|
||||
* @return pair<status, to_delete>; status is true - If version list is empty
|
||||
* after garbage collection. to_delete points to the newest record that is not
|
||||
* visible anymore. If none exists to_delete will point to nullptr.
|
||||
*/
|
||||
std::pair<bool, T *> GcDeleted(const Id &id, tx::Engine &engine) {
|
||||
std::pair<bool, T *> GcDeleted(const tx::Snapshot &snapshot,
|
||||
tx::Engine &engine) {
|
||||
// nullptr
|
||||
// |
|
||||
// [v1] ... all of this gets deleted!
|
||||
@ -100,7 +106,7 @@ class VersionList {
|
||||
T *head_of_deletable_records = current;
|
||||
T *oldest_visible_record = nullptr;
|
||||
while (current) {
|
||||
if (!current->is_not_visible_from(id, engine))
|
||||
if (!current->is_not_visible_from(snapshot, engine))
|
||||
oldest_visible_record = current;
|
||||
current = current->next();
|
||||
}
|
||||
@ -242,15 +248,13 @@ class VersionList {
|
||||
"Record is nullptr on lock and validation.");
|
||||
|
||||
// take a lock on this node
|
||||
t.take_lock(lock);
|
||||
t.TakeLock(lock);
|
||||
|
||||
// if the record hasn't been deleted yet or the deleting transaction
|
||||
// has aborted, it's ok to modify it
|
||||
if (!record->tx.exp() || !record->exp_committed(t)) return;
|
||||
if (!record->tx.exp() || !record->exp_committed(t.engine_)) return;
|
||||
|
||||
// if it committed, then we have a serialization conflict
|
||||
debug_assert(record->hints.load().exp.is_committed(),
|
||||
"Serialization conflict.");
|
||||
throw SerializationError();
|
||||
}
|
||||
|
||||
|
@ -4,8 +4,8 @@
|
||||
|
||||
#include <list>
|
||||
|
||||
#include "mvcc/id.hpp"
|
||||
#include "mvcc/record.hpp"
|
||||
#include "transactions/transaction.hpp"
|
||||
#include "utils/assert.hpp"
|
||||
|
||||
/**
|
||||
@ -31,7 +31,8 @@ class DeferredDeleter {
|
||||
* @param last_transaction - nothing newer or equal to it can see these
|
||||
* objects
|
||||
*/
|
||||
void AddObjects(const std::vector<T *> &objects, const Id &last_transaction) {
|
||||
void AddObjects(const std::vector<T *> &objects,
|
||||
tx::transaction_id_t last_transaction) {
|
||||
debug_assert(
|
||||
objects_.size() == 0 || objects_.back().deleted_at <= last_transaction,
|
||||
"Transaction ids are not non-decreasing.");
|
||||
@ -43,7 +44,7 @@ class DeferredDeleter {
|
||||
* @brief - Free memory of objects deleted before the id.
|
||||
* @param id - delete before this id
|
||||
*/
|
||||
void FreeExpiredObjects(const Id &id) {
|
||||
void FreeExpiredObjects(tx::transaction_id_t id) {
|
||||
auto it = objects_.begin();
|
||||
while (it != objects_.end() && it->deleted_at < id) {
|
||||
delete it->object;
|
||||
@ -69,8 +70,8 @@ class DeferredDeleter {
|
||||
*/
|
||||
struct DeletedObject {
|
||||
const T *object;
|
||||
const Id deleted_at;
|
||||
DeletedObject(T *object, const Id &deleted_at)
|
||||
const tx::transaction_id_t deleted_at;
|
||||
DeletedObject(T *object, tx::transaction_id_t deleted_at)
|
||||
: object(object), deleted_at(deleted_at) {}
|
||||
};
|
||||
|
||||
|
@ -1,8 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
// Base class for all classes which need to be safely disposed. Main usage is
|
||||
// for garbage class operations.
|
||||
class DeleteSensitive {
|
||||
public:
|
||||
virtual ~DeleteSensitive() {}
|
||||
};
|
@ -1,16 +0,0 @@
|
||||
#include "storage/garbage/garbage.hpp"
|
||||
|
||||
void Garbage::dispose(tx::Snapshot<Id> &&snapshot, DeleteSensitive *data) {
|
||||
// If this fails it's better to leak memory than to cause read after free.
|
||||
gar.begin().push(std::make_pair(snapshot, data));
|
||||
}
|
||||
|
||||
void Garbage::clean() {
|
||||
for (auto it = gar.begin(); it != gar.end(); it++) {
|
||||
if (it->first.all_finished(engine) && it.remove()) {
|
||||
// All transactions who could have seen data are finished and this
|
||||
// thread successfull removed item from list.
|
||||
it->second->~DeleteSensitive();
|
||||
}
|
||||
}
|
||||
}
|
@ -1,29 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include "data_structures/concurrent/concurrent_list.hpp"
|
||||
#include "mvcc/id.hpp"
|
||||
#include "storage/garbage/delete_sensitive.hpp"
|
||||
#include "transactions/snapshot.hpp"
|
||||
|
||||
namespace tx {
|
||||
class Engine;
|
||||
}
|
||||
|
||||
// Collection of delete sensitive data which need to be safely deleted. That
|
||||
// meens that all transactions that may have pointer to it must finish before
|
||||
// the sensitive data can be safely destroyed.
|
||||
class Garbage {
|
||||
public:
|
||||
Garbage(tx::Engine &e) : engine(e) {}
|
||||
|
||||
// Will safely dispose of data.
|
||||
void dispose(tx::Snapshot<Id> &&snapshot, DeleteSensitive *data);
|
||||
|
||||
// Cleaner thread should call this method every some time. Removes data
|
||||
// which is safe to be deleted.
|
||||
void clean();
|
||||
|
||||
private:
|
||||
ConcurrentList<std::pair<tx::Snapshot<Id>, DeleteSensitive *>> gar;
|
||||
tx::Engine &engine;
|
||||
};
|
@ -2,7 +2,6 @@
|
||||
|
||||
#include "data_structures/concurrent/skiplist.hpp"
|
||||
#include "logging/loggable.hpp"
|
||||
#include "mvcc/id.hpp"
|
||||
#include "mvcc/version_list.hpp"
|
||||
#include "storage/deferred_deleter.hpp"
|
||||
#include "transactions/engine.hpp"
|
||||
@ -25,20 +24,22 @@ class GarbageCollector : public Loggable {
|
||||
/**
|
||||
* @brief - Runs garbage collector. Populates deferred deleters with version
|
||||
* lists and records.
|
||||
* @param id - oldest active transaction id
|
||||
*
|
||||
* @param snapshot - the GC snapshot. Consists of the oldest active
|
||||
* transaction's snapshot, with that transaction's id appened as last.
|
||||
* @param engine - reference to engine object
|
||||
*/
|
||||
void Run(const Id &id, tx::Engine &engine) {
|
||||
void Run(const tx::Snapshot &snapshot, tx::Engine &engine) {
|
||||
auto collection_accessor = this->skiplist_.access();
|
||||
uint64_t count = 0;
|
||||
std::vector<T *> deleted_records;
|
||||
std::vector<mvcc::VersionList<T> *> deleted_version_lists;
|
||||
if (logger.Initialized())
|
||||
logger.trace("Gc started cleaning everything deleted before {}", id);
|
||||
logger.trace("GC started cleaning with snapshot: ", snapshot);
|
||||
for (auto version_list : collection_accessor) {
|
||||
// If the version_list is empty, i.e. there is nothing else to be read
|
||||
// from it we can delete it.
|
||||
auto ret = version_list->GcDeleted(id, engine);
|
||||
auto ret = version_list->GcDeleted(snapshot, engine);
|
||||
if (ret.first) {
|
||||
deleted_version_lists.push_back(version_list);
|
||||
count += collection_accessor.remove(version_list);
|
||||
@ -49,10 +50,10 @@ class GarbageCollector : public Loggable {
|
||||
|
||||
// Add records to deleter, with the id larger or equal than the last active
|
||||
// transaction.
|
||||
record_deleter_.AddObjects(deleted_records, engine.count());
|
||||
record_deleter_.AddObjects(deleted_records, engine.Count());
|
||||
// Add version_lists to deleter, with the id larger or equal than the last
|
||||
// active transaction.
|
||||
version_list_deleter_.AddObjects(deleted_version_lists, engine.count());
|
||||
version_list_deleter_.AddObjects(deleted_version_lists, engine.Count());
|
||||
}
|
||||
|
||||
private:
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
void RecordLock::lock() { mutex.lock(&timeout); }
|
||||
|
||||
LockStatus RecordLock::lock(const Id& id) {
|
||||
LockStatus RecordLock::lock(tx::transaction_id_t id) {
|
||||
if (mutex.try_lock()) return owner = id, LockStatus::Acquired;
|
||||
|
||||
if (owner == id) return LockStatus::AlreadyHeld;
|
||||
@ -11,9 +11,7 @@ LockStatus RecordLock::lock(const Id& id) {
|
||||
}
|
||||
|
||||
void RecordLock::unlock() {
|
||||
owner = INVALID;
|
||||
mutex.unlock();
|
||||
}
|
||||
|
||||
constexpr struct timespec RecordLock::timeout;
|
||||
constexpr Id RecordLock::INVALID;
|
||||
|
@ -1,19 +1,19 @@
|
||||
#pragma once
|
||||
|
||||
#include "mvcc/id.hpp"
|
||||
#include "transactions/type.hpp"
|
||||
#include "storage/locking/lock_status.hpp"
|
||||
#include "threading/sync/futex.hpp"
|
||||
|
||||
class RecordLock {
|
||||
// TODO arbitrary constant, reconsider
|
||||
static constexpr struct timespec timeout { 2, 0 };
|
||||
static constexpr Id INVALID = Id();
|
||||
|
||||
public:
|
||||
LockStatus lock(const Id& id);
|
||||
LockStatus lock(tx::transaction_id_t id);
|
||||
void lock();
|
||||
void unlock();
|
||||
|
||||
private:
|
||||
Futex mutex;
|
||||
Id owner;
|
||||
tx::transaction_id_t owner;
|
||||
};
|
||||
|
@ -1,7 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include "data_structures/bitset/dynamic_bitset.hpp"
|
||||
#include "mvcc/id.hpp"
|
||||
#include "type.hpp"
|
||||
|
||||
namespace tx {
|
||||
|
||||
@ -31,17 +31,19 @@ class CommitLog {
|
||||
|
||||
CommitLog operator=(CommitLog) = delete;
|
||||
|
||||
Info fetch_info(const Id &id) { return Info{log.at(2 * id, 2)}; }
|
||||
Info fetch_info(transaction_id_t id) { return Info{log.at(2 * id, 2)}; }
|
||||
|
||||
bool is_active(const Id &id) { return fetch_info(id).is_active(); }
|
||||
bool is_active(transaction_id_t id) { return fetch_info(id).is_active(); }
|
||||
|
||||
bool is_committed(const Id &id) { return fetch_info(id).is_committed(); }
|
||||
bool is_committed(transaction_id_t id) {
|
||||
return fetch_info(id).is_committed();
|
||||
}
|
||||
|
||||
void set_committed(const Id &id) { log.set(2 * id); }
|
||||
void set_committed(transaction_id_t id) { log.set(2 * id); }
|
||||
|
||||
bool is_aborted(const Id &id) { return fetch_info(id).is_aborted(); }
|
||||
bool is_aborted(transaction_id_t id) { return fetch_info(id).is_aborted(); }
|
||||
|
||||
void set_aborted(const Id &id) { log.set(2 * id + 1); }
|
||||
void set_aborted(transaction_id_t id) { log.set(2 * id + 1); }
|
||||
|
||||
private:
|
||||
// TODO: Searching the log will take more and more time the more and more
|
||||
|
@ -14,123 +14,153 @@
|
||||
|
||||
namespace tx {
|
||||
|
||||
/** Indicates an error in transaction handling (currently
|
||||
* only command id overflow). */
|
||||
class TransactionError : public utils::BasicException {
|
||||
public:
|
||||
using utils::BasicException::BasicException;
|
||||
};
|
||||
|
||||
// max value that could be stored as a command id
|
||||
static constexpr auto kMaxCommandId =
|
||||
std::numeric_limits<decltype(std::declval<Transaction>().cid)>::max();
|
||||
|
||||
/** Database transaction egine.
|
||||
*
|
||||
* Used for managing transactions and the related information
|
||||
* such as transaction snapshots and the commit log.
|
||||
*/
|
||||
class Engine : Lockable<SpinLock> {
|
||||
// limit for the command id, used for checking if we're about
|
||||
// to overflow. slightly unneccessary since command id should
|
||||
// be a 64-bit int
|
||||
static constexpr auto kMaxCommandId =
|
||||
std::numeric_limits<decltype(std::declval<Transaction>().cid())>::max();
|
||||
|
||||
public:
|
||||
using sptr = std::shared_ptr<Engine>;
|
||||
|
||||
Engine() : counter(0) {}
|
||||
|
||||
// Begins transaction and runs given functions in same atomic step.
|
||||
// Functions will be given Transaction&
|
||||
template <class... F>
|
||||
Transaction *begin(F... fun) {
|
||||
/** Begins a transaction and returns a pointer to
|
||||
* it's object.
|
||||
*
|
||||
* The transaction object is owned by this engine.
|
||||
* It will be released when the transaction gets
|
||||
* committted or aborted.
|
||||
*/
|
||||
Transaction *Begin() {
|
||||
auto guard = this->acquire_unique();
|
||||
|
||||
auto id = Id(counter.next());
|
||||
auto t = new Transaction(id, active, *this);
|
||||
transaction_id_t id{counter_.next()};
|
||||
auto t = new Transaction(id, active_, *this);
|
||||
|
||||
active.insert(id);
|
||||
store.put(id, t);
|
||||
|
||||
call(*t, fun...);
|
||||
active_.insert(id);
|
||||
store_.put(id, t);
|
||||
|
||||
return t;
|
||||
}
|
||||
|
||||
Transaction &advance(const Id &id) {
|
||||
/** Advances the command on the transaction with the
|
||||
* given id.
|
||||
*
|
||||
* @param id - Transation id. That transaction must
|
||||
* be currently active.
|
||||
* @return Pointer to the transaction object for id.
|
||||
*/
|
||||
Transaction &Advance(transaction_id_t id) {
|
||||
auto guard = this->acquire_unique();
|
||||
|
||||
auto *t = store.get(id);
|
||||
auto *t = store_.get(id);
|
||||
debug_assert(t != nullptr,
|
||||
"Transaction::advance on non-existing transaction");
|
||||
|
||||
if (t == nullptr) throw TransactionError("Transaction does not exist.");
|
||||
if (t->cid == kMaxCommandId)
|
||||
if (t->cid_ == kMaxCommandId)
|
||||
throw TransactionError(
|
||||
"Reached maximum number of commands in this transaction.");
|
||||
|
||||
// this is a new command
|
||||
t->cid++;
|
||||
|
||||
t->cid_++;
|
||||
return *t;
|
||||
}
|
||||
|
||||
// Returns copy of current snapshot
|
||||
Snapshot<Id> snapshot() {
|
||||
auto guard = this->acquire_unique();
|
||||
|
||||
return active;
|
||||
}
|
||||
|
||||
void commit(const Transaction &t) {
|
||||
auto guard = this->acquire_unique();
|
||||
clog.set_committed(t.id);
|
||||
|
||||
finalize(t);
|
||||
}
|
||||
|
||||
void abort(const Transaction &t) {
|
||||
auto guard = this->acquire_unique();
|
||||
clog.set_aborted(t.id);
|
||||
|
||||
finalize(t);
|
||||
}
|
||||
|
||||
/*
|
||||
*@brief Return oldest active transaction in the active transaction pool. In
|
||||
*case none exist return None.
|
||||
*@return Id of transaction
|
||||
/** Returns the snapshot relevant to garbage collection
|
||||
* of database records.
|
||||
*
|
||||
* If there are no active transactions that means
|
||||
* a snapshot containing only the next transaction ID.
|
||||
* If there are active transactions, that means the
|
||||
* oldest active transaction's snapshot, with that
|
||||
* transaction's ID appened as last.
|
||||
*
|
||||
* The idea is that data records can only be deleted
|
||||
* if they were expired (and that was committed) by
|
||||
* a transaction older then the older currently active.
|
||||
* We need the full snapshot to prevent overlaps (see
|
||||
* general GC documentation).
|
||||
*/
|
||||
Option<Id> oldest_active() {
|
||||
Snapshot GcSnapshot() {
|
||||
auto guard = this->acquire_unique();
|
||||
if (active.size() == 0) return Option<Id>();
|
||||
return Option<Id>(active.front());
|
||||
|
||||
// no active transactions
|
||||
if (active_.size() == 0) {
|
||||
auto snapshot_copy = active_;
|
||||
snapshot_copy.insert(counter_.count() + 1);
|
||||
return snapshot_copy;
|
||||
}
|
||||
|
||||
// there are active transactions
|
||||
auto snapshot_copy = store_.get(active_.front())->snapshot();
|
||||
snapshot_copy.insert(active_.front());
|
||||
return snapshot_copy;
|
||||
}
|
||||
|
||||
// total number of transactions started from the beginning of time
|
||||
uint64_t count() {
|
||||
/** Comits the given transaction. Deletes the transaction
|
||||
* object, it's not valid after this function executes. */
|
||||
void Commit(const Transaction &t) {
|
||||
auto guard = this->acquire_unique();
|
||||
return counter.count();
|
||||
clog_.set_committed(t.id_);
|
||||
|
||||
Finalize(t);
|
||||
}
|
||||
|
||||
// the number of currently active transactions
|
||||
size_t size() {
|
||||
/** Aborts the given transaction. Deletes the transaction
|
||||
* object, it's not valid after this function executes. */
|
||||
void Abort(const Transaction &t) {
|
||||
auto guard = this->acquire_unique();
|
||||
return active.size();
|
||||
clog_.set_aborted(t.id_);
|
||||
|
||||
Finalize(t);
|
||||
}
|
||||
|
||||
CommitLog clog;
|
||||
/** The total number of transactions that have
|
||||
* executed since the creation of this engine */
|
||||
auto Count() {
|
||||
auto guard = this->acquire_unique();
|
||||
return counter_.count();
|
||||
}
|
||||
|
||||
/** The count of currently active transactions */
|
||||
size_t ActiveCount() {
|
||||
auto guard = this->acquire_unique();
|
||||
return active_.size();
|
||||
}
|
||||
|
||||
// TODO make this private and expose "const CommitLog"
|
||||
// through a getter. To do that you need to make the
|
||||
// appropriate CommitLog functions const. To do THAT,
|
||||
// you need to make appropriate DynamicBitset functions
|
||||
// const. While doing that, clean the DynamicBitset up.
|
||||
/** Commit log of this engine */
|
||||
CommitLog clog_;
|
||||
|
||||
private:
|
||||
template <class T, class... F>
|
||||
void call(Transaction &t, T fun, F... funs) {
|
||||
call(t, fun);
|
||||
call(t, funs...);
|
||||
// Performs cleanup common to ending the transaction
|
||||
// with either commit or abort
|
||||
void Finalize(const Transaction &t) {
|
||||
active_.remove(t.id_);
|
||||
store_.del(t.id_);
|
||||
}
|
||||
|
||||
template <class T>
|
||||
void call(Transaction &t, T fun) {
|
||||
fun(t);
|
||||
}
|
||||
// transaction counter. contains the number of transactions
|
||||
// ever created till now
|
||||
SimpleCounter<transaction_id_t> counter_{0};
|
||||
|
||||
void call(Transaction &t) {}
|
||||
// a snapshot of currently active transactions
|
||||
Snapshot active_;
|
||||
|
||||
void finalize(const Transaction &t) {
|
||||
active.remove(t.id);
|
||||
|
||||
// remove transaction from store
|
||||
store.del(t.id);
|
||||
}
|
||||
|
||||
SimpleCounter<uint64_t> counter;
|
||||
Snapshot<Id> active;
|
||||
TransactionStore<uint64_t> store;
|
||||
// storage for the transactions
|
||||
TransactionStore<transaction_id_t> store_;
|
||||
};
|
||||
}
|
||||
|
@ -1,16 +0,0 @@
|
||||
#include "transactions/snapshot.hpp"
|
||||
|
||||
#include "transactions/engine.hpp"
|
||||
|
||||
template <class id_t>
|
||||
bool tx::Snapshot<id_t>::all_finished(Engine &engine) const {
|
||||
for (auto &sid : active) {
|
||||
if (engine.clog.is_active(sid)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
template class tx::Snapshot<Id>;
|
@ -1,66 +1,90 @@
|
||||
#pragma once
|
||||
|
||||
#include <algorithm>
|
||||
#include <iostream>
|
||||
#include <vector>
|
||||
|
||||
#include "mvcc/id.hpp"
|
||||
#include "transaction.hpp"
|
||||
#include "utils/algorithm.hpp"
|
||||
#include "utils/assert.hpp"
|
||||
#include "utils/option.hpp"
|
||||
|
||||
namespace tx {
|
||||
|
||||
class Engine;
|
||||
|
||||
template <class id_t>
|
||||
/** Ascendingly sorted collection of transaction ids.
|
||||
*
|
||||
* Represents the transactions that were active at
|
||||
* some point in the discrete transaction time.
|
||||
*/
|
||||
class Snapshot {
|
||||
public:
|
||||
Snapshot() = default;
|
||||
Snapshot(std::vector<transaction_id_t> &&active)
|
||||
: transaction_ids_(std::move(active)) {}
|
||||
// all the copy/move constructors/assignments act naturally
|
||||
|
||||
Snapshot(std::vector<id_t> active) : active(std::move(active)) {}
|
||||
|
||||
Snapshot(const Snapshot &other) { active = other.active; }
|
||||
|
||||
Snapshot(Snapshot &&other) { active = std::move(other.active); }
|
||||
|
||||
// True if all transaction from snapshot have finished.
|
||||
bool all_finished(Engine &engine) const;
|
||||
|
||||
bool is_active(id_t xid) const {
|
||||
return std::binary_search(active.begin(), active.end(), xid);
|
||||
/** Returns true if this snapshot contains the given
|
||||
* transaction id.
|
||||
*
|
||||
* @param xid - The transcation id in question
|
||||
*/
|
||||
bool contains(transaction_id_t id) const {
|
||||
return std::binary_search(transaction_ids_.begin(), transaction_ids_.end(),
|
||||
id);
|
||||
}
|
||||
|
||||
// Return id of oldest transaction. None if there is no transactions in
|
||||
// snapshot.
|
||||
Option<Id> oldest_active() const {
|
||||
auto n = active.size();
|
||||
if (n > 0) {
|
||||
Id min = active[0];
|
||||
for (auto i = 1; i < n; i++) {
|
||||
if (active[i] < min) {
|
||||
min = active[i];
|
||||
}
|
||||
}
|
||||
return Option<Id>(min);
|
||||
|
||||
} else {
|
||||
return Option<Id>();
|
||||
}
|
||||
/** Adds the given transaction id to the end of this Snapshot.
|
||||
* The given id must be greater then all the existing ones,
|
||||
* to maintain ascending sort order.
|
||||
*
|
||||
* @param id - the transaction id to add
|
||||
*/
|
||||
void insert(transaction_id_t id) {
|
||||
transaction_ids_.push_back(id);
|
||||
debug_assert(
|
||||
std::is_sorted(transaction_ids_.begin(), transaction_ids_.end()),
|
||||
"Snapshot must be sorted");
|
||||
}
|
||||
|
||||
void insert(const id_t &id) { active.push_back(id); }
|
||||
|
||||
void remove(const id_t &id) {
|
||||
// remove transaction from the active transactions list
|
||||
auto last = std::remove(active.begin(), active.end(), id);
|
||||
active.erase(last, active.end());
|
||||
/** Removes the given transaction id from this Snapshot.
|
||||
*
|
||||
* @param id - the transaction id to remove
|
||||
*/
|
||||
void remove(transaction_id_t id) {
|
||||
auto last =
|
||||
std::remove(transaction_ids_.begin(), transaction_ids_.end(), id);
|
||||
transaction_ids_.erase(last, transaction_ids_.end());
|
||||
}
|
||||
|
||||
const id_t &front() const { return active.front(); }
|
||||
transaction_id_t front() const {
|
||||
debug_assert(transaction_ids_.size(), "Snapshot.front() on empty Snapshot");
|
||||
return transaction_ids_.front();
|
||||
}
|
||||
|
||||
const id_t &back() const { return active.back(); }
|
||||
transaction_id_t back() const {
|
||||
debug_assert(transaction_ids_.size(), "Snapshot.back() on empty Snapshot");
|
||||
return transaction_ids_.back();
|
||||
}
|
||||
|
||||
size_t size() { return active.size(); }
|
||||
size_t size() const { return transaction_ids_.size(); }
|
||||
bool empty() const { return transaction_ids_.empty(); }
|
||||
bool operator==(const Snapshot &other) const {
|
||||
return transaction_ids_ == other.transaction_ids_;
|
||||
}
|
||||
auto begin() const { return transaction_ids_.begin(); }
|
||||
auto end() const { return transaction_ids_.end(); }
|
||||
|
||||
friend std::ostream &operator<<(std::ostream &stream,
|
||||
const Snapshot &snapshot) {
|
||||
stream << "Snapshot(";
|
||||
PrintIterable(stream, snapshot.transaction_ids_);
|
||||
stream << ")";
|
||||
return stream;
|
||||
}
|
||||
|
||||
private:
|
||||
std::vector<id_t> active;
|
||||
std::vector<transaction_id_t> transaction_ids_;
|
||||
};
|
||||
}
|
||||
|
@ -7,44 +7,13 @@
|
||||
#include "transactions/engine.hpp"
|
||||
|
||||
namespace tx {
|
||||
Transaction::Transaction(Engine &engine)
|
||||
: Transaction(Id(), Snapshot<Id>(), engine) {}
|
||||
|
||||
Transaction::Transaction(const Id &&id, const Snapshot<Id> &&snapshot,
|
||||
Transaction::Transaction(transaction_id_t id, const Snapshot &snapshot,
|
||||
Engine &engine)
|
||||
: id(id), cid(1), engine(engine), snapshot(std::move(snapshot)) {}
|
||||
: id_(id), engine_(engine), snapshot_(snapshot) {}
|
||||
|
||||
Transaction::Transaction(const Id &id, const Snapshot<Id> &snapshot,
|
||||
Engine &engine)
|
||||
: id(id), cid(1), engine(engine), snapshot(snapshot) {}
|
||||
void Transaction::TakeLock(RecordLock &lock) { locks_.take(&lock, id_); }
|
||||
|
||||
void Transaction::wait_for_active_except(const Id &id) const {
|
||||
Snapshot<Id> local_snapshot = snapshot;
|
||||
local_snapshot.remove(id);
|
||||
while (local_snapshot.size() > 0) {
|
||||
auto sid = local_snapshot.front();
|
||||
while (engine.clog.fetch_info(sid).is_active()) {
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(100));
|
||||
}
|
||||
local_snapshot.remove(sid);
|
||||
}
|
||||
}
|
||||
|
||||
void Transaction::take_lock(RecordLock &lock) { locks.take(&lock, id); }
|
||||
|
||||
void Transaction::commit() { engine.commit(*this); }
|
||||
|
||||
void Transaction::abort() { engine.abort(*this); }
|
||||
|
||||
bool Transaction::all_finished() {
|
||||
return !engine.clog.is_active(id) && snapshot.all_finished(engine);
|
||||
}
|
||||
|
||||
bool Transaction::in_snapshot(const Id &id) const {
|
||||
return snapshot.is_active(id);
|
||||
}
|
||||
|
||||
Id Transaction::oldest_active() {
|
||||
return snapshot.oldest_active().take_or(Id(id));
|
||||
}
|
||||
void Transaction::Commit() { engine_.Commit(*this); }
|
||||
|
||||
void Transaction::Abort() { engine_.Abort(*this); }
|
||||
}
|
||||
|
@ -1,59 +1,73 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <cstdint>
|
||||
#include <cstdlib>
|
||||
#include <vector>
|
||||
|
||||
#include "mvcc/id.hpp"
|
||||
#include "storage/locking/record_lock.hpp"
|
||||
#include "transactions/lock_store.hpp"
|
||||
#include "transactions/snapshot.hpp"
|
||||
#include "type.hpp"
|
||||
|
||||
namespace tx {
|
||||
|
||||
class Engine;
|
||||
|
||||
/** A database transaction. Encapsulates an atomic,
|
||||
* abortable unit of work. Also defines that all db
|
||||
* ops are single-threaded within a single transaction */
|
||||
class Transaction {
|
||||
friend class Engine;
|
||||
|
||||
public:
|
||||
explicit Transaction(Engine &engine);
|
||||
Transaction(const Id &&id, const Snapshot<Id> &&snapshot, Engine &engine);
|
||||
Transaction(const Id &id, const Snapshot<Id> &snapshot, Engine &engine);
|
||||
Transaction(const Transaction &) = delete;
|
||||
Transaction(Transaction &&) = default;
|
||||
|
||||
// Blocks until all transactions from snapshot finish, except the 'id' one.
|
||||
// After this method, snapshot will be either empty or contain transaction
|
||||
// with Id 'id'.
|
||||
void wait_for_active_except(const Id &id) const;
|
||||
|
||||
void take_lock(RecordLock &lock);
|
||||
void commit();
|
||||
void abort();
|
||||
|
||||
// True if this transaction and every transaction from snapshot have
|
||||
// finished.
|
||||
bool all_finished();
|
||||
|
||||
// Return id of oldest transaction from snapshot.
|
||||
Id oldest_active();
|
||||
|
||||
// True if id is in snapshot.
|
||||
bool in_snapshot(const Id &id) const;
|
||||
|
||||
// index of this transaction
|
||||
const Id id;
|
||||
|
||||
// index of the current command in the current transaction;
|
||||
uint64_t cid;
|
||||
|
||||
Engine &engine;
|
||||
/** Returns the maximum possible transcation id */
|
||||
static transaction_id_t MaxId() {
|
||||
return std::numeric_limits<transaction_id_t>::max();
|
||||
}
|
||||
|
||||
private:
|
||||
friend class Engine;
|
||||
|
||||
// the constructor is private, only the Engine ever uses it
|
||||
Transaction(transaction_id_t id, const Snapshot &snapshot, Engine &engine);
|
||||
|
||||
// a transaction can't be moved nor copied. it's owned by the transaction
|
||||
// engine, and it's lifetime is managed by it
|
||||
Transaction(const Transaction &) = delete;
|
||||
Transaction(Transaction &&) = delete;
|
||||
Transaction &operator=(const Transaction &) = delete;
|
||||
Transaction &operator=(Transaction &&) = delete;
|
||||
|
||||
public:
|
||||
/** Acquires the lock over the given RecordLock, preventing
|
||||
* other transactions from doing the same */
|
||||
void TakeLock(RecordLock &lock);
|
||||
|
||||
/** Commits this transaction. After this call this transaction
|
||||
* object is no longer valid for use (it gets deleted by the
|
||||
* engine that owns it). */
|
||||
void Commit();
|
||||
|
||||
/** Aborts this transaction. After this call this transaction
|
||||
* object is no longer valid for use (it gets deleted by the
|
||||
* engine that owns it). */
|
||||
void Abort();
|
||||
|
||||
/** Transaction's id. Unique in the engine that owns it */
|
||||
const transaction_id_t id_;
|
||||
|
||||
/** The transaction engine to which this transaction belongs */
|
||||
Engine &engine_;
|
||||
|
||||
/** Returns the current transaction's current command id */
|
||||
// TODO rename to cmd_id (variable and function
|
||||
auto cid() const { return cid_; }
|
||||
|
||||
/** Returns this transaction's snapshot. */
|
||||
const Snapshot &snapshot() const { return snapshot_; }
|
||||
|
||||
private:
|
||||
// index of the current command in the current transaction;
|
||||
command_id_t cid_{1};
|
||||
// a snapshot of currently active transactions
|
||||
const Snapshot<Id> snapshot;
|
||||
LockStore<RecordLock> locks;
|
||||
const Snapshot snapshot_;
|
||||
// locks
|
||||
LockStore<RecordLock> locks_;
|
||||
};
|
||||
}
|
||||
|
12
src/transactions/type.hpp
Normal file
12
src/transactions/type.hpp
Normal file
@ -0,0 +1,12 @@
|
||||
#include <cstdint>
|
||||
|
||||
// transcation and command types defined
|
||||
// in a separate header to avoid cyclic dependencies
|
||||
namespace tx {
|
||||
|
||||
/** Type of a tx::Transcation's id member */
|
||||
using transaction_id_t = uint64_t;
|
||||
|
||||
/** Type of a tx::Transcation's command id member */
|
||||
using command_id_t = uint64_t;
|
||||
}
|
@ -15,11 +15,11 @@ void MvccMix(benchmark::State &state) {
|
||||
while (state.KeepRunning()) {
|
||||
state.PauseTiming();
|
||||
tx::Engine engine;
|
||||
auto t1 = engine.begin();
|
||||
auto t1 = engine.Begin();
|
||||
mvcc::VersionList<Prop> version_list(*t1);
|
||||
|
||||
t1->commit();
|
||||
auto t2 = engine.begin();
|
||||
t1->Commit();
|
||||
auto t2 = engine.Begin();
|
||||
|
||||
state.ResumeTiming();
|
||||
version_list.update(*t2);
|
||||
@ -29,13 +29,13 @@ void MvccMix(benchmark::State &state) {
|
||||
version_list.find(*t2);
|
||||
state.PauseTiming();
|
||||
|
||||
t2->abort();
|
||||
t2->Abort();
|
||||
|
||||
auto t3 = engine.begin();
|
||||
auto t3 = engine.Begin();
|
||||
state.ResumeTiming();
|
||||
version_list.update(*t3);
|
||||
state.PauseTiming();
|
||||
auto t4 = engine.begin();
|
||||
auto t4 = engine.Begin();
|
||||
|
||||
// Repeat find state.range(0) number of times.
|
||||
state.ResumeTiming();
|
||||
@ -44,8 +44,8 @@ void MvccMix(benchmark::State &state) {
|
||||
}
|
||||
state.PauseTiming();
|
||||
|
||||
t3->commit();
|
||||
t4->commit();
|
||||
t3->Commit();
|
||||
t4->Commit();
|
||||
state.ResumeTiming();
|
||||
}
|
||||
}
|
||||
|
@ -18,9 +18,9 @@ int main() {
|
||||
uint64_t sum = 0;
|
||||
|
||||
for (int i = 0; i < n; ++i) {
|
||||
auto t = engine.begin();
|
||||
sum += t->id;
|
||||
engine.commit(*t);
|
||||
auto t = engine.Begin();
|
||||
sum += t->id_;
|
||||
engine.Commit(*t);
|
||||
}
|
||||
|
||||
sums[idx] = sum;
|
||||
|
@ -7,6 +7,8 @@
|
||||
#include "dbms/dbms.hpp"
|
||||
#include "storage/vertex.hpp"
|
||||
|
||||
#include "mvcc_gc_common.hpp"
|
||||
|
||||
using testing::UnorderedElementsAreArray;
|
||||
|
||||
// Test index does it insert everything uniquely
|
||||
@ -15,10 +17,10 @@ TEST(LabelsIndex, UniqueInsert) {
|
||||
Dbms dbms;
|
||||
auto dba = dbms.active();
|
||||
tx::Engine engine;
|
||||
auto t1 = engine.begin();
|
||||
auto t1 = engine.Begin();
|
||||
mvcc::VersionList<Vertex> vlist(*t1);
|
||||
t1->commit();
|
||||
auto t2 = engine.begin();
|
||||
t1->Commit();
|
||||
auto t2 = engine.Begin();
|
||||
|
||||
vlist.find(*t2)->labels_.push_back(dba->label("1"));
|
||||
index.Update(dba->label("1"), &vlist, vlist.find(*t2));
|
||||
@ -30,7 +32,7 @@ TEST(LabelsIndex, UniqueInsert) {
|
||||
|
||||
vlist.find(*t2)->labels_.push_back(dba->label("3"));
|
||||
index.Update(dba->label("3"), &vlist, vlist.find(*t2));
|
||||
t2->commit();
|
||||
t2->Commit();
|
||||
|
||||
EXPECT_EQ(index.Count(dba->label("1")), 1);
|
||||
EXPECT_EQ(index.Count(dba->label("2")), 1);
|
||||
@ -44,11 +46,10 @@ TEST(LabelsIndex, UniqueFilter) {
|
||||
auto dba = dbms.active();
|
||||
tx::Engine engine;
|
||||
|
||||
auto t1 = engine.begin();
|
||||
auto t1 = engine.Begin();
|
||||
mvcc::VersionList<Vertex> vlist1(*t1);
|
||||
mvcc::VersionList<Vertex> vlist2(*t1);
|
||||
t1->engine.advance(
|
||||
t1->id); // advance command so we can see our inserted version
|
||||
engine.Advance(t1->id_);
|
||||
auto r1v1 = vlist1.find(*t1);
|
||||
auto r1v2 = vlist2.find(*t1);
|
||||
EXPECT_NE(vlist1.find(*t1), nullptr);
|
||||
@ -58,16 +59,16 @@ TEST(LabelsIndex, UniqueFilter) {
|
||||
vlist2.find(*t1)->labels_.push_back(label1);
|
||||
index.Update(label1, &vlist1, r1v1);
|
||||
index.Update(label1, &vlist2, r1v2);
|
||||
t1->commit();
|
||||
t1->Commit();
|
||||
|
||||
auto t2 = engine.begin();
|
||||
auto t2 = engine.Begin();
|
||||
auto r2v1 = vlist1.update(*t2);
|
||||
auto r2v2 = vlist2.update(*t2);
|
||||
index.Update(label1, &vlist1, r2v1);
|
||||
index.Update(label1, &vlist2, r2v2);
|
||||
t2->commit();
|
||||
t2->Commit();
|
||||
|
||||
auto t3 = engine.begin();
|
||||
auto t3 = engine.Begin();
|
||||
std::vector<mvcc::VersionList<Vertex> *> expected = {&vlist1, &vlist2};
|
||||
sort(expected.begin(),
|
||||
expected.end()); // Entries will be sorted by vlist pointers.
|
||||
@ -85,36 +86,37 @@ TEST(LabelsIndex, Refresh) {
|
||||
auto access = dbms.active();
|
||||
tx::Engine engine;
|
||||
|
||||
auto t1 = engine.begin();
|
||||
// add two vertices to database
|
||||
auto t1 = engine.Begin();
|
||||
mvcc::VersionList<Vertex> vlist1(*t1);
|
||||
mvcc::VersionList<Vertex> vlist2(*t1);
|
||||
t1->engine.advance(
|
||||
t1->id); // advance command so we can see our inserted version
|
||||
auto r1v1 = vlist1.find(*t1);
|
||||
auto r1v2 = vlist2.find(*t1);
|
||||
EXPECT_NE(vlist1.find(*t1), nullptr);
|
||||
engine.Advance(t1->id_);
|
||||
|
||||
auto label1 = access->label("1");
|
||||
vlist1.find(*t1)->labels_.push_back(label1);
|
||||
vlist2.find(*t1)->labels_.push_back(label1);
|
||||
index.Update(label1, &vlist1, r1v1);
|
||||
index.Update(label1, &vlist2, r1v2);
|
||||
t1->commit();
|
||||
auto v1r1 = vlist1.find(*t1);
|
||||
auto v2r1 = vlist2.find(*t1);
|
||||
EXPECT_NE(v1r1, nullptr);
|
||||
EXPECT_NE(v2r1, nullptr);
|
||||
|
||||
auto t2 = engine.begin();
|
||||
auto r2v1 = vlist1.update(*t2);
|
||||
auto r2v2 = vlist2.update(*t2);
|
||||
index.Update(label1, &vlist1, r2v1);
|
||||
index.Update(label1, &vlist2, r2v2);
|
||||
int last_id = t2->id;
|
||||
t2->commit();
|
||||
EXPECT_EQ(index.Count(label1), 4);
|
||||
auto label = access->label("label");
|
||||
v1r1->labels_.push_back(label);
|
||||
v2r1->labels_.push_back(label);
|
||||
index.Update(label, &vlist1, v1r1);
|
||||
index.Update(label, &vlist2, v2r1);
|
||||
t1->Commit();
|
||||
|
||||
index.Refresh(last_id, engine);
|
||||
EXPECT_EQ(index.Count(label1), 4);
|
||||
auto t2 = engine.Begin();
|
||||
auto v1r2 = vlist1.update(*t2);
|
||||
auto v2r2 = vlist2.update(*t2);
|
||||
index.Update(label, &vlist1, v1r2);
|
||||
index.Update(label, &vlist2, v2r2);
|
||||
|
||||
index.Refresh(last_id + 1, engine);
|
||||
EXPECT_EQ(index.Count(label1), 2);
|
||||
index.Refresh(GcSnapshot(engine, t2), engine);
|
||||
EXPECT_EQ(index.Count(label), 4);
|
||||
|
||||
t2->Commit();
|
||||
EXPECT_EQ(index.Count(label), 4);
|
||||
index.Refresh(GcSnapshot(engine, nullptr), engine);
|
||||
EXPECT_EQ(index.Count(label), 2);
|
||||
}
|
||||
|
||||
// Transaction hasn't ended and so the vertex is not visible.
|
||||
|
@ -5,6 +5,8 @@
|
||||
#include "database/indexes/label_property_index.hpp"
|
||||
#include "dbms/dbms.hpp"
|
||||
|
||||
#include "mvcc_gc_common.hpp"
|
||||
|
||||
class LabelPropertyIndexComplexTest : public ::testing::Test {
|
||||
protected:
|
||||
virtual void SetUp() {
|
||||
@ -19,9 +21,9 @@ class LabelPropertyIndexComplexTest : public ::testing::Test {
|
||||
EXPECT_EQ(index.CreateIndex(*key), true);
|
||||
index.IndexFinishedBuilding(*key);
|
||||
|
||||
t = engine.begin();
|
||||
t = engine.Begin();
|
||||
vlist = new mvcc::VersionList<Vertex>(*t);
|
||||
engine.advance(t->id);
|
||||
engine.Advance(t->id_);
|
||||
|
||||
vertex = vlist->find(*t);
|
||||
ASSERT_NE(vertex, nullptr);
|
||||
@ -140,19 +142,19 @@ TEST_F(LabelPropertyIndexComplexTest, UniqueInsert) {
|
||||
// Check if index filters duplicates.
|
||||
TEST_F(LabelPropertyIndexComplexTest, UniqueFilter) {
|
||||
index.UpdateOnLabelProperty(vlist, vertex);
|
||||
t->commit();
|
||||
t->Commit();
|
||||
|
||||
auto t2 = engine.begin();
|
||||
auto t2 = engine.Begin();
|
||||
auto vertex2 = vlist->update(*t2);
|
||||
t2->commit();
|
||||
t2->Commit();
|
||||
|
||||
index.UpdateOnLabelProperty(vlist, vertex2);
|
||||
EXPECT_EQ(index.Count(*key), 2);
|
||||
|
||||
auto t3 = engine.begin();
|
||||
auto t3 = engine.Begin();
|
||||
auto iter = index.GetVlists(*key, *t3, false);
|
||||
EXPECT_EQ(std::distance(iter.begin(), iter.end()), 1);
|
||||
t3->commit();
|
||||
t3->Commit();
|
||||
}
|
||||
|
||||
// Remove label and check if index vertex is not returned now.
|
||||
@ -182,11 +184,11 @@ TEST_F(LabelPropertyIndexComplexTest, RemoveProperty) {
|
||||
// Refresh with a vertex that looses its labels and properties.
|
||||
TEST_F(LabelPropertyIndexComplexTest, Refresh) {
|
||||
index.UpdateOnLabelProperty(vlist, vertex);
|
||||
t->commit();
|
||||
t->Commit();
|
||||
EXPECT_EQ(index.Count(*key), 1);
|
||||
vertex->labels_.clear();
|
||||
vertex->properties_.clear();
|
||||
index.Refresh(engine.count() + 1, engine);
|
||||
index.Refresh(GcSnapshot(engine, nullptr), engine);
|
||||
auto iter = index.GetVlists(*key, *t, false);
|
||||
EXPECT_EQ(std::distance(iter.begin(), iter.end()), 0);
|
||||
}
|
||||
|
@ -4,7 +4,7 @@
|
||||
#include "storage/deferred_deleter.hpp"
|
||||
#include "storage/vertex.hpp"
|
||||
|
||||
#include "gc_common.hpp"
|
||||
#include "mvcc_gc_common.hpp"
|
||||
|
||||
// Add and count objects.
|
||||
TEST(DeferredDeleter, AddObjects) {
|
||||
@ -13,44 +13,44 @@ TEST(DeferredDeleter, AddObjects) {
|
||||
std::vector<Vertex *> V;
|
||||
V.push_back(new Vertex());
|
||||
V.push_back(new Vertex());
|
||||
deleter.AddObjects(V, Id(5));
|
||||
EXPECT_EQ(deleter.Count(), (i + 1) * 2);
|
||||
}
|
||||
deleter.FreeExpiredObjects(Id::MaximalId());
|
||||
deleter.AddObjects(V, 5);
|
||||
EXPECT_EQ(deleter.Count(), (i + 1) * 2);
|
||||
}
|
||||
deleter.FreeExpiredObjects(tx::Transaction::MaxId());
|
||||
}
|
||||
|
||||
// Check that the deleter can't be destroyed while it still has objects.
|
||||
TEST(DeferredDeleter, Destructor) {
|
||||
std::atomic<int> count{0};
|
||||
DeferredDeleter<PropCount> *deleter = new DeferredDeleter<PropCount>;
|
||||
DeferredDeleter<DestrCountRec> *deleter = new DeferredDeleter<DestrCountRec>;
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
std::vector<PropCount *> V;
|
||||
V.push_back(new PropCount(count));
|
||||
V.push_back(new PropCount(count));
|
||||
deleter->AddObjects(V, Id(5));
|
||||
std::vector<DestrCountRec *> V;
|
||||
V.push_back(new DestrCountRec(count));
|
||||
V.push_back(new DestrCountRec(count));
|
||||
deleter->AddObjects(V, 5);
|
||||
EXPECT_EQ(deleter->Count(), (i + 1) * 2);
|
||||
}
|
||||
EXPECT_EQ(0, count);
|
||||
EXPECT_DEATH(delete deleter, "");
|
||||
// We shouldn't leak memory.
|
||||
deleter->FreeExpiredObjects(Id::MaximalId());
|
||||
deleter->FreeExpiredObjects(tx::Transaction::MaxId());
|
||||
delete deleter;
|
||||
}
|
||||
|
||||
// Check if deleter frees objects.
|
||||
TEST(DeferredDeleter, FreeExpiredObjects) {
|
||||
DeferredDeleter<PropCount> deleter;
|
||||
std::vector<PropCount *> V;
|
||||
DeferredDeleter<DestrCountRec> deleter;
|
||||
std::vector<DestrCountRec *> V;
|
||||
std::atomic<int> count{0};
|
||||
V.push_back(new PropCount(count));
|
||||
V.push_back(new PropCount(count));
|
||||
deleter.AddObjects(V, Id(5));
|
||||
V.push_back(new DestrCountRec(count));
|
||||
V.push_back(new DestrCountRec(count));
|
||||
deleter.AddObjects(V, 5);
|
||||
|
||||
deleter.FreeExpiredObjects(Id(5));
|
||||
deleter.FreeExpiredObjects(5);
|
||||
EXPECT_EQ(deleter.Count(), 2);
|
||||
EXPECT_EQ(count, 0);
|
||||
|
||||
deleter.FreeExpiredObjects(Id(6));
|
||||
deleter.FreeExpiredObjects(6);
|
||||
EXPECT_EQ(deleter.Count(), 0);
|
||||
EXPECT_EQ(count, 2);
|
||||
}
|
||||
|
@ -1,22 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include "mvcc/record.hpp"
|
||||
|
||||
/**
|
||||
* @brief - Empty class which inherits from mvcc:Record.
|
||||
*/
|
||||
class Prop : public mvcc::Record<Prop> {};
|
||||
|
||||
/**
|
||||
* @brief - Class which inherits from mvcc::Record and takes an atomic variable
|
||||
* to count number of destructor calls (to test if the record is actually
|
||||
* deleted).
|
||||
*/
|
||||
class PropCount : public mvcc::Record<PropCount> {
|
||||
public:
|
||||
PropCount(std::atomic<int> &count) : count_(count) {}
|
||||
~PropCount() { ++count_; }
|
||||
|
||||
private:
|
||||
std::atomic<int> &count_;
|
||||
};
|
@ -1,30 +0,0 @@
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
#include "mvcc/id.hpp"
|
||||
|
||||
TEST(IdTest, BasicUsageAndTotalOrdering) {
|
||||
Id id0(0);
|
||||
Id id1(1);
|
||||
Id id2(1);
|
||||
Id id3(id2);
|
||||
Id id4 = id3;
|
||||
Id id5(5);
|
||||
|
||||
ASSERT_EQ(id0 < id5, true);
|
||||
ASSERT_EQ(id1 == id2, true);
|
||||
ASSERT_EQ(id3 == id4, true);
|
||||
ASSERT_EQ(id5 > id0, true);
|
||||
ASSERT_EQ(id5 > id0, true);
|
||||
ASSERT_EQ(id5 != id3, true);
|
||||
ASSERT_EQ(id1 >= id2, true);
|
||||
ASSERT_EQ(id3 <= id4, true);
|
||||
}
|
||||
|
||||
TEST(IdTest, MaxId) {
|
||||
EXPECT_TRUE(Id(std::numeric_limits<uint64_t>::max()) == Id::MaximalId());
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
@ -1,8 +1,6 @@
|
||||
#include <vector>
|
||||
#include "gc_common.hpp"
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
#include "mvcc/id.hpp"
|
||||
#include "mvcc/record.hpp"
|
||||
#include "mvcc/version.hpp"
|
||||
#include "mvcc/version_list.hpp"
|
||||
@ -10,18 +8,20 @@
|
||||
#include "transactions/engine.hpp"
|
||||
#include "transactions/transaction.cpp"
|
||||
|
||||
#include "mvcc_gc_common.hpp"
|
||||
|
||||
class TestClass : public mvcc::Record<TestClass> {};
|
||||
|
||||
TEST(MVCC, Deadlock) {
|
||||
tx::Engine engine;
|
||||
|
||||
auto t0 = engine.begin();
|
||||
auto t0 = engine.Begin();
|
||||
mvcc::VersionList<TestClass> version_list1(*t0);
|
||||
mvcc::VersionList<TestClass> version_list2(*t0);
|
||||
t0->commit();
|
||||
t0->Commit();
|
||||
|
||||
auto t1 = engine.begin();
|
||||
auto t2 = engine.begin();
|
||||
auto t1 = engine.Begin();
|
||||
auto t2 = engine.Begin();
|
||||
|
||||
version_list1.update(*t1);
|
||||
version_list2.update(*t2);
|
||||
@ -34,23 +34,23 @@ TEST(MVCC, UpdateDontDelete) {
|
||||
std::atomic<int> count{0};
|
||||
{
|
||||
tx::Engine engine;
|
||||
auto t1 = engine.begin();
|
||||
mvcc::VersionList<PropCount> version_list(*t1, count);
|
||||
t1->commit();
|
||||
auto t1 = engine.Begin();
|
||||
mvcc::VersionList<DestrCountRec> version_list(*t1, count);
|
||||
t1->Commit();
|
||||
|
||||
auto t2 = engine.begin();
|
||||
auto t2 = engine.Begin();
|
||||
version_list.update(*t2);
|
||||
t2->abort();
|
||||
t2->Abort();
|
||||
EXPECT_EQ(count, 0);
|
||||
|
||||
auto t3 = engine.begin();
|
||||
auto t3 = engine.Begin();
|
||||
|
||||
// Update re-links the node and shouldn't clear it yet.
|
||||
version_list.update(*t3);
|
||||
EXPECT_EQ(count, 0);
|
||||
|
||||
// TODO Gleich: why don't we also test that remove doesn't delete?
|
||||
t3->commit();
|
||||
t3->Commit();
|
||||
}
|
||||
EXPECT_EQ(count, 3);
|
||||
}
|
||||
@ -58,13 +58,13 @@ TEST(MVCC, UpdateDontDelete) {
|
||||
// Check that we get the oldest record.
|
||||
TEST(MVCC, Oldest) {
|
||||
tx::Engine engine;
|
||||
auto t1 = engine.begin();
|
||||
auto t1 = engine.Begin();
|
||||
mvcc::VersionList<TestClass> version_list(*t1);
|
||||
auto first = version_list.Oldest();
|
||||
EXPECT_NE(first, nullptr);
|
||||
// TODO Gleich: no need to do 10 checks of the same thing
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
engine.advance(t1->id);
|
||||
engine.Advance(t1->id_);
|
||||
version_list.update(*t1);
|
||||
EXPECT_EQ(version_list.Oldest(), first);
|
||||
}
|
||||
|
@ -84,7 +84,7 @@ TEST_F(Mvcc, ReadUncommitedUpdateFromSameTXSameCommand) {
|
||||
|
||||
TEST_F(Mvcc, ReadUncommitedUpdateFromSameTXNotSameCommand) {
|
||||
T2_UPDATE;
|
||||
engine.advance(t2->id);
|
||||
engine.Advance(t2->id_);
|
||||
EXPECT_EQ(v2, version_list.find(*t2));
|
||||
}
|
||||
|
||||
@ -101,6 +101,6 @@ TEST_F(Mvcc, ReadUncommitedRemoveFromSameTXNotSameCommand) {
|
||||
T2_COMMIT;
|
||||
T3_BEGIN;
|
||||
T3_REMOVE;
|
||||
engine.advance(t3->id);
|
||||
engine.Advance(t3->id_);
|
||||
EXPECT_NE(v2, version_list.find(*t3));
|
||||
}
|
||||
|
@ -1,7 +1,6 @@
|
||||
#include <vector>
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
#include "mvcc/id.hpp"
|
||||
#include "mvcc/record.hpp"
|
||||
#include "mvcc/version.hpp"
|
||||
#include "mvcc/version_list.hpp"
|
||||
@ -9,11 +8,6 @@
|
||||
#include "transactions/engine.hpp"
|
||||
#include "transactions/transaction.cpp"
|
||||
|
||||
// make it easy to compare Id with int
|
||||
bool operator==(const Id &left, const int right) {
|
||||
return static_cast<uint64_t>(left) == static_cast<uint64_t>(right);
|
||||
}
|
||||
|
||||
class TestClass : public mvcc::Record<TestClass> {
|
||||
public:
|
||||
// constructs first version, size should be 0
|
||||
@ -53,22 +47,22 @@ class TestClass : public mvcc::Record<TestClass> {
|
||||
class Mvcc : public ::testing::Test {
|
||||
protected:
|
||||
virtual void SetUp() {
|
||||
id0 = Id{0};
|
||||
t1 = &engine.advance(t1->id);
|
||||
id1 = t1->id;
|
||||
id0 = 0;
|
||||
t1 = &engine.Advance(t1->id_);
|
||||
id1 = t1->id_;
|
||||
v1 = version_list.find(*t1);
|
||||
t1->commit();
|
||||
t2 = engine.begin();
|
||||
id2 = t2->id;
|
||||
t1->Commit();
|
||||
t2 = engine.Begin();
|
||||
id2 = t2->id_;
|
||||
}
|
||||
// variable where number of versions is stored
|
||||
int version_list_size = 0;
|
||||
tx::Engine engine;
|
||||
tx::Transaction *t1 = engine.begin();
|
||||
tx::Transaction *t1 = engine.Begin();
|
||||
mvcc::VersionList<TestClass> version_list{*t1, version_list_size};
|
||||
TestClass *v1 = nullptr;
|
||||
tx::Transaction *t2 = nullptr;
|
||||
int id0, id1, id2;
|
||||
tx::transaction_id_t id0, id1, id2;
|
||||
};
|
||||
|
||||
// helper macros. important:
|
||||
@ -79,12 +73,12 @@ class Mvcc : public ::testing::Test {
|
||||
#define T4_FIND __attribute__((unused)) auto v4 = version_list.find(*t4)
|
||||
#define T2_UPDATE __attribute__((unused)) auto v2 = version_list.update(*t2)
|
||||
#define T3_UPDATE __attribute__((unused)) auto v3 = version_list.update(*t3)
|
||||
#define T2_COMMIT t2->commit();
|
||||
#define T3_COMMIT t3->commit();
|
||||
#define T2_ABORT t2->abort();
|
||||
#define T3_ABORT t3->abort();
|
||||
#define T3_BEGIN auto t3 = engine.begin(); __attribute__((unused)) int id3 = t3->id
|
||||
#define T4_BEGIN auto t4 = engine.begin();
|
||||
#define T2_COMMIT t2->Commit();
|
||||
#define T3_COMMIT t3->Commit();
|
||||
#define T2_ABORT t2->Abort();
|
||||
#define T3_ABORT t3->Abort();
|
||||
#define T3_BEGIN auto t3 = engine.Begin(); __attribute__((unused)) int id3 = t3->id_
|
||||
#define T4_BEGIN auto t4 = engine.Begin();
|
||||
#define T2_REMOVE version_list.remove(*t2)
|
||||
#define T3_REMOVE version_list.remove(*t3)
|
||||
#define EXPECT_CRE(record, expected) EXPECT_EQ(record->tx.cre(), id##expected)
|
||||
|
@ -14,117 +14,141 @@
|
||||
#include "storage/vertex.hpp"
|
||||
#include "transactions/engine.hpp"
|
||||
|
||||
#include "gc_common.hpp"
|
||||
#include "mvcc_gc_common.hpp"
|
||||
|
||||
/**
|
||||
* Test will the mvcc gc delete records inside the version list because they
|
||||
* are not longer visible.
|
||||
*/
|
||||
TEST(VersionList, GcDeleted) {
|
||||
class MvccGcTest : public ::testing::Test {
|
||||
protected:
|
||||
tx::Engine engine;
|
||||
private:
|
||||
tx::Transaction *t0 = engine.Begin();
|
||||
protected:
|
||||
std::atomic<int> record_destruction_count{0};
|
||||
mvcc::VersionList<DestrCountRec> version_list{*t0, record_destruction_count};
|
||||
std::vector<tx::Transaction *> transactions{t0};
|
||||
|
||||
// create a version_list with one record
|
||||
std::vector<uint64_t> ids;
|
||||
auto t1 = engine.begin();
|
||||
std::atomic<int> count{0};
|
||||
mvcc::VersionList<PropCount> version_list(*t1, count);
|
||||
ids.push_back(t1->id);
|
||||
t1->commit();
|
||||
void SetUp() override { t0->Commit(); }
|
||||
|
||||
// create some updates
|
||||
const int UPDATES = 10;
|
||||
for (int i = 0; i < UPDATES; ++i) {
|
||||
auto t2 = engine.begin();
|
||||
ids.push_back(t2->id);
|
||||
version_list.update(*t2);
|
||||
t2->commit();
|
||||
void MakeUpdates(int update_count, bool commit) {
|
||||
for (int i = 0; i < update_count; i++) {
|
||||
auto t = engine.Begin();
|
||||
version_list.update(*t);
|
||||
if (commit)
|
||||
t->Commit();
|
||||
else
|
||||
t->Abort();
|
||||
}
|
||||
}
|
||||
|
||||
// deleting with the first transaction does nothing
|
||||
{
|
||||
auto ret = version_list.GcDeleted(ids[0], engine);
|
||||
EXPECT_EQ(ret.first, false);
|
||||
EXPECT_EQ(ret.second, nullptr);
|
||||
auto GcDeleted(tx::Transaction *latest=nullptr) {
|
||||
return version_list.GcDeleted(GcSnapshot(engine, latest), engine);
|
||||
}
|
||||
|
||||
// deleting with the last transaction + 1 deletes
|
||||
// everything except the last update
|
||||
{
|
||||
auto ret = version_list.GcDeleted(ids.back() + 1, engine);
|
||||
EXPECT_EQ(ret.first, false);
|
||||
EXPECT_NE(ret.second, nullptr);
|
||||
delete ret.second;
|
||||
EXPECT_EQ(count, UPDATES);
|
||||
}
|
||||
};
|
||||
|
||||
// remove and abort, nothing gets deleted
|
||||
{
|
||||
auto t = engine.begin();
|
||||
version_list.remove(*t);
|
||||
auto id = t->id + 1;
|
||||
t->abort();
|
||||
auto ret = version_list.GcDeleted(id, engine);
|
||||
EXPECT_EQ(ret.first, false);
|
||||
EXPECT_EQ(ret.second, nullptr);
|
||||
}
|
||||
TEST_F(MvccGcTest, RemoveAndAbort) {
|
||||
auto t = engine.Begin();
|
||||
version_list.remove(*t);
|
||||
t->Abort();
|
||||
auto ret = GcDeleted();
|
||||
EXPECT_EQ(ret.first, false);
|
||||
EXPECT_EQ(ret.second, nullptr);
|
||||
EXPECT_EQ(record_destruction_count, 0);
|
||||
}
|
||||
|
||||
// update and abort, nothing gets deleted
|
||||
{
|
||||
auto t = engine.begin();
|
||||
version_list.update(*t);
|
||||
auto id = t->id + 1;
|
||||
t->abort();
|
||||
auto ret = version_list.GcDeleted(id, engine);
|
||||
EXPECT_EQ(ret.first, false);
|
||||
EXPECT_EQ(ret.second, nullptr);
|
||||
}
|
||||
TEST_F(MvccGcTest, UpdateAndAbort) {
|
||||
MakeUpdates(1, false);
|
||||
auto ret = GcDeleted();
|
||||
EXPECT_EQ(ret.first, false);
|
||||
EXPECT_EQ(ret.second, nullptr);
|
||||
EXPECT_EQ(record_destruction_count, 0);
|
||||
|
||||
// remove and commit, everything gets deleted
|
||||
{
|
||||
auto t = engine.begin();
|
||||
version_list.remove(*t);
|
||||
auto id = t->id + 1;
|
||||
t->commit();
|
||||
auto ret = version_list.GcDeleted(id, engine);
|
||||
EXPECT_EQ(ret.first, true);
|
||||
EXPECT_NE(ret.second, nullptr);
|
||||
delete ret.second;
|
||||
EXPECT_EQ(count, UPDATES + 2);
|
||||
}
|
||||
MakeUpdates(3, false);
|
||||
ret = GcDeleted();
|
||||
EXPECT_EQ(ret.first, false);
|
||||
EXPECT_EQ(ret.second, nullptr);
|
||||
EXPECT_EQ(record_destruction_count, 0);
|
||||
}
|
||||
|
||||
TEST_F(MvccGcTest, RemoveAndCommit) {
|
||||
auto t = engine.Begin();
|
||||
version_list.remove(*t);
|
||||
t->Commit();
|
||||
auto ret = GcDeleted();
|
||||
EXPECT_EQ(ret.first, true);
|
||||
EXPECT_NE(ret.second, nullptr);
|
||||
delete ret.second;
|
||||
EXPECT_EQ(record_destruction_count, 1);
|
||||
}
|
||||
|
||||
TEST_F(MvccGcTest, UpdateAndCommit) {
|
||||
MakeUpdates(4, true);
|
||||
auto ret = GcDeleted();
|
||||
EXPECT_EQ(ret.first, false);
|
||||
EXPECT_NE(ret.second, nullptr);
|
||||
delete ret.second;
|
||||
EXPECT_EQ(record_destruction_count, 4);
|
||||
}
|
||||
|
||||
TEST_F(MvccGcTest, OldestTransactionSnapshot) {
|
||||
// this test validates that we can't delete
|
||||
// a record that has been expired by a transaction (t1)
|
||||
// committed before GC starts (when t2 is oldest),
|
||||
// if t1 is in t2's snapshot.
|
||||
// this is because there could exist transcation t3
|
||||
// that also has t1 in it's snapshot, and consequently
|
||||
// does not see the expiration and sees the record
|
||||
auto t1 = engine.Begin();
|
||||
auto t2 = engine.Begin();
|
||||
version_list.remove(*t1);
|
||||
t1->Commit();
|
||||
|
||||
auto ret = GcDeleted(t2);
|
||||
EXPECT_EQ(ret.first, false);
|
||||
EXPECT_EQ(ret.second, nullptr);
|
||||
EXPECT_EQ(record_destruction_count, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test integration of garbage collector with MVCC GC. Delete mvcc's which are
|
||||
* Test integration of garbage collector with MVCC GC. Delete version lists
|
||||
* which are
|
||||
* empty (not visible from any future transaction) from the skiplist.
|
||||
*/
|
||||
TEST(GarbageCollector, GcClean) {
|
||||
SkipList<mvcc::VersionList<PropCount> *> skiplist;
|
||||
SkipList<mvcc::VersionList<DestrCountRec> *> skiplist;
|
||||
tx::Engine engine;
|
||||
DeferredDeleter<PropCount> deleter;
|
||||
DeferredDeleter<mvcc::VersionList<PropCount>> vlist_deleter;
|
||||
GarbageCollector<PropCount> gc(skiplist, deleter, vlist_deleter);
|
||||
|
||||
auto t1 = engine.begin();
|
||||
std::atomic<int> count{0};
|
||||
auto vl = new mvcc::VersionList<PropCount>(*t1, count);
|
||||
DeferredDeleter<DestrCountRec> deleter;
|
||||
DeferredDeleter<mvcc::VersionList<DestrCountRec>> vlist_deleter;
|
||||
GarbageCollector<DestrCountRec> gc(skiplist, deleter, vlist_deleter);
|
||||
|
||||
// create a version list in transaction t1
|
||||
auto t1 = engine.Begin();
|
||||
std::atomic<int> record_destruction_count{0};
|
||||
auto vl = new mvcc::VersionList<DestrCountRec>(*t1, record_destruction_count);
|
||||
auto access = skiplist.access();
|
||||
access.insert(vl);
|
||||
gc.Run(Id(2), engine);
|
||||
t1->commit();
|
||||
t1->Commit();
|
||||
|
||||
auto t2 = engine.begin();
|
||||
vl->remove(*t2);
|
||||
t2->commit();
|
||||
gc.Run(Id(3), engine);
|
||||
|
||||
EXPECT_EQ(deleter.Count(), 1);
|
||||
deleter.FreeExpiredObjects(engine.count() + 1);
|
||||
// run garbage collection that has nothing co collect
|
||||
gc.Run(GcSnapshot(engine, nullptr), engine);
|
||||
EXPECT_EQ(deleter.Count(), 0);
|
||||
EXPECT_EQ(count, 1);
|
||||
EXPECT_EQ(vlist_deleter.Count(), 0);
|
||||
EXPECT_EQ(record_destruction_count, 0);
|
||||
|
||||
// delete the only record in the version-list in transaction t2
|
||||
auto t2 = engine.Begin();
|
||||
vl->remove(*t2);
|
||||
t2->Commit();
|
||||
gc.Run(GcSnapshot(engine, nullptr), engine);
|
||||
|
||||
// check that we destroyed the record
|
||||
EXPECT_EQ(deleter.Count(), 1);
|
||||
deleter.FreeExpiredObjects(engine.Count() + 1);
|
||||
EXPECT_EQ(deleter.Count(), 0);
|
||||
EXPECT_EQ(record_destruction_count, 1);
|
||||
|
||||
// check that we destroyed the version list
|
||||
EXPECT_EQ(vlist_deleter.Count(), 1);
|
||||
vlist_deleter.FreeExpiredObjects(engine.count() + 1);
|
||||
vlist_deleter.FreeExpiredObjects(engine.Count() + 1);
|
||||
EXPECT_EQ(vlist_deleter.Count(), 0);
|
||||
|
||||
EXPECT_EQ(access.size(), (size_t)0);
|
||||
|
37
tests/unit/mvcc_gc_common.hpp
Normal file
37
tests/unit/mvcc_gc_common.hpp
Normal file
@ -0,0 +1,37 @@
|
||||
#pragma once
|
||||
|
||||
#include "mvcc/record.hpp"
|
||||
|
||||
/**
|
||||
* @brief - Empty class which inherits from mvcc:Record.
|
||||
*/
|
||||
class Prop : public mvcc::Record<Prop> {};
|
||||
|
||||
/**
|
||||
* @brief - Class which inherits from mvcc::Record and takes an atomic variable
|
||||
* to count number of destructor calls (to test if the record is actually
|
||||
* deleted).
|
||||
*/
|
||||
class DestrCountRec : public mvcc::Record<DestrCountRec> {
|
||||
public:
|
||||
DestrCountRec(std::atomic<int> &count) : count_(count) {}
|
||||
~DestrCountRec() { ++count_; }
|
||||
|
||||
private:
|
||||
std::atomic<int> &count_;
|
||||
};
|
||||
|
||||
// helper function for creating a GC snapshot
|
||||
// if given a nullptr it makes a GC snapshot like there
|
||||
// are no active transactions
|
||||
auto GcSnapshot(tx::Engine &engine, tx::Transaction *t) {
|
||||
if (t != nullptr) {
|
||||
tx::Snapshot gc_snap = t->snapshot();
|
||||
gc_snap.insert(t->id_);
|
||||
return gc_snap;
|
||||
} else {
|
||||
tx::Snapshot gc_snap;
|
||||
gc_snap.insert(engine.Count() + 1);
|
||||
return gc_snap;
|
||||
}
|
||||
}
|
@ -65,7 +65,7 @@ TEST_F(Mvcc, RemoveNotAdvanceRemove) {
|
||||
TEST_F(Mvcc, UpdateAdvanceUpdate) {
|
||||
T2_UPDATE;
|
||||
EXPECT_EQ(T2_FIND, v1);
|
||||
engine.advance(t2->id);
|
||||
engine.Advance(t2->id_);
|
||||
EXPECT_EQ(T2_FIND, v2);
|
||||
auto v2_2 = version_list.update(*t2);
|
||||
EXPECT_NXT(v2, v1);
|
||||
@ -82,7 +82,7 @@ TEST_F(Mvcc, UpdateAdvanceUpdate) {
|
||||
TEST_F(Mvcc, UpdateAdvanceRemove) {
|
||||
T2_UPDATE;
|
||||
EXPECT_EQ(T2_FIND, v1);
|
||||
engine.advance(t2->id);
|
||||
engine.Advance(t2->id_);
|
||||
EXPECT_EQ(T2_FIND, v2);
|
||||
T2_REMOVE;
|
||||
EXPECT_NXT(v2, v1);
|
||||
@ -96,7 +96,7 @@ TEST_F(Mvcc, UpdateAdvanceRemove) {
|
||||
TEST_F(Mvcc, RemoveAdvanceUpdate) {
|
||||
T2_REMOVE;
|
||||
EXPECT_EQ(T2_FIND, v1);
|
||||
engine.advance(t2->id);
|
||||
engine.Advance(t2->id_);
|
||||
EXPECT_EQ(T2_FIND, nullptr);
|
||||
EXPECT_DEATH(T2_UPDATE, ".*nullptr.*");
|
||||
}
|
||||
@ -104,7 +104,7 @@ TEST_F(Mvcc, RemoveAdvanceUpdate) {
|
||||
TEST_F(Mvcc, RemoveAdvanceRemove) {
|
||||
T2_REMOVE;
|
||||
EXPECT_EQ(T2_FIND, v1);
|
||||
engine.advance(t2->id);
|
||||
engine.Advance(t2->id_);
|
||||
EXPECT_EQ(T2_FIND, nullptr);
|
||||
EXPECT_DEATH(T2_REMOVE, ".*nullptr.*");
|
||||
}
|
||||
|
@ -5,54 +5,75 @@
|
||||
#include "transactions/engine.hpp"
|
||||
#include "transactions/transaction.hpp"
|
||||
|
||||
TEST(Engine, CountEmpty) {
|
||||
tx::Engine engine;
|
||||
EXPECT_EQ(engine.Count(), 0);
|
||||
}
|
||||
|
||||
TEST(Engine, Count) {
|
||||
tx::Engine eng;
|
||||
EXPECT_EQ(eng.count(), 0);
|
||||
}
|
||||
|
||||
TEST(Engine, CountFive) {
|
||||
tx::Engine eng;
|
||||
EXPECT_EQ(eng.count(), (uint64_t)0);
|
||||
std::vector<tx::Transaction *> V;
|
||||
tx::Engine engine;
|
||||
EXPECT_EQ(engine.Count(), (uint64_t)0);
|
||||
std::vector<tx::Transaction *> transactions;
|
||||
for (int i = 0; i < 5; ++i) {
|
||||
V.push_back(eng.begin());
|
||||
EXPECT_EQ(eng.count(), (uint64_t)(i + 1));
|
||||
transactions.push_back(engine.Begin());
|
||||
EXPECT_EQ(engine.Count(), (uint64_t)(i + 1));
|
||||
}
|
||||
EXPECT_EQ(eng.size(), (uint64_t)5);
|
||||
for (int i = 0; i < 5; ++i) V[i]->commit();
|
||||
EXPECT_EQ(eng.count(), (uint64_t)5);
|
||||
EXPECT_EQ(engine.ActiveCount(), (uint64_t)5);
|
||||
for (int i = 0; i < 5; ++i) transactions[i]->Commit();
|
||||
EXPECT_EQ(engine.Count(), (uint64_t)5);
|
||||
}
|
||||
|
||||
TEST(Engine, LastKnownActiveEmpty) {
|
||||
tx::Engine eng;
|
||||
EXPECT_EQ(eng.oldest_active().is_present(), false);
|
||||
}
|
||||
TEST(Engine, GcSnapshot) {
|
||||
tx::Engine engine;
|
||||
ASSERT_EQ(engine.GcSnapshot(), tx::Snapshot({1}));
|
||||
|
||||
TEST(Engine, LastKnownActive) {
|
||||
tx::Engine eng;
|
||||
std::vector<tx::Transaction *> V;
|
||||
std::vector<tx::Transaction *> transactions;
|
||||
// create transactions and check the GC snapshot
|
||||
for (int i = 0; i < 5; ++i) {
|
||||
V.push_back(eng.begin());
|
||||
EXPECT_EQ(eng.size(), (size_t)i + 1);
|
||||
transactions.push_back(engine.Begin());
|
||||
EXPECT_EQ(engine.GcSnapshot(), tx::Snapshot({1}));
|
||||
}
|
||||
for (int i = 0; i < 5; ++i) {
|
||||
EXPECT_EQ(eng.oldest_active().get(), Id(i + 1));
|
||||
V[i]->commit();
|
||||
}
|
||||
EXPECT_EQ(eng.oldest_active().is_present(), false);
|
||||
|
||||
// commit transactions in the middle, expect
|
||||
// the GcSnapshot did not change
|
||||
transactions[1]->Commit();
|
||||
EXPECT_EQ(engine.GcSnapshot(), tx::Snapshot({1}));
|
||||
transactions[2]->Commit();
|
||||
EXPECT_EQ(engine.GcSnapshot(), tx::Snapshot({1}));
|
||||
|
||||
// have the first three transactions committed
|
||||
transactions[0]->Commit();
|
||||
EXPECT_EQ(engine.GcSnapshot(), tx::Snapshot({1, 2, 3, 4}));
|
||||
|
||||
// commit all
|
||||
transactions[3]->Commit();
|
||||
transactions[4]->Commit();
|
||||
EXPECT_EQ(engine.GcSnapshot(), tx::Snapshot({6}));
|
||||
}
|
||||
|
||||
TEST(Engine, Size) {
|
||||
tx::Engine eng;
|
||||
std::vector<tx::Transaction *> V;
|
||||
TEST(Engine, ActiveCount) {
|
||||
tx::Engine engine;
|
||||
std::vector<tx::Transaction *> transactions;
|
||||
for (int i = 0; i < 5; ++i) {
|
||||
V.push_back(eng.begin());
|
||||
EXPECT_EQ(eng.size(), (size_t)i + 1);
|
||||
transactions.push_back(engine.Begin());
|
||||
EXPECT_EQ(engine.ActiveCount(), (size_t)i + 1);
|
||||
}
|
||||
|
||||
for (int i = 0; i < 5; ++i) {
|
||||
transactions[i]->Commit();
|
||||
EXPECT_EQ(engine.ActiveCount(), 4 - i);
|
||||
}
|
||||
for (int i = 0; i < 5; ++i) V[i]->commit();
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
TEST(Engine, Advance) {
|
||||
tx::Engine engine;
|
||||
|
||||
auto t0 = engine.Begin();
|
||||
auto t1 = engine.Begin();
|
||||
EXPECT_EQ(t0->cid(), 1);
|
||||
engine.Advance(t0->id_);
|
||||
EXPECT_EQ(t0->cid(), 2);
|
||||
engine.Advance(t0->id_);
|
||||
EXPECT_EQ(t0->cid(), 3);
|
||||
EXPECT_EQ(t1->cid(), 1);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user