Current work
This commit is contained in:
parent
dcdbd0a19a
commit
4a98b8f046
@ -484,7 +484,7 @@ class DbAccessor final {
|
||||
}
|
||||
|
||||
storage::Result<std::optional<std::pair<std::vector<VertexAccessor>, std::vector<EdgeAccessor>>>> DetachDelete(
|
||||
std::vector<VertexAccessor> nodes, std::vector<EdgeAccessor> edges, bool detach) {
|
||||
std::vector<VertexAccessor> nodes, std::vector<EdgeAccessor> edges, bool detach, bool fast) {
|
||||
using ReturnType = std::pair<std::vector<VertexAccessor>, std::vector<EdgeAccessor>>;
|
||||
|
||||
std::vector<storage::VertexAccessor *> nodes_impl;
|
||||
@ -501,7 +501,7 @@ class DbAccessor final {
|
||||
edges_impl.push_back(&edge_accessor.impl_);
|
||||
}
|
||||
|
||||
auto res = accessor_->DetachDelete(std::move(nodes_impl), std::move(edges_impl), detach);
|
||||
auto res = accessor_->DetachDelete(std::move(nodes_impl), std::move(edges_impl), detach, fast);
|
||||
if (res.HasError()) {
|
||||
return res.GetError();
|
||||
}
|
||||
|
@ -2790,7 +2790,7 @@ bool Delete::DeleteCursor::Pull(Frame &frame, ExecutionContext &context) {
|
||||
}
|
||||
|
||||
auto &dba = *context.db_accessor;
|
||||
auto res = dba.DetachDelete(std::move(buffer_.nodes), std::move(buffer_.edges), self_.detach_);
|
||||
auto res = dba.DetachDelete(std::move(buffer_.nodes), std::move(buffer_.edges), self_.detach_, true);
|
||||
if (res.HasError()) {
|
||||
switch (res.GetError()) {
|
||||
case storage::Error::SERIALIZATION_ERROR:
|
||||
|
@ -905,11 +905,11 @@ std::optional<VertexAccessor> DiskStorage::DiskAccessor::FindVertex(storage::Gid
|
||||
|
||||
Result<std::optional<std::pair<std::vector<VertexAccessor>, std::vector<EdgeAccessor>>>>
|
||||
DiskStorage::DiskAccessor::DetachDelete(std::vector<VertexAccessor *> nodes, std::vector<EdgeAccessor *> edges,
|
||||
bool detach) {
|
||||
bool detach, bool fast) {
|
||||
using ReturnType = std::pair<std::vector<VertexAccessor>, std::vector<EdgeAccessor>>;
|
||||
|
||||
/// TODO: (andi) Refactor
|
||||
auto maybe_result = Storage::Accessor::DetachDelete(nodes, edges, detach);
|
||||
auto maybe_result = Storage::Accessor::DetachDelete(nodes, edges, detach, fast);
|
||||
if (maybe_result.HasError()) {
|
||||
return maybe_result.GetError();
|
||||
}
|
||||
|
@ -117,7 +117,8 @@ class DiskStorage final : public Storage {
|
||||
}
|
||||
|
||||
Result<std::optional<std::pair<std::vector<VertexAccessor>, std::vector<EdgeAccessor>>>> DetachDelete(
|
||||
std::vector<VertexAccessor *> nodes, std::vector<EdgeAccessor *> edges, bool detach) override;
|
||||
std::vector<VertexAccessor *> nodes, std::vector<EdgeAccessor *> edges, bool detach,
|
||||
bool fast = false) override;
|
||||
|
||||
Result<EdgeAccessor> CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type) override;
|
||||
|
||||
|
@ -242,10 +242,10 @@ std::optional<VertexAccessor> InMemoryStorage::InMemoryAccessor::FindVertex(Gid
|
||||
|
||||
Result<std::optional<std::pair<std::vector<VertexAccessor>, std::vector<EdgeAccessor>>>>
|
||||
InMemoryStorage::InMemoryAccessor::DetachDelete(std::vector<VertexAccessor *> nodes, std::vector<EdgeAccessor *> edges,
|
||||
bool detach) {
|
||||
bool detach, bool fast) {
|
||||
using ReturnType = std::pair<std::vector<VertexAccessor>, std::vector<EdgeAccessor>>;
|
||||
|
||||
auto maybe_result = Storage::Accessor::DetachDelete(nodes, edges, detach);
|
||||
auto maybe_result = Storage::Accessor::DetachDelete(nodes, edges, detach, fast);
|
||||
|
||||
if (maybe_result.HasError()) {
|
||||
return maybe_result.GetError();
|
||||
@ -276,13 +276,16 @@ InMemoryStorage::InMemoryAccessor::DetachDelete(std::vector<VertexAccessor *> no
|
||||
}
|
||||
}};
|
||||
|
||||
for (auto const &vertex : deleted_vertices) {
|
||||
transaction_.manyDeltasCache.Invalidate(vertex.vertex_);
|
||||
}
|
||||
|
||||
for (const auto &edge : deleted_edges) {
|
||||
transaction_.manyDeltasCache.Invalidate(edge.from_vertex_, edge.edge_type_, EdgeDirection::OUT);
|
||||
transaction_.manyDeltasCache.Invalidate(edge.to_vertex_, edge.edge_type_, EdgeDirection::IN);
|
||||
if (!fast) {
|
||||
for (auto const &vertex : deleted_vertices) {
|
||||
transaction_.manyDeltasCache.Invalidate(vertex.vertex_);
|
||||
}
|
||||
for (const auto &edge : deleted_edges) {
|
||||
transaction_.manyDeltasCache.Invalidate(edge.from_vertex_, edge.edge_type_, EdgeDirection::OUT);
|
||||
transaction_.manyDeltasCache.Invalidate(edge.to_vertex_, edge.edge_type_, EdgeDirection::IN);
|
||||
}
|
||||
} else {
|
||||
transaction_.manyDeltasCache.Clear();
|
||||
}
|
||||
|
||||
return maybe_result;
|
||||
@ -957,17 +960,21 @@ void InMemoryStorage::InMemoryAccessor::FastDiscardOfDeltas(uint64_t oldest_acti
|
||||
|
||||
// 3.b) remove from veretex skip_list
|
||||
auto vertex_acc = mem_storage->vertices_.access();
|
||||
for (auto gid : current_deleted_vertices) {
|
||||
vertex_acc.remove(gid);
|
||||
if (!current_deleted_vertices.empty()) {
|
||||
mem_storage->vertices_.clear();
|
||||
}
|
||||
// for (auto gid : current_deleted_vertices) {
|
||||
// vertex_acc.remove(gid);
|
||||
// }
|
||||
}
|
||||
|
||||
if (!current_deleted_edges.empty()) {
|
||||
mem_storage->edges_.clear();
|
||||
// 3.c) remove from edge skip_list
|
||||
auto edge_acc = mem_storage->edges_.access();
|
||||
for (auto gid : current_deleted_edges) {
|
||||
edge_acc.remove(gid);
|
||||
}
|
||||
// auto edge_acc = mem_storage->edges_.access();
|
||||
// for (auto gid : current_deleted_edges) {
|
||||
// edge_acc.remove(gid);
|
||||
// }
|
||||
}
|
||||
}
|
||||
|
||||
@ -1849,7 +1856,7 @@ bool InMemoryStorage::AppendToWal(const Transaction &transaction, uint64_t final
|
||||
|
||||
//////// AF only this calls initialize transaction
|
||||
repl_storage_state_.InitializeTransaction(wal_file_->SequenceNumber(), this, db_acc);
|
||||
|
||||
uint64_t i = 0;
|
||||
auto append_deltas = [&](auto callback) {
|
||||
// Helper lambda that traverses the delta chain on order to find the first
|
||||
// delta that should be processed and then appends all discovered deltas.
|
||||
@ -1858,6 +1865,7 @@ bool InMemoryStorage::AppendToWal(const Transaction &transaction, uint64_t final
|
||||
auto *older = delta->next.load(std::memory_order_acquire);
|
||||
if (older == nullptr || older->timestamp->load(std::memory_order_acquire) != current_commit_timestamp) break;
|
||||
delta = older;
|
||||
i += 1;
|
||||
}
|
||||
while (true) {
|
||||
if (filter(delta->action)) {
|
||||
@ -1867,6 +1875,7 @@ bool InMemoryStorage::AppendToWal(const Transaction &transaction, uint64_t final
|
||||
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
|
||||
if (prev.type != PreviousPtr::Type::DELTA) break;
|
||||
delta = prev.delta;
|
||||
i += 1;
|
||||
}
|
||||
};
|
||||
|
||||
@ -1885,115 +1894,125 @@ bool InMemoryStorage::AppendToWal(const Transaction &transaction, uint64_t final
|
||||
|
||||
// 1. Process all Vertex deltas and store all operations that create vertices
|
||||
// and modify vertex data.
|
||||
for (const auto &delta : transaction.deltas) {
|
||||
auto prev = delta.prev.Get();
|
||||
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
|
||||
if (prev.type != PreviousPtr::Type::VERTEX) continue;
|
||||
find_and_apply_deltas(&delta, *prev.vertex, [](auto action) {
|
||||
switch (action) {
|
||||
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
|
||||
case Delta::Action::DELETE_OBJECT:
|
||||
case Delta::Action::SET_PROPERTY:
|
||||
case Delta::Action::ADD_LABEL:
|
||||
case Delta::Action::REMOVE_LABEL:
|
||||
return true;
|
||||
if (transaction.has_vertex_modifying_deltas) {
|
||||
for (const auto &delta : transaction.deltas) {
|
||||
auto prev = delta.prev.Get();
|
||||
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
|
||||
if (prev.type != PreviousPtr::Type::VERTEX) continue;
|
||||
find_and_apply_deltas(&delta, *prev.vertex, [](auto action) {
|
||||
switch (action) {
|
||||
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
|
||||
case Delta::Action::DELETE_OBJECT:
|
||||
case Delta::Action::SET_PROPERTY:
|
||||
case Delta::Action::ADD_LABEL:
|
||||
case Delta::Action::REMOVE_LABEL:
|
||||
return true;
|
||||
|
||||
case Delta::Action::RECREATE_OBJECT:
|
||||
case Delta::Action::ADD_IN_EDGE:
|
||||
case Delta::Action::ADD_OUT_EDGE:
|
||||
case Delta::Action::REMOVE_IN_EDGE:
|
||||
case Delta::Action::REMOVE_OUT_EDGE:
|
||||
return false;
|
||||
}
|
||||
});
|
||||
case Delta::Action::RECREATE_OBJECT:
|
||||
case Delta::Action::ADD_IN_EDGE:
|
||||
case Delta::Action::ADD_OUT_EDGE:
|
||||
case Delta::Action::REMOVE_IN_EDGE:
|
||||
case Delta::Action::REMOVE_OUT_EDGE:
|
||||
return false;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
// 2. Process all Vertex deltas and store all operations that create edges.
|
||||
for (const auto &delta : transaction.deltas) {
|
||||
auto prev = delta.prev.Get();
|
||||
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
|
||||
if (prev.type != PreviousPtr::Type::VERTEX) continue;
|
||||
find_and_apply_deltas(&delta, *prev.vertex, [](auto action) {
|
||||
switch (action) {
|
||||
case Delta::Action::REMOVE_OUT_EDGE:
|
||||
return true;
|
||||
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
|
||||
case Delta::Action::DELETE_OBJECT:
|
||||
case Delta::Action::RECREATE_OBJECT:
|
||||
case Delta::Action::SET_PROPERTY:
|
||||
case Delta::Action::ADD_LABEL:
|
||||
case Delta::Action::REMOVE_LABEL:
|
||||
case Delta::Action::ADD_IN_EDGE:
|
||||
case Delta::Action::ADD_OUT_EDGE:
|
||||
case Delta::Action::REMOVE_IN_EDGE:
|
||||
return false;
|
||||
}
|
||||
});
|
||||
if (transaction.has_edge_creating_deltas) {
|
||||
for (const auto &delta : transaction.deltas) {
|
||||
auto prev = delta.prev.Get();
|
||||
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
|
||||
if (prev.type != PreviousPtr::Type::VERTEX) continue;
|
||||
find_and_apply_deltas(&delta, *prev.vertex, [](auto action) {
|
||||
switch (action) {
|
||||
case Delta::Action::REMOVE_OUT_EDGE:
|
||||
return true;
|
||||
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
|
||||
case Delta::Action::DELETE_OBJECT:
|
||||
case Delta::Action::RECREATE_OBJECT:
|
||||
case Delta::Action::SET_PROPERTY:
|
||||
case Delta::Action::ADD_LABEL:
|
||||
case Delta::Action::REMOVE_LABEL:
|
||||
case Delta::Action::ADD_IN_EDGE:
|
||||
case Delta::Action::ADD_OUT_EDGE:
|
||||
case Delta::Action::REMOVE_IN_EDGE:
|
||||
return false;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
// 3. Process all Edge deltas and store all operations that modify edge data.
|
||||
for (const auto &delta : transaction.deltas) {
|
||||
auto prev = delta.prev.Get();
|
||||
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
|
||||
if (prev.type != PreviousPtr::Type::EDGE) continue;
|
||||
find_and_apply_deltas(&delta, *prev.edge, [](auto action) {
|
||||
switch (action) {
|
||||
case Delta::Action::SET_PROPERTY:
|
||||
return true;
|
||||
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
|
||||
case Delta::Action::DELETE_OBJECT:
|
||||
case Delta::Action::RECREATE_OBJECT:
|
||||
case Delta::Action::ADD_LABEL:
|
||||
case Delta::Action::REMOVE_LABEL:
|
||||
case Delta::Action::ADD_IN_EDGE:
|
||||
case Delta::Action::ADD_OUT_EDGE:
|
||||
case Delta::Action::REMOVE_IN_EDGE:
|
||||
case Delta::Action::REMOVE_OUT_EDGE:
|
||||
return false;
|
||||
}
|
||||
});
|
||||
if (transaction.has_edge_modifying_deltas) {
|
||||
for (const auto &delta : transaction.deltas) {
|
||||
auto prev = delta.prev.Get();
|
||||
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
|
||||
if (prev.type != PreviousPtr::Type::EDGE) continue;
|
||||
find_and_apply_deltas(&delta, *prev.edge, [](auto action) {
|
||||
switch (action) {
|
||||
case Delta::Action::SET_PROPERTY:
|
||||
return true;
|
||||
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
|
||||
case Delta::Action::DELETE_OBJECT:
|
||||
case Delta::Action::RECREATE_OBJECT:
|
||||
case Delta::Action::ADD_LABEL:
|
||||
case Delta::Action::REMOVE_LABEL:
|
||||
case Delta::Action::ADD_IN_EDGE:
|
||||
case Delta::Action::ADD_OUT_EDGE:
|
||||
case Delta::Action::REMOVE_IN_EDGE:
|
||||
case Delta::Action::REMOVE_OUT_EDGE:
|
||||
return false;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
// 4. Process all Vertex deltas and store all operations that delete edges.
|
||||
for (const auto &delta : transaction.deltas) {
|
||||
auto prev = delta.prev.Get();
|
||||
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
|
||||
if (prev.type != PreviousPtr::Type::VERTEX) continue;
|
||||
find_and_apply_deltas(&delta, *prev.vertex, [](auto action) {
|
||||
switch (action) {
|
||||
case Delta::Action::ADD_OUT_EDGE:
|
||||
return true;
|
||||
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
|
||||
case Delta::Action::DELETE_OBJECT:
|
||||
case Delta::Action::RECREATE_OBJECT:
|
||||
case Delta::Action::SET_PROPERTY:
|
||||
case Delta::Action::ADD_LABEL:
|
||||
case Delta::Action::REMOVE_LABEL:
|
||||
case Delta::Action::ADD_IN_EDGE:
|
||||
case Delta::Action::REMOVE_IN_EDGE:
|
||||
case Delta::Action::REMOVE_OUT_EDGE:
|
||||
return false;
|
||||
}
|
||||
});
|
||||
if (transaction.has_edge_deleting_deltas) {
|
||||
for (const auto &delta : transaction.deltas) {
|
||||
auto prev = delta.prev.Get();
|
||||
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
|
||||
if (prev.type != PreviousPtr::Type::VERTEX) continue;
|
||||
find_and_apply_deltas(&delta, *prev.vertex, [](auto action) {
|
||||
switch (action) {
|
||||
case Delta::Action::ADD_OUT_EDGE:
|
||||
return true;
|
||||
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
|
||||
case Delta::Action::DELETE_OBJECT:
|
||||
case Delta::Action::RECREATE_OBJECT:
|
||||
case Delta::Action::SET_PROPERTY:
|
||||
case Delta::Action::ADD_LABEL:
|
||||
case Delta::Action::REMOVE_LABEL:
|
||||
case Delta::Action::ADD_IN_EDGE:
|
||||
case Delta::Action::REMOVE_IN_EDGE:
|
||||
case Delta::Action::REMOVE_OUT_EDGE:
|
||||
return false;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
// 5. Process all Vertex deltas and store all operations that delete vertices.
|
||||
for (const auto &delta : transaction.deltas) {
|
||||
auto prev = delta.prev.Get();
|
||||
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
|
||||
if (prev.type != PreviousPtr::Type::VERTEX) continue;
|
||||
find_and_apply_deltas(&delta, *prev.vertex, [](auto action) {
|
||||
switch (action) {
|
||||
case Delta::Action::RECREATE_OBJECT:
|
||||
return true;
|
||||
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
|
||||
case Delta::Action::DELETE_OBJECT:
|
||||
case Delta::Action::SET_PROPERTY:
|
||||
case Delta::Action::ADD_LABEL:
|
||||
case Delta::Action::REMOVE_LABEL:
|
||||
case Delta::Action::ADD_IN_EDGE:
|
||||
case Delta::Action::ADD_OUT_EDGE:
|
||||
case Delta::Action::REMOVE_IN_EDGE:
|
||||
case Delta::Action::REMOVE_OUT_EDGE:
|
||||
return false;
|
||||
}
|
||||
});
|
||||
if (transaction.has_vertex_deleting_deltas) {
|
||||
for (const auto &delta : transaction.deltas) {
|
||||
auto prev = delta.prev.Get();
|
||||
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
|
||||
if (prev.type != PreviousPtr::Type::VERTEX) continue;
|
||||
find_and_apply_deltas(&delta, *prev.vertex, [](auto action) {
|
||||
switch (action) {
|
||||
case Delta::Action::RECREATE_OBJECT:
|
||||
return true;
|
||||
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
|
||||
case Delta::Action::DELETE_OBJECT:
|
||||
case Delta::Action::SET_PROPERTY:
|
||||
case Delta::Action::ADD_LABEL:
|
||||
case Delta::Action::REMOVE_LABEL:
|
||||
case Delta::Action::ADD_IN_EDGE:
|
||||
case Delta::Action::ADD_OUT_EDGE:
|
||||
case Delta::Action::REMOVE_IN_EDGE:
|
||||
case Delta::Action::REMOVE_OUT_EDGE:
|
||||
return false;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@ -2005,6 +2024,8 @@ bool InMemoryStorage::AppendToWal(const Transaction &transaction, uint64_t final
|
||||
});
|
||||
}
|
||||
|
||||
spdlog::debug("Number of iterations done: {}", i);
|
||||
|
||||
// Handle metadata deltas
|
||||
for (const auto &md_delta : transaction.md_deltas) {
|
||||
switch (md_delta.action) {
|
||||
|
@ -182,7 +182,8 @@ class InMemoryStorage final : public Storage {
|
||||
bool DeleteLabelIndexStats(const storage::LabelId &label) override;
|
||||
|
||||
Result<std::optional<std::pair<std::vector<VertexAccessor>, std::vector<EdgeAccessor>>>> DetachDelete(
|
||||
std::vector<VertexAccessor *> nodes, std::vector<EdgeAccessor *> edges, bool detach) override;
|
||||
std::vector<VertexAccessor *> nodes, std::vector<EdgeAccessor *> edges, bool detach,
|
||||
bool fast = false) override;
|
||||
|
||||
/// @throw std::bad_alloc
|
||||
Result<EdgeAccessor> CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type) override;
|
||||
|
@ -189,6 +189,36 @@ inline void CreateAndLinkDelta(Transaction *transaction, TObj *object, Args &&..
|
||||
// modification is being done, everybody else will wait until we are fully
|
||||
// done with our modification before they read the object's delta value.
|
||||
object->delta = delta;
|
||||
|
||||
auto pointer_type = object->delta->prev.Get().type;
|
||||
if (pointer_type == PreviousPtr::Type::VERTEX) {
|
||||
switch (delta->action) {
|
||||
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
|
||||
case Delta::Action::DELETE_OBJECT:
|
||||
case Delta::Action::SET_PROPERTY:
|
||||
case Delta::Action::ADD_LABEL:
|
||||
case Delta::Action::REMOVE_LABEL:
|
||||
transaction->has_vertex_modifying_deltas = true;
|
||||
break;
|
||||
case Delta::Action::ADD_OUT_EDGE:
|
||||
transaction->has_edge_deleting_deltas = true;
|
||||
break;
|
||||
case Delta::Action::REMOVE_OUT_EDGE:
|
||||
transaction->has_edge_creating_deltas = true;
|
||||
break;
|
||||
case Delta::Action::RECREATE_OBJECT:
|
||||
transaction->has_vertex_deleting_deltas = true;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
} else if (pointer_type == PreviousPtr::Type::EDGE) {
|
||||
switch (delta->action) {
|
||||
case Delta::Action::SET_PROPERTY:
|
||||
transaction->has_edge_modifying_deltas = true;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace memgraph::storage
|
||||
|
@ -222,7 +222,8 @@ Result<std::optional<EdgeAccessor>> Storage::Accessor::DeleteEdge(EdgeAccessor *
|
||||
}
|
||||
|
||||
Result<std::optional<std::pair<std::vector<VertexAccessor>, std::vector<EdgeAccessor>>>>
|
||||
Storage::Accessor::DetachDelete(std::vector<VertexAccessor *> nodes, std::vector<EdgeAccessor *> edges, bool detach) {
|
||||
Storage::Accessor::DetachDelete(std::vector<VertexAccessor *> nodes, std::vector<EdgeAccessor *> edges, bool detach,
|
||||
bool fast) {
|
||||
using ReturnType = std::pair<std::vector<VertexAccessor>, std::vector<EdgeAccessor>>;
|
||||
if (storage_->storage_mode_ == StorageMode::ON_DISK_TRANSACTIONAL) {
|
||||
for (const auto *vertex : nodes) {
|
||||
@ -238,6 +239,15 @@ Storage::Accessor::DetachDelete(std::vector<VertexAccessor *> nodes, std::vector
|
||||
}
|
||||
}
|
||||
|
||||
if (detach && fast) {
|
||||
auto maybe_deleted_entities = DeleteVerticesFast(nodes);
|
||||
if (maybe_deleted_entities.HasError()) {
|
||||
return maybe_deleted_entities.GetError();
|
||||
}
|
||||
|
||||
return maybe_deleted_entities.GetValue();
|
||||
}
|
||||
|
||||
// 1. Gather nodes which are not deleted yet in the system
|
||||
auto maybe_nodes_to_delete = PrepareDeletableNodes(nodes);
|
||||
if (maybe_nodes_to_delete.HasError()) {
|
||||
@ -365,6 +375,84 @@ EdgeInfoForDeletion Storage::Accessor::PrepareDeletableEdges(const std::unordere
|
||||
.partial_dest_vertices = std::move(partial_dest_vertices)};
|
||||
}
|
||||
|
||||
Result<std::optional<std::pair<std::vector<VertexAccessor>, std::vector<EdgeAccessor>>>>
|
||||
Storage::Accessor::DeleteVerticesFast(const std::vector<VertexAccessor *> &vertices) {
|
||||
using ReturnType = std::pair<std::vector<VertexAccessor>, std::vector<EdgeAccessor>>;
|
||||
|
||||
std::vector<VertexAccessor> deleted_vertices;
|
||||
deleted_vertices.reserve(vertices.size());
|
||||
std::vector<EdgeAccessor> deleted_edges;
|
||||
for (auto *vertex_accessor : vertices) {
|
||||
auto vertex_ptr = vertex_accessor->vertex_;
|
||||
auto vertex_lock = std::unique_lock{vertex_ptr->lock};
|
||||
|
||||
if (!PrepareForWrite(&transaction_, vertex_ptr)) return Error::SERIALIZATION_ERROR;
|
||||
MG_ASSERT(!vertex_ptr->deleted, "Invalid database state!");
|
||||
|
||||
while (!vertex_ptr->in_edges.empty()) {
|
||||
auto const &[edge_type, opposing_vertex, edge_ref] = *vertex_ptr->in_edges.rbegin();
|
||||
std::unique_lock<utils::RWSpinLock> guard;
|
||||
if (storage_->config_.salient.items.properties_on_edges) {
|
||||
auto *edge_ptr = edge_ref.ptr;
|
||||
guard = std::unique_lock{edge_ptr->lock};
|
||||
|
||||
if (!PrepareForWrite(&transaction_, edge_ptr)) return Error::SERIALIZATION_ERROR;
|
||||
}
|
||||
|
||||
// MarkEdgeAsDeleted allocates additional memory
|
||||
// and CreateAndLinkDelta needs memory
|
||||
utils::AtomicMemoryBlock atomic_memory_block{[in_edges = &vertex_ptr->in_edges, &vertex_ptr,
|
||||
edge_type = edge_type, opposing_vertex = opposing_vertex,
|
||||
edge_ref = edge_ref, this]() {
|
||||
in_edges->pop_back();
|
||||
if (this->storage_->config_.salient.items.properties_on_edges) {
|
||||
auto *edge_ptr = edge_ref.ptr;
|
||||
MarkEdgeAsDeleted(edge_ptr);
|
||||
}
|
||||
|
||||
CreateAndLinkDelta(&transaction_, vertex_ptr, Delta::AddInEdgeTag(), edge_type, opposing_vertex, edge_ref);
|
||||
}};
|
||||
std::invoke(atomic_memory_block);
|
||||
}
|
||||
while (!vertex_ptr->out_edges.empty()) {
|
||||
auto const &[edge_type, opposing_vertex, edge_ref] = *vertex_ptr->out_edges.rbegin();
|
||||
std::unique_lock<utils::RWSpinLock> guard;
|
||||
if (storage_->config_.salient.items.properties_on_edges) {
|
||||
auto *edge_ptr = edge_ref.ptr;
|
||||
guard = std::unique_lock{edge_ptr->lock};
|
||||
|
||||
if (!PrepareForWrite(&transaction_, edge_ptr)) return Error::SERIALIZATION_ERROR;
|
||||
}
|
||||
|
||||
// MarkEdgeAsDeleted allocates additional memory
|
||||
// and CreateAndLinkDelta needs memory
|
||||
utils::AtomicMemoryBlock atomic_memory_block{[out_edges = &vertex_ptr->out_edges, &vertex_ptr, &deleted_edges,
|
||||
edge_type = edge_type, opposing_vertex = opposing_vertex,
|
||||
edge_ref = edge_ref, this]() {
|
||||
out_edges->pop_back();
|
||||
if (this->storage_->config_.salient.items.properties_on_edges) {
|
||||
auto *edge_ptr = edge_ref.ptr;
|
||||
MarkEdgeAsDeleted(edge_ptr);
|
||||
}
|
||||
|
||||
CreateAndLinkDelta(&transaction_, vertex_ptr, Delta::AddOutEdgeTag(), edge_type, opposing_vertex, edge_ref);
|
||||
deleted_edges.emplace_back(edge_ref, edge_type, vertex_ptr, opposing_vertex, storage_, &transaction_, true);
|
||||
}};
|
||||
std::invoke(atomic_memory_block);
|
||||
}
|
||||
|
||||
if (!vertex_ptr->in_edges.empty() || !vertex_ptr->out_edges.empty()) {
|
||||
return Error::VERTEX_HAS_EDGES;
|
||||
}
|
||||
|
||||
CreateAndLinkDelta(&transaction_, vertex_ptr, Delta::RecreateObjectTag());
|
||||
vertex_ptr->deleted = true;
|
||||
|
||||
deleted_vertices.emplace_back(vertex_ptr, storage_, &transaction_, true);
|
||||
}
|
||||
return std::make_optional<ReturnType>(deleted_vertices, deleted_edges);
|
||||
}
|
||||
|
||||
Result<std::optional<std::vector<EdgeAccessor>>> Storage::Accessor::ClearEdgesOnVertices(
|
||||
const std::unordered_set<Vertex *> &vertices, std::unordered_set<Gid> &deleted_edge_ids) {
|
||||
// We want to gather all edges that we delete in this step so that we can proceed with
|
||||
|
@ -178,7 +178,7 @@ class Storage {
|
||||
VertexAccessor *vertex);
|
||||
|
||||
virtual Result<std::optional<std::pair<std::vector<VertexAccessor>, std::vector<EdgeAccessor>>>> DetachDelete(
|
||||
std::vector<VertexAccessor *> nodes, std::vector<EdgeAccessor *> edges, bool detach);
|
||||
std::vector<VertexAccessor *> nodes, std::vector<EdgeAccessor *> edges, bool detach, bool fast = false);
|
||||
|
||||
virtual uint64_t ApproximateVertexCount() const = 0;
|
||||
|
||||
@ -289,6 +289,8 @@ class Storage {
|
||||
bool is_transaction_active_;
|
||||
|
||||
// Detach delete private methods
|
||||
Result<std::optional<std::pair<std::vector<VertexAccessor>, std::vector<EdgeAccessor>>>> DeleteVerticesFast(
|
||||
const std::vector<VertexAccessor *> &vertices);
|
||||
Result<std::optional<std::unordered_set<Vertex *>>> PrepareDeletableNodes(
|
||||
const std::vector<VertexAccessor *> &vertices);
|
||||
EdgeInfoForDeletion PrepareDeletableEdges(const std::unordered_set<Vertex *> &vertices,
|
||||
|
@ -90,6 +90,11 @@ struct Transaction {
|
||||
|
||||
std::deque<Delta> deltas;
|
||||
utils::pmr::list<MetadataDelta> md_deltas;
|
||||
bool has_vertex_modifying_deltas{false};
|
||||
bool has_edge_creating_deltas{false};
|
||||
bool has_edge_modifying_deltas{false};
|
||||
bool has_edge_deleting_deltas{false};
|
||||
bool has_vertex_deleting_deltas{false};
|
||||
bool must_abort{};
|
||||
IsolationLevel isolation_level{};
|
||||
StorageMode storage_mode{};
|
||||
|
Loading…
Reference in New Issue
Block a user