Bugfix: correct replication handler (#1540)

Fixes root cause of a cascade of failures in replication code:
- Replica handling of deleting an edge is now corrected. Now tolerant of multiple edges of the same relationship type.
- Improved robustness: correct exception handling around failed stream of current WAL file. This now means a REPLICA failure will no longer prevent transactions on MAIN from performing WAL writes.
- Slightly better diagnostic messages, not user friendly but helps get developer to correct root cause quicker.
- Proactively remove vertex+edges during Abort rather than defer to GC to do that work, this included fixing constraints and indexes to be safe.


Co-authored-by: Andreja Tonev <andreja.tonev@memgraph.io>
This commit is contained in:
Gareth Andrew Lloyd 2023-12-01 12:38:48 +00:00 committed by GitHub
parent 7fc9b89634
commit 14f92b4a0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 558 additions and 70 deletions

View File

@ -370,8 +370,9 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage
constexpr bool kSharedAccess = false;
std::optional<std::pair<uint64_t, storage::InMemoryStorage::ReplicationAccessor>> commit_timestamp_and_accessor;
auto get_transaction = [storage, &commit_timestamp_and_accessor](uint64_t commit_timestamp,
bool unique = kSharedAccess) {
auto const get_transaction = [storage, &commit_timestamp_and_accessor](
uint64_t commit_timestamp,
bool unique = kSharedAccess) -> storage::InMemoryStorage::ReplicationAccessor * {
if (!commit_timestamp_and_accessor) {
std::unique_ptr<storage::Storage::Accessor> acc = nullptr;
if (unique) {
@ -415,9 +416,11 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage
spdlog::trace(" Delete vertex {}", delta.vertex_create_delete.gid.AsUint());
auto *transaction = get_transaction(timestamp);
auto vertex = transaction->FindVertex(delta.vertex_create_delete.gid, View::NEW);
if (!vertex) throw utils::BasicException("Invalid transaction!");
if (!vertex)
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
auto ret = transaction->DeleteVertex(&*vertex);
if (ret.HasError() || !ret.GetValue()) throw utils::BasicException("Invalid transaction!");
if (ret.HasError() || !ret.GetValue())
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
break;
}
case WalDeltaData::Type::VERTEX_ADD_LABEL: {
@ -425,9 +428,11 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage
delta.vertex_add_remove_label.label);
auto *transaction = get_transaction(timestamp);
auto vertex = transaction->FindVertex(delta.vertex_add_remove_label.gid, View::NEW);
if (!vertex) throw utils::BasicException("Invalid transaction!");
if (!vertex)
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
auto ret = vertex->AddLabel(transaction->NameToLabel(delta.vertex_add_remove_label.label));
if (ret.HasError() || !ret.GetValue()) throw utils::BasicException("Invalid transaction!");
if (ret.HasError() || !ret.GetValue())
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
break;
}
case WalDeltaData::Type::VERTEX_REMOVE_LABEL: {
@ -435,9 +440,11 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage
delta.vertex_add_remove_label.label);
auto *transaction = get_transaction(timestamp);
auto vertex = transaction->FindVertex(delta.vertex_add_remove_label.gid, View::NEW);
if (!vertex) throw utils::BasicException("Invalid transaction!");
if (!vertex)
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
auto ret = vertex->RemoveLabel(transaction->NameToLabel(delta.vertex_add_remove_label.label));
if (ret.HasError() || !ret.GetValue()) throw utils::BasicException("Invalid transaction!");
if (ret.HasError() || !ret.GetValue())
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
break;
}
case WalDeltaData::Type::VERTEX_SET_PROPERTY: {
@ -445,10 +452,12 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage
delta.vertex_edge_set_property.property, delta.vertex_edge_set_property.value);
auto *transaction = get_transaction(timestamp);
auto vertex = transaction->FindVertex(delta.vertex_edge_set_property.gid, View::NEW);
if (!vertex) throw utils::BasicException("Invalid transaction!");
if (!vertex)
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
auto ret = vertex->SetProperty(transaction->NameToProperty(delta.vertex_edge_set_property.property),
delta.vertex_edge_set_property.value);
if (ret.HasError()) throw utils::BasicException("Invalid transaction!");
if (ret.HasError())
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
break;
}
case WalDeltaData::Type::EDGE_CREATE: {
@ -457,13 +466,16 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage
delta.edge_create_delete.from_vertex.AsUint(), delta.edge_create_delete.to_vertex.AsUint());
auto *transaction = get_transaction(timestamp);
auto from_vertex = transaction->FindVertex(delta.edge_create_delete.from_vertex, View::NEW);
if (!from_vertex) throw utils::BasicException("Invalid transaction!");
if (!from_vertex)
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
auto to_vertex = transaction->FindVertex(delta.edge_create_delete.to_vertex, View::NEW);
if (!to_vertex) throw utils::BasicException("Invalid transaction!");
if (!to_vertex)
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
auto edge = transaction->CreateEdgeEx(&*from_vertex, &*to_vertex,
transaction->NameToEdgeType(delta.edge_create_delete.edge_type),
delta.edge_create_delete.gid);
if (edge.HasError()) throw utils::BasicException("Invalid transaction!");
if (edge.HasError())
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
break;
}
case WalDeltaData::Type::EDGE_DELETE: {
@ -472,16 +484,17 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage
delta.edge_create_delete.from_vertex.AsUint(), delta.edge_create_delete.to_vertex.AsUint());
auto *transaction = get_transaction(timestamp);
auto from_vertex = transaction->FindVertex(delta.edge_create_delete.from_vertex, View::NEW);
if (!from_vertex) throw utils::BasicException("Invalid transaction!");
if (!from_vertex)
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
auto to_vertex = transaction->FindVertex(delta.edge_create_delete.to_vertex, View::NEW);
if (!to_vertex) throw utils::BasicException("Invalid transaction!");
auto edges = from_vertex->OutEdges(View::NEW, {transaction->NameToEdgeType(delta.edge_create_delete.edge_type)},
&*to_vertex);
if (edges.HasError()) throw utils::BasicException("Invalid transaction!");
if (edges->edges.size() != 1) throw utils::BasicException("Invalid transaction!");
auto &edge = (*edges).edges[0];
auto ret = transaction->DeleteEdge(&edge);
if (ret.HasError()) throw utils::BasicException("Invalid transaction!");
if (!to_vertex)
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
auto edgeType = transaction->NameToEdgeType(delta.edge_create_delete.edge_type);
auto edge =
transaction->FindEdge(delta.edge_create_delete.gid, View::NEW, edgeType, &*from_vertex, &*to_vertex);
if (!edge) throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
if (auto ret = transaction->DeleteEdge(&*edge); ret.HasError())
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
break;
}
case WalDeltaData::Type::EDGE_SET_PROPERTY: {
@ -498,7 +511,8 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage
// yields an accessor that is only valid for managing the edge's
// properties.
auto edge = edge_acc.find(delta.vertex_edge_set_property.gid);
if (edge == edge_acc.end()) throw utils::BasicException("Invalid transaction!");
if (edge == edge_acc.end())
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
// The edge visibility check must be done here manually because we
// don't allow direct access to the edges through the public API.
{
@ -530,7 +544,8 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage
}
}
});
if (!is_visible) throw utils::BasicException("Invalid transaction!");
if (!is_visible)
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
}
EdgeRef edge_ref(&*edge);
// Here we create an edge accessor that we will use to get the
@ -543,7 +558,8 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage
auto ret = ea.SetProperty(transaction->NameToProperty(delta.vertex_edge_set_property.property),
delta.vertex_edge_set_property.value);
if (ret.HasError()) throw utils::BasicException("Invalid transaction!");
if (ret.HasError())
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
break;
}
@ -553,7 +569,8 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage
throw utils::BasicException("Invalid commit data!");
auto ret =
commit_timestamp_and_accessor->second.Commit(commit_timestamp_and_accessor->first, false /* not main */);
if (ret.HasError()) throw utils::BasicException("Invalid transaction!");
if (ret.HasError())
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
commit_timestamp_and_accessor = std::nullopt;
break;
}
@ -563,14 +580,14 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage
// Need to send the timestamp
auto *transaction = get_transaction(timestamp, kUniqueAccess);
if (transaction->CreateIndex(storage->NameToLabel(delta.operation_label.label)).HasError())
throw utils::BasicException("Invalid transaction!");
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
break;
}
case WalDeltaData::Type::LABEL_INDEX_DROP: {
spdlog::trace(" Drop label index on :{}", delta.operation_label.label);
auto *transaction = get_transaction(timestamp, kUniqueAccess);
if (transaction->DropIndex(storage->NameToLabel(delta.operation_label.label)).HasError())
throw utils::BasicException("Invalid transaction!");
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
break;
}
case WalDeltaData::Type::LABEL_INDEX_STATS_SET: {
@ -601,7 +618,7 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage
->CreateIndex(storage->NameToLabel(delta.operation_label_property.label),
storage->NameToProperty(delta.operation_label_property.property))
.HasError())
throw utils::BasicException("Invalid transaction!");
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
break;
}
case WalDeltaData::Type::LABEL_PROPERTY_INDEX_DROP: {
@ -612,7 +629,7 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage
->DropIndex(storage->NameToLabel(delta.operation_label_property.label),
storage->NameToProperty(delta.operation_label_property.property))
.HasError())
throw utils::BasicException("Invalid transaction!");
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
break;
}
case WalDeltaData::Type::LABEL_PROPERTY_INDEX_STATS_SET: {
@ -644,7 +661,8 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage
auto ret =
transaction->CreateExistenceConstraint(storage->NameToLabel(delta.operation_label_property.label),
storage->NameToProperty(delta.operation_label_property.property));
if (ret.HasError()) throw utils::BasicException("Invalid transaction!");
if (ret.HasError())
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
break;
}
case WalDeltaData::Type::EXISTENCE_CONSTRAINT_DROP: {
@ -655,7 +673,7 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage
->DropExistenceConstraint(storage->NameToLabel(delta.operation_label_property.label),
storage->NameToProperty(delta.operation_label_property.property))
.HasError())
throw utils::BasicException("Invalid transaction!");
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
break;
}
case WalDeltaData::Type::UNIQUE_CONSTRAINT_CREATE: {
@ -670,7 +688,7 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage
auto ret = transaction->CreateUniqueConstraint(storage->NameToLabel(delta.operation_label_properties.label),
properties);
if (!ret.HasValue() || ret.GetValue() != UniqueConstraints::CreationStatus::SUCCESS)
throw utils::BasicException("Invalid transaction!");
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
break;
}
case WalDeltaData::Type::UNIQUE_CONSTRAINT_DROP: {
@ -685,7 +703,7 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage
auto ret =
transaction->DropUniqueConstraint(storage->NameToLabel(delta.operation_label_properties.label), properties);
if (ret != UniqueConstraints::DeletionStatus::SUCCESS) {
throw utils::BasicException("Invalid transaction!");
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
}
break;
}

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -10,6 +10,7 @@
// licenses/APL.txt.
#pragma once
#include <atomic>
#include <optional>
#include <span>

