Compare commits

...

1 Commits

Author SHA1 Message Date
gvolfing
23474fd6cc initial changes 2023-05-04 08:31:08 +02:00
2 changed files with 171 additions and 132 deletions

View File

@ -12,6 +12,7 @@
#include "storage/v2/storage.hpp" #include "storage/v2/storage.hpp"
#include <algorithm> #include <algorithm>
#include <atomic> #include <atomic>
#include <limits>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <variant> #include <variant>
@ -1439,11 +1440,17 @@ void Storage::CollectGarbage() {
if constexpr (force) { if constexpr (force) {
// We take the unique lock on the main storage lock so we can forcefully clean // We take the unique lock on the main storage lock so we can forcefully clean
// everything we can // everything we can
// TODO(gvolfing) Verify this! This should not just try, it should lock() always.
if (!main_lock_.try_lock()) { if (!main_lock_.try_lock()) {
CollectGarbage<false>(); CollectGarbage<false>();
return; return;
} }
} else { } else {
// DEBUG
// return;
// Because the garbage collector iterates through the indices and constraints // Because the garbage collector iterates through the indices and constraints
// to clean them up, it must take the main lock for reading to make sure that // to clean them up, it must take the main lock for reading to make sure that
// the indices and constraints aren't concurrently being modified. // the indices and constraints aren't concurrently being modified.
@ -1484,142 +1491,169 @@ void Storage::CollectGarbage() {
deleted_vertices_->swap(current_deleted_vertices); deleted_vertices_->swap(current_deleted_vertices);
deleted_edges_->swap(current_deleted_edges); deleted_edges_->swap(current_deleted_edges);
const bool is_analytical_storage_mode = storage_mode_ == StorageMode::IN_MEMORY_ANALYTICAL;
// Flag that will be used to determine whether the Index GC should be run. It // Flag that will be used to determine whether the Index GC should be run. It
// should be run when there were any items that were cleaned up (there were // should be run when there were any items that were cleaned up (there were
// updates between this run of the GC and the previous run of the GC). This // updates between this run of the GC and the previous run of the GC). This
// eliminates high CPU usage when the GC doesn't have to clean up anything. // eliminates high CPU usage when the GC doesn't have to clean up anything.
bool run_index_cleanup = !committed_transactions_->empty() || !garbage_undo_buffers_->empty(); // bool run_index_cleanup = !committed_transactions_->empty() || !garbage_undo_buffers_->empty();
bool run_index_cleanup =
!committed_transactions_->empty() || !garbage_undo_buffers_->empty() || is_analytical_storage_mode;
while (true) { if (is_analytical_storage_mode) {
// We don't want to hold the lock on commited transactions for too long, oldest_active_start_timestamp = std::numeric_limits<decltype(oldest_active_start_timestamp)>::max();
// because that prevents other transactions from committing.
Transaction *transaction; if constexpr (force) {
{ auto vertices = vertices_.access();
auto committed_transactions_ptr = committed_transactions_.Lock(); for (auto &vertex : vertices) {
if (committed_transactions_ptr->empty()) { if (vertex.deleted) {
current_deleted_vertices.push_back(vertex.gid);
}
}
// TODO add edges as well
} else {
// CollectGarbage<true>();
// return;
auto vertices = vertices_.access();
for (auto &vertex : vertices) {
if (vertex.deleted) {
current_deleted_vertices.push_back(vertex.gid);
}
}
}
} else {
while (true) {
// We don't want to hold the lock on commited transactions for too long,
// because that prevents other transactions from committing.
Transaction *transaction;
{
auto committed_transactions_ptr = committed_transactions_.Lock();
if (committed_transactions_ptr->empty() || is_analytical_storage_mode) {
break;
}
transaction = &committed_transactions_ptr->front();
}
auto commit_timestamp = transaction->commit_timestamp->load(std::memory_order_acquire);
if (commit_timestamp >= oldest_active_start_timestamp) {
break; break;
} }
transaction = &committed_transactions_ptr->front();
}
auto commit_timestamp = transaction->commit_timestamp->load(std::memory_order_acquire); // When unlinking a delta which is the first delta in its version chain,
if (commit_timestamp >= oldest_active_start_timestamp) { // special care has to be taken to avoid the following race condition:
break; //
} // [Vertex] --> [Delta A]
//
// GC thread: Delta A is the first in its chain, it must be unlinked from
// vertex and marked for deletion
// TX thread: Update vertex and add Delta B with Delta A as next
//
// [Vertex] --> [Delta B] <--> [Delta A]
//
// GC thread: Unlink delta from Vertex
//
// [Vertex] --> (nullptr)
//
// When processing a delta that is the first one in its chain, we
// obtain the corresponding vertex or edge lock, and then verify that this
// delta still is the first in its chain.
// When processing a delta that is in the middle of the chain we only
// process the final delta of the given transaction in that chain. We
// determine the owner of the chain (either a vertex or an edge), obtain the
// corresponding lock, and then verify that this delta is still in the same
// position as it was before taking the lock.
//
// Even though the delta chain is lock-free (both `next` and `prev`) the
// chain should not be modified without taking the lock from the object that
// owns the chain (either a vertex or an edge). Modifying the chain without
// taking the lock will cause subtle race conditions that will leave the
// chain in a broken state.
// The chain can be only read without taking any locks.
// When unlinking a delta which is the first delta in its version chain, for (Delta &delta : transaction->deltas) {
// special care has to be taken to avoid the following race condition: while (true) {
// auto prev = delta.prev.Get();
// [Vertex] --> [Delta A] switch (prev.type) {
// case PreviousPtr::Type::VERTEX: {
// GC thread: Delta A is the first in its chain, it must be unlinked from Vertex *vertex = prev.vertex;
// vertex and marked for deletion std::lock_guard<utils::SpinLock> vertex_guard(vertex->lock);
// TX thread: Update vertex and add Delta B with Delta A as next if (vertex->delta != &delta) {
// // Something changed, we're not the first delta in the chain
// [Vertex] --> [Delta B] <--> [Delta A] // anymore.
// continue;
// GC thread: Unlink delta from Vertex }
// vertex->delta = nullptr;
// [Vertex] --> (nullptr) if (vertex->deleted) {
// current_deleted_vertices.push_back(vertex->gid);
// When processing a delta that is the first one in its chain, we }
// obtain the corresponding vertex or edge lock, and then verify that this
// delta still is the first in its chain.
// When processing a delta that is in the middle of the chain we only
// process the final delta of the given transaction in that chain. We
// determine the owner of the chain (either a vertex or an edge), obtain the
// corresponding lock, and then verify that this delta is still in the same
// position as it was before taking the lock.
//
// Even though the delta chain is lock-free (both `next` and `prev`) the
// chain should not be modified without taking the lock from the object that
// owns the chain (either a vertex or an edge). Modifying the chain without
// taking the lock will cause subtle race conditions that will leave the
// chain in a broken state.
// The chain can be only read without taking any locks.
for (Delta &delta : transaction->deltas) {
while (true) {
auto prev = delta.prev.Get();
switch (prev.type) {
case PreviousPtr::Type::VERTEX: {
Vertex *vertex = prev.vertex;
std::lock_guard<utils::SpinLock> vertex_guard(vertex->lock);
if (vertex->delta != &delta) {
// Something changed, we're not the first delta in the chain
// anymore.
continue;
}
vertex->delta = nullptr;
if (vertex->deleted) {
current_deleted_vertices.push_back(vertex->gid);
}
break;
}
case PreviousPtr::Type::EDGE: {
Edge *edge = prev.edge;
std::lock_guard<utils::SpinLock> edge_guard(edge->lock);
if (edge->delta != &delta) {
// Something changed, we're not the first delta in the chain
// anymore.
continue;
}
edge->delta = nullptr;
if (edge->deleted) {
current_deleted_edges.push_back(edge->gid);
}
break;
}
case PreviousPtr::Type::DELTA: {
if (prev.delta->timestamp->load(std::memory_order_acquire) == commit_timestamp) {
// The delta that is newer than this one is also a delta from this
// transaction. We skip the current delta and will remove it as a
// part of the suffix later.
break; break;
} }
std::unique_lock<utils::SpinLock> guard; case PreviousPtr::Type::EDGE: {
{ Edge *edge = prev.edge;
// We need to find the parent object in order to be able to use std::lock_guard<utils::SpinLock> edge_guard(edge->lock);
// its lock. if (edge->delta != &delta) {
auto parent = prev; // Something changed, we're not the first delta in the chain
while (parent.type == PreviousPtr::Type::DELTA) { // anymore.
parent = parent.delta->prev.Get(); continue;
} }
switch (parent.type) { edge->delta = nullptr;
case PreviousPtr::Type::VERTEX: if (edge->deleted) {
guard = std::unique_lock<utils::SpinLock>(parent.vertex->lock); current_deleted_edges.push_back(edge->gid);
break;
case PreviousPtr::Type::EDGE:
guard = std::unique_lock<utils::SpinLock>(parent.edge->lock);
break;
case PreviousPtr::Type::DELTA:
case PreviousPtr::Type::NULLPTR:
LOG_FATAL("Invalid database state!");
} }
break;
} }
if (delta.prev.Get() != prev) { case PreviousPtr::Type::DELTA: {
// Something changed, we could now be the first delta in the if (prev.delta->timestamp->load(std::memory_order_acquire) == commit_timestamp) {
// chain. // The delta that is newer than this one is also a delta from this
continue; // transaction. We skip the current delta and will remove it as a
// part of the suffix later.
break;
}
std::unique_lock<utils::SpinLock> guard;
{
// We need to find the parent object in order to be able to use
// its lock.
auto parent = prev;
while (parent.type == PreviousPtr::Type::DELTA) {
parent = parent.delta->prev.Get();
}
switch (parent.type) {
case PreviousPtr::Type::VERTEX:
guard = std::unique_lock<utils::SpinLock>(parent.vertex->lock);
break;
case PreviousPtr::Type::EDGE:
guard = std::unique_lock<utils::SpinLock>(parent.edge->lock);
break;
case PreviousPtr::Type::DELTA:
case PreviousPtr::Type::NULLPTR:
LOG_FATAL("Invalid database state!");
}
}
if (delta.prev.Get() != prev) {
// Something changed, we could now be the first delta in the
// chain.
continue;
}
Delta *prev_delta = prev.delta;
prev_delta->next.store(nullptr, std::memory_order_release);
break;
}
case PreviousPtr::Type::NULLPTR: {
LOG_FATAL("Invalid pointer!");
} }
Delta *prev_delta = prev.delta;
prev_delta->next.store(nullptr, std::memory_order_release);
break;
}
case PreviousPtr::Type::NULLPTR: {
LOG_FATAL("Invalid pointer!");
} }
break;
} }
break;
} }
committed_transactions_.WithLock([&](auto &committed_transactions) {
unlinked_undo_buffers.emplace_back(0, std::move(transaction->deltas));
committed_transactions.pop_front();
});
} }
committed_transactions_.WithLock([&](auto &committed_transactions) {
unlinked_undo_buffers.emplace_back(0, std::move(transaction->deltas));
committed_transactions.pop_front();
});
} }
// After unlinking deltas from vertices, we refresh the indices. That way // After unlinking deltas from vertices, we refresh the indices. That way
// we're sure that none of the vertices from `current_deleted_vertices` // we're sure that none of the vertices from `current_deleted_vertices`
// appears in an index, and we can safely remove the from the main storage // appears in an index, and we can safely remove the from the main storage
@ -1632,22 +1666,26 @@ void Storage::CollectGarbage() {
} }
{ {
std::unique_lock<utils::SpinLock> guard(engine_lock_); uint64_t mark_timestamp = 0;
uint64_t mark_timestamp = timestamp_; if (!is_analytical_storage_mode) {
// Take garbage_undo_buffers lock while holding the engine lock to make std::unique_lock<utils::SpinLock> guard(engine_lock_);
// sure that entries are sorted by mark timestamp in the list. // uint64_t mark_timestamp = timestamp_;
garbage_undo_buffers_.WithLock([&](auto &garbage_undo_buffers) { mark_timestamp = timestamp_;
// Release engine lock because we don't have to hold it anymore and // Take garbage_undo_buffers lock while holding the engine lock to make
// this could take a long time. // sure that entries are sorted by mark timestamp in the list.
guard.unlock(); garbage_undo_buffers_.WithLock([&](auto &garbage_undo_buffers) {
// TODO(mtomic): holding garbage_undo_buffers_ lock here prevents // Release engine lock because we don't have to hold it anymore and
// transactions from aborting until we're done marking, maybe we should // this could take a long time.
// add them one-by-one or something guard.unlock();
for (auto &[timestamp, undo_buffer] : unlinked_undo_buffers) { // TODO(mtomic): holding garbage_undo_buffers_ lock here prevents
timestamp = mark_timestamp; // transactions from aborting until we're done marking, maybe we should
} // add them one-by-one or something
garbage_undo_buffers.splice(garbage_undo_buffers.end(), unlinked_undo_buffers); for (auto &[timestamp, undo_buffer] : unlinked_undo_buffers) {
}); timestamp = mark_timestamp;
}
garbage_undo_buffers.splice(garbage_undo_buffers.end(), unlinked_undo_buffers);
});
}
for (auto vertex : current_deleted_vertices) { for (auto vertex : current_deleted_vertices) {
garbage_vertices_.emplace_back(mark_timestamp, vertex); garbage_vertices_.emplace_back(mark_timestamp, vertex);
} }

View File

@ -67,6 +67,7 @@ class DeltaGenerator final {
auto gid = memgraph::storage::Gid::FromUint(gen_->vertices_count_++); auto gid = memgraph::storage::Gid::FromUint(gen_->vertices_count_++);
auto delta = memgraph::storage::CreateDeleteObjectDelta(&transaction_); auto delta = memgraph::storage::CreateDeleteObjectDelta(&transaction_);
auto &it = gen_->vertices_.emplace_back(gid, delta); auto &it = gen_->vertices_.emplace_back(gid, delta);
// TODO(gvolfing) This is likely problematic when deleting vertices in ANALYTICAL MODE...
if (delta != nullptr) { if (delta != nullptr) {
delta->prev.Set(&it); delta->prev.Set(&it);
} }