Fix memory leak. Refactor mvcc. Revert commit function to old and introduce new one.
Summary: Update tests. Documentation: https://phabricator.memgraph.io/w/memgraph_implementation/mvcc/ Reviewers: mislav.bradac, florijan, buda Reviewed By: buda Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D227
This commit is contained in:
parent
64d0163aad
commit
e8fd479bbc
@ -94,7 +94,7 @@ class Record : public Version<T> {
|
||||
}
|
||||
|
||||
bool exp_committed(const tx::Transaction &t) {
|
||||
return committed(hints.exp, tx.exp(), t);
|
||||
return committed(hints.exp, tx.exp(), t.engine);
|
||||
}
|
||||
|
||||
bool cre_committed(const Id &id, const tx::Transaction &t) {
|
||||
@ -143,20 +143,28 @@ class Record : public Version<T> {
|
||||
|
||||
protected:
|
||||
template <class U>
|
||||
/**
|
||||
* @brief - Check if the id is commited from the perspective of transactio,
|
||||
* i.e. transaction can see the transaction with that id (it happened before
|
||||
* the transaction, and is not in the snapshot). This method is used to test
|
||||
* for visibility of some record.
|
||||
* @param hints - hints to use to determine commit/abort
|
||||
* about transactions commit/abort status
|
||||
* @param id - id to check if it's commited and visible
|
||||
* @return true if the id is commited and visible for the transaction t.
|
||||
*/
|
||||
bool committed(U &hints, const Id &id, const tx::Transaction &t) {
|
||||
// This whole section below is commented out because even though you can't
|
||||
// see something with greater id you should still able to check if it's
|
||||
// commited - consult the MVCC tests to better understand this behaviour.
|
||||
// you certainly can't see the transaction with id greater than yours
|
||||
// as that means it started after this transaction and if it committed,
|
||||
// it committed after this transaction had started.
|
||||
// if (id >= t.id) return false;
|
||||
// Dominik Gleich says 4 april 2017: the tests in this routine are correct;
|
||||
// if you think they're not, you're wrong, and you should think about it
|
||||
// again. I know, it happened to me (and also to Matej Gradicek).
|
||||
|
||||
// You certainly can't see the transaction with id greater than yours as
|
||||
// that means it started after this transaction and if it commited, it
|
||||
// commited after this transaction has started.
|
||||
if (id >= t.id) return false;
|
||||
|
||||
// This is commented out because something even though was started before
|
||||
// and not ended yet could have ended in the meantime and data should now be
|
||||
// visible if it's commited.
|
||||
// The creating transaction is still in progress (examine snapshot)
|
||||
// if (t.in_snapshot(id)) return false;
|
||||
if (t.in_snapshot(id)) return false;
|
||||
|
||||
auto hint_bits = hints.load();
|
||||
|
||||
@ -169,18 +177,32 @@ class Record : public Version<T> {
|
||||
auto info = t.engine.clog.fetch_info(id);
|
||||
|
||||
if (info.is_committed()) return hints.set_committed(), true;
|
||||
if (info.is_aborted()) return hints.set_aborted(), false;
|
||||
|
||||
debug_assert(info.is_aborted(),
|
||||
"Info isn't aborted, but function would return as aborted.");
|
||||
return hints.set_aborted(), false;
|
||||
}
|
||||
|
||||
template <class U>
|
||||
/**
|
||||
* @brief - Check if the id is commited.
|
||||
* @param hints - hints to use to determine commit/abort
|
||||
* @param id - id to check if commited
|
||||
* @param engine - engine instance with information about transactions
|
||||
* statuses
|
||||
* @return true if it's commited, false otherwise
|
||||
*/
|
||||
bool committed(U &hints, const Id &id, tx::Engine &engine) {
|
||||
auto hint_bits = hints.load();
|
||||
// if hints are set, return if xid is committed
|
||||
if (!hint_bits.is_unknown()) return hint_bits.is_committed();
|
||||
|
||||
// if hints are not set:
|
||||
// - you are the first one to check since it ended, consult commit log
|
||||
auto info = engine.clog.fetch_info(id);
|
||||
if (info.is_committed()) return hints.set_committed(), true;
|
||||
if (info.is_aborted()) return hints.set_aborted(), false;
|
||||
return false;
|
||||
/*
|
||||
BLAME dgleich and matej.gradicek. This check shouldn't be done here since
|
||||
this function should only check if transaction commited or not, and not
|
||||
actually assume that transaction associated with this record either
|
||||
aborted or commited.
|
||||
debug_assert(info.is_aborted(),
|
||||
"Info isn't aborted, but function would return as aborted.");
|
||||
return hints.set_aborted(), false;
|
||||
*/
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -1,5 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <shared_mutex>
|
||||
|
||||
#include "threading/sync/lockable.hpp"
|
||||
#include "transactions/transaction.hpp"
|
||||
|
||||
@ -16,10 +18,11 @@ class VersionList {
|
||||
using uptr = std::unique_ptr<VersionList<T>>;
|
||||
using item_t = T;
|
||||
|
||||
/* @brief Constructor that is used to insert one item into VersionList.
|
||||
@param t - transaction
|
||||
@param args - args forwarded to constructor of item T (for
|
||||
creating the first Record (Version) in this VersionList.
|
||||
/**
|
||||
* @brief Constructor that is used to insert one item into VersionList.
|
||||
* @param t - transaction
|
||||
* @param args - args forwarded to constructor of item T (for
|
||||
* creating the first Record (Version) in this VersionList.
|
||||
*/
|
||||
template <typename... Args>
|
||||
VersionList(tx::Transaction &t, Args &&... args) {
|
||||
@ -88,16 +91,30 @@ class VersionList {
|
||||
newest_deleted_record->next(std::memory_order_seq_cst);
|
||||
}
|
||||
|
||||
// This can happen only if the head already points to a deleted record or
|
||||
// the version list is empty. This means that the version_list is ready
|
||||
// for complete destruction.
|
||||
if (oldest_not_deleted_record == nullptr) {
|
||||
// This can happen only if the head already points to a deleted record or
|
||||
// the version list is empty. This means that the version_list is ready
|
||||
// for complete destruction.
|
||||
if (newest_deleted_record != nullptr) delete newest_deleted_record;
|
||||
head.store(nullptr, std::memory_order_seq_cst);
|
||||
// Head points to a deleted record.
|
||||
if (newest_deleted_record != nullptr) {
|
||||
{
|
||||
// We need to take an exclusive lock here since if some thread is
|
||||
// currently doing the find operation we could free something to which
|
||||
// find method points.
|
||||
std::unique_lock<std::shared_timed_mutex> lock(head_mutex_);
|
||||
head.store(nullptr, std::memory_order_seq_cst);
|
||||
}
|
||||
// This is safe to do since we unlinked head and now no record* in
|
||||
// another thread which is executing the find function can see any of
|
||||
// the nodes we are going to delete.
|
||||
delete newest_deleted_record;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
// oldest_not_deleted_record might be visible to some transaction but
|
||||
// newest_deleted_record is not.
|
||||
// newest_deleted_record is not and will never be visted by the find
|
||||
// function and as such doesn't represent pointer invalidation
|
||||
// race-condition risk.
|
||||
oldest_not_deleted_record->next(
|
||||
nullptr, std::memory_order_seq_cst); // No transaction will look
|
||||
// further than this record and
|
||||
@ -111,7 +128,11 @@ class VersionList {
|
||||
return false;
|
||||
}
|
||||
|
||||
T *find(const tx::Transaction &t) const {
|
||||
T *find(const tx::Transaction &t) {
|
||||
// We need to take a shared_lock here because GC could delete the first
|
||||
// entry pointed by head, or some later entry when following head->next
|
||||
// pointers.
|
||||
std::shared_lock<std::shared_timed_mutex> slock(head_mutex_);
|
||||
auto r = head.load(std::memory_order_seq_cst);
|
||||
|
||||
// nullptr
|
||||
@ -145,8 +166,10 @@ class VersionList {
|
||||
*
|
||||
* @param t The transaction
|
||||
*/
|
||||
void find_set_new_old(const tx::Transaction &t, T *&old_ref,
|
||||
T *&new_ref) const {
|
||||
void find_set_new_old(const tx::Transaction &t, T *&old_ref, T *&new_ref) {
|
||||
// Take a look in find to understand why this is needed.
|
||||
std::shared_lock<std::shared_timed_mutex> slock(head_mutex_);
|
||||
|
||||
// assume that the sought old record is further down the list
|
||||
// from new record, so that if we found old we can stop looking
|
||||
new_ref = nullptr;
|
||||
@ -167,24 +190,8 @@ class VersionList {
|
||||
return update(record, t);
|
||||
}
|
||||
|
||||
T *update(T *record, tx::Transaction &t) {
|
||||
debug_assert(record != nullptr, "Record is nullptr on update.");
|
||||
lock_and_validate(record, t);
|
||||
|
||||
// It could be done with unique_ptr but while this could mean memory
|
||||
// leak on exception, unique_ptr could mean use after free. Memory
|
||||
// leak is less dangerous.
|
||||
auto updated = new T(*record);
|
||||
|
||||
updated->mark_created(t);
|
||||
record->mark_deleted(t);
|
||||
|
||||
updated->next(record, std::memory_order_seq_cst);
|
||||
head.store(updated, std::memory_order_seq_cst);
|
||||
|
||||
return updated;
|
||||
}
|
||||
|
||||
// TODO(flor): This should also be private but can't be right now because of
|
||||
// the way graph_db_accessor works.
|
||||
bool remove(tx::Transaction &t) {
|
||||
debug_assert(head != nullptr, "Head is nullptr on removal.");
|
||||
auto record = find(t);
|
||||
@ -220,7 +227,36 @@ class VersionList {
|
||||
throw SerializationError();
|
||||
}
|
||||
|
||||
T *update(T *record, tx::Transaction &t) {
|
||||
debug_assert(record != nullptr, "Record is nullptr on update.");
|
||||
lock_and_validate(record, t);
|
||||
|
||||
// It could be done with unique_ptr but while this could mean memory
|
||||
// leak on exception, unique_ptr could mean use after free. Memory
|
||||
// leak is less dangerous.
|
||||
auto updated = new T(*record);
|
||||
|
||||
updated->mark_created(t);
|
||||
record->mark_deleted(t);
|
||||
|
||||
// Updated version should point to the latest available version. Older
|
||||
// versions that can be deleted will be removed during the GC phase.
|
||||
updated->next(head.load(), std::memory_order_seq_cst);
|
||||
|
||||
// Store the updated version as the first version point to by head.
|
||||
head.store(updated, std::memory_order_seq_cst);
|
||||
|
||||
return updated;
|
||||
}
|
||||
|
||||
std::atomic<T *> head{nullptr};
|
||||
RecordLock lock;
|
||||
// We need this mutex to make the nodes deletion operation (to avoid memory
|
||||
// leak) atomic with regards to the find operation. Otherwise we might have a
|
||||
// pointer to a version that is currently being deleted, and accessing it will
|
||||
// throw a SEGFAULT.
|
||||
// TODO(C++17)
|
||||
std::shared_timed_mutex head_mutex_; // This should be changed
|
||||
// when we migrate to C++17.
|
||||
};
|
||||
}
|
||||
|
@ -13,7 +13,6 @@
|
||||
/**
|
||||
@template T type of underlying record in mvcc
|
||||
*/
|
||||
|
||||
template <typename T>
|
||||
class GarbageCollector : public Loggable {
|
||||
public:
|
||||
|
@ -18,7 +18,7 @@ class Transaction {
|
||||
friend class Engine;
|
||||
|
||||
public:
|
||||
Transaction(Engine &engine);
|
||||
explicit Transaction(Engine &engine);
|
||||
Transaction(const Id &&id, const Snapshot<Id> &&snapshot, Engine &engine);
|
||||
Transaction(const Id &id, const Snapshot<Id> &snapshot, Engine &engine);
|
||||
Transaction(const Transaction &) = delete;
|
||||
|
65
tests/benchmark/mvcc.cpp
Normal file
65
tests/benchmark/mvcc.cpp
Normal file
@ -0,0 +1,65 @@
|
||||
#include "benchmark/benchmark.h"
|
||||
#include "benchmark/benchmark_api.h"
|
||||
#include "logging/default.hpp"
|
||||
#include "logging/streams/stderr.hpp"
|
||||
#include "mvcc/record.hpp"
|
||||
#include "mvcc/version_list.hpp"
|
||||
|
||||
class Prop : public mvcc::Record<Prop> {};
|
||||
|
||||
// Benchmark multiple updates, and finds, focused on finds.
|
||||
// This a rather weak test, but I'm not sure what's the better way to test this
|
||||
// in the future.
|
||||
// TODO(dgleich): Refresh this.
|
||||
void MvccMix(benchmark::State &state) {
|
||||
while (state.KeepRunning()) {
|
||||
state.PauseTiming();
|
||||
tx::Engine engine;
|
||||
auto t1 = engine.begin();
|
||||
mvcc::VersionList<Prop> version_list(*t1);
|
||||
|
||||
auto t2 = engine.begin();
|
||||
t1->commit();
|
||||
|
||||
state.ResumeTiming();
|
||||
version_list.update(*t2);
|
||||
state.PauseTiming();
|
||||
|
||||
state.ResumeTiming();
|
||||
version_list.find(*t2);
|
||||
state.PauseTiming();
|
||||
|
||||
t2->abort();
|
||||
|
||||
auto t3 = engine.begin();
|
||||
state.ResumeTiming();
|
||||
version_list.update(*t3);
|
||||
state.PauseTiming();
|
||||
auto t4 = engine.begin();
|
||||
|
||||
// Repeat find state.range(0) number of times.
|
||||
state.ResumeTiming();
|
||||
for (int i = 0; i < state.range(0); ++i) {
|
||||
version_list.find(*t4);
|
||||
}
|
||||
state.PauseTiming();
|
||||
|
||||
t3->commit();
|
||||
t4->commit();
|
||||
state.ResumeTiming();
|
||||
}
|
||||
}
|
||||
|
||||
BENCHMARK(MvccMix)
|
||||
->RangeMultiplier(2) // Multiply next range testdata size by 2
|
||||
->Range(1 << 14, 1 << 23) // 1<<14, 1<<15, 1<<16, ...
|
||||
->Unit(benchmark::kMillisecond);
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
logging::init_async();
|
||||
logging::log->pipe(std::make_unique<Stderr>());
|
||||
|
||||
::benchmark::Initialize(&argc, argv);
|
||||
::benchmark::RunSpecifiedBenchmarks();
|
||||
return 0;
|
||||
}
|
@ -7,6 +7,14 @@
|
||||
#include "transactions/engine.hpp"
|
||||
|
||||
class Prop : public mvcc::Record<Prop> {};
|
||||
class PropCount : public mvcc::Record<PropCount> {
|
||||
public:
|
||||
PropCount(std::atomic<int> &count) : count_(count) {}
|
||||
~PropCount() { ++count_; }
|
||||
|
||||
private:
|
||||
std::atomic<int> &count_;
|
||||
};
|
||||
|
||||
TEST(MVCC, Case1Test3) {
|
||||
tx::Engine engine;
|
||||
@ -15,17 +23,17 @@ TEST(MVCC, Case1Test3) {
|
||||
t1->commit();
|
||||
|
||||
auto t2 = engine.begin();
|
||||
auto v2 = version_list.update(*t2);
|
||||
version_list.update(*t2);
|
||||
t2->commit();
|
||||
|
||||
auto t3 = engine.begin();
|
||||
auto t4 = engine.begin();
|
||||
version_list.update(*t4);
|
||||
t4->commit();
|
||||
EXPECT_THROW(version_list.remove(v2, *t3), SerializationError);
|
||||
EXPECT_THROW(version_list.remove(*t3), SerializationError);
|
||||
}
|
||||
|
||||
TEST(MVCC, InSnapshot) {
|
||||
TEST(MVCC, InSnapshotSerializationError) {
|
||||
tx::Engine engine;
|
||||
auto t1 = engine.begin();
|
||||
mvcc::VersionList<Prop> version_list(*t1);
|
||||
@ -34,10 +42,34 @@ TEST(MVCC, InSnapshot) {
|
||||
auto t2 = engine.begin();
|
||||
version_list.update(*t2);
|
||||
auto t3 = engine.begin(); // t2 is in snapshot of t3
|
||||
auto v = version_list.find(*t3);
|
||||
t2->commit();
|
||||
|
||||
EXPECT_THROW(version_list.update(v, *t3), SerializationError);
|
||||
EXPECT_THROW(version_list.update(*t3), SerializationError);
|
||||
}
|
||||
|
||||
// Check that we don't delete records when we re-link.
|
||||
TEST(MVCC, UpdateDontDelete) {
|
||||
std::atomic<int> count{0};
|
||||
{
|
||||
tx::Engine engine;
|
||||
auto t1 = engine.begin();
|
||||
mvcc::VersionList<PropCount> version_list(*t1, count);
|
||||
t1->commit();
|
||||
|
||||
auto t2 = engine.begin();
|
||||
version_list.update(*t2);
|
||||
t2->abort();
|
||||
EXPECT_EQ(count, 0);
|
||||
|
||||
auto t3 = engine.begin();
|
||||
|
||||
// Update re-links the node and shouldn't clear it yet.
|
||||
version_list.update(*t3);
|
||||
EXPECT_EQ(count, 0);
|
||||
|
||||
t3->commit();
|
||||
}
|
||||
EXPECT_EQ(count, 3);
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
|
Loading…
Reference in New Issue
Block a user