From 9c51dbbb019828fd01d8b2336b5c6d2f7d27f1bd Mon Sep 17 00:00:00 2001 From: imilinovic <44698587+imilinovic@users.noreply.github.com> Date: Fri, 8 Sep 2023 12:52:40 +0200 Subject: [PATCH] Implement changing from and to vertices in relationships (#1221) --- include/_mgp.hpp | 10 + include/mg_procedure.h | 16 ++ include/mgp.hpp | 16 ++ src/query/db_accessor.cpp | 18 ++ src/query/db_accessor.hpp | 16 ++ src/query/procedure/mg_procedure_impl.cpp | 77 +++++++ src/storage/v2/disk/storage.cpp | 11 + src/storage/v2/disk/storage.hpp | 4 + src/storage/v2/inmemory/storage.cpp | 208 +++++++++++++++++- src/storage/v2/inmemory/storage.hpp | 4 + src/storage/v2/storage.hpp | 4 + tests/e2e/CMakeLists.txt | 1 + tests/e2e/transaction_rollback/CMakeLists.txt | 9 + tests/e2e/transaction_rollback/common.py | 25 +++ tests/e2e/transaction_rollback/conftest.py | 21 ++ .../procedures/CMakeLists.txt | 1 + .../procedures/procedures.cpp | 76 +++++++ tests/e2e/transaction_rollback/transaction.py | 58 +++++ tests/e2e/transaction_rollback/workloads.yaml | 19 ++ tests/unit/cpp_api.cpp | 44 ++++ 20 files changed, 636 insertions(+), 2 deletions(-) create mode 100644 tests/e2e/transaction_rollback/CMakeLists.txt create mode 100644 tests/e2e/transaction_rollback/common.py create mode 100644 tests/e2e/transaction_rollback/conftest.py create mode 100644 tests/e2e/transaction_rollback/procedures/CMakeLists.txt create mode 100644 tests/e2e/transaction_rollback/procedures/procedures.cpp create mode 100644 tests/e2e/transaction_rollback/transaction.py create mode 100644 tests/e2e/transaction_rollback/workloads.yaml diff --git a/include/_mgp.hpp b/include/_mgp.hpp index cc1daebf7..2c3405ad2 100644 --- a/include/_mgp.hpp +++ b/include/_mgp.hpp @@ -255,6 +255,16 @@ inline mgp_edge *graph_create_edge(mgp_graph *graph, mgp_vertex *from, mgp_verte return MgInvoke(mgp_graph_create_edge, graph, from, to, type, memory); } +inline mgp_edge *graph_edge_set_from(struct mgp_graph *graph, struct mgp_edge *e, struct mgp_vertex *new_from, + mgp_memory *memory) { + return MgInvoke(mgp_graph_edge_set_from, graph, e, new_from, memory); +} + +inline mgp_edge *graph_edge_set_to(struct mgp_graph *graph, struct mgp_edge *e, struct mgp_vertex *new_to, + mgp_memory *memory) { + return MgInvoke(mgp_graph_edge_set_to, graph, e, new_to, memory); +} + inline void graph_delete_edge(mgp_graph *graph, mgp_edge *edge) { MgInvokeVoid(mgp_graph_delete_edge, graph, edge); } inline mgp_vertex *graph_get_vertex_by_id(mgp_graph *g, mgp_vertex_id id, mgp_memory *memory) { diff --git a/include/mg_procedure.h b/include/mg_procedure.h index 4c771b4d2..dd6107583 100644 --- a/include/mg_procedure.h +++ b/include/mg_procedure.h @@ -888,6 +888,22 @@ enum mgp_error mgp_graph_detach_delete_vertex(struct mgp_graph *graph, struct mg enum mgp_error mgp_graph_create_edge(struct mgp_graph *graph, struct mgp_vertex *from, struct mgp_vertex *to, struct mgp_edge_type type, struct mgp_memory *memory, struct mgp_edge **result); +/// Change edge from vertex +/// Return mgp_error::MGP_ERROR_IMMUTABLE_OBJECT if `graph` is immutable. +/// Return mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE if unable to allocate a mgp_edge. +/// Return mgp_error::MGP_ERROR_DELETED_OBJECT if `from` or `to` has been deleted. +/// Return mgp_error::MGP_ERROR_SERIALIZATION_ERROR if `from` or `to` has been modified by another transaction. +enum mgp_error mgp_graph_edge_set_from(struct mgp_graph *graph, struct mgp_edge *e, struct mgp_vertex *new_from, + struct mgp_memory *memory, struct mgp_edge **result); + +/// Change edge to vertex +/// Return mgp_error::MGP_ERROR_IMMUTABLE_OBJECT if `graph` is immutable. +/// Return mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE if unable to allocate a mgp_edge. +/// Return mgp_error::MGP_ERROR_DELETED_OBJECT if `from` or `to` has been deleted. +/// Return mgp_error::MGP_ERROR_SERIALIZATION_ERROR if `from` or `to` has been modified by another transaction. +enum mgp_error mgp_graph_edge_set_to(struct mgp_graph *graph, struct mgp_edge *e, struct mgp_vertex *new_to, + struct mgp_memory *memory, struct mgp_edge **result); + /// Delete an edge from the graph. /// Return mgp_error::MGP_ERROR_IMMUTABLE_OBJECT if `graph` is immutable. /// Return mgp_error::MGP_ERROR_SERIALIZATION_ERROR if `edge`, its source or destination vertex has been modified by diff --git a/include/mgp.hpp b/include/mgp.hpp index 4c1a67a0a..68bd0c75c 100644 --- a/include/mgp.hpp +++ b/include/mgp.hpp @@ -256,6 +256,10 @@ class Graph { void DetachDeleteNode(const Node &node); /// @brief Creates a relationship of type `type` between nodes `from` and `to` and adds it to the graph. Relationship CreateRelationship(const Node &from, const Node &to, const std::string_view type); + /// @brief Changes a relationship from node. + void SetFrom(Relationship &relationship, const Node &new_from); + /// @brief Changes a relationship to node. + void SetTo(Relationship &relationship, const Node &new_to); /// @brief Deletes a relationship from the graph. void DeleteRelationship(const Relationship &relationship); @@ -2009,6 +2013,18 @@ inline Relationship Graph::CreateRelationship(const Node &from, const Node &to, return relationship; } +inline void Graph::SetFrom(Relationship &relationship, const Node &new_from) { + mgp_edge *edge = mgp::MemHandlerCallback(mgp::graph_edge_set_from, graph_, relationship.ptr_, new_from.ptr_); + relationship = Relationship(edge); + mgp::edge_destroy(edge); +} + +inline void Graph::SetTo(Relationship &relationship, const Node &new_to) { + mgp_edge *edge = mgp::MemHandlerCallback(mgp::graph_edge_set_to, graph_, relationship.ptr_, new_to.ptr_); + relationship = Relationship(edge); + mgp::edge_destroy(edge); +} + inline void Graph::DeleteRelationship(const Relationship &relationship) { mgp::graph_delete_edge(graph_, relationship.ptr_); } diff --git a/src/query/db_accessor.cpp b/src/query/db_accessor.cpp index 80f0b3e88..7412067ff 100644 --- a/src/query/db_accessor.cpp +++ b/src/query/db_accessor.cpp @@ -76,6 +76,24 @@ SubgraphDbAccessor::DetachRemoveVertex( // NOLINT(readability-convert-member-fu "Vertex holds only partial information about edges. Cannot detach delete safely while using projected graph."}; } +storage::Result SubgraphDbAccessor::EdgeSetFrom(EdgeAccessor *edge, SubgraphVertexAccessor *new_from) { + VertexAccessor *new_from_impl = &new_from->impl_; + if (!this->graph_->ContainsVertex(*new_from_impl)) { + throw std::logic_error{"Projected graph must contain the new `from` vertex!"}; + } + auto result = db_accessor_.EdgeSetFrom(edge, new_from_impl); + return result; +} + +storage::Result SubgraphDbAccessor::EdgeSetTo(EdgeAccessor *edge, SubgraphVertexAccessor *new_to) { + VertexAccessor *new_to_impl = &new_to->impl_; + if (!this->graph_->ContainsVertex(*new_to_impl)) { + throw std::logic_error{"Projected graph must contain the new `to` vertex!"}; + } + auto result = db_accessor_.EdgeSetTo(edge, new_to_impl); + return result; +} + storage::Result> SubgraphDbAccessor::RemoveVertex( SubgraphVertexAccessor *subgraphvertex_accessor) { VertexAccessor *vertex_accessor = &subgraphvertex_accessor->impl_; diff --git a/src/query/db_accessor.hpp b/src/query/db_accessor.hpp index aa40c6a2d..1794b8308 100644 --- a/src/query/db_accessor.hpp +++ b/src/query/db_accessor.hpp @@ -380,6 +380,18 @@ class DbAccessor final { return EdgeAccessor(*maybe_edge); } + storage::Result EdgeSetFrom(EdgeAccessor *edge, VertexAccessor *new_from) { + auto changed_edge = accessor_->EdgeSetFrom(&edge->impl_, &new_from->impl_); + if (changed_edge.HasError()) return storage::Result(changed_edge.GetError()); + return EdgeAccessor(*changed_edge); + } + + storage::Result EdgeSetTo(EdgeAccessor *edge, VertexAccessor *new_to) { + auto changed_edge = accessor_->EdgeSetTo(&edge->impl_, &new_to->impl_); + if (changed_edge.HasError()) return storage::Result(changed_edge.GetError()); + return EdgeAccessor(*changed_edge); + } + storage::Result> RemoveEdge(EdgeAccessor *edge) { auto res = accessor_->DeleteEdge(&edge->impl_); if (res.HasError()) { @@ -549,6 +561,10 @@ class SubgraphDbAccessor final { storage::Result InsertEdge(SubgraphVertexAccessor *from, SubgraphVertexAccessor *to, const storage::EdgeTypeId &edge_type); + storage::Result EdgeSetFrom(EdgeAccessor *edge, SubgraphVertexAccessor *new_from); + + storage::Result EdgeSetTo(EdgeAccessor *edge, SubgraphVertexAccessor *new_to); + storage::Result>>> DetachRemoveVertex( SubgraphVertexAccessor *vertex_accessor); diff --git a/src/query/procedure/mg_procedure_impl.cpp b/src/query/procedure/mg_procedure_impl.cpp index 5e3a147e3..0f3ab6bbd 100644 --- a/src/query/procedure/mg_procedure_impl.cpp +++ b/src/query/procedure/mg_procedure_impl.cpp @@ -2721,6 +2721,83 @@ mgp_error mgp_graph_create_edge(mgp_graph *graph, mgp_vertex *from, mgp_vertex * result); } +namespace { + +mgp_edge *EdgeSet(auto *e, auto *new_vertex, auto *memory, auto *graph, bool set_from) { + auto *ctx = graph->ctx; +#ifdef MG_ENTERPRISE + if (memgraph::license::global_license_checker.IsEnterpriseValidFast() && ctx && ctx->auth_checker && + !ctx->auth_checker->Has(e->impl, memgraph::query::AuthQuery::FineGrainedPrivilege::CREATE_DELETE)) { + throw AuthorizationException{"Insufficient permissions for changing an edge!"}; + } +#endif + if (!MgpGraphIsMutable(*graph)) { + throw ImmutableObjectException{"Cannot change the edge because the graph is immutable!"}; + } + + auto edge = std::visit( + memgraph::utils::Overloaded{ + [&e, &new_vertex, &set_from]( + memgraph::query::DbAccessor *accessor) -> memgraph::storage::Result { + if (set_from) { + return accessor->EdgeSetFrom(&e->impl, &std::get(new_vertex->impl)); + } + return accessor->EdgeSetTo(&e->impl, &std::get(new_vertex->impl)); + }, + [&e, &new_vertex, &set_from](memgraph::query::SubgraphDbAccessor *accessor) + -> memgraph::storage::Result { + if (set_from) { + return accessor->EdgeSetFrom(&e->impl, + &std::get(new_vertex->impl)); + } + return accessor->EdgeSetTo(&e->impl, &std::get(new_vertex->impl)); + }}, + graph->impl); + + if (edge.HasError()) { + switch (edge.GetError()) { + case memgraph::storage::Error::NONEXISTENT_OBJECT: + LOG_FATAL("Query modules shouldn't have access to nonexistent objects when changing an edge!"); + case memgraph::storage::Error::DELETED_OBJECT: + LOG_FATAL("The edge was already deleted."); + case memgraph::storage::Error::PROPERTIES_DISABLED: + case memgraph::storage::Error::VERTEX_HAS_EDGES: + LOG_FATAL("Unexpected error when removing an edge."); + case memgraph::storage::Error::SERIALIZATION_ERROR: + throw SerializationException{"Cannot serialize changing an edge."}; + } + } + + if (ctx->trigger_context_collector) { + ctx->trigger_context_collector->RegisterDeletedObject(e->impl); + ctx->trigger_context_collector->RegisterCreatedObject(*edge); + } + + return std::visit( + memgraph::utils::Overloaded{ + [&memory, &edge, &new_vertex](memgraph::query::DbAccessor *) -> mgp_edge * { + return NewRawMgpObject(memory->impl, edge.GetValue(), new_vertex->graph); + }, + [&memory, &edge, &new_vertex](memgraph::query::SubgraphDbAccessor *db_impl) -> mgp_edge * { + const auto v_from = memgraph::query::SubgraphVertexAccessor(edge.GetValue().From(), db_impl->getGraph()); + const auto v_to = memgraph::query::SubgraphVertexAccessor(edge.GetValue().To(), db_impl->getGraph()); + return NewRawMgpObject(memory->impl, edge.GetValue(), v_from, v_to, new_vertex->graph); + }}, + new_vertex->graph->impl); +} + +} // namespace + +mgp_error mgp_graph_edge_set_from(struct mgp_graph *graph, struct mgp_edge *e, struct mgp_vertex *new_from, + mgp_memory *memory, mgp_edge **result) { + return WrapExceptions([&]() -> mgp_edge * { return EdgeSet(e, new_from, memory, graph, true); }, result); +} + +mgp_error mgp_graph_edge_set_to(struct mgp_graph *graph, struct mgp_edge *e, struct mgp_vertex *new_to, + mgp_memory *memory, mgp_edge **result) { + return WrapExceptions([&]() -> mgp_edge * { return EdgeSet(e, new_to, memory, graph, false); }, result); +} + mgp_error mgp_graph_delete_edge(struct mgp_graph *graph, mgp_edge *edge) { return WrapExceptions([=] { auto *ctx = graph->ctx; diff --git a/src/storage/v2/disk/storage.cpp b/src/storage/v2/disk/storage.cpp index a94c01552..c5aa69e16 100644 --- a/src/storage/v2/disk/storage.cpp +++ b/src/storage/v2/disk/storage.cpp @@ -36,6 +36,7 @@ #include "storage/v2/disk/rocksdb_storage.hpp" #include "storage/v2/disk/storage.hpp" #include "storage/v2/disk/unique_constraints.hpp" +#include "storage/v2/edge_accessor.hpp" #include "storage/v2/edge_import_mode.hpp" #include "storage/v2/edge_ref.hpp" #include "storage/v2/id_types.hpp" @@ -1366,6 +1367,16 @@ Result> DiskStorage::DiskAccessor::DeleteEdge(EdgeAc &storage_->indices_, &storage_->constraints_, config_, true); } +Result DiskStorage::DiskAccessor::EdgeSetFrom(EdgeAccessor * /*edge*/, VertexAccessor * /*new_from*/) { + MG_ASSERT(false, "EdgeSetFrom is currently only implemented for InMemory storage"); + return Error::NONEXISTENT_OBJECT; +} + +Result DiskStorage::DiskAccessor::EdgeSetTo(EdgeAccessor * /*edge*/, VertexAccessor * /*new_to*/) { + MG_ASSERT(false, "EdgeSetTo is currently only implemented for InMemory storage"); + return Error::NONEXISTENT_OBJECT; +} + /// TODO: at which storage naming /// TODO: this method should also delete the old key bool DiskStorage::DiskAccessor::WriteVertexToDisk(const Vertex &vertex) { diff --git a/src/storage/v2/disk/storage.hpp b/src/storage/v2/disk/storage.hpp index 597a49b55..c8124e7a1 100644 --- a/src/storage/v2/disk/storage.hpp +++ b/src/storage/v2/disk/storage.hpp @@ -184,6 +184,10 @@ class DiskStorage final : public Storage { Result CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type) override; + Result EdgeSetFrom(EdgeAccessor *edge, VertexAccessor *new_from) override; + + Result EdgeSetTo(EdgeAccessor *edge, VertexAccessor *new_to) override; + Result> DeleteEdge(EdgeAccessor *edge) override; bool LabelIndexExists(LabelId label) const override { diff --git a/src/storage/v2/inmemory/storage.cpp b/src/storage/v2/inmemory/storage.cpp index 048096493..d26b75b9b 100644 --- a/src/storage/v2/inmemory/storage.cpp +++ b/src/storage/v2/inmemory/storage.cpp @@ -496,12 +496,216 @@ Result InMemoryStorage::InMemoryAccessor::CreateEdgeEx(VertexAcces &storage_->constraints_, config_); } +Result InMemoryStorage::InMemoryAccessor::EdgeSetFrom(EdgeAccessor *edge, VertexAccessor *new_from) { + MG_ASSERT(edge->transaction_ == new_from->transaction_, + "EdgeAccessor must be from the same transaction as the new from vertex " + "accessor when deleting an edge!"); + MG_ASSERT(edge->transaction_ == &transaction_, + "EdgeAccessor must be from the same transaction as the storage " + "accessor when changing an edge!"); + + auto *old_from_vertex = edge->from_vertex_; + auto *new_from_vertex = new_from->vertex_; + auto *to_vertex = edge->to_vertex_; + + if (old_from_vertex->gid == new_from_vertex->gid) return *edge; + + auto edge_ref = edge->edge_; + auto edge_type = edge->edge_type_; + + std::unique_lock guard; + if (config_.properties_on_edges) { + auto *edge_ptr = edge_ref.ptr; + guard = std::unique_lock{edge_ptr->lock}; + + if (!PrepareForWrite(&transaction_, edge_ptr)) return Error::SERIALIZATION_ERROR; + + if (edge_ptr->deleted) return Error::DELETED_OBJECT; + } + + std::unique_lock guard_old_from(old_from_vertex->lock, std::defer_lock); + std::unique_lock guard_new_from(new_from_vertex->lock, std::defer_lock); + std::unique_lock guard_to(to_vertex->lock, std::defer_lock); + + // lock in increasing gid order, if two vertices have the same gid need to only lock once + std::vector vertices{old_from_vertex, new_from_vertex, to_vertex}; + std::sort(vertices.begin(), vertices.end(), [](auto x, auto y) { return x->gid < y->gid; }); + vertices.erase(std::unique(vertices.begin(), vertices.end(), [](auto x, auto y) { return x->gid == y->gid; }), + vertices.end()); + + for (auto *vertex : vertices) { + if (vertex == old_from_vertex) { + guard_old_from.lock(); + } else if (vertex == new_from_vertex) { + guard_new_from.lock(); + } else if (vertex == to_vertex) { + guard_to.lock(); + } else { + return Error::NONEXISTENT_OBJECT; + } + } + + if (!PrepareForWrite(&transaction_, old_from_vertex)) return Error::SERIALIZATION_ERROR; + MG_ASSERT(!old_from_vertex->deleted, "Invalid database state!"); + + if (!PrepareForWrite(&transaction_, new_from_vertex)) return Error::SERIALIZATION_ERROR; + MG_ASSERT(!new_from_vertex->deleted, "Invalid database state!"); + + if (to_vertex != old_from_vertex && to_vertex != new_from_vertex) { + if (!PrepareForWrite(&transaction_, to_vertex)) return Error::SERIALIZATION_ERROR; + MG_ASSERT(!to_vertex->deleted, "Invalid database state!"); + } + + auto delete_edge_from_storage = [&edge_type, &edge_ref, this](auto *vertex, auto *edges) { + std::tuple link(edge_type, vertex, edge_ref); + auto it = std::find(edges->begin(), edges->end(), link); + if (config_.properties_on_edges) { + MG_ASSERT(it != edges->end(), "Invalid database state!"); + } else if (it == edges->end()) { + return false; + } + std::swap(*it, *edges->rbegin()); + edges->pop_back(); + return true; + }; + + auto op1 = delete_edge_from_storage(to_vertex, &old_from_vertex->out_edges); + auto op2 = delete_edge_from_storage(old_from_vertex, &to_vertex->in_edges); + + if (config_.properties_on_edges) { + MG_ASSERT((op1 && op2), "Invalid database state!"); + } else { + MG_ASSERT((op1 && op2) || (!op1 && !op2), "Invalid database state!"); + if (!op1 && !op2) { + // The edge is already deleted. + return Error::DELETED_OBJECT; + } + } + + CreateAndLinkDelta(&transaction_, old_from_vertex, Delta::AddOutEdgeTag(), edge_type, to_vertex, edge_ref); + CreateAndLinkDelta(&transaction_, to_vertex, Delta::AddInEdgeTag(), edge_type, old_from_vertex, edge_ref); + + CreateAndLinkDelta(&transaction_, new_from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge_ref); + new_from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge_ref); + CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), edge_type, new_from_vertex, edge_ref); + to_vertex->in_edges.emplace_back(edge_type, new_from_vertex, edge_ref); + + transaction_.manyDeltasCache.Invalidate(new_from_vertex, edge_type, EdgeDirection::OUT); + transaction_.manyDeltasCache.Invalidate(old_from_vertex, edge_type, EdgeDirection::OUT); + transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN); + + return EdgeAccessor(edge_ref, edge_type, new_from_vertex, to_vertex, &transaction_, &storage_->indices_, + &storage_->constraints_, config_); +} + +Result InMemoryStorage::InMemoryAccessor::EdgeSetTo(EdgeAccessor *edge, VertexAccessor *new_to) { + MG_ASSERT(edge->transaction_ == new_to->transaction_, + "EdgeAccessor must be from the same transaction as the new to vertex " + "accessor when deleting an edge!"); + MG_ASSERT(edge->transaction_ == &transaction_, + "EdgeAccessor must be from the same transaction as the storage " + "accessor when deleting an edge!"); + + auto *from_vertex = edge->from_vertex_; + auto *old_to_vertex = edge->to_vertex_; + auto *new_to_vertex = new_to->vertex_; + + if (old_to_vertex->gid == new_to_vertex->gid) return *edge; + + auto &edge_ref = edge->edge_; + auto &edge_type = edge->edge_type_; + + std::unique_lock guard; + if (config_.properties_on_edges) { + auto *edge_ptr = edge_ref.ptr; + guard = std::unique_lock{edge_ptr->lock}; + + if (!PrepareForWrite(&transaction_, edge_ptr)) return Error::SERIALIZATION_ERROR; + + if (edge_ptr->deleted) return Error::DELETED_OBJECT; + } + + std::unique_lock guard_from(from_vertex->lock, std::defer_lock); + std::unique_lock guard_old_to(old_to_vertex->lock, std::defer_lock); + std::unique_lock guard_new_to(new_to_vertex->lock, std::defer_lock); + + // lock in increasing gid order, if two vertices have the same gid need to only lock once + std::vector vertices{from_vertex, old_to_vertex, new_to_vertex}; + std::sort(vertices.begin(), vertices.end(), [](auto x, auto y) { return x->gid < y->gid; }); + vertices.erase(std::unique(vertices.begin(), vertices.end(), [](auto x, auto y) { return x->gid == y->gid; }), + vertices.end()); + + for (auto *vertex : vertices) { + if (vertex == from_vertex) { + guard_from.lock(); + } else if (vertex == old_to_vertex) { + guard_old_to.lock(); + } else if (vertex == new_to_vertex) { + guard_new_to.lock(); + } else { + return Error::NONEXISTENT_OBJECT; + } + } + + if (!PrepareForWrite(&transaction_, old_to_vertex)) return Error::SERIALIZATION_ERROR; + MG_ASSERT(!old_to_vertex->deleted, "Invalid database state!"); + + if (!PrepareForWrite(&transaction_, new_to_vertex)) return Error::SERIALIZATION_ERROR; + MG_ASSERT(!new_to_vertex->deleted, "Invalid database state!"); + + if (from_vertex != old_to_vertex && from_vertex != new_to_vertex) { + if (!PrepareForWrite(&transaction_, from_vertex)) return Error::SERIALIZATION_ERROR; + MG_ASSERT(!from_vertex->deleted, "Invalid database state!"); + } + + auto delete_edge_from_storage = [&edge_type, &edge_ref, this](auto *vertex, auto *edges) { + std::tuple link(edge_type, vertex, edge_ref); + auto it = std::find(edges->begin(), edges->end(), link); + if (config_.properties_on_edges) { + MG_ASSERT(it != edges->end(), "Invalid database state!"); + } else if (it == edges->end()) { + return false; + } + std::swap(*it, *edges->rbegin()); + edges->pop_back(); + return true; + }; + + auto op1 = delete_edge_from_storage(old_to_vertex, &from_vertex->out_edges); + auto op2 = delete_edge_from_storage(from_vertex, &old_to_vertex->in_edges); + + if (config_.properties_on_edges) { + MG_ASSERT((op1 && op2), "Invalid database state!"); + } else { + MG_ASSERT((op1 && op2) || (!op1 && !op2), "Invalid database state!"); + if (!op1 && !op2) { + // The edge is already deleted. + return Error::DELETED_OBJECT; + } + } + + CreateAndLinkDelta(&transaction_, from_vertex, Delta::AddOutEdgeTag(), edge_type, old_to_vertex, edge_ref); + CreateAndLinkDelta(&transaction_, old_to_vertex, Delta::AddInEdgeTag(), edge_type, from_vertex, edge_ref); + + CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, new_to_vertex, edge_ref); + from_vertex->out_edges.emplace_back(edge_type, new_to_vertex, edge_ref); + CreateAndLinkDelta(&transaction_, new_to_vertex, Delta::RemoveInEdgeTag(), edge_type, from_vertex, edge_ref); + new_to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge_ref); + + transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT); + transaction_.manyDeltasCache.Invalidate(old_to_vertex, edge_type, EdgeDirection::IN); + transaction_.manyDeltasCache.Invalidate(new_to_vertex, edge_type, EdgeDirection::IN); + + return EdgeAccessor(edge_ref, edge_type, from_vertex, new_to_vertex, &transaction_, &storage_->indices_, + &storage_->constraints_, config_); +} + Result> InMemoryStorage::InMemoryAccessor::DeleteEdge(EdgeAccessor *edge) { MG_ASSERT(edge->transaction_ == &transaction_, "EdgeAccessor must be from the same transaction as the storage " "accessor when deleting an edge!"); - auto edge_ref = edge->edge_; - auto edge_type = edge->edge_type_; + auto &edge_ref = edge->edge_; + auto &edge_type = edge->edge_type_; std::unique_lock guard; if (config_.properties_on_edges) { diff --git a/src/storage/v2/inmemory/storage.hpp b/src/storage/v2/inmemory/storage.hpp index f7b645824..32faff226 100644 --- a/src/storage/v2/inmemory/storage.hpp +++ b/src/storage/v2/inmemory/storage.hpp @@ -221,6 +221,10 @@ class InMemoryStorage final : public Storage { /// @throw std::bad_alloc Result CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type) override; + Result EdgeSetFrom(EdgeAccessor *edge, VertexAccessor *new_from) override; + + Result EdgeSetTo(EdgeAccessor *edge, VertexAccessor *new_to) override; + /// Accessor to the deleted edge if a deletion took place, std::nullopt otherwise /// @throw std::bad_alloc Result> DeleteEdge(EdgeAccessor *edge) override; diff --git a/src/storage/v2/storage.hpp b/src/storage/v2/storage.hpp index 54c6fc31b..399325966 100644 --- a/src/storage/v2/storage.hpp +++ b/src/storage/v2/storage.hpp @@ -153,6 +153,10 @@ class Storage { virtual Result CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type) = 0; + virtual Result EdgeSetFrom(EdgeAccessor *edge, VertexAccessor *new_from) = 0; + + virtual Result EdgeSetTo(EdgeAccessor *edge, VertexAccessor *new_to) = 0; + virtual Result> DeleteEdge(EdgeAccessor *edge) = 0; virtual bool LabelIndexExists(LabelId label) const = 0; diff --git a/tests/e2e/CMakeLists.txt b/tests/e2e/CMakeLists.txt index c0b26bc3e..7384fa7d7 100644 --- a/tests/e2e/CMakeLists.txt +++ b/tests/e2e/CMakeLists.txt @@ -64,6 +64,7 @@ add_subdirectory(batched_procedures) add_subdirectory(import_mode) add_subdirectory(concurrent_query_modules) add_subdirectory(set_properties) +add_subdirectory(transaction_rollback) copy_e2e_python_files(pytest_runner pytest_runner.sh "") file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/memgraph-selfsigned.crt DESTINATION ${CMAKE_CURRENT_BINARY_DIR}) diff --git a/tests/e2e/transaction_rollback/CMakeLists.txt b/tests/e2e/transaction_rollback/CMakeLists.txt new file mode 100644 index 000000000..588e1bedc --- /dev/null +++ b/tests/e2e/transaction_rollback/CMakeLists.txt @@ -0,0 +1,9 @@ +function(transaction_rollback_e2e_python_files FILE_NAME) + copy_e2e_python_files(transaction_abort ${FILE_NAME}) +endfunction() + +transaction_rollback_e2e_python_files(common.py) +transaction_rollback_e2e_python_files(conftest.py) +transaction_rollback_e2e_python_files(transaction.py) + +add_subdirectory(procedures) diff --git a/tests/e2e/transaction_rollback/common.py b/tests/e2e/transaction_rollback/common.py new file mode 100644 index 000000000..9f937a06d --- /dev/null +++ b/tests/e2e/transaction_rollback/common.py @@ -0,0 +1,25 @@ +# Copyright 2023 Memgraph Ltd. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +# License, and you may not use this file except in compliance with the Business Source License. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0, included in the file +# licenses/APL.txt. + +import typing + +import mgclient + + +def execute_and_fetch_all(cursor: mgclient.Cursor, query: str, params: dict = {}) -> typing.List[tuple]: + cursor.execute(query, params) + return cursor.fetchall() + + +def connect(**kwargs) -> mgclient.Connection: + connection = mgclient.connect(host="localhost", port=7687, **kwargs) + connection.autocommit = False + return connection diff --git a/tests/e2e/transaction_rollback/conftest.py b/tests/e2e/transaction_rollback/conftest.py new file mode 100644 index 000000000..a4ec62c9f --- /dev/null +++ b/tests/e2e/transaction_rollback/conftest.py @@ -0,0 +1,21 @@ +# Copyright 2023 Memgraph Ltd. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +# License, and you may not use this file except in compliance with the Business Source License. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0, included in the file +# licenses/APL.txt. + +import pytest +from common import connect, execute_and_fetch_all + + +@pytest.fixture(autouse=True) +def connection(): + connection = connect() + yield connection + cursor = connection.cursor() + execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n") diff --git a/tests/e2e/transaction_rollback/procedures/CMakeLists.txt b/tests/e2e/transaction_rollback/procedures/CMakeLists.txt new file mode 100644 index 000000000..8d5db0de8 --- /dev/null +++ b/tests/e2e/transaction_rollback/procedures/CMakeLists.txt @@ -0,0 +1 @@ +add_query_module(transaction_rollback procedures.cpp) diff --git a/tests/e2e/transaction_rollback/procedures/procedures.cpp b/tests/e2e/transaction_rollback/procedures/procedures.cpp new file mode 100644 index 000000000..5cf9bedd5 --- /dev/null +++ b/tests/e2e/transaction_rollback/procedures/procedures.cpp @@ -0,0 +1,76 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include "mg_procedure.h" + +#include "mgp.hpp" + +namespace { + +constexpr std::string_view ProcedureFrom = "set_from"; +constexpr std::string_view ProcedureTo = "set_to"; + +void SetFrom(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) { + mgp::MemoryDispatcherGuard guard(memory); + const auto arguments = mgp::List(args); + try { + auto rel = arguments[0].ValueRelationship(); + auto new_from = arguments[0].ValueNode(); + mgp::Graph graph{memgraph_graph}; + graph.SetFrom(rel, new_from); + } catch (const std::exception &e) { + return; + } +} + +void SetTo(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) { + mgp::MemoryDispatcherGuard guard(memory); + const auto arguments = mgp::List(args); + try { + auto rel = arguments[0].ValueRelationship(); + auto new_to = arguments[0].ValueNode(); + mgp::Graph graph{memgraph_graph}; + graph.SetTo(rel, new_to); + } catch (const std::exception &e) { + return; + } +} + +} // namespace + +// Each module needs to define mgp_init_module function. +// Here you can register multiple functions/procedures your module supports. +extern "C" int mgp_init_module(struct mgp_module *module, struct mgp_memory *memory) { + { + try { + mgp::MemoryDispatcherGuard guard(memory); + + mgp::AddProcedure( + SetFrom, ProcedureFrom, mgp::ProcedureType::Write, + {mgp::Parameter("relationship", mgp::Type::Relationship), mgp::Parameter("node_from", mgp::Type::Node)}, {}, + module, memory); + mgp::AddProcedure( + SetTo, ProcedureTo, mgp::ProcedureType::Write, + {mgp::Parameter("relationship", mgp::Type::Relationship), mgp::Parameter("node_to", mgp::Type::Node)}, {}, + module, memory); + + } catch (const std::exception &e) { + return 1; + } + } + + return 0; +} + +// This is an optional function if you need to release any resources before the +// module is unloaded. You will probably need this if you acquired some +// resources in mgp_init_module. +extern "C" int mgp_shutdown_module() { return 0; } diff --git a/tests/e2e/transaction_rollback/transaction.py b/tests/e2e/transaction_rollback/transaction.py new file mode 100644 index 000000000..ee204de8d --- /dev/null +++ b/tests/e2e/transaction_rollback/transaction.py @@ -0,0 +1,58 @@ +# Copyright 2023 Memgraph Ltd. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +# License, and you may not use this file except in compliance with the Business Source License. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0, included in the file +# licenses/APL.txt. + +import sys + +import pytest +from common import execute_and_fetch_all + + +def test_change_from_rollback(connection): + cursor = connection.cursor() + + execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n;") + execute_and_fetch_all(cursor, "CREATE (n:Node1) CREATE (m:Node2) CREATE (k:Node3) CREATE (n)-[:Relationship]->(m);") + connection.commit() + + execute_and_fetch_all( + cursor, + "MATCH (n:Node1)-[r:Relationship]->(m:Node2) MATCH (k:Node3) CALL transaction_rollback.set_from(r, k);", + ) + connection.rollback() + + result = list(execute_and_fetch_all(cursor, f"MATCH (n)-[r]->(m) RETURN n, r, m")) + assert len(result) == 1 + node_from, rel, node_to = result[0] + assert list(node_from.labels)[0] == "Node1" + assert list(node_to.labels)[0] == "Node2" + + +def test_change_to_rollback(connection): + cursor = connection.cursor() + + execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n;") + execute_and_fetch_all(cursor, "CREATE (n:Node1) CREATE (m:Node2) CREATE (k:Node3) CREATE (n)-[:Relationship]->(m);") + connection.commit() + + execute_and_fetch_all( + cursor, "MATCH (n:Node1)-[r:Relationship]->(m:Node2) MATCH (k:Node3) CALL transaction_rollback.set_to(r, k);" + ) + connection.rollback() + + result = list(execute_and_fetch_all(cursor, f"MATCH (n)-[r]->(m) RETURN n, r, m")) + assert len(result) == 1 + node_from, rel, node_to = result[0] + assert list(node_from.labels)[0] == "Node1" + assert list(node_to.labels)[0] == "Node2" + + +if __name__ == "__main__": + sys.exit(pytest.main([__file__, "-rA"])) diff --git a/tests/e2e/transaction_rollback/workloads.yaml b/tests/e2e/transaction_rollback/workloads.yaml new file mode 100644 index 000000000..4338fbc58 --- /dev/null +++ b/tests/e2e/transaction_rollback/workloads.yaml @@ -0,0 +1,19 @@ +args: &args + - "--bolt-port" + - "7687" + - "--log-level=TRACE" + +in_memory_cluster: &in_memory_cluster + cluster: + main: + args: *args + log_file: "transaction-rollback-e2e.log" + setup_queries: [] + validation_queries: [] + +workloads: + - name: "Transaction rollback" + binary: "tests/e2e/pytest_runner.sh" + proc: "tests/e2e/transaction_rollback/procedures/" + args: ["transaction_rollback/transaction.py"] + <<: *in_memory_cluster diff --git a/tests/unit/cpp_api.cpp b/tests/unit/cpp_api.cpp index 0acf2d18c..be1270e9b 100644 --- a/tests/unit/cpp_api.cpp +++ b/tests/unit/cpp_api.cpp @@ -690,6 +690,50 @@ TYPED_TEST(CppApiTestFixture, TestValueToString) { "{key: node_property, key2: node_property2})-[type: Loves2, id: 1, properties: {}]->(id: 4, properties: {})"); } +TYPED_TEST(CppApiTestFixture, TestRelationshipChangeFrom) { + if (std::is_same::value) { + return; + } + + mgp_graph raw_graph = this->CreateGraph(); + auto graph = mgp::Graph(&raw_graph); + + auto node_1 = graph.CreateNode(); + auto node_2 = graph.CreateNode(); + auto node_3 = graph.CreateNode(); + + auto relationship = graph.CreateRelationship(node_1, node_2, "Edge"); + + ASSERT_EQ(relationship.From().Id(), node_1.Id()); + graph.SetFrom(relationship, node_3); + + ASSERT_EQ(std::string(relationship.Type()), "Edge"); + ASSERT_EQ(relationship.From().Id(), node_3.Id()); + ASSERT_EQ(relationship.To().Id(), node_2.Id()); +} + +TYPED_TEST(CppApiTestFixture, TestRelationshipChangeTo) { + if (std::is_same::value) { + return; + } + + mgp_graph raw_graph = this->CreateGraph(); + auto graph = mgp::Graph(&raw_graph); + + auto node_1 = graph.CreateNode(); + auto node_2 = graph.CreateNode(); + auto node_3 = graph.CreateNode(); + + auto relationship = graph.CreateRelationship(node_1, node_2, "Edge"); + + ASSERT_EQ(relationship.To().Id(), node_2.Id()); + graph.SetTo(relationship, node_3); + + ASSERT_EQ(std::string(relationship.Type()), "Edge"); + ASSERT_EQ(relationship.From().Id(), node_1.Id()); + ASSERT_EQ(relationship.To().Id(), node_3.Id()); +} + TYPED_TEST(CppApiTestFixture, TestInAndOutDegrees) { mgp_graph raw_graph = this->CreateGraph(memgraph::storage::View::NEW); auto graph = mgp::Graph(&raw_graph);