View File

@ -29,4 +29,8 @@ Constraints::Constraints(const Config &config, StorageMode storage_mode) {
};
});
}
void Constraints::AbortEntries(std::span<Vertex const *const> vertices, uint64_t exact_start_timestamp) const {
static_cast<InMemoryUniqueConstraints *>(unique_constraints_.get())->AbortEntries(vertices, exact_start_timestamp);
}
} // namespace memgraph::storage

View File

@ -11,6 +11,8 @@
#pragma once
#include <span>
#include "storage/v2/config.hpp"
#include "storage/v2/constraints/existence_constraints.hpp"
#include "storage/v2/constraints/unique_constraints.hpp"
@ -27,6 +29,8 @@ struct Constraints {
Constraints &operator=(Constraints &&) = delete;
~Constraints() = default;
void AbortEntries(std::span<Vertex const *const> vertices, uint64_t exact_start_timestamp) const;
std::unique_ptr<ExistenceConstraints> existence_constraints_;
std::unique_ptr<UniqueConstraints> unique_constraints_;
};

View File

@ -10,7 +10,9 @@
// licenses/APL.txt.
#include "storage/v2/disk//edge_import_mode_cache.hpp"
#include <algorithm>
#include "storage/v2/disk/label_property_index.hpp"
#include "storage/v2/indices/indices.hpp"
#include "storage/v2/inmemory/label_index.hpp"
@ -28,7 +30,7 @@ EdgeImportModeCache::EdgeImportModeCache(const Config &config)
InMemoryLabelIndex::Iterable EdgeImportModeCache::Vertices(LabelId label, View view, Storage *storage,
Transaction *transaction) const {
auto *mem_label_index = static_cast<InMemoryLabelIndex *>(in_memory_indices_.label_index_.get());
return mem_label_index->Vertices(label, view, storage, transaction);
return mem_label_index->Vertices(label, vertices_.access(), view, storage, transaction);
}
InMemoryLabelPropertyIndex::Iterable EdgeImportModeCache::Vertices(
@ -37,7 +39,8 @@ InMemoryLabelPropertyIndex::Iterable EdgeImportModeCache::Vertices(
Transaction *transaction) const {
auto *mem_label_property_index =
static_cast<InMemoryLabelPropertyIndex *>(in_memory_indices_.label_property_index_.get());
return mem_label_property_index->Vertices(label, property, lower_bound, upper_bound, view, storage, transaction);
return mem_label_property_index->Vertices(label, property, vertices_.access(), lower_bound, upper_bound, view,
storage, transaction);
}
bool EdgeImportModeCache::CreateIndex(LabelId label, PropertyId property,

View File

@ -71,6 +71,37 @@
namespace memgraph::storage {
namespace {
auto FindEdges(const View view, EdgeTypeId edge_type, const VertexAccessor *from_vertex, VertexAccessor *to_vertex)
-> Result<EdgesVertexAccessorResult> {
auto use_out_edges = [](Vertex const *from_vertex, Vertex const *to_vertex) {
// Obtain the locks by `gid` order to avoid lock cycles.
auto guard_from = std::unique_lock{from_vertex->lock, std::defer_lock};
auto guard_to = std::unique_lock{to_vertex->lock, std::defer_lock};
if (from_vertex->gid < to_vertex->gid) {
guard_from.lock();
guard_to.lock();
} else if (from_vertex->gid > to_vertex->gid) {
guard_to.lock();
guard_from.lock();
} else {
// The vertices are the same vertex, only lock one.
guard_from.lock();
}
// With the potentially cheaper side FindEdges
const auto out_n = from_vertex->out_edges.size();
const auto in_n = to_vertex->in_edges.size();
return out_n <= in_n;
};
return use_out_edges(from_vertex->vertex_, to_vertex->vertex_) ? from_vertex->OutEdges(view, {edge_type}, to_vertex)
: to_vertex->InEdges(view, {edge_type}, from_vertex);
}
} // namespace
using OOMExceptionEnabler = utils::MemoryTracker::OutOfMemoryExceptionEnabler;
namespace {
@ -949,6 +980,20 @@ Result<EdgeAccessor> DiskStorage::DiskAccessor::CreateEdge(VertexAccessor *from,
return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, storage_, &transaction_);
}
std::optional<EdgeAccessor> DiskStorage::DiskAccessor::FindEdge(Gid gid, View view, EdgeTypeId edge_type,
VertexAccessor *from_vertex,
VertexAccessor *to_vertex) {
auto res = FindEdges(view, edge_type, from_vertex, to_vertex);
if (res.HasError()) return std::nullopt; // TODO: use a Result type
auto const it = std::ranges::find_if(
res->edges, [gid](EdgeAccessor const &edge_accessor) { return edge_accessor.edge_.ptr->gid == gid; });
if (it == res->edges.end()) return std::nullopt; // TODO: use a Result type
return *it;
}
Result<EdgeAccessor> DiskStorage::DiskAccessor::EdgeSetFrom(EdgeAccessor * /*edge*/, VertexAccessor * /*new_from*/) {
MG_ASSERT(false, "EdgeSetFrom is currently only implemented for InMemory storage");
return Error::NONEXISTENT_OBJECT;

View File

@ -121,6 +121,9 @@ class DiskStorage final : public Storage {
Result<EdgeAccessor> CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type) override;
std::optional<EdgeAccessor> FindEdge(Gid gid, View view, EdgeTypeId edge_type, VertexAccessor *from_vertex,
VertexAccessor *to_vertex) override;
Result<EdgeAccessor> EdgeSetFrom(EdgeAccessor *edge, VertexAccessor *new_from) override;
Result<EdgeAccessor> EdgeSetTo(EdgeAccessor *edge, VertexAccessor *new_to) override;

View File

@ -17,6 +17,21 @@
namespace memgraph::storage {
void Indices::AbortEntries(LabelId labelId, std::span<Vertex *const> vertices, uint64_t exact_start_timestamp) const {
static_cast<InMemoryLabelIndex *>(label_index_.get())->AbortEntries(labelId, vertices, exact_start_timestamp);
}
void Indices::AbortEntries(PropertyId property, std::span<std::pair<PropertyValue, Vertex *> const> vertices,
uint64_t exact_start_timestamp) const {
static_cast<InMemoryLabelPropertyIndex *>(label_property_index_.get())
->AbortEntries(property, vertices, exact_start_timestamp);
}
void Indices::AbortEntries(LabelId label, std::span<std::pair<PropertyValue, Vertex *> const> vertices,
uint64_t exact_start_timestamp) const {
static_cast<InMemoryLabelPropertyIndex *>(label_property_index_.get())
->AbortEntries(label, vertices, exact_start_timestamp);
}
void Indices::RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp) const {
static_cast<InMemoryLabelIndex *>(label_index_.get())->RemoveObsoleteEntries(oldest_active_start_timestamp);
static_cast<InMemoryLabelPropertyIndex *>(label_property_index_.get())
@ -50,4 +65,8 @@ Indices::Indices(const Config &config, StorageMode storage_mode) {
});
}
Indices::IndexStats Indices::Analysis() const {
return {static_cast<InMemoryLabelIndex *>(label_index_.get())->Analysis(),
static_cast<InMemoryLabelPropertyIndex *>(label_property_index_.get())->Analysis()};
}
} // namespace memgraph::storage

View File

@ -12,6 +12,9 @@
#pragma once
#include <memory>
#include <span>
#include "storage/v2/id_types.hpp"
#include "storage/v2/indices/label_index.hpp"
#include "storage/v2/indices/label_property_index.hpp"
#include "storage/v2/storage_mode.hpp"
@ -32,6 +35,20 @@ struct Indices {
/// TODO: unused in disk indices
void RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp) const;
/// Surgical removal of entries that was inserted this transaction
/// TODO: unused in disk indices
void AbortEntries(LabelId labelId, std::span<Vertex *const> vertices, uint64_t exact_start_timestamp) const;
void AbortEntries(PropertyId property, std::span<std::pair<PropertyValue, Vertex *> const> vertices,
uint64_t exact_start_timestamp) const;
void AbortEntries(LabelId label, std::span<std::pair<PropertyValue, Vertex *> const> vertices,
uint64_t exact_start_timestamp) const;
struct IndexStats {
std::vector<LabelId> label;
LabelPropertyIndex::IndexStats property_label;
};
IndexStats Analysis() const;
// Indices are updated whenever an update occurs, instead of only on commit or
// advance command. This is necessary because we want indices to support `NEW`
// view for use in Merge.

View File

@ -19,6 +19,11 @@ namespace memgraph::storage {
class LabelPropertyIndex {
public:
struct IndexStats {
std::map<LabelId, std::vector<PropertyId>> l2p;
std::map<PropertyId, std::vector<LabelId>> p2l;
};
LabelPropertyIndex() = default;
LabelPropertyIndex(const LabelPropertyIndex &) = delete;
LabelPropertyIndex(LabelPropertyIndex &&) = delete;

View File

@ -10,8 +10,12 @@
// licenses/APL.txt.
#include "storage/v2/inmemory/label_index.hpp"
#include <span>
#include "storage/v2/constraints/constraints.hpp"
#include "storage/v2/indices/indices_utils.hpp"
#include "storage/v2/inmemory/storage.hpp"
namespace memgraph::storage {
@ -96,9 +100,23 @@ void InMemoryLabelIndex::RemoveObsoleteEntries(uint64_t oldest_active_start_time
}
}
InMemoryLabelIndex::Iterable::Iterable(utils::SkipList<Entry>::Accessor index_accessor, LabelId label, View view,
Storage *storage, Transaction *transaction)
: index_accessor_(std::move(index_accessor)),
void InMemoryLabelIndex::AbortEntries(LabelId labelId, std::span<Vertex *const> vertices,
uint64_t exact_start_timestamp) {
auto const it = index_.find(labelId);
if (it == index_.end()) return;
auto &label_storage = it->second;
auto vertices_acc = label_storage.access();
for (auto *vertex : vertices) {
vertices_acc.remove(Entry{vertex, exact_start_timestamp});
}
}
InMemoryLabelIndex::Iterable::Iterable(utils::SkipList<Entry>::Accessor index_accessor,
utils::SkipList<Vertex>::ConstAccessor vertices_accessor, LabelId label,
View view, Storage *storage, Transaction *transaction)
: pin_accessor_(std::move(vertices_accessor)),
index_accessor_(std::move(index_accessor)),
label_(label),
view_(view),
storage_(storage),
@ -147,9 +165,21 @@ void InMemoryLabelIndex::RunGC() {
InMemoryLabelIndex::Iterable InMemoryLabelIndex::Vertices(LabelId label, View view, Storage *storage,
Transaction *transaction) {
DMG_ASSERT(storage->storage_mode_ == StorageMode::IN_MEMORY_TRANSACTIONAL ||
storage->storage_mode_ == StorageMode::IN_MEMORY_ANALYTICAL,
"LabelIndex trying to access InMemory vertices from OnDisk!");
auto vertices_acc = static_cast<InMemoryStorage const *>(storage)->vertices_.access();
const auto it = index_.find(label);
MG_ASSERT(it != index_.end(), "Index for label {} doesn't exist", label.AsUint());
return {it->second.access(), label, view, storage, transaction};
return {it->second.access(), std::move(vertices_acc), label, view, storage, transaction};
}
InMemoryLabelIndex::Iterable InMemoryLabelIndex::Vertices(
LabelId label, memgraph::utils::SkipList<memgraph::storage::Vertex>::ConstAccessor vertices_acc, View view,
Storage *storage, Transaction *transaction) {
const auto it = index_.find(label);
MG_ASSERT(it != index_.end(), "Index for label {} doesn't exist", label.AsUint());
return {it->second.access(), std::move(vertices_acc), label, view, storage, transaction};
}
void InMemoryLabelIndex::SetIndexStats(const storage::LabelId &label, const storage::LabelIndexStats &stats) {
@ -187,4 +217,12 @@ bool InMemoryLabelIndex::DeleteIndexStats(const storage::LabelId &label) {
return false;
}
std::vector<LabelId> InMemoryLabelIndex::Analysis() const {
std::vector<LabelId> res;
res.reserve(index_.size());
for (const auto &[label, _] : index_) {
res.emplace_back(label);
}
return res;
}
} // namespace memgraph::storage

View File

@ -11,6 +11,8 @@
#pragma once
#include <span>
#include "storage/v2/constraints/constraints.hpp"
#include "storage/v2/indices/label_index.hpp"
#include "storage/v2/indices/label_index_stats.hpp"
@ -56,10 +58,15 @@ class InMemoryLabelIndex : public storage::LabelIndex {
void RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp);
/// Surgical removal of entries that was inserted this transaction
void AbortEntries(LabelId labelId, std::span<Vertex *const> vertices, uint64_t exact_start_timestamp);
std::vector<LabelId> Analysis() const;
class Iterable {
public:
Iterable(utils::SkipList<Entry>::Accessor index_accessor, LabelId label, View view, Storage *storage,
Transaction *transaction);
Iterable(utils::SkipList<Entry>::Accessor index_accessor, utils::SkipList<Vertex>::ConstAccessor vertices_accessor,
LabelId label, View view, Storage *storage, Transaction *transaction);
class Iterator {
public:
@ -85,6 +92,7 @@ class InMemoryLabelIndex : public storage::LabelIndex {
Iterator end() { return {this, index_accessor_.end()}; }
private:
utils::SkipList<Vertex>::ConstAccessor pin_accessor_;
utils::SkipList<Entry>::Accessor index_accessor_;
LabelId label_;
View view_;
@ -98,6 +106,9 @@ class InMemoryLabelIndex : public storage::LabelIndex {
Iterable Vertices(LabelId label, View view, Storage *storage, Transaction *transaction);
Iterable Vertices(LabelId label, memgraph::utils::SkipList<memgraph::storage::Vertex>::ConstAccessor vertices_acc,
View view, Storage *storage, Transaction *transaction);
void SetIndexStats(const storage::LabelId &label, const storage::LabelIndexStats &stats);
std::optional<storage::LabelIndexStats> GetIndexStats(const storage::LabelId &label) const;

View File

@ -12,6 +12,8 @@
#include "storage/v2/inmemory/label_property_index.hpp"
#include "storage/v2/constraints/constraints.hpp"
#include "storage/v2/indices/indices_utils.hpp"
#include "storage/v2/inmemory/storage.hpp"
#include "utils/logging.hpp"
namespace memgraph::storage {
@ -101,11 +103,12 @@ void InMemoryLabelPropertyIndex::UpdateOnSetProperty(PropertyId property, const
return;
}
if (!indices_by_property_.contains(property)) {
auto index = indices_by_property_.find(property);
if (index == indices_by_property_.end()) {
return;
}
for (const auto &[_, storage] : indices_by_property_.at(property)) {
for (const auto &[_, storage] : index->second) {
auto acc = storage->access();
acc.insert(Entry{value, vertex, tx.start_timestamp});
}
@ -220,12 +223,14 @@ const PropertyValue kSmallestMap = PropertyValue(std::map<std::string, PropertyV
const PropertyValue kSmallestTemporalData =
PropertyValue(TemporalData{static_cast<TemporalType>(0), std::numeric_limits<int64_t>::min()});
InMemoryLabelPropertyIndex::Iterable::Iterable(utils::SkipList<Entry>::Accessor index_accessor, LabelId label,
InMemoryLabelPropertyIndex::Iterable::Iterable(utils::SkipList<Entry>::Accessor index_accessor,
utils::SkipList<Vertex>::ConstAccessor vertices_accessor, LabelId label,
PropertyId property,
const std::optional<utils::Bound<PropertyValue>> &lower_bound,
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view,
Storage *storage, Transaction *transaction)
: index_accessor_(std::move(index_accessor)),
: pin_accessor_(std::move(vertices_accessor)),
index_accessor_(std::move(index_accessor)),
label_(label),
property_(property),
lower_bound_(lower_bound),
@ -428,9 +433,57 @@ InMemoryLabelPropertyIndex::Iterable InMemoryLabelPropertyIndex::Vertices(
LabelId label, PropertyId property, const std::optional<utils::Bound<PropertyValue>> &lower_bound,
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view, Storage *storage,
Transaction *transaction) {
DMG_ASSERT(storage->storage_mode_ == StorageMode::IN_MEMORY_TRANSACTIONAL ||
storage->storage_mode_ == StorageMode::IN_MEMORY_ANALYTICAL,
"PropertyLabel index trying to access InMemory vertices from OnDisk!");
auto vertices_acc = static_cast<InMemoryStorage const *>(storage)->vertices_.access();
auto it = index_.find({label, property});
MG_ASSERT(it != index_.end(), "Index for label {} and property {} doesn't exist", label.AsUint(), property.AsUint());
return {it->second.access(), label, property, lower_bound, upper_bound, view, storage, transaction};
return {it->second.access(), std::move(vertices_acc), label, property, lower_bound, upper_bound, view, storage,
transaction};
}
InMemoryLabelPropertyIndex::Iterable InMemoryLabelPropertyIndex::Vertices(
LabelId label, PropertyId property,
memgraph::utils::SkipList<memgraph::storage::Vertex>::ConstAccessor vertices_acc,
const std::optional<utils::Bound<PropertyValue>> &lower_bound,
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view, Storage *storage,
Transaction *transaction) {
auto it = index_.find({label, property});
MG_ASSERT(it != index_.end(), "Index for label {} and property {} doesn't exist", label.AsUint(), property.AsUint());
return {it->second.access(), std::move(vertices_acc), label, property, lower_bound, upper_bound, view, storage,
transaction};
}
void InMemoryLabelPropertyIndex::AbortEntries(PropertyId property,
std::span<std::pair<PropertyValue, Vertex *> const> vertices,
uint64_t exact_start_timestamp) {
auto const it = indices_by_property_.find(property);
if (it == indices_by_property_.end()) return;
auto &indices = it->second;
for (const auto &[_, index] : indices) {
auto index_acc = index->access();
for (auto const &[value, vertex] : vertices) {
index_acc.remove(Entry{value, vertex, exact_start_timestamp});
}
}
}
void InMemoryLabelPropertyIndex::AbortEntries(LabelId label,
std::span<std::pair<PropertyValue, Vertex *> const> vertices,
uint64_t exact_start_timestamp) {
for (auto &[label_prop, storage] : index_) {
if (label_prop.first != label) {
continue;
}
auto index_acc = storage.access();
for (const auto &[property, vertex] : vertices) {
if (!property.IsNull()) {
index_acc.remove(Entry{property, vertex, exact_start_timestamp});
}
}
}
}
} // namespace memgraph::storage

View File

@ -11,9 +11,13 @@
#pragma once
#include <span>
#include "storage/v2/constraints/constraints.hpp"
#include "storage/v2/id_types.hpp"
#include "storage/v2/indices/label_property_index.hpp"
#include "storage/v2/indices/label_property_index_stats.hpp"
#include "storage/v2/property_value.hpp"
#include "utils/rw_lock.hpp"
#include "utils/synchronized.hpp"
@ -61,10 +65,25 @@ class InMemoryLabelPropertyIndex : public storage::LabelPropertyIndex {
void RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp);
void AbortEntries(PropertyId property, std::span<std::pair<PropertyValue, Vertex *> const> vertices,
uint64_t exact_start_timestamp);
void AbortEntries(LabelId label, std::span<std::pair<PropertyValue, Vertex *> const> vertices,
uint64_t exact_start_timestamp);
IndexStats Analysis() const {
IndexStats res{};
for (const auto &[lp, _] : index_) {
const auto &[label, property] = lp;
res.l2p[label].emplace_back(property);
res.p2l[property].emplace_back(label);
}
return res;
}
class Iterable {
public:
Iterable(utils::SkipList<Entry>::Accessor index_accessor, LabelId label, PropertyId property,
const std::optional<utils::Bound<PropertyValue>> &lower_bound,
Iterable(utils::SkipList<Entry>::Accessor index_accessor, utils::SkipList<Vertex>::ConstAccessor vertices_accessor,
LabelId label, PropertyId property, const std::optional<utils::Bound<PropertyValue>> &lower_bound,
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view, Storage *storage,
Transaction *transaction);
@ -92,6 +111,7 @@ class InMemoryLabelPropertyIndex : public storage::LabelPropertyIndex {
Iterator end();
private:
utils::SkipList<Vertex>::ConstAccessor pin_accessor_;
utils::SkipList<Entry>::Accessor index_accessor_;
LabelId label_;
PropertyId property_;
@ -131,6 +151,12 @@ class InMemoryLabelPropertyIndex : public storage::LabelPropertyIndex {
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view, Storage *storage,
Transaction *transaction);
Iterable Vertices(LabelId label, PropertyId property,
memgraph::utils::SkipList<memgraph::storage::Vertex>::ConstAccessor vertices_acc,
const std::optional<utils::Bound<PropertyValue>> &lower_bound,
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view, Storage *storage,
Transaction *transaction);
private:
std::map<std::pair<LabelId, PropertyId>, utils::SkipList<Entry>> index_;
std::unordered_map<PropertyId, std::unordered_map<LabelId, utils::SkipList<Entry> *>> indices_by_property_;

View File

@ -10,21 +10,57 @@
// licenses/APL.txt.
#include "storage/v2/inmemory/storage.hpp"
#include <algorithm>
#include <functional>
#include "dbms/constants.hpp"
#include "memory/global_memory_control.hpp"
#include "storage/v2/durability/durability.hpp"
#include "storage/v2/durability/snapshot.hpp"
#include "storage/v2/edge_direction.hpp"
#include "storage/v2/id_types.hpp"
#include "storage/v2/metadata_delta.hpp"
/// REPLICATION ///
#include "dbms/inmemory/replication_handlers.hpp"
#include "storage/v2/inmemory/replication/recovery.hpp"
#include "storage/v2/inmemory/unique_constraints.hpp"
#include "storage/v2/property_value.hpp"
#include "utils/resource_lock.hpp"
#include "utils/stat.hpp"
namespace memgraph::storage {
namespace {
auto FindEdges(const View view, EdgeTypeId edge_type, const VertexAccessor *from_vertex, VertexAccessor *to_vertex)
-> Result<EdgesVertexAccessorResult> {
auto use_out_edges = [](Vertex const *from_vertex, Vertex const *to_vertex) {
// Obtain the locks by `gid` order to avoid lock cycles.
auto guard_from = std::unique_lock{from_vertex->lock, std::defer_lock};
auto guard_to = std::unique_lock{to_vertex->lock, std::defer_lock};
if (from_vertex->gid < to_vertex->gid) {
guard_from.lock();
guard_to.lock();
} else if (from_vertex->gid > to_vertex->gid) {
guard_to.lock();
guard_from.lock();
} else {
// The vertices are the same vertex, only lock one.
guard_from.lock();
}
// With the potentially cheaper side FindEdges
const auto out_n = from_vertex->out_edges.size();
const auto in_n = to_vertex->in_edges.size();
return out_n <= in_n;
};
return use_out_edges(from_vertex->vertex_, to_vertex->vertex_) ? from_vertex->OutEdges(view, {edge_type}, to_vertex)
: to_vertex->InEdges(view, {edge_type}, from_vertex);
}
}; // namespace
using OOMExceptionEnabler = utils::MemoryTracker::OutOfMemoryExceptionEnabler;
InMemoryStorage::InMemoryStorage(Config config, StorageMode storage_mode)
@ -315,6 +351,24 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdge(VertexAccesso
return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, storage_, &transaction_);
}
std::optional<EdgeAccessor> InMemoryStorage::InMemoryAccessor::FindEdge(Gid gid, const View view, EdgeTypeId edge_type,
VertexAccessor *from_vertex,
VertexAccessor *to_vertex) {
auto res = FindEdges(view, edge_type, from_vertex, to_vertex);
if (res.HasError()) return std::nullopt; // TODO: use a Result type
auto const it = std::invoke([this, gid, &res]() {
auto const byGid = [gid](EdgeAccessor const &edge_accessor) { return edge_accessor.edge_.gid == gid; };
auto const byEdgePtr = [gid](EdgeAccessor const &edge_accessor) { return edge_accessor.edge_.ptr->gid == gid; };
if (config_.properties_on_edges) return std::ranges::find_if(res->edges, byEdgePtr);
return std::ranges::find_if(res->edges, byGid);
});
if (it == res->edges.end()) return std::nullopt; // TODO: use a Result type
return *it;
}
Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdgeEx(VertexAccessor *from, VertexAccessor *to,
EdgeTypeId edge_type, storage::Gid gid) {
MG_ASSERT(from->transaction_ == to->transaction_,
@ -697,7 +751,8 @@ utils::BasicResult<StorageManipulationError, void> InMemoryStorage::InMemoryAcce
could_replicate_all_sync_replicas =
mem_storage->AppendToWalDataDefinition(transaction_, *commit_timestamp_); // protected by engine_guard
// TODO: release lock, and update all deltas to have a local copy of the commit timestamp
transaction_.commit_timestamp->store(*commit_timestamp_, std::memory_order_release); // protected by engine_guard
transaction_.commit_timestamp->store(*commit_timestamp_,
std::memory_order_release); // protected by engine_guard
// Replica can only update the last commit timestamp with
// the commits received from main.
if (is_main || desired_commit_timestamp.has_value()) {
@ -823,6 +878,21 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
std::list<Gid> my_deleted_vertices;
std::list<Gid> my_deleted_edges;
std::map<LabelId, std::vector<Vertex *>> label_cleanup;
std::map<LabelId, std::vector<std::pair<PropertyValue, Vertex *>>> label_property_cleanup;
std::map<PropertyId, std::vector<std::pair<PropertyValue, Vertex *>>> property_cleanup;
// CONSTRAINTS
if (transaction_.constraint_verification_info.NeedsUniqueConstraintVerification()) {
// Need to remove elements from constraints before handling of the deltas, so the elements match the correct
// values
auto vertices_to_check = transaction_.constraint_verification_info.GetVerticesForUniqueConstraintChecking();
auto vertices_to_check_v = std::vector<Vertex const *>{vertices_to_check.begin(), vertices_to_check.end()};
storage_->constraints_.AbortEntries(vertices_to_check_v, transaction_.start_timestamp);
}
const auto index_stats = storage_->indices_.Analysis();
for (const auto &delta : transaction_.deltas.use()) {
auto prev = delta.prev.Get();
switch (prev.type) {
@ -838,6 +908,24 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
MG_ASSERT(it != vertex->labels.end(), "Invalid database state!");
std::swap(*it, *vertex->labels.rbegin());
vertex->labels.pop_back();
// For label index
// check if there is a label index for the label and add entry if so
// For property label index
// check if we care about the label; this will return all the propertyIds we care about and then get
// the current property value
if (std::binary_search(index_stats.label.begin(), index_stats.label.end(), current->label)) {
label_cleanup[current->label].emplace_back(vertex);
}
const auto &properties = index_stats.property_label.l2p.find(current->label);
if (properties != index_stats.property_label.l2p.end()) {
for (const auto &property : properties->second) {
auto current_value = vertex->properties.GetProperty(property);
if (!current_value.IsNull()) {
label_property_cleanup[current->label].emplace_back(std::move(current_value), vertex);
}
}
}
break;
}
case Delta::Action::ADD_LABEL: {
@ -847,6 +935,18 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
break;
}
case Delta::Action::SET_PROPERTY: {
// For label index nothing
// For property label index
// check if we care about the property, this will return all the labels and then get current property
// value
const auto &labels = index_stats.property_label.p2l.find(current->property.key);
if (labels != index_stats.property_label.p2l.end()) {
auto current_value = vertex->properties.GetProperty(current->property.key);
if (!current_value.IsNull()) {
property_cleanup[current->property.key].emplace_back(std::move(current_value), vertex);
}
}
// Setting the correct value
vertex->properties.SetProperty(current->property.key, current->property.value);
break;
}
@ -963,7 +1063,7 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
auto *mem_storage = static_cast<InMemoryStorage *>(storage_);
{
std::unique_lock<utils::SpinLock> engine_guard(storage_->engine_lock_);
auto engine_guard = std::unique_lock(storage_->engine_lock_);
uint64_t mark_timestamp = storage_->timestamp_;
// Take garbage_undo_buffers lock while holding the engine lock to make
// sure that entries are sorted by mark timestamp in the list.
@ -975,10 +1075,37 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
garbage_undo_buffers.emplace_back(mark_timestamp, std::move(transaction_.deltas),
std::move(transaction_.commit_timestamp));
});
mem_storage->deleted_vertices_.WithLock(
[&](auto &deleted_vertices) { deleted_vertices.splice(deleted_vertices.begin(), my_deleted_vertices); });
mem_storage->deleted_edges_.WithLock(
[&](auto &deleted_edges) { deleted_edges.splice(deleted_edges.begin(), my_deleted_edges); });
/// We MUST unlink (aka. remove) entries in indexes and constraints
/// before we unlink (aka. remove) vertices from storage
/// this is because they point into vertices skip_list
// INDICES
for (auto const &[label, vertices] : label_cleanup) {
storage_->indices_.AbortEntries(label, vertices, transaction_.start_timestamp);
}
for (auto const &[label, prop_vertices] : label_property_cleanup) {
storage_->indices_.AbortEntries(label, prop_vertices, transaction_.start_timestamp);
}
for (auto const &[property, prop_vertices] : property_cleanup) {
storage_->indices_.AbortEntries(property, prop_vertices, transaction_.start_timestamp);
}
// VERTICES
{
auto vertices_acc = mem_storage->vertices_.access();
for (auto gid : my_deleted_vertices) {
vertices_acc.remove(gid);
}
}
// EDGES
{
auto edges_acc = mem_storage->edges_.access();
for (auto gid : my_deleted_edges) {
edges_acc.remove(gid);
}
}
}
mem_storage->commit_log_->MarkFinished(transaction_.start_timestamp);
@ -1271,8 +1398,6 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_
// vertices that appear in an index also exist in main storage.
std::list<Gid> current_deleted_edges;
std::list<Gid> current_deleted_vertices;
deleted_vertices_->swap(current_deleted_vertices);
deleted_edges_->swap(current_deleted_edges);
auto const need_full_scan_vertices = gc_full_scan_vertices_delete_.exchange(false);
auto const need_full_scan_edges = gc_full_scan_edges_delete_.exchange(false);
@ -1922,12 +2047,12 @@ utils::BasicResult<InMemoryStorage::CreateSnapshotError> InMemoryStorage::Create
void InMemoryStorage::FreeMemory(std::unique_lock<utils::ResourceLock> main_guard) {
CollectGarbage<true>(std::move(main_guard));
static_cast<InMemoryLabelIndex *>(indices_.label_index_.get())->RunGC();
static_cast<InMemoryLabelPropertyIndex *>(indices_.label_property_index_.get())->RunGC();
// SkipList is already threadsafe
vertices_.run_gc();
edges_.run_gc();
static_cast<InMemoryLabelIndex *>(indices_.label_index_.get())->RunGC();
static_cast<InMemoryLabelPropertyIndex *>(indices_.label_property_index_.get())->RunGC();
}
uint64_t InMemoryStorage::CommitTimestamp(const std::optional<uint64_t> desired_commit_timestamp) {

View File

@ -51,6 +51,8 @@ class InMemoryStorage final : public Storage {
friend std::vector<RecoveryStep> GetRecoverySteps(uint64_t replica_commit,
utils::FileRetainer::FileLocker *file_locker,
const InMemoryStorage *storage);
friend class InMemoryLabelIndex;
friend class InMemoryLabelPropertyIndex;
public:
enum class CreateSnapshotError : uint8_t { DisabledForReplica, ReachedMaxNumTries };
@ -185,6 +187,9 @@ class InMemoryStorage final : public Storage {
/// @throw std::bad_alloc
Result<EdgeAccessor> CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type) override;
std::optional<EdgeAccessor> FindEdge(Gid gid, View view, EdgeTypeId edge_type, VertexAccessor *from_vertex,
VertexAccessor *to_vertex) override;
Result<EdgeAccessor> EdgeSetFrom(EdgeAccessor *edge, VertexAccessor *new_from) override;
Result<EdgeAccessor> EdgeSetTo(EdgeAccessor *edge, VertexAccessor *new_to) override;

View File

@ -256,11 +256,12 @@ bool InMemoryUniqueConstraints::Entry::operator==(const std::vector<PropertyValu
void InMemoryUniqueConstraints::UpdateBeforeCommit(const Vertex *vertex, const Transaction &tx) {
for (const auto &label : vertex->labels) {
if (!constraints_by_label_.contains(label)) {
const auto &constraint = constraints_by_label_.find(label);
if (constraint == constraints_by_label_.end()) {
continue;
}
for (auto &[props, storage] : constraints_by_label_.at(label)) {
for (auto &[props, storage] : constraint->second) {
auto values = vertex->properties.ExtractPropertyValues(props);
if (!values) {
@ -273,6 +274,28 @@ void InMemoryUniqueConstraints::UpdateBeforeCommit(const Vertex *vertex, const T
}
}
void InMemoryUniqueConstraints::AbortEntries(std::span<Vertex const *const> vertices, uint64_t exact_start_timestamp) {
for (const auto &vertex : vertices) {
for (const auto &label : vertex->labels) {
const auto &constraint = constraints_by_label_.find(label);
if (constraint == constraints_by_label_.end()) {
return;
}
for (auto &[props, storage] : constraint->second) {
auto values = vertex->properties.ExtractPropertyValues(props);
if (!values) {
continue;
}
auto acc = storage->access();
acc.remove(Entry{std::move(*values), vertex, exact_start_timestamp});
}
}
}
}
utils::BasicResult<ConstraintViolation, InMemoryUniqueConstraints::CreationStatus>
InMemoryUniqueConstraints::CreateConstraint(LabelId label, const std::set<PropertyId> &properties,
utils::SkipList<Vertex>::Accessor vertices) {
@ -364,12 +387,14 @@ std::optional<ConstraintViolation> InMemoryUniqueConstraints::Validate(const Ver
if (vertex.deleted) {
return std::nullopt;
}
for (const auto &label : vertex.labels) {
if (!constraints_by_label_.contains(label)) {
const auto &constraint = constraints_by_label_.find(label);
if (constraint == constraints_by_label_.end()) {
continue;
}
for (const auto &[properties, storage] : constraints_by_label_.at(label)) {
for (const auto &[properties, storage] : constraint->second) {
auto value_array = vertex.properties.ExtractPropertyValues(properties);
if (!value_array) {

View File

@ -11,6 +11,8 @@
#pragma once
#include <span>
#include "storage/v2/constraints/unique_constraints.hpp"
namespace memgraph::storage {
@ -54,6 +56,8 @@ class InMemoryUniqueConstraints : public UniqueConstraints {
void UpdateBeforeCommit(const Vertex *vertex, std::unordered_set<LabelId> &added_labels,
std::unordered_set<PropertyId> &added_properties, const Transaction &tx);
void AbortEntries(std::span<Vertex const *const> vertices, uint64_t exact_start_timestamp);
/// Creates unique constraint on the given `label` and a list of `properties`.
/// Returns constraint violation if there are multiple vertices with the same
/// label and property values. Returns `CreationStatus::ALREADY_EXISTS` if

View File

@ -195,6 +195,9 @@ class Storage {
virtual Result<EdgeAccessor> CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type) = 0;
virtual std::optional<EdgeAccessor> FindEdge(Gid gid, View view, EdgeTypeId edge_type, VertexAccessor *from_vertex,
VertexAccessor *to_vertex) = 0;
virtual Result<EdgeAccessor> EdgeSetFrom(EdgeAccessor *edge, VertexAccessor *new_from) = 0;
virtual Result<EdgeAccessor> EdgeSetTo(EdgeAccessor *edge, VertexAccessor *new_to) = 0;

View File

@ -16,11 +16,11 @@
#include <list>
#include <thread>
constexpr char *kProcedureHackerNews = "hacker_news";
constexpr char *kArgumentHackerNewsVotes = "votes";
constexpr char *kArgumentHackerNewsItemHourAge = "item_hour_age";
constexpr char *kArgumentHackerNewsGravity = "gravity";
constexpr char *kReturnHackerNewsScore = "score";
constexpr char const *kProcedureHackerNews = "hacker_news";
constexpr char const *kArgumentHackerNewsVotes = "votes";
constexpr char const *kArgumentHackerNewsItemHourAge = "item_hour_age";
constexpr char const *kArgumentHackerNewsGravity = "gravity";
constexpr char const *kReturnHackerNewsScore = "score";
void HackerNews(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) {
mgp::MemoryDispatcherGuard guard(memory);

View File

@ -13,6 +13,7 @@ copy_e2e_python_files(replication_show common.py)
copy_e2e_python_files(replication_show conftest.py)
copy_e2e_python_files(replication_show show.py)
copy_e2e_python_files(replication_show show_while_creating_invalid_state.py)
copy_e2e_python_files(replication_show edge_delete.py)
copy_e2e_python_files_from_parent_folder(replication_show ".." memgraph.py)
copy_e2e_python_files_from_parent_folder(replication_show ".." interactive_mg_runner.py)
copy_e2e_python_files_from_parent_folder(replication_show ".." mg_utils.py)

View File

@ -0,0 +1,56 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import sys
import time
import pytest
from common import execute_and_fetch_all
from mg_utils import mg_sleep_and_assert
# BUGFIX: for issue https://github.com/memgraph/memgraph/issues/1515
def test_replication_handles_delete_when_multiple_edges_of_same_type(connection):
# Goal is to check the timestamp are correctly computed from the information we get from replicas.
# 0/ Check original state of replicas.
# 1/ Add nodes and edges to MAIN, then delete the edges.
# 2/ Check state of replicas.
# 0/
conn = connection(7687, "main")
conn.autocommit = True
cursor = conn.cursor()
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("replica_2", "127.0.0.1:10002", "async", 0, 0, "ready"),
}
assert actual_data == expected_data
# 1/
execute_and_fetch_all(cursor, "CREATE (a)-[r:X]->(b) CREATE (a)-[:X]->(b) DELETE r;")
# 2/
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 2, 0, "ready"),
("replica_2", "127.0.0.1:10002", "async", 2, 0, "ready"),
}
def retrieve_data():
return set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
actual_data = mg_sleep_and_assert(expected_data, retrieve_data)
assert actual_data == expected_data
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -8,6 +8,23 @@ template_validation_queries: &template_validation_queries
validation_queries:
- <<: *template_test_nodes_query
- <<: *template_test_edges_query
template_simple_cluster: &template_simple_cluster
cluster:
replica_1:
args: [ "--bolt-port", "7688", "--log-level=TRACE" ]
log_file: "replication-e2e-replica1.log"
setup_queries: [ "SET REPLICATION ROLE TO REPLICA WITH PORT 10001;" ]
replica_2:
args: ["--bolt-port", "7689", "--log-level=TRACE"]
log_file: "replication-e2e-replica2.log"
setup_queries: ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"]
main:
args: ["--bolt-port", "7687", "--log-level=TRACE"]
log_file: "replication-e2e-main.log"
setup_queries: [
"REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:10001'",
"REGISTER REPLICA replica_2 ASYNC TO '127.0.0.1:10002'",
]
template_cluster: &template_cluster
cluster:
replica_1:
@ -83,3 +100,8 @@ workloads:
- name: "Show while creating invalid state"
binary: "tests/e2e/pytest_runner.sh"
args: ["replication/show_while_creating_invalid_state.py"]
- name: "Delete edge replication"
binary: "tests/e2e/pytest_runner.sh"
args: ["replication/edge_delete.py"]
<<: *template_simple_cluster