Improve performance of delta creation (#1129)

This commit is contained in:
Antonio Filipovic 2023-09-06 11:30:21 +02:00 committed by GitHub
parent 93992a275b
commit b6b32bec03
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 140 additions and 53 deletions

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd. // Copyright 2023 Memgraph Ltd.
// //
// Use of this software is governed by the Business Source License // Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source

View File

@ -54,6 +54,7 @@
#include "utils/exceptions.hpp" #include "utils/exceptions.hpp"
#include "utils/file.hpp" #include "utils/file.hpp"
#include "utils/logging.hpp" #include "utils/logging.hpp"
#include "utils/memory.hpp"
#include "utils/memory_tracker.hpp" #include "utils/memory_tracker.hpp"
#include "utils/message.hpp" #include "utils/message.hpp"
#include "utils/on_scope_exit.hpp" #include "utils/on_scope_exit.hpp"
@ -63,6 +64,7 @@
#include "utils/skip_list.hpp" #include "utils/skip_list.hpp"
#include "utils/stat.hpp" #include "utils/stat.hpp"
#include "utils/string.hpp" #include "utils/string.hpp"
#include "utils/typeinfo.hpp"
namespace memgraph::storage { namespace memgraph::storage {
@ -335,6 +337,8 @@ DiskStorage::DiskAccessor::~DiskAccessor() {
} }
FinalizeTransaction(); FinalizeTransaction();
transaction_.deltas.~Bond<PmrListDelta>();
} }
/// NOTE: This will create Delta object which will cause deletion of old key entry on the disk /// NOTE: This will create Delta object which will cause deletion of old key entry on the disk
@ -1638,9 +1642,9 @@ utils::BasicResult<StorageDataManipulationError, void> DiskStorage::DiskAccessor
auto *disk_storage = static_cast<DiskStorage *>(storage_); auto *disk_storage = static_cast<DiskStorage *>(storage_);
bool edge_import_mode_active = disk_storage->edge_import_status_ == EdgeImportMode::ACTIVE; bool edge_import_mode_active = disk_storage->edge_import_status_ == EdgeImportMode::ACTIVE;
if (transaction_.deltas.empty() || if (transaction_.deltas.use().empty() ||
(!edge_import_mode_active && (!edge_import_mode_active &&
std::all_of(transaction_.deltas.begin(), transaction_.deltas.end(), std::all_of(transaction_.deltas.use().begin(), transaction_.deltas.use().end(),
[](const Delta &delta) { return delta.action == Delta::Action::DELETE_DESERIALIZED_OBJECT; }))) { [](const Delta &delta) { return delta.action == Delta::Action::DELETE_DESERIALIZED_OBJECT; }))) {
} else { } else {
std::unique_lock<utils::SpinLock> engine_guard(storage_->engine_lock_); std::unique_lock<utils::SpinLock> engine_guard(storage_->engine_lock_);
@ -1758,9 +1762,9 @@ std::vector<std::pair<std::string, std::string>> DiskStorage::SerializeVerticesF
void DiskStorage::DiskAccessor::UpdateObjectsCountOnAbort() { void DiskStorage::DiskAccessor::UpdateObjectsCountOnAbort() {
auto *disk_storage = static_cast<DiskStorage *>(storage_); auto *disk_storage = static_cast<DiskStorage *>(storage_);
uint64_t transaction_id = transaction_.transaction_id.load(std::memory_order_acquire); uint64_t transaction_id = transaction_.transaction_id;
for (const auto &delta : transaction_.deltas) { for (const auto &delta : transaction_.deltas.use()) {
auto prev = delta.prev.Get(); auto prev = delta.prev.Get();
switch (prev.type) { switch (prev.type) {
case PreviousPtr::Type::VERTEX: { case PreviousPtr::Type::VERTEX: {

View File

@ -390,6 +390,7 @@ class DiskStorage final : public Storage {
private: private:
std::unique_ptr<RocksDBStorage> kvstore_; std::unique_ptr<RocksDBStorage> kvstore_;
std::unique_ptr<kvstore::KVStore> durability_kvstore_; std::unique_ptr<kvstore::KVStore> durability_kvstore_;
std::atomic<uint64_t> vertex_count_{0}; std::atomic<uint64_t> vertex_count_{0};
}; };

View File

@ -178,6 +178,9 @@ InMemoryStorage::~InMemoryStorage() {
} }
} }
} }
if (!committed_transactions_->empty()) {
committed_transactions_.WithLock([](auto &transactions) { transactions.clear(); });
}
} }
InMemoryStorage::InMemoryAccessor::InMemoryAccessor(InMemoryStorage *storage, IsolationLevel isolation_level, InMemoryStorage::InMemoryAccessor::InMemoryAccessor(InMemoryStorage *storage, IsolationLevel isolation_level,
@ -597,14 +600,14 @@ utils::BasicResult<StorageDataManipulationError, void> InMemoryStorage::InMemory
auto *mem_storage = static_cast<InMemoryStorage *>(storage_); auto *mem_storage = static_cast<InMemoryStorage *>(storage_);
if (transaction_.deltas.empty()) { if (transaction_.deltas.use().empty()) {
// We don't have to update the commit timestamp here because no one reads // We don't have to update the commit timestamp here because no one reads
// it. // it.
mem_storage->commit_log_->MarkFinished(transaction_.start_timestamp); mem_storage->commit_log_->MarkFinished(transaction_.start_timestamp);
} else { } else {
// Validate that existence constraints are satisfied for all modified // Validate that existence constraints are satisfied for all modified
// vertices. // vertices.
for (const auto &delta : transaction_.deltas) { for (const auto &delta : transaction_.deltas.use()) {
auto prev = delta.prev.Get(); auto prev = delta.prev.Get();
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!"); MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
if (prev.type != PreviousPtr::Type::VERTEX) { if (prev.type != PreviousPtr::Type::VERTEX) {
@ -636,7 +639,7 @@ utils::BasicResult<StorageDataManipulationError, void> InMemoryStorage::InMemory
// Before committing and validating vertices against unique constraints, // Before committing and validating vertices against unique constraints,
// we have to update unique constraints with the vertices that are going // we have to update unique constraints with the vertices that are going
// to be validated/committed. // to be validated/committed.
for (const auto &delta : transaction_.deltas) { for (const auto &delta : transaction_.deltas.use()) {
auto prev = delta.prev.Get(); auto prev = delta.prev.Get();
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!"); MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
if (prev.type != PreviousPtr::Type::VERTEX) { if (prev.type != PreviousPtr::Type::VERTEX) {
@ -647,7 +650,7 @@ utils::BasicResult<StorageDataManipulationError, void> InMemoryStorage::InMemory
// Validate that unique constraints are satisfied for all modified // Validate that unique constraints are satisfied for all modified
// vertices. // vertices.
for (const auto &delta : transaction_.deltas) { for (const auto &delta : transaction_.deltas.use()) {
auto prev = delta.prev.Get(); auto prev = delta.prev.Get();
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!"); MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
if (prev.type != PreviousPtr::Type::VERTEX) { if (prev.type != PreviousPtr::Type::VERTEX) {
@ -726,15 +729,15 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
std::list<Gid> my_deleted_vertices; std::list<Gid> my_deleted_vertices;
std::list<Gid> my_deleted_edges; std::list<Gid> my_deleted_edges;
for (const auto &delta : transaction_.deltas) { for (const auto &delta : transaction_.deltas.use()) {
auto prev = delta.prev.Get(); auto prev = delta.prev.Get();
switch (prev.type) { switch (prev.type) {
case PreviousPtr::Type::VERTEX: { case PreviousPtr::Type::VERTEX: {
auto *vertex = prev.vertex; auto *vertex = prev.vertex;
auto guard = std::unique_lock{vertex->lock}; auto guard = std::unique_lock{vertex->lock};
Delta *current = vertex->delta; Delta *current = vertex->delta;
while (current != nullptr && current->timestamp->load(std::memory_order_acquire) == while (current != nullptr &&
transaction_.transaction_id.load(std::memory_order_acquire)) { current->timestamp->load(std::memory_order_acquire) == transaction_.transaction_id) {
switch (current->action) { switch (current->action) {
case Delta::Action::REMOVE_LABEL: { case Delta::Action::REMOVE_LABEL: {
auto it = std::find(vertex->labels.begin(), vertex->labels.end(), current->label); auto it = std::find(vertex->labels.begin(), vertex->labels.end(), current->label);
@ -821,8 +824,8 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
auto *edge = prev.edge; auto *edge = prev.edge;
auto guard = std::lock_guard{edge->lock}; auto guard = std::lock_guard{edge->lock};
Delta *current = edge->delta; Delta *current = edge->delta;
while (current != nullptr && current->timestamp->load(std::memory_order_acquire) == while (current != nullptr &&
transaction_.transaction_id.load(std::memory_order_acquire)) { current->timestamp->load(std::memory_order_acquire) == transaction_.transaction_id) {
switch (current->action) { switch (current->action) {
case Delta::Action::SET_PROPERTY: { case Delta::Action::SET_PROPERTY: {
edge->properties.SetProperty(current->property.key, current->property.value); edge->properties.SetProperty(current->property.key, current->property.value);
@ -874,6 +877,7 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
// Release engine lock because we don't have to hold it anymore and // Release engine lock because we don't have to hold it anymore and
// emplace back could take a long time. // emplace back could take a long time.
engine_guard.unlock(); engine_guard.unlock();
garbage_undo_buffers.emplace_back(mark_timestamp, std::move(transaction_.deltas)); garbage_undo_buffers.emplace_back(mark_timestamp, std::move(transaction_.deltas));
}); });
mem_storage->deleted_vertices_.WithLock( mem_storage->deleted_vertices_.WithLock(
@ -1191,7 +1195,7 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::RWLock> main_guard)
// We don't move undo buffers of unlinked transactions to garbage_undo_buffers // We don't move undo buffers of unlinked transactions to garbage_undo_buffers
// list immediately, because we would have to repeatedly take // list immediately, because we would have to repeatedly take
// garbage_undo_buffers lock. // garbage_undo_buffers lock.
std::list<std::pair<uint64_t, std::list<Delta>>> unlinked_undo_buffers; std::list<std::pair<uint64_t, BondPmrLd>> unlinked_undo_buffers;
// We will only free vertices deleted up until now in this GC cycle, and we // We will only free vertices deleted up until now in this GC cycle, and we
// will do it after cleaning-up the indices. That way we are sure that all // will do it after cleaning-up the indices. That way we are sure that all
@ -1259,7 +1263,7 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::RWLock> main_guard)
// chain in a broken state. // chain in a broken state.
// The chain can be only read without taking any locks. // The chain can be only read without taking any locks.
for (Delta &delta : transaction->deltas) { for (Delta &delta : transaction->deltas.use()) {
while (true) { while (true) {
auto prev = delta.prev.Get(); auto prev = delta.prev.Get();
switch (prev.type) { switch (prev.type) {
@ -1365,10 +1369,10 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::RWLock> main_guard)
// TODO(mtomic): holding garbage_undo_buffers_ lock here prevents // TODO(mtomic): holding garbage_undo_buffers_ lock here prevents
// transactions from aborting until we're done marking, maybe we should // transactions from aborting until we're done marking, maybe we should
// add them one-by-one or something // add them one-by-one or something
for (auto &[timestamp, undo_buffer] : unlinked_undo_buffers) { for (auto &[timestamp, transaction_deltas] : unlinked_undo_buffers) {
timestamp = mark_timestamp; timestamp = mark_timestamp;
} }
garbage_undo_buffers.splice(garbage_undo_buffers.end(), unlinked_undo_buffers); garbage_undo_buffers.splice(garbage_undo_buffers.end(), std::move(unlinked_undo_buffers));
}); });
for (auto vertex : current_deleted_vertices) { for (auto vertex : current_deleted_vertices) {
garbage_vertices_.emplace_back(mark_timestamp, vertex); garbage_vertices_.emplace_back(mark_timestamp, vertex);
@ -1379,9 +1383,17 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::RWLock> main_guard)
// if force is set to true we can simply delete all the leftover undos because // if force is set to true we can simply delete all the leftover undos because
// no transaction is active // no transaction is active
if constexpr (force) { if constexpr (force) {
for (auto &[timestamp, transaction_deltas] : undo_buffers) {
transaction_deltas.~Bond<PmrListDelta>();
}
undo_buffers.clear(); undo_buffers.clear();
} else { } else {
while (!undo_buffers.empty() && undo_buffers.front().first <= oldest_active_start_timestamp) { while (!undo_buffers.empty() && undo_buffers.front().first <= oldest_active_start_timestamp) {
auto &[timestamp, transaction_deltas] = undo_buffers.front();
transaction_deltas.~Bond<PmrListDelta>();
// this will trigger destory of object
// but since we release pointer, it will just destory other stuff
undo_buffers.pop_front(); undo_buffers.pop_front();
} }
} }
@ -1527,7 +1539,7 @@ bool InMemoryStorage::AppendToWalDataManipulation(const Transaction &transaction
// 1. Process all Vertex deltas and store all operations that create vertices // 1. Process all Vertex deltas and store all operations that create vertices
// and modify vertex data. // and modify vertex data.
for (const auto &delta : transaction.deltas) { for (const auto &delta : transaction.deltas.use()) {
auto prev = delta.prev.Get(); auto prev = delta.prev.Get();
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!"); MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
if (prev.type != PreviousPtr::Type::VERTEX) continue; if (prev.type != PreviousPtr::Type::VERTEX) continue;
@ -1550,7 +1562,7 @@ bool InMemoryStorage::AppendToWalDataManipulation(const Transaction &transaction
}); });
} }
// 2. Process all Vertex deltas and store all operations that create edges. // 2. Process all Vertex deltas and store all operations that create edges.
for (const auto &delta : transaction.deltas) { for (const auto &delta : transaction.deltas.use()) {
auto prev = delta.prev.Get(); auto prev = delta.prev.Get();
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!"); MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
if (prev.type != PreviousPtr::Type::VERTEX) continue; if (prev.type != PreviousPtr::Type::VERTEX) continue;
@ -1572,7 +1584,7 @@ bool InMemoryStorage::AppendToWalDataManipulation(const Transaction &transaction
}); });
} }
// 3. Process all Edge deltas and store all operations that modify edge data. // 3. Process all Edge deltas and store all operations that modify edge data.
for (const auto &delta : transaction.deltas) { for (const auto &delta : transaction.deltas.use()) {
auto prev = delta.prev.Get(); auto prev = delta.prev.Get();
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!"); MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
if (prev.type != PreviousPtr::Type::EDGE) continue; if (prev.type != PreviousPtr::Type::EDGE) continue;
@ -1594,7 +1606,7 @@ bool InMemoryStorage::AppendToWalDataManipulation(const Transaction &transaction
}); });
} }
// 4. Process all Vertex deltas and store all operations that delete edges. // 4. Process all Vertex deltas and store all operations that delete edges.
for (const auto &delta : transaction.deltas) { for (const auto &delta : transaction.deltas.use()) {
auto prev = delta.prev.Get(); auto prev = delta.prev.Get();
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!"); MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
if (prev.type != PreviousPtr::Type::VERTEX) continue; if (prev.type != PreviousPtr::Type::VERTEX) continue;
@ -1616,7 +1628,7 @@ bool InMemoryStorage::AppendToWalDataManipulation(const Transaction &transaction
}); });
} }
// 5. Process all Vertex deltas and store all operations that delete vertices. // 5. Process all Vertex deltas and store all operations that delete vertices.
for (const auto &delta : transaction.deltas) { for (const auto &delta : transaction.deltas.use()) {
auto prev = delta.prev.Get(); auto prev = delta.prev.Get();
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!"); MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
if (prev.type != PreviousPtr::Type::VERTEX) continue; if (prev.type != PreviousPtr::Type::VERTEX) continue;

View File

@ -12,6 +12,9 @@
#pragma once #pragma once
#include <cstddef> #include <cstddef>
#include <cstdint>
#include <memory>
#include <utility>
#include "storage/v2/inmemory/label_index.hpp" #include "storage/v2/inmemory/label_index.hpp"
#include "storage/v2/inmemory/label_property_index.hpp" #include "storage/v2/inmemory/label_property_index.hpp"
#include "storage/v2/storage.hpp" #include "storage/v2/storage.hpp"
@ -23,6 +26,9 @@
#include "storage/v2/replication/replication_persistence_helper.hpp" #include "storage/v2/replication/replication_persistence_helper.hpp"
#include "storage/v2/replication/rpc.hpp" #include "storage/v2/replication/rpc.hpp"
#include "storage/v2/replication/serialization.hpp" #include "storage/v2/replication/serialization.hpp"
#include "storage/v2/transaction.hpp"
#include "utils/memory.hpp"
#include "utils/synchronized.hpp"
namespace memgraph::storage { namespace memgraph::storage {
@ -438,12 +444,14 @@ class InMemoryStorage final : public Storage {
// `timestamp_` in a sensible unit, something like TransactionClock or // `timestamp_` in a sensible unit, something like TransactionClock or
// whatever. // whatever.
std::optional<CommitLog> commit_log_; std::optional<CommitLog> commit_log_;
utils::Synchronized<std::list<Transaction>, utils::SpinLock> committed_transactions_; utils::Synchronized<std::list<Transaction>, utils::SpinLock> committed_transactions_;
utils::Scheduler gc_runner_; utils::Scheduler gc_runner_;
std::mutex gc_lock_; std::mutex gc_lock_;
// Undo buffers that were unlinked and now are waiting to be freed. using BondPmrLd = Bond<utils::pmr::list<Delta>>;
utils::Synchronized<std::list<std::pair<uint64_t, std::list<Delta>>>, utils::SpinLock> garbage_undo_buffers_; // Ownership of unlinked deltas is transfered to garabage_undo_buffers once transaction is commited
utils::Synchronized<std::list<std::pair<uint64_t, BondPmrLd>>, utils::SpinLock> garbage_undo_buffers_;
// Vertices that are logically deleted but still have to be removed from // Vertices that are logically deleted but still have to be removed from
// indices before removing them from the main storage. // indices before removing them from the main storage.

View File

@ -65,7 +65,7 @@ bool LastCommittedVersionHasLabelProperty(const Vertex &vertex, LabelId label, c
while (delta != nullptr) { while (delta != nullptr) {
auto ts = delta->timestamp->load(std::memory_order_acquire); auto ts = delta->timestamp->load(std::memory_order_acquire);
if (ts < commit_timestamp || ts == transaction.transaction_id.load(std::memory_order_acquire)) { if (ts < commit_timestamp || ts == transaction.transaction_id) {
break; break;
} }

View File

@ -41,7 +41,8 @@ inline std::size_t ApplyDeltasForRead(Transaction const *transaction, const Delt
// This allows the transaction to see its changes even though it's committed. // This allows the transaction to see its changes even though it's committed.
const auto commit_timestamp = transaction->commit_timestamp const auto commit_timestamp = transaction->commit_timestamp
? transaction->commit_timestamp->load(std::memory_order_acquire) ? transaction->commit_timestamp->load(std::memory_order_acquire)
: transaction->transaction_id.load(std::memory_order_acquire); : transaction->transaction_id;
std::size_t n_processed = 0; std::size_t n_processed = 0;
while (delta != nullptr) { while (delta != nullptr) {
// For SNAPSHOT ISOLATION -> we can only see the changes which were committed before the start of the current // For SNAPSHOT ISOLATION -> we can only see the changes which were committed before the start of the current
@ -94,7 +95,7 @@ template <typename TObj>
inline bool PrepareForWrite(Transaction *transaction, TObj *object) { inline bool PrepareForWrite(Transaction *transaction, TObj *object) {
if (object->delta == nullptr) return true; if (object->delta == nullptr) return true;
auto ts = object->delta->timestamp->load(std::memory_order_acquire); auto ts = object->delta->timestamp->load(std::memory_order_acquire);
if (ts == transaction->transaction_id.load(std::memory_order_acquire) || ts < transaction->start_timestamp) { if (ts == transaction->transaction_id || ts < transaction->start_timestamp) {
return true; return true;
} }
@ -112,7 +113,7 @@ inline Delta *CreateDeleteObjectDelta(Transaction *transaction) {
return nullptr; return nullptr;
} }
transaction->EnsureCommitTimestampExists(); transaction->EnsureCommitTimestampExists();
return &transaction->deltas.emplace_back(Delta::DeleteObjectTag(), transaction->commit_timestamp.get(), return &transaction->deltas.use().emplace_back(Delta::DeleteObjectTag(), transaction->commit_timestamp.get(),
transaction->command_id); transaction->command_id);
} }
@ -125,11 +126,12 @@ inline Delta *CreateDeleteObjectDelta(Transaction *transaction, std::list<Delta>
} }
/// TODO: what if in-memory analytical /// TODO: what if in-memory analytical
inline Delta *CreateDeleteDeserializedObjectDelta(Transaction *transaction, std::optional<std::string> old_disk_key, inline Delta *CreateDeleteDeserializedObjectDelta(Transaction *transaction, std::optional<std::string> old_disk_key,
std::string &&ts) { std::string &&ts) {
// Should use utils::DecodeFixed64(ts.c_str()) once we will move to RocksDB real timestamps
transaction->EnsureCommitTimestampExists(); transaction->EnsureCommitTimestampExists();
return &transaction->deltas.emplace_back(Delta::DeleteDeserializedObjectTag(), std::stoull(ts), old_disk_key); // Should use utils::DecodeFixed64(ts.c_str()) once we will move to RocksDB real timestamps
return &transaction->deltas.use().emplace_back(Delta::DeleteDeserializedObjectTag(), std::stoull(ts), old_disk_key);
} }
inline Delta *CreateDeleteDeserializedObjectDelta(std::list<Delta> *deltas, std::optional<std::string> old_disk_key, inline Delta *CreateDeleteDeserializedObjectDelta(std::list<Delta> *deltas, std::optional<std::string> old_disk_key,
@ -159,7 +161,7 @@ inline void CreateAndLinkDelta(Transaction *transaction, TObj *object, Args &&..
return; return;
} }
transaction->EnsureCommitTimestampExists(); transaction->EnsureCommitTimestampExists();
auto delta = &transaction->deltas.emplace_back(std::forward<Args>(args)..., transaction->commit_timestamp.get(), auto delta = &transaction->deltas.use().emplace_back(std::forward<Args>(args)..., transaction->commit_timestamp.get(),
transaction->command_id); transaction->command_id);
// The operations are written in such order so that both `next` and `prev` // The operations are written in such order so that both `next` and `prev`

View File

@ -117,7 +117,7 @@ StorageMode Storage::Accessor::GetCreationStorageMode() const { return creation_
std::optional<uint64_t> Storage::Accessor::GetTransactionId() const { std::optional<uint64_t> Storage::Accessor::GetTransactionId() const {
if (is_transaction_active_) { if (is_transaction_active_) {
return transaction_.transaction_id.load(std::memory_order_acquire); return transaction_.transaction_id;
} }
return {}; return {};
} }

View File

@ -16,6 +16,7 @@
#include <list> #include <list>
#include <memory> #include <memory>
#include "utils/memory.hpp"
#include "utils/skip_list.hpp" #include "utils/skip_list.hpp"
#include "storage/v2/delta.hpp" #include "storage/v2/delta.hpp"
@ -27,11 +28,14 @@
#include "storage/v2/vertex.hpp" #include "storage/v2/vertex.hpp"
#include "storage/v2/vertex_info_cache.hpp" #include "storage/v2/vertex_info_cache.hpp"
#include "storage/v2/view.hpp" #include "storage/v2/view.hpp"
#include "utils/bond.hpp"
#include "utils/pmr/list.hpp"
namespace memgraph::storage { namespace memgraph::storage {
const uint64_t kTimestampInitialId = 0; const uint64_t kTimestampInitialId = 0;
const uint64_t kTransactionInitialId = 1ULL << 63U; const uint64_t kTransactionInitialId = 1ULL << 63U;
using PmrListDelta = utils::pmr::list<Delta>;
struct Transaction { struct Transaction {
Transaction(uint64_t transaction_id, uint64_t start_timestamp, IsolationLevel isolation_level, Transaction(uint64_t transaction_id, uint64_t start_timestamp, IsolationLevel isolation_level,
@ -39,13 +43,14 @@ struct Transaction {
: transaction_id(transaction_id), : transaction_id(transaction_id),
start_timestamp(start_timestamp), start_timestamp(start_timestamp),
command_id(0), command_id(0),
deltas(1024UL),
must_abort(false), must_abort(false),
isolation_level(isolation_level), isolation_level(isolation_level),
storage_mode(storage_mode), storage_mode(storage_mode),
edge_import_mode_active(edge_import_mode_active) {} edge_import_mode_active(edge_import_mode_active) {}
Transaction(Transaction &&other) noexcept Transaction(Transaction &&other) noexcept
: transaction_id(other.transaction_id.load(std::memory_order_acquire)), : transaction_id(other.transaction_id),
start_timestamp(other.start_timestamp), start_timestamp(other.start_timestamp),
commit_timestamp(std::move(other.commit_timestamp)), commit_timestamp(std::move(other.commit_timestamp)),
command_id(other.command_id), command_id(other.command_id),
@ -67,7 +72,7 @@ struct Transaction {
/// @throw std::bad_alloc if failed to create the `commit_timestamp` /// @throw std::bad_alloc if failed to create the `commit_timestamp`
void EnsureCommitTimestampExists() { void EnsureCommitTimestampExists() {
if (commit_timestamp != nullptr) return; if (commit_timestamp != nullptr) return;
commit_timestamp = std::make_unique<std::atomic<uint64_t>>(transaction_id.load(std::memory_order_relaxed)); commit_timestamp = std::make_unique<std::atomic<uint64_t>>(transaction_id);
} }
void AddModifiedEdge(Gid gid, ModifiedEdgeInfo modified_edge) { void AddModifiedEdge(Gid gid, ModifiedEdgeInfo modified_edge) {
@ -78,7 +83,7 @@ struct Transaction {
void RemoveModifiedEdge(const Gid &gid) { modified_edges_.erase(gid); } void RemoveModifiedEdge(const Gid &gid) { modified_edges_.erase(gid); }
std::atomic<uint64_t> transaction_id; uint64_t transaction_id;
uint64_t start_timestamp; uint64_t start_timestamp;
// The `Transaction` object is stack allocated, but the `commit_timestamp` // The `Transaction` object is stack allocated, but the `commit_timestamp`
// must be heap allocated because `Delta`s have a pointer to it, and that // must be heap allocated because `Delta`s have a pointer to it, and that
@ -86,7 +91,8 @@ struct Transaction {
// `commited_transactions_` list for GC. // `commited_transactions_` list for GC.
std::unique_ptr<std::atomic<uint64_t>> commit_timestamp; std::unique_ptr<std::atomic<uint64_t>> commit_timestamp;
uint64_t command_id; uint64_t command_id;
std::list<Delta> deltas;
Bond<PmrListDelta> deltas;
bool must_abort; bool must_abort;
IsolationLevel isolation_level; IsolationLevel isolation_level;
StorageMode storage_mode; StorageMode storage_mode;
@ -102,16 +108,12 @@ struct Transaction {
}; };
inline bool operator==(const Transaction &first, const Transaction &second) { inline bool operator==(const Transaction &first, const Transaction &second) {
return first.transaction_id.load(std::memory_order_acquire) == second.transaction_id.load(std::memory_order_acquire); return first.transaction_id == second.transaction_id;
} }
inline bool operator<(const Transaction &first, const Transaction &second) { inline bool operator<(const Transaction &first, const Transaction &second) {
return first.transaction_id.load(std::memory_order_acquire) < second.transaction_id.load(std::memory_order_acquire); return first.transaction_id < second.transaction_id;
}
inline bool operator==(const Transaction &first, const uint64_t &second) {
return first.transaction_id.load(std::memory_order_acquire) == second;
}
inline bool operator<(const Transaction &first, const uint64_t &second) {
return first.transaction_id.load(std::memory_order_acquire) < second;
} }
inline bool operator==(const Transaction &first, const uint64_t &second) { return first.transaction_id == second; }
inline bool operator<(const Transaction &first, const uint64_t &second) { return first.transaction_id < second; }
} // namespace memgraph::storage } // namespace memgraph::storage

57
src/utils/bond.hpp Normal file
View File

@ -0,0 +1,57 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <cstddef>
#include <memory>
#include "storage/v2/delta.hpp"
#include "utils/memory.hpp"
#include "utils/pmr/list.hpp"
/// struct Bond presents the association of a collection and its resource
/// and makes them tightly bound for easier handling of construction, moving and destruction
/// of container
template <typename Container>
struct Bond {
using resource = memgraph::utils::MonotonicBufferResource;
explicit Bond(std::size_t initial_size)
: res_(std::make_unique<resource>(initial_size)),
container_(memgraph::utils::Allocator<Container>(res_.get()).template new_object<Container>()){};
Bond(Bond &&other) noexcept : res_(std::exchange(other.res_, nullptr)), container_(other.container_) {
other.container_ = nullptr;
}
Bond(const Bond &other) = delete;
Bond &operator=(const Bond &other) = delete;
Bond &operator=(Bond &&other) = delete;
auto use() -> Container & { return *container_; }
auto use() const -> const Container & { return *container_; }
auto res() -> resource * { return res_.get(); }
~Bond() {
if (res_) {
memgraph::utils::Allocator<Container>(res_.get()).delete_object(container_);
container_ = nullptr;
res_->Release();
res_ = nullptr;
}
}
private:
std::unique_ptr<resource> res_{nullptr};
Container *container_{nullptr};
};

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd. // Copyright 2023 Memgraph Ltd.
// //
// Use of this software is governed by the Business Source License // Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd. // Copyright 2023 Memgraph Ltd.
// //
// Use of this software is governed by the Business Source License // Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd. // Copyright 2023 Memgraph Ltd.
// //
// Use of this software is governed by the Business Source License // Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source

View File

@ -25,6 +25,7 @@
#include "storage_test_utils.hpp" #include "storage_test_utils.hpp"
#include "utils/file.hpp" #include "utils/file.hpp"
#include "utils/file_locker.hpp" #include "utils/file_locker.hpp"
#include "utils/memory.hpp"
#include "utils/uuid.hpp" #include "utils/uuid.hpp"
// Helper function used to convert between enum types. // Helper function used to convert between enum types.
@ -144,8 +145,8 @@ class DeltaGenerator final {
void Finalize(bool append_transaction_end = true) { void Finalize(bool append_transaction_end = true) {
auto commit_timestamp = gen_->timestamp_++; auto commit_timestamp = gen_->timestamp_++;
if (transaction_.deltas.empty()) return; if (transaction_.deltas.use().empty()) return;
for (const auto &delta : transaction_.deltas) { for (const auto &delta : transaction_.deltas.use()) {
auto owner = delta.prev.Get(); auto owner = delta.prev.Get();
while (owner.type == memgraph::storage::PreviousPtr::Type::DELTA) { while (owner.type == memgraph::storage::PreviousPtr::Type::DELTA) {
owner = owner.delta->prev.Get(); owner = owner.delta->prev.Get();
@ -161,7 +162,7 @@ class DeltaGenerator final {
if (append_transaction_end) { if (append_transaction_end) {
gen_->wal_file_.AppendTransactionEnd(commit_timestamp); gen_->wal_file_.AppendTransactionEnd(commit_timestamp);
if (gen_->valid_) { if (gen_->valid_) {
gen_->UpdateStats(commit_timestamp, transaction_.deltas.size() + 1); gen_->UpdateStats(commit_timestamp, transaction_.deltas.use().size() + 1);
for (auto &data : data_) { for (auto &data : data_) {
if (data.type == memgraph::storage::durability::WalDeltaData::Type::VERTEX_SET_PROPERTY) { if (data.type == memgraph::storage::durability::WalDeltaData::Type::VERTEX_SET_PROPERTY) {
// We need to put the final property value into the SET_PROPERTY // We need to put the final property value into the SET_PROPERTY