Implement changing from and to vertices in relationships (#1221)
This commit is contained in:
parent
c0d4f5e0bc
commit
9c51dbbb01
@ -255,6 +255,16 @@ inline mgp_edge *graph_create_edge(mgp_graph *graph, mgp_vertex *from, mgp_verte
|
||||
return MgInvoke<mgp_edge *>(mgp_graph_create_edge, graph, from, to, type, memory);
|
||||
}
|
||||
|
||||
inline mgp_edge *graph_edge_set_from(struct mgp_graph *graph, struct mgp_edge *e, struct mgp_vertex *new_from,
|
||||
mgp_memory *memory) {
|
||||
return MgInvoke<mgp_edge *>(mgp_graph_edge_set_from, graph, e, new_from, memory);
|
||||
}
|
||||
|
||||
inline mgp_edge *graph_edge_set_to(struct mgp_graph *graph, struct mgp_edge *e, struct mgp_vertex *new_to,
|
||||
mgp_memory *memory) {
|
||||
return MgInvoke<mgp_edge *>(mgp_graph_edge_set_to, graph, e, new_to, memory);
|
||||
}
|
||||
|
||||
inline void graph_delete_edge(mgp_graph *graph, mgp_edge *edge) { MgInvokeVoid(mgp_graph_delete_edge, graph, edge); }
|
||||
|
||||
inline mgp_vertex *graph_get_vertex_by_id(mgp_graph *g, mgp_vertex_id id, mgp_memory *memory) {
|
||||
|
@ -888,6 +888,22 @@ enum mgp_error mgp_graph_detach_delete_vertex(struct mgp_graph *graph, struct mg
|
||||
enum mgp_error mgp_graph_create_edge(struct mgp_graph *graph, struct mgp_vertex *from, struct mgp_vertex *to,
|
||||
struct mgp_edge_type type, struct mgp_memory *memory, struct mgp_edge **result);
|
||||
|
||||
/// Change edge from vertex
|
||||
/// Return mgp_error::MGP_ERROR_IMMUTABLE_OBJECT if `graph` is immutable.
|
||||
/// Return mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE if unable to allocate a mgp_edge.
|
||||
/// Return mgp_error::MGP_ERROR_DELETED_OBJECT if `from` or `to` has been deleted.
|
||||
/// Return mgp_error::MGP_ERROR_SERIALIZATION_ERROR if `from` or `to` has been modified by another transaction.
|
||||
enum mgp_error mgp_graph_edge_set_from(struct mgp_graph *graph, struct mgp_edge *e, struct mgp_vertex *new_from,
|
||||
struct mgp_memory *memory, struct mgp_edge **result);
|
||||
|
||||
/// Change edge to vertex
|
||||
/// Return mgp_error::MGP_ERROR_IMMUTABLE_OBJECT if `graph` is immutable.
|
||||
/// Return mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE if unable to allocate a mgp_edge.
|
||||
/// Return mgp_error::MGP_ERROR_DELETED_OBJECT if `from` or `to` has been deleted.
|
||||
/// Return mgp_error::MGP_ERROR_SERIALIZATION_ERROR if `from` or `to` has been modified by another transaction.
|
||||
enum mgp_error mgp_graph_edge_set_to(struct mgp_graph *graph, struct mgp_edge *e, struct mgp_vertex *new_to,
|
||||
struct mgp_memory *memory, struct mgp_edge **result);
|
||||
|
||||
/// Delete an edge from the graph.
|
||||
/// Return mgp_error::MGP_ERROR_IMMUTABLE_OBJECT if `graph` is immutable.
|
||||
/// Return mgp_error::MGP_ERROR_SERIALIZATION_ERROR if `edge`, its source or destination vertex has been modified by
|
||||
|
@ -256,6 +256,10 @@ class Graph {
|
||||
void DetachDeleteNode(const Node &node);
|
||||
/// @brief Creates a relationship of type `type` between nodes `from` and `to` and adds it to the graph.
|
||||
Relationship CreateRelationship(const Node &from, const Node &to, const std::string_view type);
|
||||
/// @brief Changes a relationship from node.
|
||||
void SetFrom(Relationship &relationship, const Node &new_from);
|
||||
/// @brief Changes a relationship to node.
|
||||
void SetTo(Relationship &relationship, const Node &new_to);
|
||||
/// @brief Deletes a relationship from the graph.
|
||||
void DeleteRelationship(const Relationship &relationship);
|
||||
|
||||
@ -2009,6 +2013,18 @@ inline Relationship Graph::CreateRelationship(const Node &from, const Node &to,
|
||||
return relationship;
|
||||
}
|
||||
|
||||
inline void Graph::SetFrom(Relationship &relationship, const Node &new_from) {
|
||||
mgp_edge *edge = mgp::MemHandlerCallback(mgp::graph_edge_set_from, graph_, relationship.ptr_, new_from.ptr_);
|
||||
relationship = Relationship(edge);
|
||||
mgp::edge_destroy(edge);
|
||||
}
|
||||
|
||||
inline void Graph::SetTo(Relationship &relationship, const Node &new_to) {
|
||||
mgp_edge *edge = mgp::MemHandlerCallback(mgp::graph_edge_set_to, graph_, relationship.ptr_, new_to.ptr_);
|
||||
relationship = Relationship(edge);
|
||||
mgp::edge_destroy(edge);
|
||||
}
|
||||
|
||||
inline void Graph::DeleteRelationship(const Relationship &relationship) {
|
||||
mgp::graph_delete_edge(graph_, relationship.ptr_);
|
||||
}
|
||||
|
@ -76,6 +76,24 @@ SubgraphDbAccessor::DetachRemoveVertex( // NOLINT(readability-convert-member-fu
|
||||
"Vertex holds only partial information about edges. Cannot detach delete safely while using projected graph."};
|
||||
}
|
||||
|
||||
storage::Result<EdgeAccessor> SubgraphDbAccessor::EdgeSetFrom(EdgeAccessor *edge, SubgraphVertexAccessor *new_from) {
|
||||
VertexAccessor *new_from_impl = &new_from->impl_;
|
||||
if (!this->graph_->ContainsVertex(*new_from_impl)) {
|
||||
throw std::logic_error{"Projected graph must contain the new `from` vertex!"};
|
||||
}
|
||||
auto result = db_accessor_.EdgeSetFrom(edge, new_from_impl);
|
||||
return result;
|
||||
}
|
||||
|
||||
storage::Result<EdgeAccessor> SubgraphDbAccessor::EdgeSetTo(EdgeAccessor *edge, SubgraphVertexAccessor *new_to) {
|
||||
VertexAccessor *new_to_impl = &new_to->impl_;
|
||||
if (!this->graph_->ContainsVertex(*new_to_impl)) {
|
||||
throw std::logic_error{"Projected graph must contain the new `to` vertex!"};
|
||||
}
|
||||
auto result = db_accessor_.EdgeSetTo(edge, new_to_impl);
|
||||
return result;
|
||||
}
|
||||
|
||||
storage::Result<std::optional<VertexAccessor>> SubgraphDbAccessor::RemoveVertex(
|
||||
SubgraphVertexAccessor *subgraphvertex_accessor) {
|
||||
VertexAccessor *vertex_accessor = &subgraphvertex_accessor->impl_;
|
||||
|
@ -380,6 +380,18 @@ class DbAccessor final {
|
||||
return EdgeAccessor(*maybe_edge);
|
||||
}
|
||||
|
||||
storage::Result<EdgeAccessor> EdgeSetFrom(EdgeAccessor *edge, VertexAccessor *new_from) {
|
||||
auto changed_edge = accessor_->EdgeSetFrom(&edge->impl_, &new_from->impl_);
|
||||
if (changed_edge.HasError()) return storage::Result<EdgeAccessor>(changed_edge.GetError());
|
||||
return EdgeAccessor(*changed_edge);
|
||||
}
|
||||
|
||||
storage::Result<EdgeAccessor> EdgeSetTo(EdgeAccessor *edge, VertexAccessor *new_to) {
|
||||
auto changed_edge = accessor_->EdgeSetTo(&edge->impl_, &new_to->impl_);
|
||||
if (changed_edge.HasError()) return storage::Result<EdgeAccessor>(changed_edge.GetError());
|
||||
return EdgeAccessor(*changed_edge);
|
||||
}
|
||||
|
||||
storage::Result<std::optional<EdgeAccessor>> RemoveEdge(EdgeAccessor *edge) {
|
||||
auto res = accessor_->DeleteEdge(&edge->impl_);
|
||||
if (res.HasError()) {
|
||||
@ -549,6 +561,10 @@ class SubgraphDbAccessor final {
|
||||
storage::Result<EdgeAccessor> InsertEdge(SubgraphVertexAccessor *from, SubgraphVertexAccessor *to,
|
||||
const storage::EdgeTypeId &edge_type);
|
||||
|
||||
storage::Result<EdgeAccessor> EdgeSetFrom(EdgeAccessor *edge, SubgraphVertexAccessor *new_from);
|
||||
|
||||
storage::Result<EdgeAccessor> EdgeSetTo(EdgeAccessor *edge, SubgraphVertexAccessor *new_to);
|
||||
|
||||
storage::Result<std::optional<std::pair<VertexAccessor, std::vector<EdgeAccessor>>>> DetachRemoveVertex(
|
||||
SubgraphVertexAccessor *vertex_accessor);
|
||||
|
||||
|
@ -2721,6 +2721,83 @@ mgp_error mgp_graph_create_edge(mgp_graph *graph, mgp_vertex *from, mgp_vertex *
|
||||
result);
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
mgp_edge *EdgeSet(auto *e, auto *new_vertex, auto *memory, auto *graph, bool set_from) {
|
||||
auto *ctx = graph->ctx;
|
||||
#ifdef MG_ENTERPRISE
|
||||
if (memgraph::license::global_license_checker.IsEnterpriseValidFast() && ctx && ctx->auth_checker &&
|
||||
!ctx->auth_checker->Has(e->impl, memgraph::query::AuthQuery::FineGrainedPrivilege::CREATE_DELETE)) {
|
||||
throw AuthorizationException{"Insufficient permissions for changing an edge!"};
|
||||
}
|
||||
#endif
|
||||
if (!MgpGraphIsMutable(*graph)) {
|
||||
throw ImmutableObjectException{"Cannot change the edge because the graph is immutable!"};
|
||||
}
|
||||
|
||||
auto edge = std::visit(
|
||||
memgraph::utils::Overloaded{
|
||||
[&e, &new_vertex, &set_from](
|
||||
memgraph::query::DbAccessor *accessor) -> memgraph::storage::Result<memgraph::query::EdgeAccessor> {
|
||||
if (set_from) {
|
||||
return accessor->EdgeSetFrom(&e->impl, &std::get<memgraph::query::VertexAccessor>(new_vertex->impl));
|
||||
}
|
||||
return accessor->EdgeSetTo(&e->impl, &std::get<memgraph::query::VertexAccessor>(new_vertex->impl));
|
||||
},
|
||||
[&e, &new_vertex, &set_from](memgraph::query::SubgraphDbAccessor *accessor)
|
||||
-> memgraph::storage::Result<memgraph::query::EdgeAccessor> {
|
||||
if (set_from) {
|
||||
return accessor->EdgeSetFrom(&e->impl,
|
||||
&std::get<memgraph::query::SubgraphVertexAccessor>(new_vertex->impl));
|
||||
}
|
||||
return accessor->EdgeSetTo(&e->impl, &std::get<memgraph::query::SubgraphVertexAccessor>(new_vertex->impl));
|
||||
}},
|
||||
graph->impl);
|
||||
|
||||
if (edge.HasError()) {
|
||||
switch (edge.GetError()) {
|
||||
case memgraph::storage::Error::NONEXISTENT_OBJECT:
|
||||
LOG_FATAL("Query modules shouldn't have access to nonexistent objects when changing an edge!");
|
||||
case memgraph::storage::Error::DELETED_OBJECT:
|
||||
LOG_FATAL("The edge was already deleted.");
|
||||
case memgraph::storage::Error::PROPERTIES_DISABLED:
|
||||
case memgraph::storage::Error::VERTEX_HAS_EDGES:
|
||||
LOG_FATAL("Unexpected error when removing an edge.");
|
||||
case memgraph::storage::Error::SERIALIZATION_ERROR:
|
||||
throw SerializationException{"Cannot serialize changing an edge."};
|
||||
}
|
||||
}
|
||||
|
||||
if (ctx->trigger_context_collector) {
|
||||
ctx->trigger_context_collector->RegisterDeletedObject(e->impl);
|
||||
ctx->trigger_context_collector->RegisterCreatedObject(*edge);
|
||||
}
|
||||
|
||||
return std::visit(
|
||||
memgraph::utils::Overloaded{
|
||||
[&memory, &edge, &new_vertex](memgraph::query::DbAccessor *) -> mgp_edge * {
|
||||
return NewRawMgpObject<mgp_edge>(memory->impl, edge.GetValue(), new_vertex->graph);
|
||||
},
|
||||
[&memory, &edge, &new_vertex](memgraph::query::SubgraphDbAccessor *db_impl) -> mgp_edge * {
|
||||
const auto v_from = memgraph::query::SubgraphVertexAccessor(edge.GetValue().From(), db_impl->getGraph());
|
||||
const auto v_to = memgraph::query::SubgraphVertexAccessor(edge.GetValue().To(), db_impl->getGraph());
|
||||
return NewRawMgpObject<mgp_edge>(memory->impl, edge.GetValue(), v_from, v_to, new_vertex->graph);
|
||||
}},
|
||||
new_vertex->graph->impl);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
mgp_error mgp_graph_edge_set_from(struct mgp_graph *graph, struct mgp_edge *e, struct mgp_vertex *new_from,
|
||||
mgp_memory *memory, mgp_edge **result) {
|
||||
return WrapExceptions([&]() -> mgp_edge * { return EdgeSet(e, new_from, memory, graph, true); }, result);
|
||||
}
|
||||
|
||||
mgp_error mgp_graph_edge_set_to(struct mgp_graph *graph, struct mgp_edge *e, struct mgp_vertex *new_to,
|
||||
mgp_memory *memory, mgp_edge **result) {
|
||||
return WrapExceptions([&]() -> mgp_edge * { return EdgeSet(e, new_to, memory, graph, false); }, result);
|
||||
}
|
||||
|
||||
mgp_error mgp_graph_delete_edge(struct mgp_graph *graph, mgp_edge *edge) {
|
||||
return WrapExceptions([=] {
|
||||
auto *ctx = graph->ctx;
|
||||
|
@ -36,6 +36,7 @@
|
||||
#include "storage/v2/disk/rocksdb_storage.hpp"
|
||||
#include "storage/v2/disk/storage.hpp"
|
||||
#include "storage/v2/disk/unique_constraints.hpp"
|
||||
#include "storage/v2/edge_accessor.hpp"
|
||||
#include "storage/v2/edge_import_mode.hpp"
|
||||
#include "storage/v2/edge_ref.hpp"
|
||||
#include "storage/v2/id_types.hpp"
|
||||
@ -1366,6 +1367,16 @@ Result<std::optional<EdgeAccessor>> DiskStorage::DiskAccessor::DeleteEdge(EdgeAc
|
||||
&storage_->indices_, &storage_->constraints_, config_, true);
|
||||
}
|
||||
|
||||
Result<EdgeAccessor> DiskStorage::DiskAccessor::EdgeSetFrom(EdgeAccessor * /*edge*/, VertexAccessor * /*new_from*/) {
|
||||
MG_ASSERT(false, "EdgeSetFrom is currently only implemented for InMemory storage");
|
||||
return Error::NONEXISTENT_OBJECT;
|
||||
}
|
||||
|
||||
Result<EdgeAccessor> DiskStorage::DiskAccessor::EdgeSetTo(EdgeAccessor * /*edge*/, VertexAccessor * /*new_to*/) {
|
||||
MG_ASSERT(false, "EdgeSetTo is currently only implemented for InMemory storage");
|
||||
return Error::NONEXISTENT_OBJECT;
|
||||
}
|
||||
|
||||
/// TODO: at which storage naming
|
||||
/// TODO: this method should also delete the old key
|
||||
bool DiskStorage::DiskAccessor::WriteVertexToDisk(const Vertex &vertex) {
|
||||
|
@ -184,6 +184,10 @@ class DiskStorage final : public Storage {
|
||||
|
||||
Result<EdgeAccessor> CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type) override;
|
||||
|
||||
Result<EdgeAccessor> EdgeSetFrom(EdgeAccessor *edge, VertexAccessor *new_from) override;
|
||||
|
||||
Result<EdgeAccessor> EdgeSetTo(EdgeAccessor *edge, VertexAccessor *new_to) override;
|
||||
|
||||
Result<std::optional<EdgeAccessor>> DeleteEdge(EdgeAccessor *edge) override;
|
||||
|
||||
bool LabelIndexExists(LabelId label) const override {
|
||||
|
@ -496,12 +496,216 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdgeEx(VertexAcces
|
||||
&storage_->constraints_, config_);
|
||||
}
|
||||
|
||||
Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::EdgeSetFrom(EdgeAccessor *edge, VertexAccessor *new_from) {
|
||||
MG_ASSERT(edge->transaction_ == new_from->transaction_,
|
||||
"EdgeAccessor must be from the same transaction as the new from vertex "
|
||||
"accessor when deleting an edge!");
|
||||
MG_ASSERT(edge->transaction_ == &transaction_,
|
||||
"EdgeAccessor must be from the same transaction as the storage "
|
||||
"accessor when changing an edge!");
|
||||
|
||||
auto *old_from_vertex = edge->from_vertex_;
|
||||
auto *new_from_vertex = new_from->vertex_;
|
||||
auto *to_vertex = edge->to_vertex_;
|
||||
|
||||
if (old_from_vertex->gid == new_from_vertex->gid) return *edge;
|
||||
|
||||
auto edge_ref = edge->edge_;
|
||||
auto edge_type = edge->edge_type_;
|
||||
|
||||
std::unique_lock<utils::RWSpinLock> guard;
|
||||
if (config_.properties_on_edges) {
|
||||
auto *edge_ptr = edge_ref.ptr;
|
||||
guard = std::unique_lock{edge_ptr->lock};
|
||||
|
||||
if (!PrepareForWrite(&transaction_, edge_ptr)) return Error::SERIALIZATION_ERROR;
|
||||
|
||||
if (edge_ptr->deleted) return Error::DELETED_OBJECT;
|
||||
}
|
||||
|
||||
std::unique_lock<utils::RWSpinLock> guard_old_from(old_from_vertex->lock, std::defer_lock);
|
||||
std::unique_lock<utils::RWSpinLock> guard_new_from(new_from_vertex->lock, std::defer_lock);
|
||||
std::unique_lock<utils::RWSpinLock> guard_to(to_vertex->lock, std::defer_lock);
|
||||
|
||||
// lock in increasing gid order, if two vertices have the same gid need to only lock once
|
||||
std::vector<memgraph::storage::Vertex *> vertices{old_from_vertex, new_from_vertex, to_vertex};
|
||||
std::sort(vertices.begin(), vertices.end(), [](auto x, auto y) { return x->gid < y->gid; });
|
||||
vertices.erase(std::unique(vertices.begin(), vertices.end(), [](auto x, auto y) { return x->gid == y->gid; }),
|
||||
vertices.end());
|
||||
|
||||
for (auto *vertex : vertices) {
|
||||
if (vertex == old_from_vertex) {
|
||||
guard_old_from.lock();
|
||||
} else if (vertex == new_from_vertex) {
|
||||
guard_new_from.lock();
|
||||
} else if (vertex == to_vertex) {
|
||||
guard_to.lock();
|
||||
} else {
|
||||
return Error::NONEXISTENT_OBJECT;
|
||||
}
|
||||
}
|
||||
|
||||
if (!PrepareForWrite(&transaction_, old_from_vertex)) return Error::SERIALIZATION_ERROR;
|
||||
MG_ASSERT(!old_from_vertex->deleted, "Invalid database state!");
|
||||
|
||||
if (!PrepareForWrite(&transaction_, new_from_vertex)) return Error::SERIALIZATION_ERROR;
|
||||
MG_ASSERT(!new_from_vertex->deleted, "Invalid database state!");
|
||||
|
||||
if (to_vertex != old_from_vertex && to_vertex != new_from_vertex) {
|
||||
if (!PrepareForWrite(&transaction_, to_vertex)) return Error::SERIALIZATION_ERROR;
|
||||
MG_ASSERT(!to_vertex->deleted, "Invalid database state!");
|
||||
}
|
||||
|
||||
auto delete_edge_from_storage = [&edge_type, &edge_ref, this](auto *vertex, auto *edges) {
|
||||
std::tuple<EdgeTypeId, Vertex *, EdgeRef> link(edge_type, vertex, edge_ref);
|
||||
auto it = std::find(edges->begin(), edges->end(), link);
|
||||
if (config_.properties_on_edges) {
|
||||
MG_ASSERT(it != edges->end(), "Invalid database state!");
|
||||
} else if (it == edges->end()) {
|
||||
return false;
|
||||
}
|
||||
std::swap(*it, *edges->rbegin());
|
||||
edges->pop_back();
|
||||
return true;
|
||||
};
|
||||
|
||||
auto op1 = delete_edge_from_storage(to_vertex, &old_from_vertex->out_edges);
|
||||
auto op2 = delete_edge_from_storage(old_from_vertex, &to_vertex->in_edges);
|
||||
|
||||
if (config_.properties_on_edges) {
|
||||
MG_ASSERT((op1 && op2), "Invalid database state!");
|
||||
} else {
|
||||
MG_ASSERT((op1 && op2) || (!op1 && !op2), "Invalid database state!");
|
||||
if (!op1 && !op2) {
|
||||
// The edge is already deleted.
|
||||
return Error::DELETED_OBJECT;
|
||||
}
|
||||
}
|
||||
|
||||
CreateAndLinkDelta(&transaction_, old_from_vertex, Delta::AddOutEdgeTag(), edge_type, to_vertex, edge_ref);
|
||||
CreateAndLinkDelta(&transaction_, to_vertex, Delta::AddInEdgeTag(), edge_type, old_from_vertex, edge_ref);
|
||||
|
||||
CreateAndLinkDelta(&transaction_, new_from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge_ref);
|
||||
new_from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge_ref);
|
||||
CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), edge_type, new_from_vertex, edge_ref);
|
||||
to_vertex->in_edges.emplace_back(edge_type, new_from_vertex, edge_ref);
|
||||
|
||||
transaction_.manyDeltasCache.Invalidate(new_from_vertex, edge_type, EdgeDirection::OUT);
|
||||
transaction_.manyDeltasCache.Invalidate(old_from_vertex, edge_type, EdgeDirection::OUT);
|
||||
transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN);
|
||||
|
||||
return EdgeAccessor(edge_ref, edge_type, new_from_vertex, to_vertex, &transaction_, &storage_->indices_,
|
||||
&storage_->constraints_, config_);
|
||||
}
|
||||
|
||||
Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::EdgeSetTo(EdgeAccessor *edge, VertexAccessor *new_to) {
|
||||
MG_ASSERT(edge->transaction_ == new_to->transaction_,
|
||||
"EdgeAccessor must be from the same transaction as the new to vertex "
|
||||
"accessor when deleting an edge!");
|
||||
MG_ASSERT(edge->transaction_ == &transaction_,
|
||||
"EdgeAccessor must be from the same transaction as the storage "
|
||||
"accessor when deleting an edge!");
|
||||
|
||||
auto *from_vertex = edge->from_vertex_;
|
||||
auto *old_to_vertex = edge->to_vertex_;
|
||||
auto *new_to_vertex = new_to->vertex_;
|
||||
|
||||
if (old_to_vertex->gid == new_to_vertex->gid) return *edge;
|
||||
|
||||
auto &edge_ref = edge->edge_;
|
||||
auto &edge_type = edge->edge_type_;
|
||||
|
||||
std::unique_lock<utils::RWSpinLock> guard;
|
||||
if (config_.properties_on_edges) {
|
||||
auto *edge_ptr = edge_ref.ptr;
|
||||
guard = std::unique_lock{edge_ptr->lock};
|
||||
|
||||
if (!PrepareForWrite(&transaction_, edge_ptr)) return Error::SERIALIZATION_ERROR;
|
||||
|
||||
if (edge_ptr->deleted) return Error::DELETED_OBJECT;
|
||||
}
|
||||
|
||||
std::unique_lock<utils::RWSpinLock> guard_from(from_vertex->lock, std::defer_lock);
|
||||
std::unique_lock<utils::RWSpinLock> guard_old_to(old_to_vertex->lock, std::defer_lock);
|
||||
std::unique_lock<utils::RWSpinLock> guard_new_to(new_to_vertex->lock, std::defer_lock);
|
||||
|
||||
// lock in increasing gid order, if two vertices have the same gid need to only lock once
|
||||
std::vector<memgraph::storage::Vertex *> vertices{from_vertex, old_to_vertex, new_to_vertex};
|
||||
std::sort(vertices.begin(), vertices.end(), [](auto x, auto y) { return x->gid < y->gid; });
|
||||
vertices.erase(std::unique(vertices.begin(), vertices.end(), [](auto x, auto y) { return x->gid == y->gid; }),
|
||||
vertices.end());
|
||||
|
||||
for (auto *vertex : vertices) {
|
||||
if (vertex == from_vertex) {
|
||||
guard_from.lock();
|
||||
} else if (vertex == old_to_vertex) {
|
||||
guard_old_to.lock();
|
||||
} else if (vertex == new_to_vertex) {
|
||||
guard_new_to.lock();
|
||||
} else {
|
||||
return Error::NONEXISTENT_OBJECT;
|
||||
}
|
||||
}
|
||||
|
||||
if (!PrepareForWrite(&transaction_, old_to_vertex)) return Error::SERIALIZATION_ERROR;
|
||||
MG_ASSERT(!old_to_vertex->deleted, "Invalid database state!");
|
||||
|
||||
if (!PrepareForWrite(&transaction_, new_to_vertex)) return Error::SERIALIZATION_ERROR;
|
||||
MG_ASSERT(!new_to_vertex->deleted, "Invalid database state!");
|
||||
|
||||
if (from_vertex != old_to_vertex && from_vertex != new_to_vertex) {
|
||||
if (!PrepareForWrite(&transaction_, from_vertex)) return Error::SERIALIZATION_ERROR;
|
||||
MG_ASSERT(!from_vertex->deleted, "Invalid database state!");
|
||||
}
|
||||
|
||||
auto delete_edge_from_storage = [&edge_type, &edge_ref, this](auto *vertex, auto *edges) {
|
||||
std::tuple<EdgeTypeId, Vertex *, EdgeRef> link(edge_type, vertex, edge_ref);
|
||||
auto it = std::find(edges->begin(), edges->end(), link);
|
||||
if (config_.properties_on_edges) {
|
||||
MG_ASSERT(it != edges->end(), "Invalid database state!");
|
||||
} else if (it == edges->end()) {
|
||||
return false;
|
||||
}
|
||||
std::swap(*it, *edges->rbegin());
|
||||
edges->pop_back();
|
||||
return true;
|
||||
};
|
||||
|
||||
auto op1 = delete_edge_from_storage(old_to_vertex, &from_vertex->out_edges);
|
||||
auto op2 = delete_edge_from_storage(from_vertex, &old_to_vertex->in_edges);
|
||||
|
||||
if (config_.properties_on_edges) {
|
||||
MG_ASSERT((op1 && op2), "Invalid database state!");
|
||||
} else {
|
||||
MG_ASSERT((op1 && op2) || (!op1 && !op2), "Invalid database state!");
|
||||
if (!op1 && !op2) {
|
||||
// The edge is already deleted.
|
||||
return Error::DELETED_OBJECT;
|
||||
}
|
||||
}
|
||||
|
||||
CreateAndLinkDelta(&transaction_, from_vertex, Delta::AddOutEdgeTag(), edge_type, old_to_vertex, edge_ref);
|
||||
CreateAndLinkDelta(&transaction_, old_to_vertex, Delta::AddInEdgeTag(), edge_type, from_vertex, edge_ref);
|
||||
|
||||
CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, new_to_vertex, edge_ref);
|
||||
from_vertex->out_edges.emplace_back(edge_type, new_to_vertex, edge_ref);
|
||||
CreateAndLinkDelta(&transaction_, new_to_vertex, Delta::RemoveInEdgeTag(), edge_type, from_vertex, edge_ref);
|
||||
new_to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge_ref);
|
||||
|
||||
transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT);
|
||||
transaction_.manyDeltasCache.Invalidate(old_to_vertex, edge_type, EdgeDirection::IN);
|
||||
transaction_.manyDeltasCache.Invalidate(new_to_vertex, edge_type, EdgeDirection::IN);
|
||||
|
||||
return EdgeAccessor(edge_ref, edge_type, from_vertex, new_to_vertex, &transaction_, &storage_->indices_,
|
||||
&storage_->constraints_, config_);
|
||||
}
|
||||
|
||||
Result<std::optional<EdgeAccessor>> InMemoryStorage::InMemoryAccessor::DeleteEdge(EdgeAccessor *edge) {
|
||||
MG_ASSERT(edge->transaction_ == &transaction_,
|
||||
"EdgeAccessor must be from the same transaction as the storage "
|
||||
"accessor when deleting an edge!");
|
||||
auto edge_ref = edge->edge_;
|
||||
auto edge_type = edge->edge_type_;
|
||||
auto &edge_ref = edge->edge_;
|
||||
auto &edge_type = edge->edge_type_;
|
||||
|
||||
std::unique_lock<utils::RWSpinLock> guard;
|
||||
if (config_.properties_on_edges) {
|
||||
|
@ -221,6 +221,10 @@ class InMemoryStorage final : public Storage {
|
||||
/// @throw std::bad_alloc
|
||||
Result<EdgeAccessor> CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type) override;
|
||||
|
||||
Result<EdgeAccessor> EdgeSetFrom(EdgeAccessor *edge, VertexAccessor *new_from) override;
|
||||
|
||||
Result<EdgeAccessor> EdgeSetTo(EdgeAccessor *edge, VertexAccessor *new_to) override;
|
||||
|
||||
/// Accessor to the deleted edge if a deletion took place, std::nullopt otherwise
|
||||
/// @throw std::bad_alloc
|
||||
Result<std::optional<EdgeAccessor>> DeleteEdge(EdgeAccessor *edge) override;
|
||||
|
@ -153,6 +153,10 @@ class Storage {
|
||||
|
||||
virtual Result<EdgeAccessor> CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type) = 0;
|
||||
|
||||
virtual Result<EdgeAccessor> EdgeSetFrom(EdgeAccessor *edge, VertexAccessor *new_from) = 0;
|
||||
|
||||
virtual Result<EdgeAccessor> EdgeSetTo(EdgeAccessor *edge, VertexAccessor *new_to) = 0;
|
||||
|
||||
virtual Result<std::optional<EdgeAccessor>> DeleteEdge(EdgeAccessor *edge) = 0;
|
||||
|
||||
virtual bool LabelIndexExists(LabelId label) const = 0;
|
||||
|
@ -64,6 +64,7 @@ add_subdirectory(batched_procedures)
|
||||
add_subdirectory(import_mode)
|
||||
add_subdirectory(concurrent_query_modules)
|
||||
add_subdirectory(set_properties)
|
||||
add_subdirectory(transaction_rollback)
|
||||
|
||||
copy_e2e_python_files(pytest_runner pytest_runner.sh "")
|
||||
file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/memgraph-selfsigned.crt DESTINATION ${CMAKE_CURRENT_BINARY_DIR})
|
||||
|
9
tests/e2e/transaction_rollback/CMakeLists.txt
Normal file
9
tests/e2e/transaction_rollback/CMakeLists.txt
Normal file
@ -0,0 +1,9 @@
|
||||
function(transaction_rollback_e2e_python_files FILE_NAME)
|
||||
copy_e2e_python_files(transaction_abort ${FILE_NAME})
|
||||
endfunction()
|
||||
|
||||
transaction_rollback_e2e_python_files(common.py)
|
||||
transaction_rollback_e2e_python_files(conftest.py)
|
||||
transaction_rollback_e2e_python_files(transaction.py)
|
||||
|
||||
add_subdirectory(procedures)
|
25
tests/e2e/transaction_rollback/common.py
Normal file
25
tests/e2e/transaction_rollback/common.py
Normal file
@ -0,0 +1,25 @@
|
||||
# Copyright 2023 Memgraph Ltd.
|
||||
#
|
||||
# Use of this software is governed by the Business Source License
|
||||
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
# License, and you may not use this file except in compliance with the Business Source License.
|
||||
#
|
||||
# As of the Change Date specified in that file, in accordance with
|
||||
# the Business Source License, use of this software will be governed
|
||||
# by the Apache License, Version 2.0, included in the file
|
||||
# licenses/APL.txt.
|
||||
|
||||
import typing
|
||||
|
||||
import mgclient
|
||||
|
||||
|
||||
def execute_and_fetch_all(cursor: mgclient.Cursor, query: str, params: dict = {}) -> typing.List[tuple]:
|
||||
cursor.execute(query, params)
|
||||
return cursor.fetchall()
|
||||
|
||||
|
||||
def connect(**kwargs) -> mgclient.Connection:
|
||||
connection = mgclient.connect(host="localhost", port=7687, **kwargs)
|
||||
connection.autocommit = False
|
||||
return connection
|
21
tests/e2e/transaction_rollback/conftest.py
Normal file
21
tests/e2e/transaction_rollback/conftest.py
Normal file
@ -0,0 +1,21 @@
|
||||
# Copyright 2023 Memgraph Ltd.
|
||||
#
|
||||
# Use of this software is governed by the Business Source License
|
||||
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
# License, and you may not use this file except in compliance with the Business Source License.
|
||||
#
|
||||
# As of the Change Date specified in that file, in accordance with
|
||||
# the Business Source License, use of this software will be governed
|
||||
# by the Apache License, Version 2.0, included in the file
|
||||
# licenses/APL.txt.
|
||||
|
||||
import pytest
|
||||
from common import connect, execute_and_fetch_all
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def connection():
|
||||
connection = connect()
|
||||
yield connection
|
||||
cursor = connection.cursor()
|
||||
execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n")
|
1
tests/e2e/transaction_rollback/procedures/CMakeLists.txt
Normal file
1
tests/e2e/transaction_rollback/procedures/CMakeLists.txt
Normal file
@ -0,0 +1 @@
|
||||
add_query_module(transaction_rollback procedures.cpp)
|
76
tests/e2e/transaction_rollback/procedures/procedures.cpp
Normal file
76
tests/e2e/transaction_rollback/procedures/procedures.cpp
Normal file
@ -0,0 +1,76 @@
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include "mg_procedure.h"
|
||||
|
||||
#include "mgp.hpp"
|
||||
|
||||
namespace {
|
||||
|
||||
constexpr std::string_view ProcedureFrom = "set_from";
|
||||
constexpr std::string_view ProcedureTo = "set_to";
|
||||
|
||||
void SetFrom(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) {
|
||||
mgp::MemoryDispatcherGuard guard(memory);
|
||||
const auto arguments = mgp::List(args);
|
||||
try {
|
||||
auto rel = arguments[0].ValueRelationship();
|
||||
auto new_from = arguments[0].ValueNode();
|
||||
mgp::Graph graph{memgraph_graph};
|
||||
graph.SetFrom(rel, new_from);
|
||||
} catch (const std::exception &e) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
void SetTo(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) {
|
||||
mgp::MemoryDispatcherGuard guard(memory);
|
||||
const auto arguments = mgp::List(args);
|
||||
try {
|
||||
auto rel = arguments[0].ValueRelationship();
|
||||
auto new_to = arguments[0].ValueNode();
|
||||
mgp::Graph graph{memgraph_graph};
|
||||
graph.SetTo(rel, new_to);
|
||||
} catch (const std::exception &e) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
// Each module needs to define mgp_init_module function.
|
||||
// Here you can register multiple functions/procedures your module supports.
|
||||
extern "C" int mgp_init_module(struct mgp_module *module, struct mgp_memory *memory) {
|
||||
{
|
||||
try {
|
||||
mgp::MemoryDispatcherGuard guard(memory);
|
||||
|
||||
mgp::AddProcedure(
|
||||
SetFrom, ProcedureFrom, mgp::ProcedureType::Write,
|
||||
{mgp::Parameter("relationship", mgp::Type::Relationship), mgp::Parameter("node_from", mgp::Type::Node)}, {},
|
||||
module, memory);
|
||||
mgp::AddProcedure(
|
||||
SetTo, ProcedureTo, mgp::ProcedureType::Write,
|
||||
{mgp::Parameter("relationship", mgp::Type::Relationship), mgp::Parameter("node_to", mgp::Type::Node)}, {},
|
||||
module, memory);
|
||||
|
||||
} catch (const std::exception &e) {
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
// This is an optional function if you need to release any resources before the
|
||||
// module is unloaded. You will probably need this if you acquired some
|
||||
// resources in mgp_init_module.
|
||||
extern "C" int mgp_shutdown_module() { return 0; }
|
58
tests/e2e/transaction_rollback/transaction.py
Normal file
58
tests/e2e/transaction_rollback/transaction.py
Normal file
@ -0,0 +1,58 @@
|
||||
# Copyright 2023 Memgraph Ltd.
|
||||
#
|
||||
# Use of this software is governed by the Business Source License
|
||||
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
# License, and you may not use this file except in compliance with the Business Source License.
|
||||
#
|
||||
# As of the Change Date specified in that file, in accordance with
|
||||
# the Business Source License, use of this software will be governed
|
||||
# by the Apache License, Version 2.0, included in the file
|
||||
# licenses/APL.txt.
|
||||
|
||||
import sys
|
||||
|
||||
import pytest
|
||||
from common import execute_and_fetch_all
|
||||
|
||||
|
||||
def test_change_from_rollback(connection):
|
||||
cursor = connection.cursor()
|
||||
|
||||
execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n;")
|
||||
execute_and_fetch_all(cursor, "CREATE (n:Node1) CREATE (m:Node2) CREATE (k:Node3) CREATE (n)-[:Relationship]->(m);")
|
||||
connection.commit()
|
||||
|
||||
execute_and_fetch_all(
|
||||
cursor,
|
||||
"MATCH (n:Node1)-[r:Relationship]->(m:Node2) MATCH (k:Node3) CALL transaction_rollback.set_from(r, k);",
|
||||
)
|
||||
connection.rollback()
|
||||
|
||||
result = list(execute_and_fetch_all(cursor, f"MATCH (n)-[r]->(m) RETURN n, r, m"))
|
||||
assert len(result) == 1
|
||||
node_from, rel, node_to = result[0]
|
||||
assert list(node_from.labels)[0] == "Node1"
|
||||
assert list(node_to.labels)[0] == "Node2"
|
||||
|
||||
|
||||
def test_change_to_rollback(connection):
|
||||
cursor = connection.cursor()
|
||||
|
||||
execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n;")
|
||||
execute_and_fetch_all(cursor, "CREATE (n:Node1) CREATE (m:Node2) CREATE (k:Node3) CREATE (n)-[:Relationship]->(m);")
|
||||
connection.commit()
|
||||
|
||||
execute_and_fetch_all(
|
||||
cursor, "MATCH (n:Node1)-[r:Relationship]->(m:Node2) MATCH (k:Node3) CALL transaction_rollback.set_to(r, k);"
|
||||
)
|
||||
connection.rollback()
|
||||
|
||||
result = list(execute_and_fetch_all(cursor, f"MATCH (n)-[r]->(m) RETURN n, r, m"))
|
||||
assert len(result) == 1
|
||||
node_from, rel, node_to = result[0]
|
||||
assert list(node_from.labels)[0] == "Node1"
|
||||
assert list(node_to.labels)[0] == "Node2"
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(pytest.main([__file__, "-rA"]))
|
19
tests/e2e/transaction_rollback/workloads.yaml
Normal file
19
tests/e2e/transaction_rollback/workloads.yaml
Normal file
@ -0,0 +1,19 @@
|
||||
args: &args
|
||||
- "--bolt-port"
|
||||
- "7687"
|
||||
- "--log-level=TRACE"
|
||||
|
||||
in_memory_cluster: &in_memory_cluster
|
||||
cluster:
|
||||
main:
|
||||
args: *args
|
||||
log_file: "transaction-rollback-e2e.log"
|
||||
setup_queries: []
|
||||
validation_queries: []
|
||||
|
||||
workloads:
|
||||
- name: "Transaction rollback"
|
||||
binary: "tests/e2e/pytest_runner.sh"
|
||||
proc: "tests/e2e/transaction_rollback/procedures/"
|
||||
args: ["transaction_rollback/transaction.py"]
|
||||
<<: *in_memory_cluster
|
@ -690,6 +690,50 @@ TYPED_TEST(CppApiTestFixture, TestValueToString) {
|
||||
"{key: node_property, key2: node_property2})-[type: Loves2, id: 1, properties: {}]->(id: 4, properties: {})");
|
||||
}
|
||||
|
||||
TYPED_TEST(CppApiTestFixture, TestRelationshipChangeFrom) {
|
||||
if (std::is_same<TypeParam, memgraph::storage::DiskStorage>::value) {
|
||||
return;
|
||||
}
|
||||
|
||||
mgp_graph raw_graph = this->CreateGraph();
|
||||
auto graph = mgp::Graph(&raw_graph);
|
||||
|
||||
auto node_1 = graph.CreateNode();
|
||||
auto node_2 = graph.CreateNode();
|
||||
auto node_3 = graph.CreateNode();
|
||||
|
||||
auto relationship = graph.CreateRelationship(node_1, node_2, "Edge");
|
||||
|
||||
ASSERT_EQ(relationship.From().Id(), node_1.Id());
|
||||
graph.SetFrom(relationship, node_3);
|
||||
|
||||
ASSERT_EQ(std::string(relationship.Type()), "Edge");
|
||||
ASSERT_EQ(relationship.From().Id(), node_3.Id());
|
||||
ASSERT_EQ(relationship.To().Id(), node_2.Id());
|
||||
}
|
||||
|
||||
TYPED_TEST(CppApiTestFixture, TestRelationshipChangeTo) {
|
||||
if (std::is_same<TypeParam, memgraph::storage::DiskStorage>::value) {
|
||||
return;
|
||||
}
|
||||
|
||||
mgp_graph raw_graph = this->CreateGraph();
|
||||
auto graph = mgp::Graph(&raw_graph);
|
||||
|
||||
auto node_1 = graph.CreateNode();
|
||||
auto node_2 = graph.CreateNode();
|
||||
auto node_3 = graph.CreateNode();
|
||||
|
||||
auto relationship = graph.CreateRelationship(node_1, node_2, "Edge");
|
||||
|
||||
ASSERT_EQ(relationship.To().Id(), node_2.Id());
|
||||
graph.SetTo(relationship, node_3);
|
||||
|
||||
ASSERT_EQ(std::string(relationship.Type()), "Edge");
|
||||
ASSERT_EQ(relationship.From().Id(), node_1.Id());
|
||||
ASSERT_EQ(relationship.To().Id(), node_3.Id());
|
||||
}
|
||||
|
||||
TYPED_TEST(CppApiTestFixture, TestInAndOutDegrees) {
|
||||
mgp_graph raw_graph = this->CreateGraph(memgraph::storage::View::NEW);
|
||||
auto graph = mgp::Graph(&raw_graph);
|
||||
|
Loading…
Reference in New Issue
Block a user