Add handling of deleted return values for query procedures and functions ran in analytical mode (#1395)

Co-authored-by: Ante Pušić <ante.pusic@memgraph.io>
This commit is contained in:
Aidar Samerkhanov 2023-12-04 10:32:59 +03:00 committed by GitHub
parent 0fb3ae2d56
commit 953a8f5340
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 919 additions and 57 deletions

View File

@ -234,8 +234,6 @@ inline mgp_type *type_duration() { return MgInvoke<mgp_type *>(mgp_type_duration
inline mgp_type *type_nullable(mgp_type *type) { return MgInvoke<mgp_type *>(mgp_type_nullable, type); }
// mgp_graph
inline bool create_label_index(mgp_graph *graph, const char *label) {
return MgInvoke<int>(mgp_create_label_index, graph, label);
}
@ -284,6 +282,10 @@ inline mgp_list *list_all_unique_constraints(mgp_graph *graph, mgp_memory *memor
return MgInvoke<mgp_list *>(mgp_list_all_unique_constraints, graph, memory);
}
// mgp_graph
inline bool graph_is_transactional(mgp_graph *graph) { return MgInvoke<int>(mgp_graph_is_transactional, graph); }
inline bool graph_is_mutable(mgp_graph *graph) { return MgInvoke<int>(mgp_graph_is_mutable, graph); }
inline mgp_vertex *graph_create_vertex(mgp_graph *graph, mgp_memory *memory) {
@ -376,6 +378,8 @@ inline mgp_list *list_copy(mgp_list *list, mgp_memory *memory) {
inline void list_destroy(mgp_list *list) { mgp_list_destroy(list); }
inline bool list_contains_deleted(mgp_list *list) { return MgInvoke<int>(mgp_list_contains_deleted, list); }
inline void list_append(mgp_list *list, mgp_value *val) { MgInvokeVoid(mgp_list_append, list, val); }
inline void list_append_extend(mgp_list *list, mgp_value *val) { MgInvokeVoid(mgp_list_append_extend, list, val); }
@ -394,6 +398,8 @@ inline mgp_map *map_copy(mgp_map *map, mgp_memory *memory) { return MgInvoke<mgp
inline void map_destroy(mgp_map *map) { mgp_map_destroy(map); }
inline bool map_contains_deleted(mgp_map *map) { return MgInvoke<int>(mgp_map_contains_deleted, map); }
inline void map_insert(mgp_map *map, const char *key, mgp_value *value) {
MgInvokeVoid(mgp_map_insert, map, key, value);
}
@ -442,6 +448,8 @@ inline mgp_vertex *vertex_copy(mgp_vertex *v, mgp_memory *memory) {
inline void vertex_destroy(mgp_vertex *v) { mgp_vertex_destroy(v); }
inline bool vertex_is_deleted(mgp_vertex *v) { return MgInvoke<int>(mgp_vertex_is_deleted, v); }
inline bool vertex_equal(mgp_vertex *v1, mgp_vertex *v2) { return MgInvoke<int>(mgp_vertex_equal, v1, v2); }
inline size_t vertex_labels_count(mgp_vertex *v) { return MgInvoke<size_t>(mgp_vertex_labels_count, v); }
@ -494,6 +502,8 @@ inline mgp_edge *edge_copy(mgp_edge *e, mgp_memory *memory) { return MgInvoke<mg
inline void edge_destroy(mgp_edge *e) { mgp_edge_destroy(e); }
inline bool edge_is_deleted(mgp_edge *e) { return MgInvoke<int>(mgp_edge_is_deleted, e); }
inline bool edge_equal(mgp_edge *e1, mgp_edge *e2) { return MgInvoke<int>(mgp_edge_equal, e1, e2); }
inline mgp_edge_type edge_get_type(mgp_edge *e) { return MgInvoke<mgp_edge_type>(mgp_edge_get_type, e); }
@ -530,6 +540,8 @@ inline mgp_path *path_copy(mgp_path *path, mgp_memory *memory) {
inline void path_destroy(mgp_path *path) { mgp_path_destroy(path); }
inline bool path_contains_deleted(mgp_path *path) { return MgInvoke<int>(mgp_path_contains_deleted, path); }
inline void path_expand(mgp_path *path, mgp_edge *edge) { MgInvokeVoid(mgp_path_expand, path, edge); }
inline void path_pop(mgp_path *path) { MgInvokeVoid(mgp_path_pop, path); }

View File

@ -429,6 +429,9 @@ enum mgp_error mgp_list_copy(struct mgp_list *list, struct mgp_memory *memory, s
/// Free the memory used by the given mgp_list and contained elements.
void mgp_list_destroy(struct mgp_list *list);
/// Return whether the given mgp_list contains any deleted values.
enum mgp_error mgp_list_contains_deleted(struct mgp_list *list, int *result);
/// Append a copy of mgp_value to mgp_list if capacity allows.
/// The list copies the given value and therefore does not take ownership of the
/// original value. You still need to call mgp_value_destroy to free the
@ -469,6 +472,9 @@ enum mgp_error mgp_map_copy(struct mgp_map *map, struct mgp_memory *memory, stru
/// Free the memory used by the given mgp_map and contained items.
void mgp_map_destroy(struct mgp_map *map);
/// Return whether the given mgp_map contains any deleted values.
enum mgp_error mgp_map_contains_deleted(struct mgp_map *map, int *result);
/// Insert a new mapping from a NULL terminated character string to a value.
/// If a mapping with the same key already exists, it is *not* replaced.
/// In case of insertion, both the string and the value are copied into the map.
@ -552,6 +558,9 @@ enum mgp_error mgp_path_copy(struct mgp_path *path, struct mgp_memory *memory, s
/// Free the memory used by the given mgp_path and contained vertices and edges.
void mgp_path_destroy(struct mgp_path *path);
/// Return whether the given mgp_path contains any deleted values.
enum mgp_error mgp_path_contains_deleted(struct mgp_path *path, int *result);
/// Append an edge continuing from the last vertex on the path.
/// The edge is copied into the path. Therefore, the path does not take
/// ownership of the original edge, so you still need to free the edge memory
@ -725,6 +734,9 @@ enum mgp_error mgp_vertex_copy(struct mgp_vertex *v, struct mgp_memory *memory,
/// Free the memory used by a mgp_vertex.
void mgp_vertex_destroy(struct mgp_vertex *v);
/// Return whether the given mgp_vertex is deleted.
enum mgp_error mgp_vertex_is_deleted(struct mgp_vertex *v, int *result);
/// Result is non-zero if given vertices are equal, otherwise 0.
enum mgp_error mgp_vertex_equal(struct mgp_vertex *v1, struct mgp_vertex *v2, int *result);
@ -819,6 +831,9 @@ enum mgp_error mgp_edge_copy(struct mgp_edge *e, struct mgp_memory *memory, stru
/// Free the memory used by a mgp_edge.
void mgp_edge_destroy(struct mgp_edge *e);
/// Return whether the given mgp_edge is deleted.
enum mgp_error mgp_edge_is_deleted(struct mgp_edge *e, int *result);
/// Result is non-zero if given edges are equal, otherwise 0.
enum mgp_error mgp_edge_equal(struct mgp_edge *e1, struct mgp_edge *e2, int *result);
@ -941,6 +956,12 @@ enum mgp_error mgp_list_all_unique_constraints(struct mgp_graph *graph, struct m
/// Current implementation always returns without errors.
enum mgp_error mgp_graph_is_mutable(struct mgp_graph *graph, int *result);
/// Result is non-zero if the graph is in transactional storage mode.
/// If a graph is not in transactional mode (i.e. analytical mode), then vertices and edges can be missing
/// because changes from other transactions are visible.
/// Current implementation always returns without errors.
enum mgp_error mgp_graph_is_transactional(struct mgp_graph *graph, int *result);
/// Add a new vertex to the graph.
/// Resulting vertex must be freed using mgp_vertex_destroy.
/// Return mgp_error::MGP_ERROR_IMMUTABLE_OBJECT if `graph` is immutable.

View File

@ -246,6 +246,8 @@ class Graph {
/// @brief Returns whether the graph is mutable.
bool IsMutable() const;
/// @brief Returns whether the graph is in a transactional storage mode.
bool IsTransactional() const;
/// @brief Creates a node and adds it to the graph.
Node CreateNode();
/// @brief Deletes a node from the graph.
@ -512,6 +514,9 @@ class List {
~List();
/// @brief Returns wheter the list contains any deleted values.
bool ContainsDeleted() const;
/// @brief Returns the size of the list.
size_t Size() const;
/// @brief Returns whether the list is empty.
@ -618,6 +623,9 @@ class Map {
~Map();
/// @brief Returns wheter the map contains any deleted values.
bool ContainsDeleted() const;
/// @brief Returns the size of the map.
size_t Size() const;
@ -730,6 +738,9 @@ class Node {
~Node();
/// @brief Returns wheter the node has been deleted.
bool IsDeleted() const;
/// @brief Returns the nodes ID.
mgp::Id Id() const;
@ -811,6 +822,9 @@ class Relationship {
~Relationship();
/// @brief Returns wheter the relationship has been deleted.
bool IsDeleted() const;
/// @brief Returns the relationships ID.
mgp::Id Id() const;
@ -876,6 +890,9 @@ class Path {
~Path();
/// @brief Returns wheter the path contains any deleted values.
bool ContainsDeleted() const;
/// Returns the path length (number of relationships).
size_t Length() const;
@ -1995,6 +2012,8 @@ inline bool Graph::ContainsRelationship(const Relationship &relationship) const
inline bool Graph::IsMutable() const { return mgp::graph_is_mutable(graph_); }
inline bool Graph::IsTransactional() const { return mgp::graph_is_transactional(graph_); }
inline Node Graph::CreateNode() {
auto *vertex = mgp::MemHandlerCallback(graph_create_vertex, graph_);
auto node = Node(vertex);
@ -2442,6 +2461,8 @@ inline List::~List() {
}
}
inline bool List::ContainsDeleted() const { return mgp::list_contains_deleted(ptr_); }
inline size_t List::Size() const { return mgp::list_size(ptr_); }
inline bool List::Empty() const { return Size() == 0; }
@ -2568,6 +2589,8 @@ inline Map::~Map() {
}
}
inline bool Map::ContainsDeleted() const { return mgp::map_contains_deleted(ptr_); }
inline size_t Map::Size() const { return mgp::map_size(ptr_); }
inline bool Map::Empty() const { return Size() == 0; }
@ -2733,6 +2756,8 @@ inline Node::~Node() {
}
}
inline bool Node::IsDeleted() const { return mgp::vertex_is_deleted(ptr_); }
inline mgp::Id Node::Id() const { return Id::FromInt(mgp::vertex_get_id(ptr_).as_int); }
inline mgp::Labels Node::Labels() const { return mgp::Labels(ptr_); }
@ -2884,6 +2909,8 @@ inline Relationship::~Relationship() {
}
}
inline bool Relationship::IsDeleted() const { return mgp::edge_is_deleted(ptr_); }
inline mgp::Id Relationship::Id() const { return Id::FromInt(mgp::edge_get_id(ptr_).as_int); }
inline std::string_view Relationship::Type() const { return mgp::edge_get_type(ptr_).name; }
@ -2989,6 +3016,8 @@ inline Path::~Path() {
}
}
inline bool Path::ContainsDeleted() const { return mgp::path_contains_deleted(ptr_); }
inline size_t Path::Length() const { return mgp::path_size(ptr_); }
inline Node Path::GetNodeAt(size_t index) const {

View File

@ -95,7 +95,7 @@ class Database {
*
* @return storage::StorageMode
*/
storage::StorageMode GetStorageMode() const { return storage_->GetStorageMode(); }
storage::StorageMode GetStorageMode() const noexcept { return storage_->GetStorageMode(); }
/**
* @brief Get the storage info

View File

@ -15,6 +15,7 @@
#include <cppitertools/filter.hpp>
#include <cppitertools/imap.hpp>
#include "storage/v2/storage_mode.hpp"
#include "utils/pmr/unordered_set.hpp"
namespace memgraph::query {
@ -139,6 +140,8 @@ std::optional<VertexAccessor> SubgraphDbAccessor::FindVertex(storage::Gid gid, s
query::Graph *SubgraphDbAccessor::getGraph() { return graph_; }
storage::StorageMode SubgraphDbAccessor::GetStorageMode() const noexcept { return db_accessor_.GetStorageMode(); }
DbAccessor *SubgraphDbAccessor::GetAccessor() { return &db_accessor_; }
VertexAccessor SubgraphVertexAccessor::GetVertexAccessor() const { return impl_; }

View File

@ -42,6 +42,8 @@ class EdgeAccessor final {
explicit EdgeAccessor(storage::EdgeAccessor impl) : impl_(std::move(impl)) {}
bool IsDeleted() const { return impl_.IsDeleted(); }
bool IsVisible(storage::View view) const { return impl_.IsVisible(view); }
storage::EdgeTypeId EdgeType() const { return impl_.EdgeType(); }
@ -543,7 +545,7 @@ class DbAccessor final {
void Abort() { accessor_->Abort(); }
storage::StorageMode GetStorageMode() const { return accessor_->GetCreationStorageMode(); }
storage::StorageMode GetStorageMode() const noexcept { return accessor_->GetCreationStorageMode(); }
bool LabelIndexExists(storage::LabelId label) const { return accessor_->LabelIndexExists(label); }
@ -693,6 +695,8 @@ class SubgraphDbAccessor final {
Graph *getGraph();
storage::StorageMode GetStorageMode() const noexcept;
DbAccessor *GetAccessor();
};

View File

@ -29,8 +29,10 @@
#include "query/frontend/ast/ast.hpp"
#include "query/frontend/semantic/symbol_table.hpp"
#include "query/interpret/frame.hpp"
#include "query/procedure/mg_procedure_impl.hpp"
#include "query/typed_value.hpp"
#include "spdlog/spdlog.h"
#include "storage/v2/storage_mode.hpp"
#include "utils/exceptions.hpp"
#include "utils/frame_change_id.hpp"
#include "utils/logging.hpp"
@ -840,6 +842,8 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
TypedValue Visit(Function &function) override {
FunctionContext function_ctx{dba_, ctx_->memory, ctx_->timestamp, &ctx_->counters, view_};
bool is_transactional = storage::IsTransactional(dba_->GetStorageMode());
TypedValue res(ctx_->memory);
// Stack allocate evaluated arguments when there's a small number of them.
if (function.arguments_.size() <= 8) {
TypedValue arguments[8] = {TypedValue(ctx_->memory), TypedValue(ctx_->memory), TypedValue(ctx_->memory),
@ -848,19 +852,20 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
for (size_t i = 0; i < function.arguments_.size(); ++i) {
arguments[i] = function.arguments_[i]->Accept(*this);
}
auto res = function.function_(arguments, function.arguments_.size(), function_ctx);
MG_ASSERT(res.GetMemoryResource() == ctx_->memory);
return res;
res = function.function_(arguments, function.arguments_.size(), function_ctx);
} else {
TypedValue::TVector arguments(ctx_->memory);
arguments.reserve(function.arguments_.size());
for (const auto &argument : function.arguments_) {
arguments.emplace_back(argument->Accept(*this));
}
auto res = function.function_(arguments.data(), arguments.size(), function_ctx);
MG_ASSERT(res.GetMemoryResource() == ctx_->memory);
return res;
res = function.function_(arguments.data(), arguments.size(), function_ctx);
}
MG_ASSERT(res.GetMemoryResource() == ctx_->memory);
if (!is_transactional && res.ContainsDeleted()) [[unlikely]] {
return TypedValue(ctx_->memory);
}
return res;
}
TypedValue Visit(Reduce &reduce) override {

View File

@ -4835,6 +4835,12 @@ class CallProcedureCursor : public Cursor {
AbortCheck(context);
auto skip_rows_with_deleted_values = [this]() {
while (result_row_it_ != result_->rows.end() && result_row_it_->has_deleted_values) {
++result_row_it_;
}
};
// We need to fetch new procedure results after pulling from input.
// TODO: Look into openCypher's distinction between procedures returning an
// empty result set vs procedures which return `void`. We currently don't
@ -4844,7 +4850,7 @@ class CallProcedureCursor : public Cursor {
// It might be a good idea to resolve the procedure name once, at the
// start. Unfortunately, this could deadlock if we tried to invoke a
// procedure from a module (read lock) and reload a module (write lock)
// inside the same execution thread. Also, our RWLock is setup so that
// inside the same execution thread. Also, our RWLock is set up so that
// it's not possible for a single thread to request multiple read locks.
// Builtin module registration in query/procedure/module.cpp depends on
// this locking scheme.
@ -4892,6 +4898,7 @@ class CallProcedureCursor : public Cursor {
graph_view);
result_->signature = &proc->results;
result_->is_transactional = storage::IsTransactional(context.db_accessor->GetStorageMode());
// Use special memory as invoking procedure is complex
// TODO: This will probably need to be changed when we add support for
@ -4916,6 +4923,9 @@ class CallProcedureCursor : public Cursor {
throw QueryRuntimeException("{}: {}", self_->procedure_name_, *result_->error_msg);
}
result_row_it_ = result_->rows.begin();
if (!result_->is_transactional) {
skip_rows_with_deleted_values();
}
stream_exhausted = result_row_it_ == result_->rows.end();
}
@ -4945,6 +4955,9 @@ class CallProcedureCursor : public Cursor {
}
}
++result_row_it_;
if (!result_->is_transactional) {
skip_rows_with_deleted_values();
}
return true;
}

View File

@ -32,6 +32,7 @@
#include "query/procedure/mg_procedure_helpers.hpp"
#include "query/stream/common.hpp"
#include "storage/v2/property_value.hpp"
#include "storage/v2/storage_mode.hpp"
#include "storage/v2/view.hpp"
#include "utils/algorithm.hpp"
#include "utils/concepts.hpp"
@ -321,6 +322,53 @@ mgp_value_type FromTypedValueType(memgraph::query::TypedValue::Type type) {
}
} // namespace
bool IsDeleted(const mgp_vertex *vertex) { return vertex->getImpl().impl_.vertex_->deleted; }
bool IsDeleted(const mgp_edge *edge) { return edge->impl.IsDeleted(); }
bool ContainsDeleted(const mgp_path *path) {
return std::ranges::any_of(path->vertices, [](const auto &vertex) { return IsDeleted(&vertex); }) ||
std::ranges::any_of(path->edges, [](const auto &edge) { return IsDeleted(&edge); });
}
bool ContainsDeleted(const mgp_list *list) {
return std::ranges::any_of(list->elems, [](const auto &elem) { return ContainsDeleted(&elem); });
}
bool ContainsDeleted(const mgp_map *map) {
return std::ranges::any_of(map->items, [](const auto &item) { return ContainsDeleted(&item.second); });
}
bool ContainsDeleted(const mgp_value *val) {
switch (val->type) {
// Value types
case MGP_VALUE_TYPE_NULL:
case MGP_VALUE_TYPE_BOOL:
case MGP_VALUE_TYPE_INT:
case MGP_VALUE_TYPE_DOUBLE:
case MGP_VALUE_TYPE_STRING:
case MGP_VALUE_TYPE_DATE:
case MGP_VALUE_TYPE_LOCAL_TIME:
case MGP_VALUE_TYPE_LOCAL_DATE_TIME:
case MGP_VALUE_TYPE_DURATION:
return false;
// Reference types
case MGP_VALUE_TYPE_LIST:
return ContainsDeleted(val->list_v);
case MGP_VALUE_TYPE_MAP:
return ContainsDeleted(val->map_v);
case MGP_VALUE_TYPE_VERTEX:
return IsDeleted(val->vertex_v);
case MGP_VALUE_TYPE_EDGE:
return IsDeleted(val->edge_v);
case MGP_VALUE_TYPE_PATH:
return ContainsDeleted(val->path_v);
default:
throw memgraph::query::QueryRuntimeException("Value of unknown type");
}
return false;
}
memgraph::query::TypedValue ToTypedValue(const mgp_value &val, memgraph::utils::MemoryResource *memory) {
switch (val.type) {
case MGP_VALUE_TYPE_NULL:
@ -1003,6 +1051,10 @@ mgp_error mgp_list_copy(mgp_list *list, mgp_memory *memory, mgp_list **result) {
void mgp_list_destroy(mgp_list *list) { DeleteRawMgpObject(list); }
mgp_error mgp_list_contains_deleted(mgp_list *list, int *result) {
return WrapExceptions([list, result] { *result = ContainsDeleted(list); });
}
namespace {
void MgpListAppendExtend(mgp_list &list, const mgp_value &value) { list.elems.push_back(value); }
} // namespace
@ -1054,6 +1106,10 @@ mgp_error mgp_map_copy(mgp_map *map, mgp_memory *memory, mgp_map **result) {
void mgp_map_destroy(mgp_map *map) { DeleteRawMgpObject(map); }
mgp_error mgp_map_contains_deleted(mgp_map *map, int *result) {
return WrapExceptions([map, result] { *result = ContainsDeleted(map); });
}
mgp_error mgp_map_insert(mgp_map *map, const char *key, mgp_value *value) {
return WrapExceptions([&] {
auto emplace_result = map->items.emplace(key, *value);
@ -1177,6 +1233,10 @@ mgp_error mgp_path_copy(mgp_path *path, mgp_memory *memory, mgp_path **result) {
void mgp_path_destroy(mgp_path *path) { DeleteRawMgpObject(path); }
mgp_error mgp_path_contains_deleted(mgp_path *path, int *result) {
return WrapExceptions([path, result] { *result = ContainsDeleted(path); });
}
mgp_error mgp_path_expand(mgp_path *path, mgp_edge *edge) {
return WrapExceptions([path, edge] {
MG_ASSERT(Call<size_t>(mgp_path_size, path) == path->vertices.size() - 1, "Invalid mgp_path");
@ -1560,8 +1620,9 @@ mgp_error mgp_result_new_record(mgp_result *res, mgp_result_record **result) {
auto *memory = res->rows.get_allocator().GetMemoryResource();
MG_ASSERT(res->signature, "Expected to have a valid signature");
res->rows.push_back(mgp_result_record{
res->signature,
memgraph::utils::pmr::map<memgraph::utils::pmr::string, memgraph::query::TypedValue>(memory)});
.signature = res->signature,
.values = memgraph::utils::pmr::map<memgraph::utils::pmr::string, memgraph::query::TypedValue>(memory),
.ignore_deleted_values = !res->is_transactional});
return &res->rows.back();
},
result);
@ -1576,10 +1637,14 @@ mgp_error mgp_result_record_insert(mgp_result_record *record, const char *field_
if (find_it == record->signature->end()) {
throw std::out_of_range{fmt::format("The result doesn't have any field named '{}'.", field_name)};
}
if (record->ignore_deleted_values && ContainsDeleted(val)) [[unlikely]] {
record->has_deleted_values = true;
return;
}
const auto *type = find_it->second.first;
if (!type->SatisfiesType(*val)) {
throw std::logic_error{
fmt::format("The type of value doesn't satisfies the type '{}'!", type->GetPresentableName())};
fmt::format("The type of value doesn't satisfy the type '{}'!", type->GetPresentableName())};
}
record->values.emplace(field_name, ToTypedValue(*val, memory));
});
@ -1746,7 +1811,7 @@ memgraph::storage::PropertyValue ToPropertyValue(const mgp_value &value) {
return memgraph::storage::PropertyValue{memgraph::storage::TemporalData{memgraph::storage::TemporalType::Duration,
value.duration_v->duration.microseconds}};
case MGP_VALUE_TYPE_VERTEX:
throw ValueConversionException{"A vertex is not a valid property value! "};
throw ValueConversionException{"A vertex is not a valid property value!"};
case MGP_VALUE_TYPE_EDGE:
throw ValueConversionException{"An edge is not a valid property value!"};
case MGP_VALUE_TYPE_PATH:
@ -1962,6 +2027,10 @@ mgp_error mgp_vertex_copy(mgp_vertex *v, mgp_memory *memory, mgp_vertex **result
void mgp_vertex_destroy(mgp_vertex *v) { DeleteRawMgpObject(v); }
mgp_error mgp_vertex_is_deleted(mgp_vertex *v, int *result) {
return WrapExceptions([v] { return IsDeleted(v); }, result);
}
mgp_error mgp_vertex_equal(mgp_vertex *v1, mgp_vertex *v2, int *result) {
// NOLINTNEXTLINE(clang-diagnostic-unevaluated-expression)
static_assert(noexcept(*v1 == *v2));
@ -2319,6 +2388,10 @@ mgp_error mgp_edge_copy(mgp_edge *e, mgp_memory *memory, mgp_edge **result) {
void mgp_edge_destroy(mgp_edge *e) { DeleteRawMgpObject(e); }
mgp_error mgp_edge_is_deleted(mgp_edge *e, int *result) {
return WrapExceptions([e] { return IsDeleted(e); }, result);
}
mgp_error mgp_edge_equal(mgp_edge *e1, mgp_edge *e2, int *result) {
// NOLINTNEXTLINE(clang-diagnostic-unevaluated-expression)
static_assert(noexcept(*e1 == *e2));
@ -2864,6 +2937,11 @@ mgp_error mgp_list_all_unique_constraints(mgp_graph *graph, mgp_memory *memory,
});
}
mgp_error mgp_graph_is_transactional(mgp_graph *graph, int *result) {
*result = IsTransactional(graph->storage_mode) ? 1 : 0;
return mgp_error::MGP_ERROR_NO_ERROR;
}
mgp_error mgp_graph_is_mutable(mgp_graph *graph, int *result) {
*result = MgpGraphIsMutable(*graph) ? 1 : 0;
return mgp_error::MGP_ERROR_NO_ERROR;

View File

@ -560,23 +560,24 @@ struct mgp_graph {
// TODO: Merge `mgp_graph` and `mgp_memory` into a single `mgp_context`. The
// `ctx` field is out of place here.
memgraph::query::ExecutionContext *ctx;
memgraph::storage::StorageMode storage_mode;
static mgp_graph WritableGraph(memgraph::query::DbAccessor &acc, memgraph::storage::View view,
memgraph::query::ExecutionContext &ctx) {
return mgp_graph{&acc, view, &ctx};
return mgp_graph{&acc, view, &ctx, acc.GetStorageMode()};
}
static mgp_graph NonWritableGraph(memgraph::query::DbAccessor &acc, memgraph::storage::View view) {
return mgp_graph{&acc, view, nullptr};
return mgp_graph{&acc, view, nullptr, acc.GetStorageMode()};
}
static mgp_graph WritableGraph(memgraph::query::SubgraphDbAccessor &acc, memgraph::storage::View view,
memgraph::query::ExecutionContext &ctx) {
return mgp_graph{&acc, view, &ctx};
return mgp_graph{&acc, view, &ctx, acc.GetStorageMode()};
}
static mgp_graph NonWritableGraph(memgraph::query::SubgraphDbAccessor &acc, memgraph::storage::View view) {
return mgp_graph{&acc, view, nullptr};
return mgp_graph{&acc, view, nullptr, acc.GetStorageMode()};
}
};
@ -585,6 +586,8 @@ struct mgp_result_record {
const memgraph::utils::pmr::map<memgraph::utils::pmr::string,
std::pair<const memgraph::query::procedure::CypherType *, bool>> *signature;
memgraph::utils::pmr::map<memgraph::utils::pmr::string, memgraph::query::TypedValue> values;
bool ignore_deleted_values = false;
bool has_deleted_values = false;
};
struct mgp_result {
@ -599,6 +602,7 @@ struct mgp_result {
std::pair<const memgraph::query::procedure::CypherType *, bool>> *signature;
memgraph::utils::pmr::vector<mgp_result_record> rows;
std::optional<memgraph::utils::pmr::string> error_msg;
bool is_transactional = true;
};
struct mgp_func_result {
@ -614,6 +618,7 @@ struct mgp_func_context {
memgraph::query::DbAccessor *impl;
memgraph::storage::View view;
};
struct mgp_properties_iterator {
using allocator_type = memgraph::utils::Allocator<mgp_properties_iterator>;
@ -724,6 +729,7 @@ struct ProcedureInfo {
bool is_batched{false};
std::optional<memgraph::query::AuthQuery::Privilege> required_privilege = std::nullopt;
};
struct mgp_proc {
using allocator_type = memgraph::utils::Allocator<mgp_proc>;
@ -984,4 +990,6 @@ struct mgp_messages {
storage_type messages;
};
bool ContainsDeleted(const mgp_value *val);
memgraph::query::TypedValue ToTypedValue(const mgp_value &val, memgraph::utils::MemoryResource *memory);

View File

@ -25,6 +25,7 @@
#include "query/exceptions.hpp"
#include "query/procedure/mg_procedure_helpers.hpp"
#include "query/procedure/mg_procedure_impl.hpp"
#include "storage/v2/storage_mode.hpp"
#include "utils/memory.hpp"
#include "utils/on_scope_exit.hpp"
#include "utils/pmr/vector.hpp"
@ -867,7 +868,37 @@ py::Object MgpListToPyTuple(mgp_list *list, PyObject *py_graph) {
}
namespace {
std::optional<py::ExceptionInfo> AddRecordFromPython(mgp_result *result, py::Object py_record, mgp_memory *memory) {
struct RecordFieldCache {
PyObject *key;
PyObject *val;
const char *field_name;
mgp_value *field_val;
};
std::optional<py::ExceptionInfo> InsertField(PyObject *key, PyObject *val, mgp_result_record *record,
const char *field_name, mgp_value *field_val) {
if (mgp_result_record_insert(record, field_name, field_val) != mgp_error::MGP_ERROR_NO_ERROR) {
std::stringstream ss;
ss << "Unable to insert field '" << py::Object::FromBorrow(key) << "' with value: '" << py::Object::FromBorrow(val)
<< "'; did you set the correct field type?";
const auto &msg = ss.str();
PyErr_SetString(PyExc_ValueError, msg.c_str());
mgp_value_destroy(field_val);
return py::FetchError();
}
mgp_value_destroy(field_val);
return std::nullopt;
}
void SkipRecord(mgp_value *field_val, std::vector<RecordFieldCache> &current_record_cache) {
mgp_value_destroy(field_val);
for (auto &cache_entry : current_record_cache) {
mgp_value_destroy(cache_entry.field_val);
}
}
std::optional<py::ExceptionInfo> AddRecordFromPython(mgp_result *result, py::Object py_record, mgp_graph *graph,
mgp_memory *memory) {
py::Object py_mgp(PyImport_ImportModule("mgp"));
if (!py_mgp) return py::FetchError();
auto record_cls = py_mgp.GetAttr("Record");
@ -888,15 +919,27 @@ std::optional<py::ExceptionInfo> AddRecordFromPython(mgp_result *result, py::Obj
py::Object items(PyDict_Items(fields.Ptr()));
if (!items) return py::FetchError();
mgp_result_record *record{nullptr};
if (RaiseExceptionFromErrorCode(mgp_result_new_record(result, &record))) {
return py::FetchError();
const auto is_transactional = storage::IsTransactional(graph->storage_mode);
if (is_transactional) {
// IN_MEMORY_ANALYTICAL must first verify that the record contains no deleted values
if (RaiseExceptionFromErrorCode(mgp_result_new_record(result, &record))) {
return py::FetchError();
}
}
std::vector<RecordFieldCache> current_record_cache{};
utils::OnScopeExit clear_record_cache{[&current_record_cache] {
for (auto &record : current_record_cache) {
mgp_value_destroy(record.field_val);
}
}};
Py_ssize_t len = PyList_GET_SIZE(items.Ptr());
for (Py_ssize_t i = 0; i < len; ++i) {
auto *item = PyList_GET_ITEM(items.Ptr(), i);
if (!item) return py::FetchError();
MG_ASSERT(PyTuple_Check(item));
auto *key = PyTuple_GetItem(item, 0);
PyObject *key = PyTuple_GetItem(item, 0);
if (!key) return py::FetchError();
if (!PyUnicode_Check(key)) {
std::stringstream ss;
@ -905,30 +948,48 @@ std::optional<py::ExceptionInfo> AddRecordFromPython(mgp_result *result, py::Obj
PyErr_SetString(PyExc_TypeError, msg.c_str());
return py::FetchError();
}
const auto *field_name = PyUnicode_AsUTF8(key);
const char *field_name = PyUnicode_AsUTF8(key);
if (!field_name) return py::FetchError();
auto *val = PyTuple_GetItem(item, 1);
PyObject *val = PyTuple_GetItem(item, 1);
if (!val) return py::FetchError();
// This memory is one dedicated for mg_procedure.
mgp_value *field_val = PyObjectToMgpValueWithPythonExceptions(val, memory);
if (field_val == nullptr) {
return py::FetchError();
}
if (mgp_result_record_insert(record, field_name, field_val) != mgp_error::MGP_ERROR_NO_ERROR) {
std::stringstream ss;
ss << "Unable to insert field '" << py::Object::FromBorrow(key) << "' with value: '"
<< py::Object::FromBorrow(val) << "'; did you set the correct field type?";
const auto &msg = ss.str();
PyErr_SetString(PyExc_ValueError, msg.c_str());
mgp_value_destroy(field_val);
return py::FetchError();
if (!is_transactional) {
// If a deleted value is being inserted into a record, skip the whole record
if (ContainsDeleted(field_val)) {
SkipRecord(field_val, current_record_cache);
return std::nullopt;
}
current_record_cache.emplace_back(
RecordFieldCache{.key = key, .val = val, .field_name = field_name, .field_val = field_val});
} else {
auto maybe_exc = InsertField(key, val, record, field_name, field_val);
if (maybe_exc) return maybe_exc;
}
mgp_value_destroy(field_val);
}
if (is_transactional) {
return std::nullopt;
}
// IN_MEMORY_ANALYTICAL only adds a new record after verifying that it contains no deleted values
if (RaiseExceptionFromErrorCode(mgp_result_new_record(result, &record))) {
return py::FetchError();
}
for (auto &cache_entry : current_record_cache) {
auto maybe_exc =
InsertField(cache_entry.key, cache_entry.val, record, cache_entry.field_name, cache_entry.field_val);
if (maybe_exc) return maybe_exc;
}
return std::nullopt;
}
std::optional<py::ExceptionInfo> AddMultipleRecordsFromPython(mgp_result *result, py::Object py_seq,
std::optional<py::ExceptionInfo> AddMultipleRecordsFromPython(mgp_result *result, py::Object py_seq, mgp_graph *graph,
mgp_memory *memory) {
Py_ssize_t len = PySequence_Size(py_seq.Ptr());
if (len == -1) return py::FetchError();
@ -938,7 +999,7 @@ std::optional<py::ExceptionInfo> AddMultipleRecordsFromPython(mgp_result *result
for (Py_ssize_t i = 0, curr_item = 0; i < len; ++i, ++curr_item) {
py::Object py_record(PySequence_GetItem(py_seq.Ptr(), curr_item));
if (!py_record) return py::FetchError();
auto maybe_exc = AddRecordFromPython(result, py_record, memory);
auto maybe_exc = AddRecordFromPython(result, py_record, graph, memory);
if (maybe_exc) return maybe_exc;
// Once PySequence_DelSlice deletes "transformed" objects, starting index is 0 again.
if (i && i % del_cnt == 0) {
@ -952,14 +1013,14 @@ std::optional<py::ExceptionInfo> AddMultipleRecordsFromPython(mgp_result *result
}
std::optional<py::ExceptionInfo> AddMultipleBatchRecordsFromPython(mgp_result *result, py::Object py_seq,
mgp_memory *memory) {
mgp_graph *graph, mgp_memory *memory) {
Py_ssize_t len = PySequence_Size(py_seq.Ptr());
if (len == -1) return py::FetchError();
result->rows.reserve(len);
for (Py_ssize_t i = 0; i < len; ++i) {
py::Object py_record(PySequence_GetItem(py_seq.Ptr(), i));
if (!py_record) return py::FetchError();
auto maybe_exc = AddRecordFromPython(result, py_record, memory);
auto maybe_exc = AddRecordFromPython(result, py_record, graph, memory);
if (maybe_exc) return maybe_exc;
}
PySequence_DelSlice(py_seq.Ptr(), 0, PySequence_Size(py_seq.Ptr()));
@ -1015,11 +1076,11 @@ void CallPythonProcedure(const py::Object &py_cb, mgp_list *args, mgp_graph *gra
if (!py_res) return py::FetchError();
if (PySequence_Check(py_res.Ptr())) {
if (is_batched) {
return AddMultipleBatchRecordsFromPython(result, py_res, memory);
return AddMultipleBatchRecordsFromPython(result, py_res, graph, memory);
}
return AddMultipleRecordsFromPython(result, py_res, memory);
return AddMultipleRecordsFromPython(result, py_res, graph, memory);
}
return AddRecordFromPython(result, py_res, memory);
return AddRecordFromPython(result, py_res, graph, memory);
};
// It is *VERY IMPORTANT* to note that this code takes great care not to keep
@ -1114,9 +1175,9 @@ void CallPythonTransformation(const py::Object &py_cb, mgp_messages *msgs, mgp_g
auto py_res = py_cb.Call(py_graph, py_messages);
if (!py_res) return py::FetchError();
if (PySequence_Check(py_res.Ptr())) {
return AddMultipleRecordsFromPython(result, py_res, memory);
return AddMultipleRecordsFromPython(result, py_res, graph, memory);
}
return AddRecordFromPython(result, py_res, memory);
return AddRecordFromPython(result, py_res, graph, memory);
};
// It is *VERY IMPORTANT* to note that this code takes great care not to keep
@ -1164,9 +1225,27 @@ void CallPythonFunction(const py::Object &py_cb, mgp_list *args, mgp_graph *grap
auto call = [&](py::Object py_graph) -> utils::BasicResult<std::optional<py::ExceptionInfo>, mgp_value *> {
py::Object py_args(MgpListToPyTuple(args, py_graph.Ptr()));
if (!py_args) return {py::FetchError()};
const auto is_transactional = storage::IsTransactional(graph->storage_mode);
auto py_res = py_cb.Call(py_graph, py_args);
if (!py_res) return {py::FetchError()};
mgp_value *ret_val = PyObjectToMgpValueWithPythonExceptions(py_res.Ptr(), memory);
if (!is_transactional && ContainsDeleted(ret_val)) {
mgp_value_destroy(ret_val);
mgp_value *null_val{nullptr};
mgp_error last_error{mgp_error::MGP_ERROR_NO_ERROR};
last_error = mgp_value_make_null(memory, &null_val);
if (last_error == mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE) {
throw std::bad_alloc{};
}
if (last_error != mgp_error::MGP_ERROR_NO_ERROR) {
throw std::runtime_error{"Unexpected error while creating mgp_value"};
}
return null_val;
}
if (ret_val == nullptr) {
return {py::FetchError()};
}

View File

@ -99,7 +99,7 @@ void CallCustomTransformation(const std::string &transformation_name, const std:
mgp_messages mgp_messages{mgp_messages::storage_type{&memory_resource}};
std::transform(messages.begin(), messages.end(), std::back_inserter(mgp_messages.messages),
[](const TMessage &message) { return mgp_message{message}; });
mgp_graph graph{&db_accessor, storage::View::OLD, nullptr};
mgp_graph graph{&db_accessor, storage::View::OLD, nullptr, db_accessor.GetStorageMode()};
mgp_memory memory{&memory_resource};
result.rows.clear();
result.error_msg.reset();

View File

@ -370,6 +370,38 @@ bool TypedValue::IsGraph() const { return type_ == Type::Graph; }
#undef DEFINE_VALUE_AND_TYPE_GETTERS
bool TypedValue::ContainsDeleted() const {
switch (type_) {
// Value types
case Type::Null:
case Type::Bool:
case Type::Int:
case Type::Double:
case Type::String:
case Type::Date:
case Type::LocalTime:
case Type::LocalDateTime:
case Type::Duration:
return false;
// Reference types
case Type::List:
return std::ranges::any_of(list_v, [](const auto &elem) { return elem.ContainsDeleted(); });
case Type::Map:
return std::ranges::any_of(map_v, [](const auto &item) { return item.second.ContainsDeleted(); });
case Type::Vertex:
return vertex_v.impl_.vertex_->deleted;
case Type::Edge:
return edge_v.IsDeleted();
case Type::Path:
return std::ranges::any_of(path_v.vertices(),
[](auto &vertex_acc) { return vertex_acc.impl_.vertex_->deleted; }) ||
std::ranges::any_of(path_v.edges(), [](auto &edge_acc) { return edge_acc.IsDeleted(); });
default:
throw TypedValueException("Value of unknown type");
}
return false;
}
bool TypedValue::IsNull() const { return type_ == Type::Null; }
bool TypedValue::IsNumeric() const { return IsInt() || IsDouble(); }

View File

@ -515,6 +515,8 @@ class TypedValue {
#undef DECLARE_VALUE_AND_TYPE_GETTERS
bool ContainsDeleted() const;
/** Checks if value is a TypedValue::Null. */
bool IsNull() const;

View File

@ -25,6 +25,13 @@
namespace memgraph::storage {
bool EdgeAccessor::IsDeleted() const {
if (!storage_->config_.items.properties_on_edges) {
return false;
}
return edge_.ptr->deleted;
}
bool EdgeAccessor::IsVisible(const View view) const {
bool exists = true;
bool deleted = true;

View File

@ -44,6 +44,8 @@ class EdgeAccessor final {
transaction_(transaction),
for_deleted_(for_deleted) {}
bool IsDeleted() const;
/// @return true if the object is visible from the current transaction
bool IsVisible(View view) const;

View File

@ -85,7 +85,7 @@ Storage::Accessor::Accessor(Accessor &&other) noexcept
other.commit_timestamp_.reset();
}
StorageMode Storage::GetStorageMode() const { return storage_mode_; }
StorageMode Storage::GetStorageMode() const noexcept { return storage_mode_; }
IsolationLevel Storage::GetIsolationLevel() const noexcept { return isolation_level_; }
@ -95,7 +95,7 @@ utils::BasicResult<Storage::SetIsolationLevelError> Storage::SetIsolationLevel(I
return {};
}
StorageMode Storage::Accessor::GetCreationStorageMode() const { return creation_storage_mode_; }
StorageMode Storage::Accessor::GetCreationStorageMode() const noexcept { return creation_storage_mode_; }
std::optional<uint64_t> Storage::Accessor::GetTransactionId() const {
if (is_transaction_active_) {

View File

@ -238,7 +238,7 @@ class Storage {
EdgeTypeId NameToEdgeType(std::string_view name) { return storage_->NameToEdgeType(name); }
StorageMode GetCreationStorageMode() const;
StorageMode GetCreationStorageMode() const noexcept;
const std::string &id() const { return storage_->id(); }
@ -304,7 +304,7 @@ class Storage {
return EdgeTypeId::FromUint(name_id_mapper_->NameToId(name));
}
StorageMode GetStorageMode() const;
StorageMode GetStorageMode() const noexcept;
virtual void FreeMemory(std::unique_lock<utils::ResourceLock> main_guard) = 0;

View File

@ -13,6 +13,10 @@
namespace memgraph::storage {
bool IsTransactional(const StorageMode storage_mode) noexcept {
return storage_mode != StorageMode::IN_MEMORY_ANALYTICAL;
}
std::string_view StorageModeToString(memgraph::storage::StorageMode storage_mode) {
switch (storage_mode) {
case memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL:

View File

@ -18,6 +18,8 @@ namespace memgraph::storage {
enum class StorageMode : std::uint8_t { IN_MEMORY_ANALYTICAL, IN_MEMORY_TRANSACTIONAL, ON_DISK_TRANSACTIONAL };
bool IsTransactional(const StorageMode storage_mode) noexcept;
std::string_view StorageModeToString(memgraph::storage::StorageMode storage_mode);
} // namespace memgraph::storage

View File

@ -72,6 +72,7 @@ add_subdirectory(constraints)
add_subdirectory(inspect_query)
add_subdirectory(filter_info)
add_subdirectory(queries)
add_subdirectory(query_modules_storage_modes)
add_subdirectory(garbage_collection)
add_subdirectory(query_planning)

View File

@ -0,0 +1,8 @@
function(copy_qm_storage_modes_e2e_python_files FILE_NAME)
copy_e2e_python_files(query_modules_storage_modes ${FILE_NAME})
endfunction()
copy_qm_storage_modes_e2e_python_files(common.py)
copy_qm_storage_modes_e2e_python_files(test_query_modules_storage_modes.py)
add_subdirectory(query_modules)

View File

@ -0,0 +1,37 @@
# Copyright 2023 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import typing
import mgclient
import pytest
@pytest.fixture(scope="function")
def cursor(**kwargs) -> mgclient.Connection:
connection = mgclient.connect(host="localhost", port=7687, **kwargs)
connection.autocommit = True
cursor = connection.cursor()
cursor.execute("MATCH (n) DETACH DELETE n;")
cursor.execute("CREATE (m:Component {id: 'A7422'}), (n:Component {id: '7X8X0'});")
cursor.execute("MATCH (m:Component {id: 'A7422'}) MATCH (n:Component {id: '7X8X0'}) CREATE (m)-[:PART_OF]->(n);")
cursor.execute("MATCH (m:Component {id: 'A7422'}) MATCH (n:Component {id: '7X8X0'}) CREATE (n)-[:DEPENDS_ON]->(m);")
yield cursor
cursor.execute("MATCH (n) DETACH DELETE n;")
def connect(**kwargs):
connection = mgclient.connect(host="localhost", port=7687, **kwargs)
connection.autocommit = True
return connection.cursor()

View File

@ -0,0 +1,4 @@
copy_qm_storage_modes_e2e_python_files(python_api.py)
add_query_module(c_api c_api.cpp)
add_query_module(cpp_api cpp_api.cpp)

View File

@ -0,0 +1,70 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <chrono>
#include <thread>
#include "_mgp.hpp"
#include "mg_exceptions.hpp"
#include "mg_procedure.h"
constexpr std::string_view kFunctionPassRelationship = "pass_relationship";
constexpr std::string_view kPassRelationshipArg = "relationship";
constexpr std::string_view kProcedurePassNodeWithId = "pass_node_with_id";
constexpr std::string_view kPassNodeWithIdArg = "node";
constexpr std::string_view kPassNodeWithIdFieldNode = "node";
constexpr std::string_view kPassNodeWithIdFieldId = "id";
// While the query procedure/function sleeps for this amount of time, a parallel transaction will erase a graph element
// (node or relationship) contained in the return value. Any operation in the parallel transaction should take far less
// time than this value.
const int64_t kSleep = 1;
void PassRelationship(mgp_list *args, mgp_func_context *ctx, mgp_func_result *res, mgp_memory *memory) {
auto *relationship = mgp::list_at(args, 0);
std::this_thread::sleep_for(std::chrono::seconds(kSleep));
mgp::func_result_set_value(res, relationship, memory);
}
void PassNodeWithId(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) {
auto *node = mgp::value_get_vertex(mgp::list_at(args, 0));
auto node_id = mgp::vertex_get_id(node).as_int;
std::this_thread::sleep_for(std::chrono::seconds(kSleep));
auto *result_record = mgp::result_new_record(result);
mgp::result_record_insert(result_record, kPassNodeWithIdFieldNode.data(), mgp::value_make_vertex(node));
mgp::result_record_insert(result_record, kPassNodeWithIdFieldId.data(), mgp::value_make_int(node_id, memory));
}
extern "C" int mgp_init_module(struct mgp_module *query_module, struct mgp_memory *memory) {
try {
{
auto *func = mgp::module_add_function(query_module, kFunctionPassRelationship.data(), PassRelationship);
mgp::func_add_arg(func, kPassRelationshipArg.data(), mgp::type_relationship());
}
{
auto *proc = mgp::module_add_read_procedure(query_module, kProcedurePassNodeWithId.data(), PassNodeWithId);
mgp::proc_add_arg(proc, kPassNodeWithIdArg.data(), mgp::type_node());
mgp::proc_add_result(proc, kPassNodeWithIdFieldNode.data(), mgp::type_node());
mgp::proc_add_result(proc, kPassNodeWithIdFieldId.data(), mgp::type_int());
}
} catch (const std::exception &e) {
return 1;
}
return 0;
}
extern "C" int mgp_shutdown_module() { return 0; }

View File

@ -0,0 +1,86 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <chrono>
#include <thread>
#include <mgp.hpp>
constexpr std::string_view kFunctionPassRelationship = "pass_relationship";
constexpr std::string_view kPassRelationshipArg = "relationship";
constexpr std::string_view kProcedurePassNodeWithId = "pass_node_with_id";
constexpr std::string_view kPassNodeWithIdArg = "node";
constexpr std::string_view kPassNodeWithIdFieldNode = "node";
constexpr std::string_view kPassNodeWithIdFieldId = "id";
// While the query procedure/function sleeps for this amount of time, a parallel transaction will erase a graph element
// (node or relationship) contained in the return value. Any operation in the parallel transaction should take far less
// time than this value.
const int64_t kSleep = 1;
void PassRelationship(mgp_list *args, mgp_func_context *ctx, mgp_func_result *res, mgp_memory *memory) {
try {
mgp::MemoryDispatcherGuard guard{memory};
const auto arguments = mgp::List(args);
auto result = mgp::Result(res);
const auto relationship = arguments[0].ValueRelationship();
std::this_thread::sleep_for(std::chrono::seconds(kSleep));
result.SetValue(relationship);
} catch (const std::exception &e) {
mgp::func_result_set_error_msg(res, e.what(), memory);
return;
}
}
void PassNodeWithId(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) {
try {
mgp::MemoryDispatcherGuard guard(memory);
const auto arguments = mgp::List(args);
const auto record_factory = mgp::RecordFactory(result);
const auto node = arguments[0].ValueNode();
const auto node_id = node.Id().AsInt();
std::this_thread::sleep_for(std::chrono::seconds(kSleep));
auto record = record_factory.NewRecord();
record.Insert(kPassNodeWithIdFieldNode.data(), node);
record.Insert(kPassNodeWithIdFieldId.data(), node_id);
} catch (const std::exception &e) {
mgp::result_set_error_msg(result, e.what());
return;
}
}
extern "C" int mgp_init_module(struct mgp_module *query_module, struct mgp_memory *memory) {
try {
mgp::MemoryDispatcherGuard guard(memory);
mgp::AddFunction(PassRelationship, kFunctionPassRelationship,
{mgp::Parameter(kPassRelationshipArg, mgp::Type::Relationship)}, query_module, memory);
mgp::AddProcedure(
PassNodeWithId, kProcedurePassNodeWithId, mgp::ProcedureType::Read,
{mgp::Parameter(kPassNodeWithIdArg, mgp::Type::Node)},
{mgp::Return(kPassNodeWithIdFieldNode, mgp::Type::Node), mgp::Return(kPassNodeWithIdFieldId, mgp::Type::Int)},
query_module, memory);
} catch (const std::exception &e) {
return 1;
}
return 0;
}
extern "C" int mgp_shutdown_module() { return 0; }

View File

@ -0,0 +1,55 @@
# Copyright 2023 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
from time import sleep
import mgp
# While the query procedure/function sleeps for this amount of time, a parallel transaction will erase a graph element
# (node or relationship) contained in the return value. Any operation in the parallel transaction should take far less
# time than this value.
SLEEP = 1
@mgp.read_proc
def pass_node_with_id(ctx: mgp.ProcCtx, node: mgp.Vertex) -> mgp.Record(node=mgp.Vertex, id=int):
sleep(SLEEP)
return mgp.Record(node=node, id=node.id)
@mgp.function
def pass_node(ctx: mgp.FuncCtx, node: mgp.Vertex):
sleep(SLEEP)
return node
@mgp.function
def pass_relationship(ctx: mgp.FuncCtx, relationship: mgp.Edge):
sleep(SLEEP)
return relationship
@mgp.function
def pass_path(ctx: mgp.FuncCtx, path: mgp.Path):
sleep(SLEEP)
return path
@mgp.function
def pass_list(ctx: mgp.FuncCtx, list_: mgp.List[mgp.Any]):
sleep(SLEEP)
return list_
@mgp.function
def pass_map(ctx: mgp.FuncCtx, map_: mgp.Map):
sleep(SLEEP)
return map_

View File

@ -0,0 +1,283 @@
# Copyright 2023 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
# isort: off
from multiprocessing import Process
import sys
import pytest
from common import cursor, connect
import time
SWITCH_TO_ANALYTICAL = "STORAGE MODE IN_MEMORY_ANALYTICAL;"
def modify_graph(query):
subprocess_cursor = connect()
time.sleep(0.5) # Time for the parallel transaction to call a query procedure
subprocess_cursor.execute(query)
@pytest.mark.parametrize("api", ["c", "cpp", "python"])
def test_function_delete_result(cursor, api):
cursor.execute(SWITCH_TO_ANALYTICAL)
deleter = Process(
target=modify_graph,
args=("MATCH (m:Component {id: 'A7422'})-[e:PART_OF]->(n:Component {id: '7X8X0'}) DELETE e;",),
)
deleter.start()
cursor.execute(f"MATCH (m)-[e]->(n) RETURN {api}_api.pass_relationship(e);")
deleter.join()
result = cursor.fetchall()
assert len(result) == 1 and len(result[0]) == 1 and result[0][0].type == "DEPENDS_ON"
@pytest.mark.parametrize("api", ["c", "cpp", "python"])
def test_function_delete_only_result(cursor, api):
cursor.execute(SWITCH_TO_ANALYTICAL)
cursor.execute("MATCH (m:Component {id: '7X8X0'})-[e:DEPENDS_ON]->(n:Component {id: 'A7422'}) DELETE e;")
deleter = Process(
target=modify_graph,
args=("MATCH (m:Component {id: 'A7422'})-[e:PART_OF]->(n:Component {id: '7X8X0'}) DELETE e;",),
)
deleter.start()
cursor.execute(f"MATCH (m)-[e]->(n) RETURN {api}_api.pass_relationship(e);")
deleter.join()
result = cursor.fetchall()
assert len(result) == 1 and len(result[0]) == 1 and result[0][0] is None
@pytest.mark.parametrize("api", ["c", "cpp", "python"])
def test_procedure_delete_result(cursor, api):
cursor.execute(SWITCH_TO_ANALYTICAL)
deleter = Process(
target=modify_graph,
args=("MATCH (n {id: 'A7422'}) DETACH DELETE n;",),
)
deleter.start()
cursor.execute(
f"""MATCH (n)
CALL {api}_api.pass_node_with_id(n)
YIELD node, id
RETURN node, id;"""
)
deleter.join()
result = cursor.fetchall()
assert len(result) == 1 and len(result[0]) == 2 and result[0][0].properties["id"] == "7X8X0"
@pytest.mark.parametrize("api", ["c", "cpp", "python"])
def test_procedure_delete_only_result(cursor, api):
cursor.execute(SWITCH_TO_ANALYTICAL)
cursor.execute("MATCH (n {id: '7X8X0'}) DETACH DELETE n;")
deleter = Process(
target=modify_graph,
args=("MATCH (n {id: 'A7422'}) DETACH DELETE n;",),
)
deleter.start()
cursor.execute(
f"""MATCH (n)
CALL {api}_api.pass_node_with_id(n)
YIELD node, id
RETURN node, id;"""
)
deleter.join()
result = cursor.fetchall()
assert len(result) == 0
def test_deleted_node(cursor):
cursor.execute(SWITCH_TO_ANALYTICAL)
deleter = Process(target=modify_graph, args=("MATCH (n:Component {id: 'A7422'}) DETACH DELETE n;",))
deleter.start()
cursor.execute(
"""MATCH (n: Component {id: 'A7422'})
RETURN python_api.pass_node(n);"""
)
deleter.join()
result = cursor.fetchall()
assert len(result) == 1 and len(result[0]) == 1 and result[0][0] is None
def test_deleted_relationship(cursor):
cursor.execute(SWITCH_TO_ANALYTICAL)
deleter = Process(
target=modify_graph,
args=("MATCH (:Component {id: 'A7422'})-[e:PART_OF]->(:Component {id: '7X8X0'}) DELETE e;",),
)
deleter.start()
cursor.execute(
"""MATCH (:Component {id: 'A7422'})-[e:PART_OF]->(:Component {id: '7X8X0'})
RETURN python_api.pass_relationship(e);"""
)
deleter.join()
result = cursor.fetchall()
assert len(result) == 1 and len(result[0]) == 1 and result[0][0] is None
def test_deleted_node_in_path(cursor):
cursor.execute(SWITCH_TO_ANALYTICAL)
deleter = Process(target=modify_graph, args=("MATCH (n:Component {id: 'A7422'}) DETACH DELETE n;",))
deleter.start()
cursor.execute(
"""MATCH path=(n {id: 'A7422'})-[e]->(m)
RETURN python_api.pass_path(path);"""
)
deleter.join()
result = cursor.fetchall()
assert len(result) == 1 and len(result[0]) == 1 and result[0][0] is None
def test_deleted_relationship_in_path(cursor):
cursor.execute(SWITCH_TO_ANALYTICAL)
deleter = Process(
target=modify_graph,
args=("MATCH (:Component {id: 'A7422'})-[e:PART_OF]->(:Component {id: '7X8X0'}) DELETE e;",),
)
deleter.start()
cursor.execute(
"""MATCH path=(n {id: 'A7422'})-[e]->(m)
RETURN python_api.pass_path(path);"""
)
deleter.join()
result = cursor.fetchall()
assert len(result) == 1 and len(result[0]) == 1 and result[0][0] is None
def test_deleted_value_in_list(cursor):
cursor.execute(SWITCH_TO_ANALYTICAL)
deleter = Process(
target=modify_graph,
args=("MATCH (:Component {id: 'A7422'})-[e:PART_OF]->(:Component {id: '7X8X0'}) DELETE e;",),
)
deleter.start()
cursor.execute(
"""MATCH (n)-[e]->()
WITH collect(n) + collect(e) as list
RETURN python_api.pass_list(list);"""
)
deleter.join()
result = cursor.fetchall()
assert len(result) == 1 and len(result[0]) == 1 and result[0][0] is None
def test_deleted_value_in_map(cursor):
cursor.execute(SWITCH_TO_ANALYTICAL)
deleter = Process(
target=modify_graph,
args=("MATCH (:Component {id: 'A7422'})-[e:PART_OF]->(:Component {id: '7X8X0'}) DELETE e;",),
)
deleter.start()
cursor.execute(
"""MATCH (n {id: 'A7422'})-[e]->()
WITH {node: n, relationship: e} AS map
RETURN python_api.pass_map(map);"""
)
deleter.join()
result = cursor.fetchall()
assert len(result) == 1 and len(result[0]) == 1 and result[0][0] is None
@pytest.mark.parametrize("storage_mode", ["IN_MEMORY_TRANSACTIONAL", "IN_MEMORY_ANALYTICAL"])
def test_function_none_deleted(storage_mode):
cursor = connect()
cursor.execute(f"STORAGE MODE {storage_mode};")
cursor.execute("CREATE (m:Component {id: 'A7422'}), (n:Component {id: '7X8X0'});")
cursor.execute(
"""MATCH (n)
RETURN python_api.pass_node(n);"""
)
result = cursor.fetchall()
cursor.execute("MATCH (n) DETACH DELETE n;")
assert len(result) == 2
@pytest.mark.parametrize("storage_mode", ["IN_MEMORY_TRANSACTIONAL", "IN_MEMORY_ANALYTICAL"])
def test_procedure_none_deleted(storage_mode):
cursor = connect()
cursor.execute(f"STORAGE MODE {storage_mode};")
cursor.execute("CREATE (m:Component {id: 'A7422'}), (n:Component {id: '7X8X0'});")
cursor.execute(
"""MATCH (n)
CALL python_api.pass_node_with_id(n)
YIELD node, id
RETURN node, id;"""
)
result = cursor.fetchall()
cursor.execute("MATCH (n) DETACH DELETE n;")
assert len(result) == 2
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -0,0 +1,14 @@
query_modules_storage_modes_cluster: &query_modules_storage_modes_cluster
cluster:
main:
args: ["--bolt-port", "7687", "--log-level=TRACE"]
log_file: "query_modules_storage_modes.log"
setup_queries: []
validation_queries: []
workloads:
- name: "Test query module API behavior in Memgraph storage modes"
binary: "tests/e2e/pytest_runner.sh"
proc: "tests/e2e/query_modules_storage_modes/query_modules/"
args: ["query_modules_storage_modes/test_query_modules_storage_modes.py"]
<<: *query_modules_storage_modes_cluster

View File

@ -104,7 +104,7 @@ class MemgraphRunner:
memgraph_binary = os.path.join(self.build_directory, "memgraph")
args_mg = [
memgraph_binary,
"--storage-properties-on-edges",
"--storage-properties-on-edges=true",
"--data-directory",
self.data_directory.name,
"--log-file",

View File

@ -38,7 +38,8 @@ struct CppApiTestFixture : public ::testing::Test {
mgp_graph CreateGraph(const memgraph::storage::View view = memgraph::storage::View::NEW) {
// the execution context can be null as it shouldn't be used in these tests
return mgp_graph{&CreateDbAccessor(memgraph::storage::IsolationLevel::SNAPSHOT_ISOLATION), view, ctx_.get()};
return mgp_graph{&CreateDbAccessor(memgraph::storage::IsolationLevel::SNAPSHOT_ISOLATION), view, ctx_.get(),
memgraph::storage::StorageMode::IN_MEMORY_TRANSACTIONAL};
}
memgraph::query::DbAccessor &CreateDbAccessor(const memgraph::storage::IsolationLevel isolationLevel) {
@ -499,6 +500,7 @@ TYPED_TEST(CppApiTestFixture, TestValueOperatorLessThan) {
ASSERT_THROW(list_test < map_test, mgp::ValueException);
ASSERT_THROW(list_test < list_test, mgp::ValueException);
}
TYPED_TEST(CppApiTestFixture, TestNumberEquality) {
mgp::Value double_1{1.0};
mgp::Value int_1{static_cast<int64_t>(1)};

View File

@ -249,7 +249,7 @@ TYPED_TEST(CypherType, VertexSatisfiesType) {
auto vertex = dba.InsertVertex();
mgp_memory memory{memgraph::utils::NewDeleteResource()};
memgraph::utils::Allocator<mgp_vertex> alloc(memory.impl);
mgp_graph graph{&dba, memgraph::storage::View::NEW, nullptr};
mgp_graph graph{&dba, memgraph::storage::View::NEW, nullptr, dba.GetStorageMode()};
auto *mgp_vertex_v =
EXPECT_MGP_NO_ERROR(mgp_value *, mgp_value_make_vertex, alloc.new_object<mgp_vertex>(vertex, &graph));
const memgraph::query::TypedValue tv_vertex(vertex);
@ -274,7 +274,7 @@ TYPED_TEST(CypherType, EdgeSatisfiesType) {
auto edge = *dba.InsertEdge(&v1, &v2, dba.NameToEdgeType("edge_type"));
mgp_memory memory{memgraph::utils::NewDeleteResource()};
memgraph::utils::Allocator<mgp_edge> alloc(memory.impl);
mgp_graph graph{&dba, memgraph::storage::View::NEW, nullptr};
mgp_graph graph{&dba, memgraph::storage::View::NEW, nullptr, dba.GetStorageMode()};
auto *mgp_edge_v = EXPECT_MGP_NO_ERROR(mgp_value *, mgp_value_make_edge, alloc.new_object<mgp_edge>(edge, &graph));
const memgraph::query::TypedValue tv_edge(edge);
CheckSatisfiesTypesAndNullable(
@ -298,7 +298,7 @@ TYPED_TEST(CypherType, PathSatisfiesType) {
auto edge = *dba.InsertEdge(&v1, &v2, dba.NameToEdgeType("edge_type"));
mgp_memory memory{memgraph::utils::NewDeleteResource()};
memgraph::utils::Allocator<mgp_path> alloc(memory.impl);
mgp_graph graph{&dba, memgraph::storage::View::NEW, nullptr};
mgp_graph graph{&dba, memgraph::storage::View::NEW, nullptr, dba.GetStorageMode()};
auto *mgp_vertex_v = alloc.new_object<mgp_vertex>(v1, &graph);
auto path = EXPECT_MGP_NO_ERROR(mgp_path *, mgp_path_make_with_start, mgp_vertex_v, &memory);
ASSERT_TRUE(path);

View File

@ -132,7 +132,7 @@ TYPED_TEST(PyModule, PyVertex) {
auto storage_dba = this->db->Access();
memgraph::query::DbAccessor dba(storage_dba.get());
mgp_memory memory{memgraph::utils::NewDeleteResource()};
mgp_graph graph{&dba, memgraph::storage::View::OLD, nullptr};
mgp_graph graph{&dba, memgraph::storage::View::OLD, nullptr, dba.GetStorageMode()};
auto *vertex = EXPECT_MGP_NO_ERROR(mgp_vertex *, mgp_graph_get_vertex_by_id, &graph, mgp_vertex_id{0}, &memory);
ASSERT_TRUE(vertex);
auto *vertex_value = EXPECT_MGP_NO_ERROR(mgp_value *, mgp_value_make_vertex,
@ -182,7 +182,7 @@ TYPED_TEST(PyModule, PyEdge) {
auto storage_dba = this->db->Access();
memgraph::query::DbAccessor dba(storage_dba.get());
mgp_memory memory{memgraph::utils::NewDeleteResource()};
mgp_graph graph{&dba, memgraph::storage::View::OLD, nullptr};
mgp_graph graph{&dba, memgraph::storage::View::OLD, nullptr, dba.GetStorageMode()};
auto *start_v = EXPECT_MGP_NO_ERROR(mgp_vertex *, mgp_graph_get_vertex_by_id, &graph, mgp_vertex_id{0}, &memory);
ASSERT_TRUE(start_v);
auto *edges_it = EXPECT_MGP_NO_ERROR(mgp_edges_iterator *, mgp_vertex_iter_out_edges, start_v, &memory);
@ -228,7 +228,7 @@ TYPED_TEST(PyModule, PyPath) {
auto storage_dba = this->db->Access();
memgraph::query::DbAccessor dba(storage_dba.get());
mgp_memory memory{memgraph::utils::NewDeleteResource()};
mgp_graph graph{&dba, memgraph::storage::View::OLD, nullptr};
mgp_graph graph{&dba, memgraph::storage::View::OLD, nullptr, dba.GetStorageMode()};
auto *start_v = EXPECT_MGP_NO_ERROR(mgp_vertex *, mgp_graph_get_vertex_by_id, &graph, mgp_vertex_id{0}, &memory);
ASSERT_TRUE(start_v);
auto *path = EXPECT_MGP_NO_ERROR(mgp_path *, mgp_path_make_with_start, start_v, &memory);

View File

@ -120,7 +120,8 @@ class MgpGraphTest : public ::testing::Test {
public:
mgp_graph CreateGraph(const memgraph::storage::View view = memgraph::storage::View::NEW) {
// the execution context can be null as it shouldn't be used in these tests
return mgp_graph{&CreateDbAccessor(memgraph::storage::IsolationLevel::SNAPSHOT_ISOLATION), view, ctx_.get()};
return mgp_graph{&CreateDbAccessor(memgraph::storage::IsolationLevel::SNAPSHOT_ISOLATION), view, ctx_.get(),
memgraph::storage::StorageMode::IN_MEMORY_TRANSACTIONAL};
}
std::array<memgraph::storage::Gid, 2> CreateEdge() {