Deferred deleter added.

Summary:
Add tests.

Merge branch 'dev' into remove_locks

Everything works. Refactor complete.

Documentation:
https://phabricator.memgraph.io/w/memgraph_implementation/indexing/

Reviewers: buda, mislav.bradac, florijan

Reviewed By: florijan

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D258
This commit is contained in:
Dominik Gleich 2017-04-14 17:32:59 +02:00
parent 0a09a6ac64
commit 461ea65ad4
18 changed files with 607 additions and 148 deletions

View File

@ -12,19 +12,54 @@ const int DEFAULT_CLEANING_CYCLE_SEC = 30; // 30 seconds
GraphDb::GraphDb(const std::string &name, bool import_snapshot)
: name_(name),
gc_vertices_(&vertices_, &tx_engine),
gc_edges_(&edges_, &tx_engine) {
gc_vertices_(vertices_, vertex_record_deleter_,
vertex_version_list_deleter_),
gc_edges_(edges_, edge_record_deleter_, edge_version_list_deleter_) {
const std::string time_str = CONFIG(config::CLEANING_CYCLE_SEC);
int pause = DEFAULT_CLEANING_CYCLE_SEC;
if (!time_str.empty()) pause = CONFIG_INTEGER(config::CLEANING_CYCLE_SEC);
// Pause of -1 means we shouldn't run the GC.
if (pause != -1) {
gc_vertices_scheduler_.Run(
std::chrono::seconds(pause),
std::bind(&GarbageCollector<Vertex>::Run, gc_vertices_));
gc_edges_scheduler_.Run(std::chrono::seconds(pause),
std::bind(&GarbageCollector<Edge>::Run, gc_edges_));
gc_scheduler_.Run(std::chrono::seconds(pause), [this]() {
// main garbage collection logic
// see wiki documentation for logic explanation
const auto next_id = this->tx_engine.count() + 1;
const auto id = this->tx_engine.oldest_active().get_or(next_id);
{
// This can be run concurrently
this->gc_vertices_.Run(id, this->tx_engine);
this->gc_edges_.Run(id, this->tx_engine);
}
// This has to be run sequentially after gc because gc modifies
// version_lists and changes the oldest visible record, on which Refresh
// depends.
{
// This can be run concurrently
this->labels_index_.Refresh(id, this->tx_engine);
this->edge_types_index_.Refresh(id, this->tx_engine);
}
this->edge_record_deleter_.FreeExpiredObjects(id);
this->vertex_record_deleter_.FreeExpiredObjects(id);
this->edge_version_list_deleter_.FreeExpiredObjects(id);
this->vertex_version_list_deleter_.FreeExpiredObjects(id);
});
}
// if (import_snapshot)
// snap_engine.import();
}
GraphDb::~GraphDb() {
// Stop the gc scheduler to not run into race conditions for deletions.
gc_scheduler_.Stop();
// Delete vertices and edges which weren't collected before, also deletes
// records inside version list
for (auto &vertex : this->vertices_.access()) delete vertex;
for (auto &edge : this->edges_.access()) delete edge;
// Free expired records with the maximal possible id from all the deleters.
this->edge_record_deleter_.FreeExpiredObjects(Id::MaximalId());
this->vertex_record_deleter_.FreeExpiredObjects(Id::MaximalId());
this->edge_version_list_deleter_.FreeExpiredObjects(Id::MaximalId());
this->vertex_version_list_deleter_.FreeExpiredObjects(Id::MaximalId());
}

View File

@ -7,6 +7,7 @@
#include "database/graph_db_datatypes.hpp"
#include "database/indexes/key_index.hpp"
#include "mvcc/version_list.hpp"
#include "storage/deferred_deleter.hpp"
#include "storage/edge.hpp"
#include "storage/garbage_collector.hpp"
#include "storage/unique_object_store.hpp"
@ -34,6 +35,11 @@ class GraphDb {
* into the db.
*/
GraphDb(const std::string &name, bool import_snapshot = true);
/**
* @brief - Destruct database object. Delete all vertices and edges and free
* all deferred deleters.
*/
~GraphDb();
/**
* Database object can't be copied.
@ -57,9 +63,19 @@ class GraphDb {
// main storage for the graph
SkipList<mvcc::VersionList<Vertex> *> vertices_;
SkipList<mvcc::VersionList<Edge> *> edges_;
// Garbage collectors
GarbageCollector<Vertex> gc_vertices_;
GarbageCollector<Edge> gc_edges_;
// Deleters for not relevant records
DeferredDeleter<Vertex> vertex_record_deleter_;
DeferredDeleter<Edge> edge_record_deleter_;
// Deleters for not relevant version_lists
DeferredDeleter<mvcc::VersionList<Vertex>> vertex_version_list_deleter_;
DeferredDeleter<mvcc::VersionList<Edge>> edge_version_list_deleter_;
// unique object stores
// TODO this should be also garbage collected
ConcurrentSet<std::string> labels_;
@ -71,6 +87,5 @@ class GraphDb {
KeyIndex<GraphDbTypes::EdgeType, Edge> edge_types_index_;
// Schedulers
Scheduler<std::mutex> gc_vertices_scheduler_;
Scheduler<std::mutex> gc_edges_scheduler_;
Scheduler<std::mutex> gc_scheduler_;
};

View File

@ -45,9 +45,10 @@ VertexAccessor GraphDbAccessor::insert_vertex() {
throw CreationException("Unable to create a Vertex.");
}
void GraphDbAccessor::update_label_index(
const GraphDbTypes::Label &label, const VertexAccessor &vertex_accessor) {
this->db_.labels_index_.Update(label, vertex_accessor.vlist_);
void GraphDbAccessor::update_label_index(const GraphDbTypes::Label &label,
const VertexAccessor &vertex_accessor,
const Vertex *vertex) {
this->db_.labels_index_.Update(label, vertex_accessor.vlist_, vertex);
}
size_t GraphDbAccessor::vertices_count(const GraphDbTypes::Label &label) {
@ -95,9 +96,9 @@ EdgeAccessor GraphDbAccessor::insert_edge(VertexAccessor &from,
bool success = db_.edges_.access().insert(edge_vlist).second;
const auto edge_accessor = EdgeAccessor(*edge_vlist, *this);
if (success) {
// This has to be here because there is no single method called for
// type seting. It's set here, and sometimes in set_edge_type method.
update_edge_type_index(edge_type, edge_accessor);
// This has to be here because there is no additional method for setting
// edge type.
update_edge_type_index(edge_type, edge_accessor, &edge_accessor.current());
return edge_accessor;
}
@ -105,9 +106,9 @@ EdgeAccessor GraphDbAccessor::insert_edge(VertexAccessor &from,
}
void GraphDbAccessor::update_edge_type_index(
const GraphDbTypes::EdgeType &edge_type,
const EdgeAccessor &edge_accessor) {
this->db_.edge_types_index_.Update(edge_type, edge_accessor.vlist_);
const GraphDbTypes::EdgeType &edge_type, const EdgeAccessor &edge_accessor,
const Edge *edge) {
this->db_.edge_types_index_.Update(edge_type, edge_accessor.vlist_, edge);
}
size_t GraphDbAccessor::edges_count(const GraphDbTypes::EdgeType &edge_type) {

View File

@ -102,7 +102,7 @@ class GraphDbAccessor {
auto vertices(const GraphDbTypes::Label &label) {
return iter::imap(
[this](auto vlist) { return VertexAccessor(*vlist, *this); },
db_.labels_index_.Acquire(label, *transaction_));
db_.labels_index_.GetVlists(label, *transaction_));
}
/**
@ -150,24 +150,28 @@ class GraphDbAccessor {
auto edges(const GraphDbTypes::EdgeType &edge_type) {
return iter::imap(
[this](auto vlist) { return EdgeAccessor(*vlist, *this); },
db_.edge_types_index_.Acquire(edge_type, *transaction_));
db_.edge_types_index_.GetVlists(edge_type, *transaction_));
}
/**
* Insert this record into corresponding label index.
* @param label - label index into which to insert record
* @param record - record which to insert
* Insert this vertex into corresponding label index.
* @param label - label index into which to insert vertex label record
* @param vertex_accessor - vertex_accessor to insert
* @param vertex - vertex record to insert
*/
void update_label_index(const GraphDbTypes::Label &label,
const VertexAccessor &vertex_accessor);
const VertexAccessor &vertex_accessor,
const Vertex *vertex);
/**
* Insert this record into corresponding edge_type index.
* Insert this edge into corresponding edge_type index.
* @param edge_type - edge_type index into which to insert record
* @param record - record which to insert
* @param edge_accessor - edge_accessor to insert
* @param edge - edge record to insert
*/
void update_edge_type_index(const GraphDbTypes::EdgeType &edge_type,
const EdgeAccessor &edge_accessor);
const EdgeAccessor &edge_accessor,
const Edge *edge);
/**
* Return approximate number of vertices under indexes with the given label.

View File

@ -10,6 +10,7 @@
#include "storage/edge.hpp"
#include "storage/vertex.hpp"
#include "transactions/transaction.hpp"
#include "utils/total_ordering.hpp"
/**
* @brief Implements index update and acquire.
@ -20,31 +21,46 @@ template <typename TKey, typename TRecord>
class KeyIndex {
public:
/**
* @brief - Add vlist, if new, to TKey specific storage.
* @param key - TKey index to update.
* @param vlist - pointer to vlist entry to add.
* @brief - Clear all indexes so that we don't leak memory.
*/
void Update(const TKey &key, mvcc::VersionList<TRecord> *vlist) {
GetKeyStorage(key)->access().insert(vlist);
~KeyIndex() {
for (auto key_indices_pair : indices_.access())
// Delete skiplist because we created it with a new operator.
delete key_indices_pair.second;
}
/**
* @brief - Add record, vlist, if new, to TKey specific storage.
* @param key - TKey index to update.
* @param vlist - pointer to vlist entry to add
* @param record - pointer to record entry to add (contained in vlist)
*/
void Update(const TKey &key, mvcc::VersionList<TRecord> *vlist,
const TRecord *record) {
GetKeyStorage(key)->access().insert(IndexEntry(vlist, record));
}
/**
* @brief - Acquire all the inserted vlists in TKey specific storage which
* @brief - Get all the inserted vlists in TKey specific storage which
* still have that label visible in this transaction.
* @param key - key to query.
* @param t - current transaction, which determines visibility.
* @return iterable collection of vlists records<TRecord> with the requested
* TKey.
*/
auto Acquire(const TKey &key, const tx::Transaction &t) {
auto GetVlists(const TKey &key, tx::Transaction &t) {
auto index = GetKeyStorage(key);
return iter::filter(
[this, &key, &t](auto vlist) {
auto version = vlist->find(t);
mvcc::VersionList<TRecord> *prev = nullptr;
auto filtered = iter::filter(
[this, &key, &t, prev](auto entry) mutable {
if (entry.vlist_ == prev) return false;
auto version = entry.vlist_->find(t);
prev = entry.vlist_;
if (version == nullptr) return false;
return Exists(key, version);
},
index->access());
return iter::imap([this](auto entry) { return entry.vlist_; },
std::move(filtered));
}
/**
@ -59,19 +75,88 @@ class KeyIndex {
*/
auto Count(const TKey &key) { return GetKeyStorage(key)->access().size(); }
/**
* @brief - Removes from the index all entries for which records don't contain
* the given label anymore. Update all record which are not visible for any
* transaction with an id larger or equal to `id`.
* @param id - oldest active id, safe to remove everything deleted before this
* id.
* @param engine - transaction engine to see which records are commited
*/
void Refresh(const Id &id, tx::Engine &engine) {
for (auto &key_indices_pair : indices_.access()) {
auto indices_entries_accessor = key_indices_pair.second->access();
for (auto indices_entry : indices_entries_accessor) {
// Remove it from index if it's deleted before the current id, or it
// doesn't have that label/edge_type anymore. We need to be careful when
// we are deleting the record which is not visible anymore because even
// though that record is not visible, some other, newer record could be
// visible, and might contain the label, and might not be in the index -
// that's why we can't remove it completely, but just update it's record
// value.
// We also need to be extra careful when checking for existance of
// label/edge_type because that record could still be under the update
// operation and as such could be modified while we are reading the
// label/edge_type - that's why we have to wait for it's creation id to
// be lower than ours `id` as that means it's surely either commited or
// aborted as we always refresh with the oldest active id.
if (indices_entry.record_->is_not_visible_from(id, engine)) {
// We first have to insert and then remove, because otherwise we might
// have a situation where there exists a vlist with the record
// containg the label/edge_type but it's not in our index.
auto new_record = indices_entry.vlist_->Oldest();
if (new_record != nullptr)
indices_entries_accessor.insert(
IndexEntry(indices_entry.vlist_, new_record));
auto success = indices_entries_accessor.remove(indices_entry);
debug_assert(success == true, "Not able to delete entry.");
} else if (indices_entry.record_->tx.cre() < id &&
!Exists(key_indices_pair.first, indices_entry.record_)) {
indices_entries_accessor.remove(indices_entry);
}
}
}
}
private:
/**
* @brief - Contains vlist and record pointers.
*/
class IndexEntry : public TotalOrdering<IndexEntry> {
public:
IndexEntry(mvcc::VersionList<TRecord> *vlist, const TRecord *record)
: vlist_(vlist), record_(record) {}
// Comparision operators - we need them to keep this sorted inside
// skiplist.
// This needs to be sorted first by vlist and second record because we
// want to keep same vlists close together since we need to filter them to
// get only the unique ones.
bool operator<(const IndexEntry &other) const {
if (this->vlist_ != other.vlist_) return this->vlist_ < other.vlist_;
return this->record_ < other.record_;
}
bool operator==(const IndexEntry &other) const {
return this->vlist_ == other.vlist_ && this->record_ == other.record_;
}
mvcc::VersionList<TRecord> *vlist_;
const TRecord *record_;
};
/**
* @brief - Get storage for this label. Creates new
* storage if this label is not yet indexed.
* @param label - Label for which to access storage.
* storage if this key is not yet indexed.
* @param key - key for which to access storage.
* @return pointer to skiplist of version list records<T>.
*/
auto GetKeyStorage(const TKey &key) {
auto access = index_.access();
auto access = indices_.access();
// Avoid excessive new/delete by first checking if it exists.
auto iter = access.find(key);
if (iter == access.end()) {
auto skiplist = new SkipList<mvcc::VersionList<TRecord> *>;
auto skiplist = new SkipList<IndexEntry>;
auto ret = access.insert(key, skiplist);
// In case some other insert managed to create new skiplist we shouldn't
// leak memory and should delete this one accordingly.
@ -87,6 +172,7 @@ class KeyIndex {
* @return true if it contains, false otherwise.
*/
bool Exists(const GraphDbTypes::Label &label, const Vertex *v) const {
debug_assert(v != nullptr, "Vertex is nullptr.");
// We have to check for existance of label because the transaction
// might not see the label, or the label was deleted and not yet
// removed from the index.
@ -100,11 +186,12 @@ class KeyIndex {
* @return true if it has that edge_type, false otherwise.
*/
bool Exists(const GraphDbTypes::EdgeType &edge_type, const Edge *e) const {
debug_assert(e != nullptr, "Edge is nullptr.");
// We have to check for equality of edge types because the transaction
// might not see the edge type, or the edge type was deleted and not yet
// removed from the index.
return e->edge_type_ == edge_type;
}
ConcurrentMap<TKey, SkipList<mvcc::VersionList<TRecord> *> *> index_;
ConcurrentMap<TKey, SkipList<IndexEntry> *> indices_;
};

View File

@ -1,6 +1,7 @@
#pragma once
#include <stdint.h>
#include <limits>
#include <ostream>
#include "utils/total_ordering.hpp"
@ -11,14 +12,21 @@ class Id : public TotalOrdering<Id> {
Id(uint64_t id);
friend bool operator<(const Id& a, const Id& b);
friend bool operator<(const Id &a, const Id &b);
friend bool operator==(const Id& a, const Id& b);
friend bool operator==(const Id &a, const Id &b);
friend std::ostream& operator<<(std::ostream& stream, const Id& id);
friend std::ostream &operator<<(std::ostream &stream, const Id &id);
operator uint64_t() const;
/**
* @brief - Return maximal possible Id.
*/
static const Id MaximalId() {
return Id(std::numeric_limits<uint64_t>::max());
}
private:
uint64_t id{0};
};

View File

@ -105,9 +105,12 @@ class Record : public Version<T> {
return committed(hints.cre, tx.cre(), t);
}
// True if record was deleted before id.
bool is_deleted_before(const Id &id) {
return tx.exp() != Id(0) && tx.exp() < id;
// Record won'te be visible to any transaction after or at `id` if it was
// expired before id or it's creation was aborted.
bool is_not_visible_from(const Id &id, tx::Engine &engine) const {
return (tx.exp() != Id(0) && tx.exp() < id &&
engine.clog.is_committed(tx.exp())) ||
engine.clog.is_aborted(tx.cre());
}
// TODO: Test this
@ -142,7 +145,6 @@ class Record : public Version<T> {
}
protected:
template <class U>
/**
* @brief - Check if the id is commited from the perspective of transactio,
* i.e. transaction can see the transaction with that id (it happened before
@ -153,6 +155,7 @@ class Record : public Version<T> {
* @param id - id to check if it's commited and visible
* @return true if the id is commited and visible for the transaction t.
*/
template <class U>
bool committed(U &hints, const Id &id, const tx::Transaction &t) {
// Dominik Gleich says 4 april 2017: the tests in this routine are correct;
// if you think they're not, you're wrong, and you should think about it
@ -183,7 +186,6 @@ class Record : public Version<T> {
return hints.set_aborted(), false;
}
template <class U>
/**
* @brief - Check if the id is commited.
* @param hints - hints to use to determine commit/abort
@ -192,6 +194,7 @@ class Record : public Version<T> {
* statuses
* @return true if it's commited, false otherwise
*/
template <class U>
bool committed(U &hints, const Id &id, tx::Engine &engine) {
auto hint_bits = hints.load();
// if hints are set, return if xid is committed

View File

@ -65,12 +65,17 @@ class VersionList {
/**
* This method is NOT thread-safe. This should never be called with a
* transaction id newer than the oldest active transaction id.
* Garbage collect (delete) all records which are no longer visible for any
* Re-links all records which are no longer visible for any
* transaction with an id greater or equal to id.
* @param id - transaction id from which to start garbage collection
* @return true - If version list is empty after garbage collection.
* that is not visible anymore. If none exists to_delete will rbecome nullptr.
* @param engine - transaction engine to use - we need it to check which
* records were commited and which werent
* @return pair<status, to_delete>; status is true - If version list is empty
* after garbage collection. to_delete points to the newest record that is not
* visible anymore. If none exists to_delete will point to nullptr.
*/
bool GcDeleted(const Id &id) {
std::pair<bool, T *> GcDeleted(const Id &id, tx::Engine &engine) {
auto newest_deleted_record = head.load(std::memory_order_seq_cst);
T *oldest_not_deleted_record = nullptr;
@ -85,7 +90,7 @@ class VersionList {
// [VerList] ----+ record, or you reach the end of the list
//
while (newest_deleted_record != nullptr &&
!newest_deleted_record->is_deleted_before(id)) {
!newest_deleted_record->is_not_visible_from(id, engine)) {
oldest_not_deleted_record = newest_deleted_record;
newest_deleted_record =
newest_deleted_record->next(std::memory_order_seq_cst);
@ -97,19 +102,13 @@ class VersionList {
if (oldest_not_deleted_record == nullptr) {
// Head points to a deleted record.
if (newest_deleted_record != nullptr) {
{
// We need to take an exclusive lock here since if some thread is
// currently doing the find operation we could free something to which
// find method points.
std::unique_lock<std::shared_timed_mutex> lock(head_mutex_);
head.store(nullptr, std::memory_order_seq_cst);
}
// This is safe to do since we unlinked head and now no record* in
// another thread which is executing the find function can see any of
// the nodes we are going to delete.
delete newest_deleted_record;
head.store(nullptr, std::memory_order_seq_cst);
// This is safe to return as ready for deletion since we unlinked head
// above and this will only be deleted after the last active transaction
// ends.
return std::make_pair(true, newest_deleted_record);
}
return true;
return std::make_pair(true, nullptr);
}
// oldest_not_deleted_record might be visible to some transaction but
// newest_deleted_record is not and will never be visted by the find
@ -120,19 +119,23 @@ class VersionList {
// further than this record and
// that's why it's safe to set
// next to nullptr.
// Call destructor which will clean everything older than this record since
// they are called recursively.
if (newest_deleted_record != nullptr)
delete newest_deleted_record; // THIS IS ISSUE IF MULTIPLE THREADS TRY TO
// DO THIS
return false;
// Calling destructor of newest_deleted_record will clean everything older
// than this record since they are called recursively.
return std::make_pair(false, newest_deleted_record);
}
/**
* @brief - returns oldest record
* @return nullptr if none exist
*/
T *Oldest() {
auto r = head.load(std::memory_order_seq_cst);
while (r && r->next(std::memory_order_seq_cst))
r = r->next(std::memory_order_seq_cst);
return r;
}
T *find(const tx::Transaction &t) {
// We need to take a shared_lock here because GC could delete the first
// entry pointed by head, or some later entry when following head->next
// pointers.
std::shared_lock<std::shared_timed_mutex> slock(head_mutex_);
auto r = head.load(std::memory_order_seq_cst);
// nullptr
@ -167,9 +170,6 @@ class VersionList {
* @param t The transaction
*/
void find_set_new_old(const tx::Transaction &t, T *&old_ref, T *&new_ref) {
// Take a look in find to understand why this is needed.
std::shared_lock<std::shared_timed_mutex> slock(head_mutex_);
// assume that the sought old record is further down the list
// from new record, so that if we found old we can stop looking
new_ref = nullptr;
@ -190,8 +190,6 @@ class VersionList {
return update(record, t);
}
// TODO(flor): This should also be private but can't be right now because of
// the way graph_db_accessor works.
bool remove(tx::Transaction &t) {
debug_assert(head != nullptr, "Head is nullptr on removal.");
auto record = find(t);
@ -203,6 +201,8 @@ class VersionList {
return remove(record, t), true;
}
// TODO(flor): This should also be private but can't be right now because of
// the way graph_db_accessor works.
void remove(T *record, tx::Transaction &t) {
debug_assert(record != nullptr, "Record is nullptr on removal.");
lock_and_validate(record, t);
@ -251,12 +251,5 @@ class VersionList {
std::atomic<T *> head{nullptr};
RecordLock lock;
// We need this mutex to make the nodes deletion operation (to avoid memory
// leak) atomic with regards to the find operation. Otherwise we might have a
// pointer to a version that is currently being deleted, and accessing it will
// throw a SEGFAULT.
// TODO(C++17)
std::shared_timed_mutex head_mutex_; // This should be changed
// when we migrate to C++17.
};
}

View File

@ -0,0 +1,71 @@
#pragma once
#include <list>
#include "mvcc/id.hpp"
#include "mvcc/record.hpp"
#include "utils/assert.hpp"
/**
* @brief - Implements deferred deletion.
* @Tparam T - type of object to delete (Vertex/Edge/VersionList...)
* This is NOT a thread-safe class.
*/
template <typename T>
class DeferredDeleter {
public:
/**
* @brief - check if everything is freed
*/
~DeferredDeleter() {
permanent_assert(objects_.size() == 0,
"Objects are not freed when calling the destructor.");
}
/**
* @brief - Add objects to this deleter. This method assumes that it will
* always be called with a non-decreasing sequence of `last_transaction`.
* @param objects - vector of objects to add
* @param last_transaction - nothing newer or equal to it can see these
* objects
*/
void AddObjects(const std::vector<T *> &objects, const Id &last_transaction) {
debug_assert(
objects_.size() == 0 || objects_.back().deleted_at <= last_transaction,
"Transaction ids are not non-decreasing.");
for (auto object : objects)
objects_.emplace_back(DeletedObject(object, last_transaction));
}
/**
* @brief - Free memory of objects deleted before the id.
* @param id - delete before this id
*/
void FreeExpiredObjects(const Id &id) {
auto it = objects_.begin();
while (it != objects_.end() && it->deleted_at < id) {
delete it->object;
++it;
}
objects_.erase(objects_.begin(), it);
}
/**
* @brief - Return number of stored objects.
*/
size_t Count() { return objects_.size(); }
private:
/**
* @brief - keep track of what object was deleted at which time.
*/
struct DeletedObject {
const T *object;
const Id deleted_at;
DeletedObject(T *object, const Id &deleted_at)
: object(object), deleted_at(deleted_at) {}
};
// Ascendingly sorted list of deleted objects by `deleted_at`.
std::list<DeletedObject> objects_;
};

View File

@ -4,45 +4,59 @@
#include "logging/loggable.hpp"
#include "mvcc/id.hpp"
#include "mvcc/version_list.hpp"
#include "storage/deferred_deleter.hpp"
#include "transactions/engine.hpp"
/**
@template T type of underlying record in mvcc
* @brief - Garbage collects deleted records.
* @Tparam T type of underlying record in mvcc
*/
template <typename T>
class GarbageCollector : public Loggable {
public:
GarbageCollector(SkipList<mvcc::VersionList<T> *> *skiplist,
tx::Engine *engine)
: Loggable("MvccGc"), skiplist_(skiplist), engine_(engine) {
permanent_assert(skiplist != nullptr, "Skiplist can't be nullptr.");
permanent_assert(engine != nullptr, "Engine can't be nullptr.");
};
GarbageCollector(SkipList<mvcc::VersionList<T> *> &skiplist,
DeferredDeleter<T> &record_deleter,
DeferredDeleter<mvcc::VersionList<T>> &version_list_deleter)
: Loggable("MvccGc"),
skiplist_(skiplist),
record_deleter_(record_deleter),
version_list_deleter_(version_list_deleter){};
/**
*@brief - Runs garbage collector.
* @brief - Runs garbage collector. Populates deferred deleters with version
* lists and records.
* @param id - oldest active transaction id
* @param engine - reference to engine object
*/
void Run() {
auto accessor = this->skiplist_->access();
void Run(const Id &id, tx::Engine &engine) {
auto collection_accessor = this->skiplist_.access();
uint64_t count = 0;
// Acquire id of either the oldest active transaction, or the id of a
// transaction that will be assigned next. We should make sure that we
// get count before we ask for active transactions since some
// transaction could possibly increase the count while we ask for
// oldest_active transaction.
const auto next_id = engine_->count() + 1;
const auto id = this->engine_->oldest_active().get_or(next_id);
std::vector<T *> deleted_records;
std::vector<mvcc::VersionList<T> *> deleted_version_lists;
if (logger.Initialized())
logger.trace("Gc started cleaning everything deleted before {}", id);
for (auto x : accessor) {
// If the mvcc is empty, i.e. there is nothing else to be read from it
// we can delete it.
if (x->GcDeleted(id)) count += accessor.remove(x);
for (auto version_list : collection_accessor) {
// If the version_list is empty, i.e. there is nothing else to be read
// from it we can delete it.
auto ret = version_list->GcDeleted(id, engine);
if (ret.first) {
deleted_version_lists.push_back(version_list);
count += collection_accessor.remove(version_list);
}
if (ret.second != nullptr) deleted_records.push_back(ret.second);
}
if (logger.Initialized()) logger.trace("Destroyed: {}", count);
// Add records to deleter, with the id larger or equal than the last active
// transaction.
record_deleter_.AddObjects(deleted_records, engine.count());
// Add version_lists to deleter, with the id larger or equal than the last
// active transaction.
version_list_deleter_.AddObjects(deleted_version_lists, engine.count());
}
private:
SkipList<mvcc::VersionList<T> *> *skiplist_{nullptr}; // Not owned.
tx::Engine *engine_{nullptr}; // Not owned.
SkipList<mvcc::VersionList<T> *> &skiplist_;
DeferredDeleter<T> &record_deleter_;
DeferredDeleter<mvcc::VersionList<T>> &version_list_deleter_;
};

View File

@ -16,8 +16,9 @@ bool VertexAccessor::add_label(GraphDbTypes::Label label) {
if (found != labels_view.end()) return false;
// not a duplicate label, add it
update().labels_.emplace_back(label);
this->db_accessor().update_label_index(label, *this);
Vertex &vertex = update();
vertex.labels_.emplace_back(label);
this->db_accessor().update_label_index(label, *this, &vertex);
return true;
}

View File

@ -99,6 +99,9 @@ class DynamicLib : public Loggable {
// http://stackoverflow.com/questions/6450828/segmentation-fault-when-using-dlclose-on-android-platform
// // for now it is not crucial so I've created a task for that
// // ! 0 is success
// Return early because dlclose seems to be casuing the problem again. So
// strange.
return;
int closing_status = dlclose(dynamic_lib);
if (closing_status != 0) throw DynamicLibException(dlerror());
} else {

View File

@ -0,0 +1,61 @@
#include "gtest/gtest.h"
#include "mvcc/record.hpp"
#include "storage/deferred_deleter.hpp"
#include "storage/vertex.hpp"
#include "gc_common.hpp"
// Add and count objects.
TEST(DeferredDeleter, AddObjects) {
DeferredDeleter<Vertex> deleter;
for (int i = 0; i < 10; ++i) {
std::vector<Vertex *> V;
V.push_back(new Vertex());
V.push_back(new Vertex());
deleter.AddObjects(V, Id(5));
EXPECT_EQ(deleter.Count(), (i + 1) * 2);
}
deleter.FreeExpiredObjects(Id::MaximalId());
}
// Check that the deleter can't be destroyed while it still has objects.
TEST(DeferredDeleter, Destructor) {
std::atomic<int> count{0};
DeferredDeleter<PropCount> *deleter = new DeferredDeleter<PropCount>;
for (int i = 0; i < 10; ++i) {
std::vector<PropCount *> V;
V.push_back(new PropCount(count));
V.push_back(new PropCount(count));
deleter->AddObjects(V, Id(5));
EXPECT_EQ(deleter->Count(), (i + 1) * 2);
}
EXPECT_EQ(0, count);
EXPECT_DEATH(delete deleter, "");
// We shouldn't leak memory.
deleter->FreeExpiredObjects(Id::MaximalId());
delete deleter;
}
// Check if deleter frees objects.
TEST(DeferredDeleter, FreeExpiredObjects) {
DeferredDeleter<PropCount> deleter;
std::vector<PropCount *> V;
std::atomic<int> count{0};
V.push_back(new PropCount(count));
V.push_back(new PropCount(count));
deleter.AddObjects(V, Id(5));
deleter.FreeExpiredObjects(Id(5));
EXPECT_EQ(deleter.Count(), 2);
EXPECT_EQ(count, 0);
deleter.FreeExpiredObjects(Id(6));
EXPECT_EQ(deleter.Count(), 0);
EXPECT_EQ(count, 2);
}
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

22
tests/unit/gc_common.hpp Normal file
View File

@ -0,0 +1,22 @@
#pragma once
#include "mvcc/record.hpp"
/**
* @brief - Empty class which inherits from mvcc:Record.
*/
class Prop : public mvcc::Record<Prop> {};
/**
* @brief - Class which inherits from mvcc::Record and takes an atomic variable
* to count number of destructor calls (to test if the record is actually
* deleted).
*/
class PropCount : public mvcc::Record<PropCount> {
public:
PropCount(std::atomic<int> &count) : count_(count) {}
~PropCount() { ++count_; }
private:
std::atomic<int> &count_;
};

View File

@ -20,6 +20,10 @@ TEST(IdTest, BasicUsageAndTotalOrdering) {
ASSERT_EQ(id3 <= id4, true);
}
TEST(IdTest, MaxId) {
EXPECT_TRUE(Id(std::numeric_limits<uint64_t>::max()) == Id::MaximalId());
}
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();

View File

@ -3,7 +3,9 @@
#include "data_structures/ptr_int.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/graph_db_datatypes.hpp"
#include "dbms/dbms.hpp"
#include "storage/vertex.hpp"
using testing::UnorderedElementsAreArray;
@ -27,8 +29,116 @@ TEST(LabelsIndex, Count) {
}
}
// Test index does it insert everything uniquely
TEST(LabelsIndex, UniqueInsert) {
KeyIndex<GraphDbTypes::Label, Vertex> index;
Dbms dbms;
auto access = dbms.active();
tx::Engine engine;
auto t1 = engine.begin();
mvcc::VersionList<Vertex> vlist(*t1);
t1->commit();
auto t2 = engine.begin();
vlist.find(*t2)->labels_.push_back(access->label("1"));
index.Update(access->label("1"), &vlist, vlist.find(*t2));
// Try multiple inserts
index.Update(access->label("1"), &vlist, vlist.find(*t2));
vlist.find(*t2)->labels_.push_back(access->label("2"));
index.Update(access->label("2"), &vlist, vlist.find(*t2));
vlist.find(*t2)->labels_.push_back(access->label("3"));
index.Update(access->label("3"), &vlist, vlist.find(*t2));
t2->commit();
EXPECT_EQ(index.Count(access->label("1")), 1);
EXPECT_EQ(index.Count(access->label("2")), 1);
EXPECT_EQ(index.Count(access->label("3")), 1);
}
// Check if index filters duplicates.
TEST(LabelsIndex, UniqueFilter) {
KeyIndex<GraphDbTypes::Label, Vertex> index;
Dbms dbms;
auto access = dbms.active();
tx::Engine engine;
auto t1 = engine.begin();
mvcc::VersionList<Vertex> vlist1(*t1);
mvcc::VersionList<Vertex> vlist2(*t1);
t1->engine.advance(
t1->id); // advance command so we can see our inserted version
auto r1v1 = vlist1.find(*t1);
auto r1v2 = vlist1.find(*t1);
EXPECT_NE(vlist1.find(*t1), nullptr);
auto label1 = access->label("1");
vlist1.find(*t1)->labels_.push_back(label1);
vlist2.find(*t1)->labels_.push_back(label1);
index.Update(label1, &vlist1, r1v1);
index.Update(label1, &vlist2, r1v2);
t1->commit();
auto t2 = engine.begin();
auto r2v1 = vlist1.update(*t2);
auto r2v2 = vlist2.update(*t2);
index.Update(label1, &vlist1, r2v1);
index.Update(label1, &vlist2, r2v2);
t2->commit();
auto t3 = engine.begin();
std::vector<mvcc::VersionList<Vertex> *> expected = {&vlist1, &vlist2};
sort(expected.begin(),
expected.end()); // Entries will be sorted by vlist pointers.
int cnt = 0;
for (auto vlist : index.GetVlists(label1, *t3)) {
EXPECT_LT(cnt, expected.size());
EXPECT_EQ(vlist, expected[cnt++]);
}
}
// Delete not anymore relevant recods from index.
TEST(LabelsIndex, Refresh) {
KeyIndex<GraphDbTypes::Label, Vertex> index;
Dbms dbms;
auto access = dbms.active();
tx::Engine engine;
auto t1 = engine.begin();
mvcc::VersionList<Vertex> vlist1(*t1);
mvcc::VersionList<Vertex> vlist2(*t1);
t1->engine.advance(
t1->id); // advance command so we can see our inserted version
auto r1v1 = vlist1.find(*t1);
auto r1v2 = vlist2.find(*t1);
EXPECT_NE(vlist1.find(*t1), nullptr);
auto label1 = access->label("1");
vlist1.find(*t1)->labels_.push_back(label1);
vlist2.find(*t1)->labels_.push_back(label1);
index.Update(label1, &vlist1, r1v1);
index.Update(label1, &vlist2, r1v2);
t1->commit();
auto t2 = engine.begin();
auto r2v1 = vlist1.update(*t2);
auto r2v2 = vlist2.update(*t2);
index.Update(label1, &vlist1, r2v1);
index.Update(label1, &vlist2, r2v2);
int last_id = t2->id;
t2->commit();
EXPECT_EQ(index.Count(label1), 4);
index.Refresh(last_id, engine);
EXPECT_EQ(index.Count(label1), 4);
index.Refresh(last_id + 1, engine);
EXPECT_EQ(index.Count(label1), 2);
}
// Transaction hasn't ended and so the vertex is not visible.
TEST(LabelsIndex, AddGetZeroLabels) {
TEST(LabelsIndexDb, AddGetZeroLabels) {
Dbms dbms;
auto accessor = dbms.active();
auto vertex = accessor->insert_vertex();
@ -41,7 +151,7 @@ TEST(LabelsIndex, AddGetZeroLabels) {
// Test label index by adding and removing one vertex, and removing label from
// another, while the third one with an irrelevant label exists.
TEST(LabelsIndex, AddGetRemoveLabel) {
TEST(LabelsIndexDb, AddGetRemoveLabel) {
Dbms dbms;
{
auto accessor = dbms.active();

View File

@ -1,20 +1,13 @@
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "mvcc/id.hpp"
#include "mvcc/record.hpp"
#include "mvcc/version_list.hpp"
#include "storage/vertex.hpp"
#include "transactions/engine.hpp"
class Prop : public mvcc::Record<Prop> {};
class PropCount : public mvcc::Record<PropCount> {
public:
PropCount(std::atomic<int> &count) : count_(count) {}
~PropCount() { ++count_; }
private:
std::atomic<int> &count_;
};
#include "gc_common.hpp"
TEST(MVCC, Case1Test3) {
tx::Engine engine;
@ -72,6 +65,20 @@ TEST(MVCC, UpdateDontDelete) {
EXPECT_EQ(count, 3);
}
// Check that we get the oldest record.
TEST(MVCC, Oldest) {
tx::Engine engine;
auto t1 = engine.begin();
mvcc::VersionList<Prop> version_list(*t1);
auto first = version_list.Oldest();
EXPECT_NE(first, nullptr);
for (int i = 0; i < 10; ++i) {
engine.advance(t1->id);
version_list.update(*t1);
EXPECT_EQ(version_list.Oldest(), first);
}
}
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();

View File

@ -3,6 +3,7 @@
#include <chrono>
#include <memory>
#include <thread>
#include "config/config.hpp"
#include "data_structures/concurrent/skiplist.hpp"
@ -14,18 +15,7 @@
#include "storage/vertex.hpp"
#include "transactions/engine.hpp"
/**
* Class which takes an atomic variable to count number of destructor calls (to
* test if GC is actually deleting records).
*/
class Prop : public mvcc::Record<Prop> {
public:
Prop(std::atomic<int> &count) : count_(count) {}
~Prop() { ++count_; }
private:
std::atomic<int> &count_;
};
#include "gc_common.hpp"
/**
* Test will the mvcc gc delete records inside the version list because they
@ -37,7 +27,7 @@ TEST(VersionList, GcDeleted) {
std::vector<uint64_t> ids;
auto t1 = engine.begin();
std::atomic<int> count{0};
mvcc::VersionList<Prop> version_list(*t1, count);
mvcc::VersionList<PropCount> version_list(*t1, count);
ids.push_back(t1->id);
t1->commit();
@ -48,16 +38,36 @@ TEST(VersionList, GcDeleted) {
t2->commit();
}
EXPECT_EQ(version_list.GcDeleted(ids[0]), false);
EXPECT_EQ(version_list.GcDeleted(ids[0], engine),
std::make_pair(false, (PropCount *)nullptr));
EXPECT_EQ(count, 0);
EXPECT_EQ(version_list.GcDeleted(ids.back() + 1), false);
auto ret = version_list.GcDeleted(ids.back() + 1, engine);
EXPECT_EQ(ret.first, false);
EXPECT_NE(ret.second, nullptr);
delete ret.second;
EXPECT_EQ(count, UPDATES);
auto tl = engine.begin();
version_list.remove(*tl);
EXPECT_EQ(version_list.GcDeleted(tl->id + 1), true);
auto id = tl->id + 1;
tl->abort();
auto ret2 = version_list.GcDeleted(id, engine);
EXPECT_EQ(ret2.first, false);
EXPECT_EQ(ret2.second, nullptr);
auto tk = engine.begin();
version_list.remove(*tk);
auto id2 = tk->id + 1;
tk->commit();
auto ret3 = version_list.GcDeleted(id2, engine);
EXPECT_EQ(ret3.first, true);
EXPECT_NE(ret3.second, nullptr);
delete ret3.second;
EXPECT_EQ(count, UPDATES + 1);
tl->commit();
}
/**
@ -65,25 +75,35 @@ TEST(VersionList, GcDeleted) {
* empty (not visible from any future transaction) from the skiplist.
*/
TEST(GarbageCollector, GcClean) {
SkipList<mvcc::VersionList<Prop> *> skiplist;
SkipList<mvcc::VersionList<PropCount> *> skiplist;
tx::Engine engine;
GarbageCollector<Prop> gc(&skiplist, &engine);
DeferredDeleter<PropCount> deleter;
DeferredDeleter<mvcc::VersionList<PropCount>> vlist_deleter;
GarbageCollector<PropCount> gc(skiplist, deleter, vlist_deleter);
auto t1 = engine.begin();
std::atomic<int> count;
auto vl = new mvcc::VersionList<Prop>(*t1, count);
std::atomic<int> count{0};
auto vl = new mvcc::VersionList<PropCount>(*t1, count);
auto access = skiplist.access();
access.insert(vl);
gc.Run();
gc.Run(Id(2), engine);
t1->commit();
gc.Run();
auto t2 = engine.begin();
EXPECT_EQ(vl->remove(*t2), true);
t2->commit();
gc.Run(Id(3), engine);
EXPECT_EQ(deleter.Count(), 1);
deleter.FreeExpiredObjects(engine.count() + 1);
EXPECT_EQ(deleter.Count(), 0);
EXPECT_EQ(count, 1);
EXPECT_EQ(vlist_deleter.Count(), 1);
vlist_deleter.FreeExpiredObjects(engine.count() + 1);
EXPECT_EQ(vlist_deleter.Count(), 0);
gc.Run();
EXPECT_EQ(access.size(), (size_t)0);
}