Compare commits
1 Commits
master
...
free-memor
Author | SHA1 | Date | |
---|---|---|---|
|
23474fd6cc |
@ -12,6 +12,7 @@
|
|||||||
#include "storage/v2/storage.hpp"
|
#include "storage/v2/storage.hpp"
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
|
#include <limits>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <variant>
|
#include <variant>
|
||||||
@ -1439,11 +1440,17 @@ void Storage::CollectGarbage() {
|
|||||||
if constexpr (force) {
|
if constexpr (force) {
|
||||||
// We take the unique lock on the main storage lock so we can forcefully clean
|
// We take the unique lock on the main storage lock so we can forcefully clean
|
||||||
// everything we can
|
// everything we can
|
||||||
|
|
||||||
|
// TODO(gvolfing) Verify this! This should not just try, it should lock() always.
|
||||||
if (!main_lock_.try_lock()) {
|
if (!main_lock_.try_lock()) {
|
||||||
CollectGarbage<false>();
|
CollectGarbage<false>();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
// DEBUG
|
||||||
|
// return;
|
||||||
|
|
||||||
// Because the garbage collector iterates through the indices and constraints
|
// Because the garbage collector iterates through the indices and constraints
|
||||||
// to clean them up, it must take the main lock for reading to make sure that
|
// to clean them up, it must take the main lock for reading to make sure that
|
||||||
// the indices and constraints aren't concurrently being modified.
|
// the indices and constraints aren't concurrently being modified.
|
||||||
@ -1484,142 +1491,169 @@ void Storage::CollectGarbage() {
|
|||||||
deleted_vertices_->swap(current_deleted_vertices);
|
deleted_vertices_->swap(current_deleted_vertices);
|
||||||
deleted_edges_->swap(current_deleted_edges);
|
deleted_edges_->swap(current_deleted_edges);
|
||||||
|
|
||||||
|
const bool is_analytical_storage_mode = storage_mode_ == StorageMode::IN_MEMORY_ANALYTICAL;
|
||||||
|
|
||||||
// Flag that will be used to determine whether the Index GC should be run. It
|
// Flag that will be used to determine whether the Index GC should be run. It
|
||||||
// should be run when there were any items that were cleaned up (there were
|
// should be run when there were any items that were cleaned up (there were
|
||||||
// updates between this run of the GC and the previous run of the GC). This
|
// updates between this run of the GC and the previous run of the GC). This
|
||||||
// eliminates high CPU usage when the GC doesn't have to clean up anything.
|
// eliminates high CPU usage when the GC doesn't have to clean up anything.
|
||||||
bool run_index_cleanup = !committed_transactions_->empty() || !garbage_undo_buffers_->empty();
|
// bool run_index_cleanup = !committed_transactions_->empty() || !garbage_undo_buffers_->empty();
|
||||||
|
bool run_index_cleanup =
|
||||||
|
!committed_transactions_->empty() || !garbage_undo_buffers_->empty() || is_analytical_storage_mode;
|
||||||
|
|
||||||
while (true) {
|
if (is_analytical_storage_mode) {
|
||||||
// We don't want to hold the lock on commited transactions for too long,
|
oldest_active_start_timestamp = std::numeric_limits<decltype(oldest_active_start_timestamp)>::max();
|
||||||
// because that prevents other transactions from committing.
|
|
||||||
Transaction *transaction;
|
if constexpr (force) {
|
||||||
{
|
auto vertices = vertices_.access();
|
||||||
auto committed_transactions_ptr = committed_transactions_.Lock();
|
for (auto &vertex : vertices) {
|
||||||
if (committed_transactions_ptr->empty()) {
|
if (vertex.deleted) {
|
||||||
|
current_deleted_vertices.push_back(vertex.gid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// TODO add edges as well
|
||||||
|
|
||||||
|
} else {
|
||||||
|
// CollectGarbage<true>();
|
||||||
|
// return;
|
||||||
|
auto vertices = vertices_.access();
|
||||||
|
for (auto &vertex : vertices) {
|
||||||
|
if (vertex.deleted) {
|
||||||
|
current_deleted_vertices.push_back(vertex.gid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
while (true) {
|
||||||
|
// We don't want to hold the lock on commited transactions for too long,
|
||||||
|
// because that prevents other transactions from committing.
|
||||||
|
Transaction *transaction;
|
||||||
|
{
|
||||||
|
auto committed_transactions_ptr = committed_transactions_.Lock();
|
||||||
|
if (committed_transactions_ptr->empty() || is_analytical_storage_mode) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
transaction = &committed_transactions_ptr->front();
|
||||||
|
}
|
||||||
|
|
||||||
|
auto commit_timestamp = transaction->commit_timestamp->load(std::memory_order_acquire);
|
||||||
|
if (commit_timestamp >= oldest_active_start_timestamp) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
transaction = &committed_transactions_ptr->front();
|
|
||||||
}
|
|
||||||
|
|
||||||
auto commit_timestamp = transaction->commit_timestamp->load(std::memory_order_acquire);
|
// When unlinking a delta which is the first delta in its version chain,
|
||||||
if (commit_timestamp >= oldest_active_start_timestamp) {
|
// special care has to be taken to avoid the following race condition:
|
||||||
break;
|
//
|
||||||
}
|
// [Vertex] --> [Delta A]
|
||||||
|
//
|
||||||
|
// GC thread: Delta A is the first in its chain, it must be unlinked from
|
||||||
|
// vertex and marked for deletion
|
||||||
|
// TX thread: Update vertex and add Delta B with Delta A as next
|
||||||
|
//
|
||||||
|
// [Vertex] --> [Delta B] <--> [Delta A]
|
||||||
|
//
|
||||||
|
// GC thread: Unlink delta from Vertex
|
||||||
|
//
|
||||||
|
// [Vertex] --> (nullptr)
|
||||||
|
//
|
||||||
|
// When processing a delta that is the first one in its chain, we
|
||||||
|
// obtain the corresponding vertex or edge lock, and then verify that this
|
||||||
|
// delta still is the first in its chain.
|
||||||
|
// When processing a delta that is in the middle of the chain we only
|
||||||
|
// process the final delta of the given transaction in that chain. We
|
||||||
|
// determine the owner of the chain (either a vertex or an edge), obtain the
|
||||||
|
// corresponding lock, and then verify that this delta is still in the same
|
||||||
|
// position as it was before taking the lock.
|
||||||
|
//
|
||||||
|
// Even though the delta chain is lock-free (both `next` and `prev`) the
|
||||||
|
// chain should not be modified without taking the lock from the object that
|
||||||
|
// owns the chain (either a vertex or an edge). Modifying the chain without
|
||||||
|
// taking the lock will cause subtle race conditions that will leave the
|
||||||
|
// chain in a broken state.
|
||||||
|
// The chain can be only read without taking any locks.
|
||||||
|
|
||||||
// When unlinking a delta which is the first delta in its version chain,
|
for (Delta &delta : transaction->deltas) {
|
||||||
// special care has to be taken to avoid the following race condition:
|
while (true) {
|
||||||
//
|
auto prev = delta.prev.Get();
|
||||||
// [Vertex] --> [Delta A]
|
switch (prev.type) {
|
||||||
//
|
case PreviousPtr::Type::VERTEX: {
|
||||||
// GC thread: Delta A is the first in its chain, it must be unlinked from
|
Vertex *vertex = prev.vertex;
|
||||||
// vertex and marked for deletion
|
std::lock_guard<utils::SpinLock> vertex_guard(vertex->lock);
|
||||||
// TX thread: Update vertex and add Delta B with Delta A as next
|
if (vertex->delta != &delta) {
|
||||||
//
|
// Something changed, we're not the first delta in the chain
|
||||||
// [Vertex] --> [Delta B] <--> [Delta A]
|
// anymore.
|
||||||
//
|
continue;
|
||||||
// GC thread: Unlink delta from Vertex
|
}
|
||||||
//
|
vertex->delta = nullptr;
|
||||||
// [Vertex] --> (nullptr)
|
if (vertex->deleted) {
|
||||||
//
|
current_deleted_vertices.push_back(vertex->gid);
|
||||||
// When processing a delta that is the first one in its chain, we
|
}
|
||||||
// obtain the corresponding vertex or edge lock, and then verify that this
|
|
||||||
// delta still is the first in its chain.
|
|
||||||
// When processing a delta that is in the middle of the chain we only
|
|
||||||
// process the final delta of the given transaction in that chain. We
|
|
||||||
// determine the owner of the chain (either a vertex or an edge), obtain the
|
|
||||||
// corresponding lock, and then verify that this delta is still in the same
|
|
||||||
// position as it was before taking the lock.
|
|
||||||
//
|
|
||||||
// Even though the delta chain is lock-free (both `next` and `prev`) the
|
|
||||||
// chain should not be modified without taking the lock from the object that
|
|
||||||
// owns the chain (either a vertex or an edge). Modifying the chain without
|
|
||||||
// taking the lock will cause subtle race conditions that will leave the
|
|
||||||
// chain in a broken state.
|
|
||||||
// The chain can be only read without taking any locks.
|
|
||||||
|
|
||||||
for (Delta &delta : transaction->deltas) {
|
|
||||||
while (true) {
|
|
||||||
auto prev = delta.prev.Get();
|
|
||||||
switch (prev.type) {
|
|
||||||
case PreviousPtr::Type::VERTEX: {
|
|
||||||
Vertex *vertex = prev.vertex;
|
|
||||||
std::lock_guard<utils::SpinLock> vertex_guard(vertex->lock);
|
|
||||||
if (vertex->delta != &delta) {
|
|
||||||
// Something changed, we're not the first delta in the chain
|
|
||||||
// anymore.
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
vertex->delta = nullptr;
|
|
||||||
if (vertex->deleted) {
|
|
||||||
current_deleted_vertices.push_back(vertex->gid);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case PreviousPtr::Type::EDGE: {
|
|
||||||
Edge *edge = prev.edge;
|
|
||||||
std::lock_guard<utils::SpinLock> edge_guard(edge->lock);
|
|
||||||
if (edge->delta != &delta) {
|
|
||||||
// Something changed, we're not the first delta in the chain
|
|
||||||
// anymore.
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
edge->delta = nullptr;
|
|
||||||
if (edge->deleted) {
|
|
||||||
current_deleted_edges.push_back(edge->gid);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case PreviousPtr::Type::DELTA: {
|
|
||||||
if (prev.delta->timestamp->load(std::memory_order_acquire) == commit_timestamp) {
|
|
||||||
// The delta that is newer than this one is also a delta from this
|
|
||||||
// transaction. We skip the current delta and will remove it as a
|
|
||||||
// part of the suffix later.
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
std::unique_lock<utils::SpinLock> guard;
|
case PreviousPtr::Type::EDGE: {
|
||||||
{
|
Edge *edge = prev.edge;
|
||||||
// We need to find the parent object in order to be able to use
|
std::lock_guard<utils::SpinLock> edge_guard(edge->lock);
|
||||||
// its lock.
|
if (edge->delta != &delta) {
|
||||||
auto parent = prev;
|
// Something changed, we're not the first delta in the chain
|
||||||
while (parent.type == PreviousPtr::Type::DELTA) {
|
// anymore.
|
||||||
parent = parent.delta->prev.Get();
|
continue;
|
||||||
}
|
}
|
||||||
switch (parent.type) {
|
edge->delta = nullptr;
|
||||||
case PreviousPtr::Type::VERTEX:
|
if (edge->deleted) {
|
||||||
guard = std::unique_lock<utils::SpinLock>(parent.vertex->lock);
|
current_deleted_edges.push_back(edge->gid);
|
||||||
break;
|
|
||||||
case PreviousPtr::Type::EDGE:
|
|
||||||
guard = std::unique_lock<utils::SpinLock>(parent.edge->lock);
|
|
||||||
break;
|
|
||||||
case PreviousPtr::Type::DELTA:
|
|
||||||
case PreviousPtr::Type::NULLPTR:
|
|
||||||
LOG_FATAL("Invalid database state!");
|
|
||||||
}
|
}
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
if (delta.prev.Get() != prev) {
|
case PreviousPtr::Type::DELTA: {
|
||||||
// Something changed, we could now be the first delta in the
|
if (prev.delta->timestamp->load(std::memory_order_acquire) == commit_timestamp) {
|
||||||
// chain.
|
// The delta that is newer than this one is also a delta from this
|
||||||
continue;
|
// transaction. We skip the current delta and will remove it as a
|
||||||
|
// part of the suffix later.
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
std::unique_lock<utils::SpinLock> guard;
|
||||||
|
{
|
||||||
|
// We need to find the parent object in order to be able to use
|
||||||
|
// its lock.
|
||||||
|
auto parent = prev;
|
||||||
|
while (parent.type == PreviousPtr::Type::DELTA) {
|
||||||
|
parent = parent.delta->prev.Get();
|
||||||
|
}
|
||||||
|
switch (parent.type) {
|
||||||
|
case PreviousPtr::Type::VERTEX:
|
||||||
|
guard = std::unique_lock<utils::SpinLock>(parent.vertex->lock);
|
||||||
|
break;
|
||||||
|
case PreviousPtr::Type::EDGE:
|
||||||
|
guard = std::unique_lock<utils::SpinLock>(parent.edge->lock);
|
||||||
|
break;
|
||||||
|
case PreviousPtr::Type::DELTA:
|
||||||
|
case PreviousPtr::Type::NULLPTR:
|
||||||
|
LOG_FATAL("Invalid database state!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (delta.prev.Get() != prev) {
|
||||||
|
// Something changed, we could now be the first delta in the
|
||||||
|
// chain.
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Delta *prev_delta = prev.delta;
|
||||||
|
prev_delta->next.store(nullptr, std::memory_order_release);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case PreviousPtr::Type::NULLPTR: {
|
||||||
|
LOG_FATAL("Invalid pointer!");
|
||||||
}
|
}
|
||||||
Delta *prev_delta = prev.delta;
|
|
||||||
prev_delta->next.store(nullptr, std::memory_order_release);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case PreviousPtr::Type::NULLPTR: {
|
|
||||||
LOG_FATAL("Invalid pointer!");
|
|
||||||
}
|
}
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
committed_transactions_.WithLock([&](auto &committed_transactions) {
|
||||||
|
unlinked_undo_buffers.emplace_back(0, std::move(transaction->deltas));
|
||||||
|
committed_transactions.pop_front();
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
committed_transactions_.WithLock([&](auto &committed_transactions) {
|
|
||||||
unlinked_undo_buffers.emplace_back(0, std::move(transaction->deltas));
|
|
||||||
committed_transactions.pop_front();
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// After unlinking deltas from vertices, we refresh the indices. That way
|
// After unlinking deltas from vertices, we refresh the indices. That way
|
||||||
// we're sure that none of the vertices from `current_deleted_vertices`
|
// we're sure that none of the vertices from `current_deleted_vertices`
|
||||||
// appears in an index, and we can safely remove the from the main storage
|
// appears in an index, and we can safely remove the from the main storage
|
||||||
@ -1632,22 +1666,26 @@ void Storage::CollectGarbage() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
std::unique_lock<utils::SpinLock> guard(engine_lock_);
|
uint64_t mark_timestamp = 0;
|
||||||
uint64_t mark_timestamp = timestamp_;
|
if (!is_analytical_storage_mode) {
|
||||||
// Take garbage_undo_buffers lock while holding the engine lock to make
|
std::unique_lock<utils::SpinLock> guard(engine_lock_);
|
||||||
// sure that entries are sorted by mark timestamp in the list.
|
// uint64_t mark_timestamp = timestamp_;
|
||||||
garbage_undo_buffers_.WithLock([&](auto &garbage_undo_buffers) {
|
mark_timestamp = timestamp_;
|
||||||
// Release engine lock because we don't have to hold it anymore and
|
// Take garbage_undo_buffers lock while holding the engine lock to make
|
||||||
// this could take a long time.
|
// sure that entries are sorted by mark timestamp in the list.
|
||||||
guard.unlock();
|
garbage_undo_buffers_.WithLock([&](auto &garbage_undo_buffers) {
|
||||||
// TODO(mtomic): holding garbage_undo_buffers_ lock here prevents
|
// Release engine lock because we don't have to hold it anymore and
|
||||||
// transactions from aborting until we're done marking, maybe we should
|
// this could take a long time.
|
||||||
// add them one-by-one or something
|
guard.unlock();
|
||||||
for (auto &[timestamp, undo_buffer] : unlinked_undo_buffers) {
|
// TODO(mtomic): holding garbage_undo_buffers_ lock here prevents
|
||||||
timestamp = mark_timestamp;
|
// transactions from aborting until we're done marking, maybe we should
|
||||||
}
|
// add them one-by-one or something
|
||||||
garbage_undo_buffers.splice(garbage_undo_buffers.end(), unlinked_undo_buffers);
|
for (auto &[timestamp, undo_buffer] : unlinked_undo_buffers) {
|
||||||
});
|
timestamp = mark_timestamp;
|
||||||
|
}
|
||||||
|
garbage_undo_buffers.splice(garbage_undo_buffers.end(), unlinked_undo_buffers);
|
||||||
|
});
|
||||||
|
}
|
||||||
for (auto vertex : current_deleted_vertices) {
|
for (auto vertex : current_deleted_vertices) {
|
||||||
garbage_vertices_.emplace_back(mark_timestamp, vertex);
|
garbage_vertices_.emplace_back(mark_timestamp, vertex);
|
||||||
}
|
}
|
||||||
|
@ -67,6 +67,7 @@ class DeltaGenerator final {
|
|||||||
auto gid = memgraph::storage::Gid::FromUint(gen_->vertices_count_++);
|
auto gid = memgraph::storage::Gid::FromUint(gen_->vertices_count_++);
|
||||||
auto delta = memgraph::storage::CreateDeleteObjectDelta(&transaction_);
|
auto delta = memgraph::storage::CreateDeleteObjectDelta(&transaction_);
|
||||||
auto &it = gen_->vertices_.emplace_back(gid, delta);
|
auto &it = gen_->vertices_.emplace_back(gid, delta);
|
||||||
|
// TODO(gvolfing) This is likely problematic when deleting vertices in ANALYTICAL MODE...
|
||||||
if (delta != nullptr) {
|
if (delta != nullptr) {
|
||||||
delta->prev.Set(&it);
|
delta->prev.Set(&it);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user