diff --git a/src/query/context.hpp b/src/query/context.hpp index 286e5adf5..2bced7a1a 100644 --- a/src/query/context.hpp +++ b/src/query/context.hpp @@ -57,9 +57,7 @@ struct ExecutionContext { std::chrono::duration profile_execution_time; plan::ProfilingStats stats; plan::ProfilingStats *stats_root{nullptr}; - - // trigger context - TriggerContext *trigger_context{nullptr}; + TriggerContextCollector *trigger_context_collector{nullptr}; }; inline bool MustAbort(const ExecutionContext &context) { diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 6881615fd..4d8ca0221 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -18,6 +18,7 @@ #include "query/plan/planner.hpp" #include "query/plan/profile.hpp" #include "query/plan/vertex_count_cache.hpp" +#include "query/trigger.hpp" #include "query/typed_value.hpp" #include "utils/algorithm.hpp" #include "utils/csv_parsing.hpp" @@ -467,7 +468,8 @@ struct PullPlanVector { struct PullPlan { explicit PullPlan(std::shared_ptr plan, const Parameters ¶meters, bool is_profile_query, DbAccessor *dba, InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory, - TriggerContext *trigger_context = nullptr, std::optional memory_limit = {}); + TriggerContextCollector *trigger_context_collector = nullptr, + std::optional memory_limit = {}); std::optional Pull(AnyStream *stream, std::optional n, const std::vector &output_symbols, std::map *summary); @@ -495,7 +497,7 @@ struct PullPlan { PullPlan::PullPlan(const std::shared_ptr plan, const Parameters ¶meters, const bool is_profile_query, DbAccessor *dba, InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory, - TriggerContext *trigger_context, const std::optional memory_limit) + TriggerContextCollector *trigger_context_collector, const std::optional memory_limit) : plan_(plan), cursor_(plan->plan().MakeCursor(execution_memory)), frame_(plan->symbol_table().max_position(), execution_memory), @@ -512,7 +514,7 @@ PullPlan::PullPlan(const std::shared_ptr plan, const Parameters &par ctx_.max_execution_time_sec = interpreter_context->execution_timeout_sec; ctx_.is_shutting_down = &interpreter_context->is_shutting_down; ctx_.is_profile_query = is_profile_query; - ctx_.trigger_context = trigger_context; + ctx_.trigger_context_collector = trigger_context_collector; } std::optional PullPlan::Pull(AnyStream *stream, std::optional n, @@ -599,40 +601,54 @@ using RWType = plan::ReadWriteTypeChecker::RWType; Interpreter::Interpreter(InterpreterContext *interpreter_context) : interpreter_context_(interpreter_context) { MG_ASSERT(interpreter_context_, "Interpreter context must not be NULL"); // try { - // { - // auto storage_acc = interpreter_context_->db->Access(); - // DbAccessor dba(&storage_acc); - // auto triggers_acc = interpreter_context_->before_commit_triggers.access(); - // triggers_acc.insert(Trigger{"BeforeDelete", - // "UNWIND deletedVertices as u CREATE(:DELETED_VERTEX {id: id(u) + 10})", - // &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock}); - // triggers_acc.insert(Trigger{"BeforeDeleteEdge", "UNWIND deletedEdges as u CREATE(:DELETED_EDGE {id: id(u) + - // 10})", - // &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock}); - // // triggers_acc.insert(Trigger{"BeforeDelete2", "UNWIND deletedEdges as u SET u.deleted = 0", - // // &interpreter_context_->ast_cache, &dba, - // // &interpreter_context_->antlr_lock}); - // triggers_acc.insert(Trigger{"BeforeDeleteProcedure", "CALL script.procedure(updatedVertices) YIELD * RETURN *", - // &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock}); - // triggers_acc.insert(Trigger{"BeforeCreator", "UNWIND createdVertices as u SET u.before = id(u) + 10", - // &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock}); - // triggers_acc.insert(Trigger{"BeforeCreatorEdge", "UNWIND createdEdges as u SET u.before = id(u) + 10", - // &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock}); - // triggers_acc.insert(Trigger{"BeforeSetLabelProcedure", - // "CALL label.procedure(assignedVertexLabels) YIELD * RETURN *", - // &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock}); - // } - // { - // auto storage_acc = interpreter_context->db->Access(); - // DbAccessor dba(&storage_acc); - // auto triggers_acc = interpreter_context->after_commit_triggers.access(); - // triggers_acc.insert(Trigger{"AfterDelete", "UNWIND deletedVertices as u CREATE(:DELETED {id: u.id + 100})", - // &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock}); - // triggers_acc.insert(Trigger{"AfterCreator", "UNWIND createdVertices as u SET u.after = u.id + 100", - // &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock}); - // triggers_acc.insert(Trigger{"AfterUpdateProcedure", "CALL script.procedure(updatedVertices) YIELD * RETURN *", - // &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock}); - // } + // { + // auto storage_acc = interpreter_context_->db->Access(); + // DbAccessor dba(&storage_acc); + // auto triggers_acc = interpreter_context_->before_commit_triggers.access(); + // triggers_acc.insert(Trigger{"BeforeDelete", + // "UNWIND deletedVertices as u CREATE(:DELETED_VERTEX {id: id(u) + 10})", + // &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock, + // TriggerEventType::VERTEX_DELETE}); + // triggers_acc.insert(Trigger{"BeforeUpdatePropertyi", + // "UNWIND assignedVertexProperties as u SET u.vertex.two = u.new", + // &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock, + // TriggerEventType::VERTEX_UPDATE}); + // triggers_acc.insert(Trigger{"BeforeDeleteEdge", "UNWIND deletedEdges as u CREATE(:DELETED_EDGE {id: id(u) +10}) + // ", + // &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock, + // TriggerEventType::EDGE_DELETE}); + // // triggers_acc.insert(Trigger{"BeforeDelete2", "UNWIND deletedEdges as u SET u.deleted = 0", + // // &interpreter_context_->ast_cache, &dba, + // // &interpreter_context_->antlr_lock}); + // triggers_acc.insert(Trigger{"BeforeDeleteProcedure", + // "CALL script.procedure('VERTEX_UPDATE', updatedVertices) YIELD * RETURN *", + // &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock, + // TriggerEventType::VERTEX_UPDATE}); + // triggers_acc.insert(Trigger{"BeforeCreator", "UNWIND createdVertices as u SET u.before = id(u) + 10", + // &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock, + // TriggerEventType::VERTEX_CREATE}); + // triggers_acc.insert(Trigger{"BeforeCreatorEdge", "UNWIND createdEdges as u SET u.before = id(u) + 10", + // &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock, + // TriggerEventType::EDGE_CREATE}); + // triggers_acc.insert(Trigger{"BeforeSetLabelProcedure", + // "CALL label.procedure('VERTEX_UPDATE', assignedVertexLabels) YIELD * RETURN *", + // &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock, + // TriggerEventType::VERTEX_UPDATE}); + // } + // { + // auto storage_acc = interpreter_context->db->Access(); + // DbAccessor dba(&storage_acc); + // auto triggers_acc = interpreter_context->after_commit_triggers.access(); + // triggers_acc.insert(Trigger{"AfterDelete", "UNWIND deletedVertices as u CREATE(:DELETED {id: u.id + 100})", + // &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock, + // TriggerEventType::VERTEX_DELETE}); + // triggers_acc.insert(Trigger{"AfterCreator", "UNWIND createdVertices as u SET u.after = u.id + 100", + // &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock, + // TriggerEventType::VERTEX_CREATE}); + // triggers_acc.insert(Trigger{ + // "AfterUpdateProcedure", "CALL script.procedure('UPDATE',updatedObjects) YIELD * RETURN *", + // &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock, TriggerEventType::UPDATE}); + // } // } catch (const utils::BasicException &e) { // spdlog::critical("Failed to create a trigger because: {}", e.what()); // } @@ -654,7 +670,7 @@ PreparedQuery Interpreter::PrepareTransactionQuery(std::string_view query_upper) if (interpreter_context_->before_commit_triggers.size() > 0 || interpreter_context_->after_commit_triggers.size() > 0) { - trigger_context_.emplace(); + trigger_context_collector_.emplace(); } }; } else if (query_upper == "COMMIT") { @@ -702,7 +718,8 @@ PreparedQuery Interpreter::PrepareTransactionQuery(std::string_view query_upper) PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map *summary, InterpreterContext *interpreter_context, DbAccessor *dba, - utils::MemoryResource *execution_memory, TriggerContext *trigger_context = nullptr) { + utils::MemoryResource *execution_memory, + TriggerContextCollector *trigger_context_collector = nullptr) { auto *cypher_query = utils::Downcast(parsed_query.query); Frame frame(0); @@ -740,7 +757,7 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map(plan, parsed_query.parameters, false, dba, interpreter_context, - execution_memory, trigger_context, memory_limit); + execution_memory, trigger_context_collector, memory_limit); return PreparedQuery{std::move(header), std::move(parsed_query.required_privileges), [pull_plan = std::move(pull_plan), output_symbols = std::move(output_symbols), summary]( AnyStream *stream, std::optional n) -> std::optional { @@ -1373,7 +1390,7 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string, if (utils::Downcast(parsed_query.query) && (interpreter_context_->before_commit_triggers.size() > 0 || interpreter_context_->after_commit_triggers.size() > 0)) { - trigger_context_.emplace(); + trigger_context_collector_.emplace(); } } @@ -1383,7 +1400,7 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string, if (utils::Downcast(parsed_query.query)) { prepared_query = PrepareCypherQuery(std::move(parsed_query), &query_execution->summary, interpreter_context_, &*execution_db_accessor_, &query_execution->execution_memory, - trigger_context_ ? &*trigger_context_ : nullptr); + trigger_context_collector_ ? &*trigger_context_collector_ : nullptr); } else if (utils::Downcast(parsed_query.query)) { prepared_query = PrepareExplainQuery(std::move(parsed_query), &query_execution->summary, interpreter_context_, &*execution_db_accessor_, &query_execution->execution_memory); @@ -1451,7 +1468,7 @@ void Interpreter::Abort() { db_accessor_->Abort(); execution_db_accessor_.reset(); db_accessor_.reset(); - trigger_context_.reset(); + trigger_context_collector_.reset(); } namespace { @@ -1459,7 +1476,6 @@ void RunTriggersIndividually(const utils::SkipList &triggers, Interpret TriggerContext trigger_context) { // Run the triggers for (const auto &trigger : triggers.access()) { - spdlog::debug("Executing trigger '{}'", trigger.name()); utils::MonotonicBufferResource execution_memory{kExecutionMemoryBlockSize}; // create a new transaction for each trigger @@ -1512,16 +1528,20 @@ void Interpreter::Commit() { // a query. if (!db_accessor_) return; - if (trigger_context_) { + std::optional trigger_context = std::nullopt; + if (trigger_context_collector_) { + trigger_context.emplace(std::move(*trigger_context_collector_).TransformToTriggerContext()); + } + + if (trigger_context) { // Run the triggers for (const auto &trigger : interpreter_context_->before_commit_triggers.access()) { - spdlog::debug("Executing trigger '{}'", trigger.name()); utils::MonotonicBufferResource execution_memory{kExecutionMemoryBlockSize}; AdvanceCommand(); try { trigger.Execute(&*execution_db_accessor_, &execution_memory, *interpreter_context_->tsc_frequency, interpreter_context_->execution_timeout_sec, &interpreter_context_->is_shutting_down, - *trigger_context_); + *trigger_context); } catch (const utils::BasicException &e) { throw utils::BasicException( fmt::format("Trigger '{}' caused the transaction to fail.\nException: {}", trigger.name(), e.what())); @@ -1540,7 +1560,7 @@ void Interpreter::Commit() { auto property_name = execution_db_accessor_->PropertyToName(*constraint_violation.properties.begin()); execution_db_accessor_.reset(); db_accessor_.reset(); - trigger_context_.reset(); + trigger_context_collector_.reset(); throw QueryException("Unable to commit due to existence constraint violation on :{}({})", label_name, property_name); break; @@ -1553,7 +1573,7 @@ void Interpreter::Commit() { [this](auto &stream, const auto &prop) { stream << execution_db_accessor_->PropertyToName(prop); }); execution_db_accessor_.reset(); db_accessor_.reset(); - trigger_context_.reset(); + trigger_context_collector_.reset(); throw QueryException("Unable to commit due to unique constraint violation on :{}({})", label_name, property_names_stream.str()); break; @@ -1561,8 +1581,8 @@ void Interpreter::Commit() { } } - if (trigger_context_) { - background_thread_.AddTask([trigger_context = std::move(*trigger_context_), + if (trigger_context && interpreter_context_->after_commit_triggers.size() > 0) { + background_thread_.AddTask([trigger_context = std::move(*trigger_context), interpreter_context = this->interpreter_context_, user_transaction = std::shared_ptr(std::move(db_accessor_))]() mutable { RunTriggersIndividually(interpreter_context->after_commit_triggers, interpreter_context, @@ -1574,7 +1594,7 @@ void Interpreter::Commit() { execution_db_accessor_.reset(); db_accessor_.reset(); - trigger_context_.reset(); + trigger_context_collector_.reset(); SPDLOG_DEBUG("Finished comitting the transaction"); } diff --git a/src/query/interpreter.hpp b/src/query/interpreter.hpp index 3da93ec6a..8f42c430c 100644 --- a/src/query/interpreter.hpp +++ b/src/query/interpreter.hpp @@ -303,7 +303,7 @@ class Interpreter final { // move this unique_ptr into a shrared_ptr. std::unique_ptr db_accessor_; std::optional execution_db_accessor_; - std::optional trigger_context_; + std::optional trigger_context_collector_; bool in_explicit_transaction_{false}; bool expect_rollback_{false}; diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index 5e90c013e..49698bc20 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -209,8 +209,8 @@ bool CreateNode::CreateNodeCursor::Pull(Frame &frame, ExecutionContext &context) if (input_cursor_->Pull(frame, context)) { auto created_vertex = CreateLocalVertex(self_.node_info_, &frame, context); - if (context.trigger_context) { - context.trigger_context->RegisterCreatedObject(created_vertex); + if (context.trigger_context_collector) { + context.trigger_context_collector->RegisterCreatedObject(created_vertex); } return true; } @@ -311,8 +311,8 @@ bool CreateExpand::CreateExpandCursor::Pull(Frame &frame, ExecutionContext &cont } }(); - if (context.trigger_context) { - context.trigger_context->RegisterCreatedObject(created_edge); + if (context.trigger_context_collector) { + context.trigger_context_collector->RegisterCreatedObject(created_edge); } return true; @@ -329,8 +329,8 @@ VertexAccessor &CreateExpand::CreateExpandCursor::OtherVertex(Frame &frame, Exec return dest_node_value.ValueVertex(); } else { auto &created_vertex = CreateLocalVertex(self_.node_info_, &frame, context); - if (context.trigger_context) { - context.trigger_context->RegisterCreatedObject(created_vertex); + if (context.trigger_context_collector) { + context.trigger_context_collector->RegisterCreatedObject(created_vertex); } return created_vertex; } @@ -1848,8 +1848,8 @@ bool Delete::DeleteCursor::Pull(Frame &frame, ExecutionContext &context) { } } - if (context.trigger_context && maybe_value.GetValue()) { - context.trigger_context->RegisterDeletedObject(*maybe_value.GetValue()); + if (context.trigger_context_collector && maybe_value.GetValue()) { + context.trigger_context_collector->RegisterDeletedObject(*maybe_value.GetValue()); } } } @@ -1873,10 +1873,10 @@ bool Delete::DeleteCursor::Pull(Frame &frame, ExecutionContext &context) { throw QueryRuntimeException("Unexpected error when deleting a node."); } } - if (context.trigger_context && res.GetValue()) { - context.trigger_context->RegisterDeletedObject(res.GetValue()->first); + if (context.trigger_context_collector && res.GetValue()) { + context.trigger_context_collector->RegisterDeletedObject(res.GetValue()->first); for (const auto &deleted_edge : res.GetValue()->second) { - context.trigger_context->RegisterDeletedObject(deleted_edge); + context.trigger_context_collector->RegisterDeletedObject(deleted_edge); } } } else { @@ -1894,8 +1894,8 @@ bool Delete::DeleteCursor::Pull(Frame &frame, ExecutionContext &context) { } } - if (context.trigger_context && res.GetValue()) { - context.trigger_context->RegisterDeletedObject(*res.GetValue()); + if (context.trigger_context_collector && res.GetValue()) { + context.trigger_context_collector->RegisterDeletedObject(*res.GetValue()); } } break; @@ -1953,18 +1953,20 @@ bool SetProperty::SetPropertyCursor::Pull(Frame &frame, ExecutionContext &contex case TypedValue::Type::Vertex: { auto old_value = PropsSetChecked(&lhs.ValueVertex(), self_.property_, rhs); - if (context.trigger_context) { - context.trigger_context->RegisterSetObjectProperty(lhs.ValueVertex(), self_.property_, - TypedValue{std::move(old_value)}, std::move(rhs)); + if (context.trigger_context_collector) { + // rhs cannot be moved because it was created with the allocator that is only valid during current pull + context.trigger_context_collector->RegisterSetObjectProperty(lhs.ValueVertex(), self_.property_, + TypedValue{std::move(old_value)}, TypedValue{rhs}); } break; } case TypedValue::Type::Edge: { auto old_value = PropsSetChecked(&lhs.ValueEdge(), self_.property_, rhs); - if (context.trigger_context) { - context.trigger_context->RegisterSetObjectProperty(lhs.ValueEdge(), self_.property_, - TypedValue{std::move(old_value)}, std::move(rhs)); + if (context.trigger_context_collector) { + // rhs cannot be moved because it was created with the allocator that is only valid during current pull + context.trigger_context_collector->RegisterSetObjectProperty(lhs.ValueEdge(), self_.property_, + TypedValue{std::move(old_value)}, TypedValue{rhs}); } break; } @@ -2039,7 +2041,7 @@ void SetPropertiesOnRecord(TRecordAccessor *record, const TypedValue &rhs, SetPr } } - if (context->trigger_context) { + if (context->trigger_context_collector) { old_values.emplace(std::move(*maybe_value)); } } @@ -2073,8 +2075,8 @@ void SetPropertiesOnRecord(TRecordAccessor *record, const TypedValue &rhs, SetPr return {}; }(); - context->trigger_context->RegisterSetObjectProperty(*record, key, TypedValue(std::move(old_value)), - TypedValue(std::move(new_value))); + context->trigger_context_collector->RegisterSetObjectProperty(*record, key, TypedValue(std::move(old_value)), + TypedValue(std::move(new_value))); }; auto set_props = [&, record](auto properties) { @@ -2094,7 +2096,7 @@ void SetPropertiesOnRecord(TRecordAccessor *record, const TypedValue &rhs, SetPr } } - if (context->trigger_context) { + if (context->trigger_context_collector) { register_set_property(std::move(*maybe_error), kv.first, std::move(kv.second)); } } @@ -2111,7 +2113,7 @@ void SetPropertiesOnRecord(TRecordAccessor *record, const TypedValue &rhs, SetPr for (const auto &kv : rhs.ValueMap()) { auto key = context->db_accessor->NameToProperty(kv.first); auto old_value = PropsSetChecked(record, key, kv.second); - if (context->trigger_context) { + if (context->trigger_context_collector) { register_set_property(std::move(old_value), key, kv.second); } } @@ -2123,11 +2125,11 @@ void SetPropertiesOnRecord(TRecordAccessor *record, const TypedValue &rhs, SetPr "map."); } - if (context->trigger_context && old_values) { + if (context->trigger_context_collector && old_values) { // register removed properties for (auto &[property_id, property_value] : *old_values) { - context->trigger_context->RegisterRemovedObjectProperty(*record, property_id, - TypedValue(std::move(property_value))); + context->trigger_context_collector->RegisterRemovedObjectProperty(*record, property_id, + TypedValue(std::move(property_value))); } } } @@ -2196,9 +2198,9 @@ bool SetLabels::SetLabelsCursor::Pull(Frame &frame, ExecutionContext &context) { ExpectType(self_.input_symbol_, vertex_value, TypedValue::Type::Vertex); auto &vertex = vertex_value.ValueVertex(); for (auto label : self_.labels_) { - auto maybe_error = vertex.AddLabel(label); - if (maybe_error.HasError()) { - switch (maybe_error.GetError()) { + auto maybe_value = vertex.AddLabel(label); + if (maybe_value.HasError()) { + switch (maybe_value.GetError()) { case storage::Error::SERIALIZATION_ERROR: throw QueryRuntimeException("Can't serialize due to concurrent operations."); case storage::Error::DELETED_OBJECT: @@ -2210,8 +2212,8 @@ bool SetLabels::SetLabelsCursor::Pull(Frame &frame, ExecutionContext &context) { } } - if (context.trigger_context) { - context.trigger_context->RegisterSetVertexLabel(vertex, label); + if (context.trigger_context_collector && *maybe_value) { + context.trigger_context_collector->RegisterSetVertexLabel(vertex, label); } } @@ -2269,9 +2271,9 @@ bool RemoveProperty::RemovePropertyCursor::Pull(Frame &frame, ExecutionContext & } } - if (context.trigger_context) { - context.trigger_context->RegisterRemovedObjectProperty(*record, property, - TypedValue(std::move(*maybe_old_value))); + if (context.trigger_context_collector) { + context.trigger_context_collector->RegisterRemovedObjectProperty(*record, property, + TypedValue(std::move(*maybe_old_value))); } }; @@ -2339,8 +2341,8 @@ bool RemoveLabels::RemoveLabelsCursor::Pull(Frame &frame, ExecutionContext &cont } } - if (context.trigger_context && *maybe_value) { - context.trigger_context->RegisterRemovedVertexLabel(vertex, label); + if (context.trigger_context_collector && *maybe_value) { + context.trigger_context_collector->RegisterRemovedVertexLabel(vertex, label); } } diff --git a/src/query/trigger.cpp b/src/query/trigger.cpp index 65ad14dde..2a1ee4668 100644 --- a/src/query/trigger.cpp +++ b/src/query/trigger.cpp @@ -12,23 +12,112 @@ namespace query { namespace { -// clang-format off -std::vector> GetPredefinedIdentifiers() { - return {{{"createdVertices", false}, trigger::IdentifierTag::CREATED_VERTICES }, - {{"createdEdges", false}, trigger::IdentifierTag::CREATED_EDGES }, - {{"deletedVertices", false}, trigger::IdentifierTag::DELETED_VERTICES }, - {{"deletedEdges", false}, trigger::IdentifierTag::DELETED_EDGES }, - {{"assignedVertexProperties", false}, trigger::IdentifierTag::SET_VERTEX_PROPERTIES }, - {{"assignedEdgeProperties", false}, trigger::IdentifierTag::SET_EDGE_PROPERTIES }, - {{"removedVertexProperties", false}, trigger::IdentifierTag::REMOVED_VERTEX_PROPERTIES}, - {{"removedEdgeProperties", false}, trigger::IdentifierTag::REMOVED_EDGE_PROPERTIES }, - {{"assignedVertexLabels", false}, trigger::IdentifierTag::SET_VERTEX_LABELS }, - {{"removedVertexLabels", false}, trigger::IdentifierTag::REMOVED_VERTEX_LABELS }, - {{"updatedVertices", false}, trigger::IdentifierTag::UPDATED_VERTICES }, - {{"updatedEdges", false}, trigger::IdentifierTag::UPDATED_EDGES }, - {{"updatedObjects", false}, trigger::IdentifierTag::UPDATED_OBJECTS }}; + +auto IdentifierString(const TriggerIdentifierTag tag) noexcept { + switch (tag) { + case TriggerIdentifierTag::CREATED_VERTICES: + return "createdVertices"; + + case TriggerIdentifierTag::CREATED_EDGES: + return "createdEdges"; + + case TriggerIdentifierTag::CREATED_OBJECTS: + return "createdObjects"; + + case TriggerIdentifierTag::DELETED_VERTICES: + return "deletedVertices"; + + case TriggerIdentifierTag::DELETED_EDGES: + return "deletedEdges"; + + case TriggerIdentifierTag::DELETED_OBJECTS: + return "deletedObjects"; + + case TriggerIdentifierTag::SET_VERTEX_PROPERTIES: + return "assignedVertexProperties"; + + case TriggerIdentifierTag::SET_EDGE_PROPERTIES: + return "assignedEdgeProperties"; + + case TriggerIdentifierTag::REMOVED_VERTEX_PROPERTIES: + return "removedVertexProperties"; + + case TriggerIdentifierTag::REMOVED_EDGE_PROPERTIES: + return "removedEdgeProperties"; + + case TriggerIdentifierTag::SET_VERTEX_LABELS: + return "assignedVertexLabels"; + + case TriggerIdentifierTag::REMOVED_VERTEX_LABELS: + return "removedVertexLabels"; + + case TriggerIdentifierTag::UPDATED_VERTICES: + return "updatedVertices"; + + case TriggerIdentifierTag::UPDATED_EDGES: + return "updatedEdges"; + + case TriggerIdentifierTag::UPDATED_OBJECTS: + return "updatedObjects"; + } +} + +template +concept SameAsIdentifierTag = std::same_as; + +template +std::vector> TagsToIdentifiers(const TArgs &...args) { + std::vector> identifiers; + identifiers.reserve(sizeof...(args)); + + auto add_identifier = [&identifiers](const auto tag) { + identifiers.emplace_back(Identifier{IdentifierString(tag), false}, tag); + }; + + (add_identifier(args), ...); + + return identifiers; +}; + +std::vector> GetPredefinedIdentifiers(const TriggerEventType event_type) { + using IdentifierTag = TriggerIdentifierTag; + using EventType = TriggerEventType; + + switch (event_type) { + case EventType::ANY: + return {}; + + case EventType::CREATE: + return TagsToIdentifiers(IdentifierTag::CREATED_OBJECTS); + + case EventType::VERTEX_CREATE: + return TagsToIdentifiers(IdentifierTag::CREATED_VERTICES); + + case EventType::EDGE_CREATE: + return TagsToIdentifiers(IdentifierTag::CREATED_EDGES); + + case EventType::DELETE: + return TagsToIdentifiers(IdentifierTag::DELETED_OBJECTS); + + case EventType::VERTEX_DELETE: + return TagsToIdentifiers(IdentifierTag::DELETED_VERTICES); + + case EventType::EDGE_DELETE: + return TagsToIdentifiers(IdentifierTag::DELETED_EDGES); + + case EventType::UPDATE: + return TagsToIdentifiers(IdentifierTag::UPDATED_OBJECTS); + + case EventType::VERTEX_UPDATE: + return TagsToIdentifiers(IdentifierTag::SET_VERTEX_PROPERTIES, IdentifierTag::REMOVED_VERTEX_PROPERTIES, + IdentifierTag::SET_VERTEX_LABELS, IdentifierTag::REMOVED_VERTEX_LABELS, + IdentifierTag::UPDATED_VERTICES); + + case EventType::EDGE_UPDATE: + return TagsToIdentifiers(IdentifierTag::SET_EDGE_PROPERTIES, IdentifierTag::REMOVED_EDGE_PROPERTIES, + IdentifierTag::UPDATED_EDGES); + } } -// clang-format on template concept WithToMap = requires(const T value, DbAccessor *dba) { @@ -42,14 +131,12 @@ TypedValue ToTypedValue(const T &value, DbAccessor *dba) { } template -TypedValue ToTypedValue(const TriggerContext::CreatedObject &created_object, - [[maybe_unused]] DbAccessor *dba) { +TypedValue ToTypedValue(const detail::CreatedObject &created_object, [[maybe_unused]] DbAccessor *dba) { return TypedValue{created_object.object}; } template -TypedValue ToTypedValue(const TriggerContext::DeletedObject &deleted_object, - [[maybe_unused]] DbAccessor *dba) { +TypedValue ToTypedValue(const detail::DeletedObject &deleted_object, [[maybe_unused]] DbAccessor *dba) { return TypedValue{deleted_object.object}; } @@ -67,7 +154,7 @@ concept ConvertableToTypedValue = requires(T value, DbAccessor *dba) { &&WithIsValid; template -concept LabelUpdateContext = utils::SameAsAnyOf; +concept LabelUpdateContext = utils::SameAsAnyOf; template TypedValue ToTypedValue(const std::vector &values, DbAccessor *dba) { @@ -79,17 +166,19 @@ TypedValue ToTypedValue(const std::vector &values, DbAccessor *dba) { } } - std::map typed_values; + TypedValue result{std::map{}}; + auto &typed_values = result.ValueMap(); for (auto &[label_id, vertices] : vertices_by_labels) { typed_values.emplace(dba->LabelToName(label_id), TypedValue(std::move(vertices))); } - return TypedValue(std::move(typed_values)); + return result; } template TypedValue ToTypedValue(const std::vector &values, DbAccessor *dba) requires(!LabelUpdateContext) { - std::vector typed_values; + TypedValue result{std::vector{}}; + auto &typed_values = result.ValueList(); typed_values.reserve(values.size()); for (const auto &value : values) { @@ -98,131 +187,97 @@ TypedValue ToTypedValue(const std::vector &values, DbAccessor *dba) requires( } } - return TypedValue(std::move(typed_values)); + return result; } template const char *TypeToString() { - if constexpr (std::same_as>) { + if constexpr (std::same_as>) { + return "created_vertex"; + } else if constexpr (std::same_as>) { + return "created_edge"; + } else if constexpr (std::same_as>) { + return "deleted_vertex"; + } else if constexpr (std::same_as>) { + return "deleted_edge"; + } else if constexpr (std::same_as>) { return "set_vertex_property"; - } else if constexpr (std::same_as>) { + } else if constexpr (std::same_as>) { return "set_edge_property"; - } else if constexpr (std::same_as>) { + } else if constexpr (std::same_as>) { return "removed_vertex_property"; - } else if constexpr (std::same_as>) { + } else if constexpr (std::same_as>) { return "removed_edge_property"; - } else if constexpr (std::same_as) { + } else if constexpr (std::same_as) { return "set_vertex_label"; - } else if constexpr (std::same_as) { + } else if constexpr (std::same_as) { return "removed_vertex_label"; } } template -concept UpdateContext = WithToMap &&WithIsValid; +concept ContextInfo = WithToMap &&WithIsValid; -template -TypedValue Updated(DbAccessor *dba, const std::vector &...args) { +template +TypedValue Concatenate(DbAccessor *dba, const std::vector &...args) { const auto size = (args.size() + ...); - std::vector updated; - updated.reserve(size); + TypedValue result{std::vector{}}; + auto &concatenated = result.ValueList(); + concatenated.reserve(size); - const auto add_to_updated = [&](const std::vector &values) { + const auto add_to_concatenated = [&](const std::vector &values) { for (const auto &value : values) { if (value.IsValid()) { auto map = value.ToMap(dba); - map["type"] = TypeToString(); - updated.emplace_back(std::move(map)); + map["event_type"] = TypeToString(); + concatenated.emplace_back(std::move(map)); } } }; - (add_to_updated(args), ...); + (add_to_concatenated(args), ...); - return TypedValue(std::move(updated)); + return result; +} + +template +concept WithEmpty = requires(const T value) { + { value.empty() } + ->std::same_as; +}; + +template +bool AnyContainsValue(const TContainer &...value_containers) { + return (!value_containers.empty() || ...); } } // namespace -bool TriggerContext::SetVertexLabel::IsValid() const { return object.IsVisible(storage::View::OLD); } +namespace detail { +bool SetVertexLabel::IsValid() const { return object.IsVisible(storage::View::OLD); } -std::map TriggerContext::SetVertexLabel::ToMap(DbAccessor *dba) const { +std::map SetVertexLabel::ToMap(DbAccessor *dba) const { return {{"vertex", TypedValue{object}}, {"label", TypedValue{dba->LabelToName(label_id)}}}; } -bool TriggerContext::RemovedVertexLabel::IsValid() const { return object.IsVisible(storage::View::OLD); } +bool RemovedVertexLabel::IsValid() const { return object.IsVisible(storage::View::OLD); } -std::map TriggerContext::RemovedVertexLabel::ToMap(DbAccessor *dba) const { +std::map RemovedVertexLabel::ToMap(DbAccessor *dba) const { return {{"vertex", TypedValue{object}}, {"label", TypedValue{dba->LabelToName(label_id)}}}; } - -void TriggerContext::RegisterSetVertexLabel(const VertexAccessor &vertex, const storage::LabelId label_id) { - set_vertex_labels_.emplace_back(vertex, label_id); -} - -void TriggerContext::RegisterRemovedVertexLabel(const VertexAccessor &vertex, const storage::LabelId label_id) { - removed_vertex_labels_.emplace_back(vertex, label_id); -} - -TypedValue TriggerContext::GetTypedValue(const trigger::IdentifierTag tag, DbAccessor *dba) const { - const auto &[created_vertices, deleted_vertices, set_vertex_properties, removed_vertex_properties] = vertex_registry_; - const auto &[created_edges, deleted_edges, set_edge_properties, removed_edge_properties] = edge_registry_; - - switch (tag) { - case trigger::IdentifierTag::CREATED_VERTICES: - return ToTypedValue(created_vertices, dba); - - case trigger::IdentifierTag::CREATED_EDGES: - return ToTypedValue(created_edges, dba); - - case trigger::IdentifierTag::DELETED_VERTICES: - return ToTypedValue(deleted_vertices, dba); - - case trigger::IdentifierTag::DELETED_EDGES: - return ToTypedValue(deleted_edges, dba); - - case trigger::IdentifierTag::SET_VERTEX_PROPERTIES: - return ToTypedValue(set_vertex_properties, dba); - - case trigger::IdentifierTag::SET_EDGE_PROPERTIES: - return ToTypedValue(set_edge_properties, dba); - - case trigger::IdentifierTag::REMOVED_VERTEX_PROPERTIES: - return ToTypedValue(removed_vertex_properties, dba); - - case trigger::IdentifierTag::REMOVED_EDGE_PROPERTIES: - return ToTypedValue(removed_edge_properties, dba); - - case trigger::IdentifierTag::SET_VERTEX_LABELS: - return ToTypedValue(set_vertex_labels_, dba); - - case trigger::IdentifierTag::REMOVED_VERTEX_LABELS: - return ToTypedValue(removed_vertex_labels_, dba); - - case trigger::IdentifierTag::UPDATED_VERTICES: - return Updated(dba, set_vertex_properties, removed_vertex_properties, set_vertex_labels_, removed_vertex_labels_); - - case trigger::IdentifierTag::UPDATED_EDGES: - return Updated(dba, set_edge_properties, removed_edge_properties); - - case trigger::IdentifierTag::UPDATED_OBJECTS: - return Updated(dba, set_vertex_properties, set_edge_properties, removed_vertex_properties, - removed_edge_properties, set_vertex_labels_, removed_vertex_labels_); - } -} +} // namespace detail void TriggerContext::AdaptForAccessor(DbAccessor *accessor) { - auto &[created_vertices, deleted_vertices, set_vertex_properties, removed_vertex_properties] = vertex_registry_; - // adapt created_vertices_ { - auto it = created_vertices.begin(); - for (const auto &created_vertex : created_vertices) { + // adapt created_vertices_ + auto it = created_vertices_.begin(); + for (auto &created_vertex : created_vertices_) { if (auto maybe_vertex = accessor->FindVertex(created_vertex.object.Gid(), storage::View::OLD); maybe_vertex) { - *it = CreatedObject{*maybe_vertex}; + *it = detail::CreatedObject{*maybe_vertex}; ++it; } } - created_vertices.erase(it, created_vertices.end()); + created_vertices_.erase(it, created_vertices_.end()); } // deleted_vertices_ should keep the transaction context of the transaction which deleted it @@ -241,30 +296,30 @@ void TriggerContext::AdaptForAccessor(DbAccessor *accessor) { values->erase(it, values->end()); }; - adapt_context_with_vertex(&set_vertex_properties); - adapt_context_with_vertex(&removed_vertex_properties); + adapt_context_with_vertex(&set_vertex_properties_); + adapt_context_with_vertex(&removed_vertex_properties_); adapt_context_with_vertex(&set_vertex_labels_); adapt_context_with_vertex(&removed_vertex_labels_); - auto &[created_edges, deleted_edges, set_edge_properties, removed_edge_properties] = edge_registry_; - // adapt created_edges { - auto it = created_edges.begin(); - for (const auto &created_edge : created_edges) { - if (auto maybe_vertex = accessor->FindVertex(created_edge.object.From().Gid(), storage::View::OLD); - maybe_vertex) { - auto maybe_out_edges = maybe_vertex->OutEdges(storage::View::OLD); - MG_ASSERT(maybe_out_edges.HasValue()); - for (const auto &edge : *maybe_out_edges) { - if (edge.Gid() == created_edge.object.Gid()) { - *it = CreatedObject{edge}; - ++it; - break; - } + // adapt created_edges + auto it = created_edges_.begin(); + for (auto &created_edge : created_edges_) { + const auto maybe_from_vertex = accessor->FindVertex(created_edge.object.From().Gid(), storage::View::OLD); + if (!maybe_from_vertex) { + continue; + } + auto maybe_out_edges = maybe_from_vertex->OutEdges(storage::View::OLD); + MG_ASSERT(maybe_out_edges.HasValue()); + const auto edge_gid = created_edge.object.Gid(); + for (const auto &edge : *maybe_out_edges) { + if (edge.Gid() == edge_gid) { + *it = detail::CreatedObject{edge}; + ++it; } } } - created_edges.erase(it, created_edges.end()); + created_edges_.erase(it, created_edges_.end()); } // deleted_edges_ should keep the transaction context of the transaction which deleted it @@ -290,13 +345,164 @@ void TriggerContext::AdaptForAccessor(DbAccessor *accessor) { values->erase(it, values->end()); }; - adapt_context_with_edge(&set_edge_properties); - adapt_context_with_edge(&removed_edge_properties); + adapt_context_with_edge(&set_edge_properties_); + adapt_context_with_edge(&removed_edge_properties_); +} + +TypedValue TriggerContext::GetTypedValue(const TriggerIdentifierTag tag, DbAccessor *dba) const { + switch (tag) { + case TriggerIdentifierTag::CREATED_VERTICES: + return ToTypedValue(created_vertices_, dba); + + case TriggerIdentifierTag::CREATED_EDGES: + return ToTypedValue(created_edges_, dba); + + case TriggerIdentifierTag::CREATED_OBJECTS: + return Concatenate(dba, created_vertices_, created_edges_); + + case TriggerIdentifierTag::DELETED_VERTICES: + return ToTypedValue(deleted_vertices_, dba); + + case TriggerIdentifierTag::DELETED_EDGES: + return ToTypedValue(deleted_edges_, dba); + + case TriggerIdentifierTag::DELETED_OBJECTS: + return Concatenate(dba, deleted_vertices_, deleted_edges_); + + case TriggerIdentifierTag::SET_VERTEX_PROPERTIES: + return ToTypedValue(set_vertex_properties_, dba); + + case TriggerIdentifierTag::SET_EDGE_PROPERTIES: + return ToTypedValue(set_edge_properties_, dba); + + case TriggerIdentifierTag::REMOVED_VERTEX_PROPERTIES: + return ToTypedValue(removed_vertex_properties_, dba); + + case TriggerIdentifierTag::REMOVED_EDGE_PROPERTIES: + return ToTypedValue(removed_edge_properties_, dba); + + case TriggerIdentifierTag::SET_VERTEX_LABELS: + return ToTypedValue(set_vertex_labels_, dba); + + case TriggerIdentifierTag::REMOVED_VERTEX_LABELS: + return ToTypedValue(removed_vertex_labels_, dba); + + case TriggerIdentifierTag::UPDATED_VERTICES: + return Concatenate(dba, set_vertex_properties_, removed_vertex_properties_, set_vertex_labels_, + removed_vertex_labels_); + + case TriggerIdentifierTag::UPDATED_EDGES: + return Concatenate(dba, set_edge_properties_, removed_edge_properties_); + + case TriggerIdentifierTag::UPDATED_OBJECTS: + return Concatenate(dba, set_vertex_properties_, set_edge_properties_, removed_vertex_properties_, + removed_edge_properties_, set_vertex_labels_, removed_vertex_labels_); + } +} + +bool TriggerContext::ShouldEventTrigger(const TriggerEventType event_type) const { + using EventType = TriggerEventType; + switch (event_type) { + case EventType::ANY: + return true; + + case EventType::CREATE: + return AnyContainsValue(created_vertices_, created_edges_); + + case EventType::VERTEX_CREATE: + return AnyContainsValue(created_vertices_); + + case EventType::EDGE_CREATE: + return AnyContainsValue(created_edges_); + + case EventType::DELETE: + return AnyContainsValue(deleted_vertices_, deleted_edges_); + + case EventType::VERTEX_DELETE: + return AnyContainsValue(deleted_vertices_); + + case EventType::EDGE_DELETE: + return AnyContainsValue(deleted_edges_); + + case EventType::UPDATE: + return AnyContainsValue(set_vertex_properties_, set_edge_properties_, removed_vertex_properties_, + removed_edge_properties_, set_vertex_labels_, removed_vertex_labels_); + + case EventType::VERTEX_UPDATE: + return AnyContainsValue(set_vertex_properties_, removed_vertex_properties_, set_vertex_labels_, + removed_vertex_labels_); + + case EventType::EDGE_UPDATE: + return AnyContainsValue(set_edge_properties_, removed_edge_properties_); + } +} + +void TriggerContextCollector::UpdateLabelMap(const VertexAccessor vertex, const storage::LabelId label_id, + const LabelChange change) { + auto ®istry = GetRegistry(); + if (registry.created_objects_.count(vertex.Gid())) { + return; + } + + if (auto it = label_changes_.find({vertex, label_id}); it != label_changes_.end()) { + it->second = std::clamp(it->second + LabelChangeToInt(change), -1, 1); + return; + } + + label_changes_.emplace(std::make_pair(vertex, label_id), LabelChangeToInt(change)); +} + +void TriggerContextCollector::RegisterSetVertexLabel(const VertexAccessor &vertex, const storage::LabelId label_id) { + UpdateLabelMap(vertex, label_id, LabelChange::ADD); +} + +void TriggerContextCollector::RegisterRemovedVertexLabel(const VertexAccessor &vertex, + const storage::LabelId label_id) { + UpdateLabelMap(vertex, label_id, LabelChange::REMOVE); +} + +int8_t TriggerContextCollector::LabelChangeToInt(LabelChange change) { + static_assert(std::is_same_v, int8_t>, + "The underlying type of LabelChange doesn't match the return type!"); + return static_cast(change); +} + +TriggerContext TriggerContextCollector::TransformToTriggerContext() && { + auto [created_vertices, deleted_vertices, set_vertex_properties, removed_vertex_properties] = + std::move(vertex_registry_).Summarize(); + auto [set_vertex_labels, removed_vertex_labels] = LabelMapToList(std::move(label_changes_)); + auto [created_edges, deleted_edges, set_edge_properties, removed_edge_properties] = + std::move(edge_registry_).Summarize(); + + return {std::move(created_vertices), std::move(deleted_vertices), + std::move(set_vertex_properties), std::move(removed_vertex_properties), + std::move(set_vertex_labels), std::move(removed_vertex_labels), + std::move(created_edges), std::move(deleted_edges), + std::move(set_edge_properties), std::move(removed_edge_properties)}; +} + +TriggerContextCollector::LabelChangesLists TriggerContextCollector::LabelMapToList(LabelChangesMap &&label_changes) { + std::vector set_vertex_labels; + std::vector removed_vertex_labels; + + for (const auto &[key, label_state] : label_changes) { + if (label_state == LabelChangeToInt(LabelChange::ADD)) { + set_vertex_labels.emplace_back(key.first, key.second); + } else if (label_state == LabelChangeToInt(LabelChange::REMOVE)) { + removed_vertex_labels.emplace_back(key.first, key.second); + } + } + + label_changes.clear(); + + return {std::move(set_vertex_labels), std::move(removed_vertex_labels)}; } Trigger::Trigger(std::string name, const std::string &query, utils::SkipList *query_cache, - DbAccessor *db_accessor, utils::SpinLock *antlr_lock) - : name_(std::move(name)), parsed_statements_{ParseQuery(query, {}, query_cache, antlr_lock)} { + DbAccessor *db_accessor, utils::SpinLock *antlr_lock, const TriggerEventType event_type) + : name_(std::move(name)), + parsed_statements_{ParseQuery(query, {}, query_cache, antlr_lock)}, + event_type_{event_type} { // We check immediately if the query is valid by trying to create a plan. GetPlan(db_accessor); } @@ -310,7 +516,7 @@ std::shared_ptr Trigger::GetPlan(DbAccessor *db_accessor) return trigger_plan_; } - auto identifiers = GetPredefinedIdentifiers(); + auto identifiers = GetPredefinedIdentifiers(event_type_); AstStorage ast_storage; ast_storage.properties_ = parsed_statements_.ast_storage.properties_; @@ -332,6 +538,11 @@ std::shared_ptr Trigger::GetPlan(DbAccessor *db_accessor) void Trigger::Execute(DbAccessor *dba, utils::MonotonicBufferResource *execution_memory, const double tsc_frequency, const double max_execution_time_sec, std::atomic *is_shutting_down, const TriggerContext &context) const { + if (!context.ShouldEventTrigger(event_type_)) { + return; + } + + spdlog::debug("Executing trigger '{}'", name_); auto trigger_plan = GetPlan(dba); MG_ASSERT(trigger_plan, "Invalid trigger plan received"); auto &[plan, identifiers] = *trigger_plan; diff --git a/src/query/trigger.hpp b/src/query/trigger.hpp index 9689b7775..51119885e 100644 --- a/src/query/trigger.hpp +++ b/src/query/trigger.hpp @@ -1,33 +1,18 @@ #pragma once +#include #include +#include +#include #include +#include #include "query/cypher_query_interpreter.hpp" -#include "query/db_accessor.hpp" #include "query/frontend/ast/ast.hpp" #include "query/typed_value.hpp" #include "utils/concepts.hpp" +#include "utils/fnv.hpp" namespace query { - -namespace trigger { -enum class IdentifierTag : uint8_t { - CREATED_VERTICES, - CREATED_EDGES, - DELETED_VERTICES, - DELETED_EDGES, - SET_VERTEX_PROPERTIES, - SET_EDGE_PROPERTIES, - REMOVED_VERTEX_PROPERTIES, - REMOVED_EDGE_PROPERTIES, - SET_VERTEX_LABELS, - REMOVED_VERTEX_LABELS, - UPDATED_VERTICES, - UPDATED_EDGES, - UPDATED_OBJECTS -}; -} // namespace trigger - namespace detail { template concept ObjectAccessor = utils::SameAsAnyOf; @@ -40,34 +25,214 @@ const char *ObjectString() { return "edge"; } } + +template +struct CreatedObject { + explicit CreatedObject(const TAccessor &object) : object{object} {} + + bool IsValid() const { return object.IsVisible(storage::View::OLD); } + std::map ToMap([[maybe_unused]] DbAccessor *dba) const { + return {{ObjectString(), TypedValue{object}}}; + } + + TAccessor object; +}; + +template +struct DeletedObject { + explicit DeletedObject(const TAccessor &object) : object{object} {} + + bool IsValid() const { return object.IsVisible(storage::View::OLD); } + std::map ToMap([[maybe_unused]] DbAccessor *dba) const { + return {{ObjectString(), TypedValue{object}}}; + } + + TAccessor object; +}; + +template +struct SetObjectProperty { + explicit SetObjectProperty(const TAccessor &object, storage::PropertyId key, TypedValue old_value, + TypedValue new_value) + : object{object}, key{key}, old_value{std::move(old_value)}, new_value{std::move(new_value)} {} + + std::map ToMap(DbAccessor *dba) const { + return {{ObjectString(), TypedValue{object}}, + {"key", TypedValue{dba->PropertyToName(key)}}, + {"old", old_value}, + {"new", new_value}}; + } + + bool IsValid() const { return object.IsVisible(storage::View::OLD); } + + TAccessor object; + storage::PropertyId key; + TypedValue old_value; + TypedValue new_value; +}; + +template +struct RemovedObjectProperty { + explicit RemovedObjectProperty(const TAccessor &object, storage::PropertyId key, TypedValue old_value) + : object{object}, key{key}, old_value{std::move(old_value)} {} + + std::map ToMap(DbAccessor *dba) const { + return {{ObjectString(), TypedValue{object}}, + {"key", TypedValue{dba->PropertyToName(key)}}, + {"old", old_value}}; + } + + bool IsValid() const { return object.IsVisible(storage::View::OLD); } + + TAccessor object; + storage::PropertyId key; + TypedValue old_value; +}; + +struct SetVertexLabel { + explicit SetVertexLabel(const VertexAccessor &vertex, const storage::LabelId label_id) + : object{vertex}, label_id{label_id} {} + + std::map ToMap(DbAccessor *dba) const; + bool IsValid() const; + + VertexAccessor object; + storage::LabelId label_id; +}; + +struct RemovedVertexLabel { + explicit RemovedVertexLabel(const VertexAccessor &vertex, const storage::LabelId label_id) + : object{vertex}, label_id{label_id} {} + + std::map ToMap(DbAccessor *dba) const; + bool IsValid() const; + + VertexAccessor object; + storage::LabelId label_id; +}; } // namespace detail -struct TriggerContext { - static_assert(std::is_trivially_copy_constructible_v, - "VertexAccessor is not trivially copy constructible, move it where possible and remove this assert"); - static_assert(std::is_trivially_copy_constructible_v, - "EdgeAccessor is not trivially copy constructible, move it where possible and remove this asssert"); +enum class TriggerIdentifierTag : uint8_t { + CREATED_VERTICES, + CREATED_EDGES, + CREATED_OBJECTS, + DELETED_VERTICES, + DELETED_EDGES, + DELETED_OBJECTS, + SET_VERTEX_PROPERTIES, + SET_EDGE_PROPERTIES, + REMOVED_VERTEX_PROPERTIES, + REMOVED_EDGE_PROPERTIES, + SET_VERTEX_LABELS, + REMOVED_VERTEX_LABELS, + UPDATED_VERTICES, + UPDATED_EDGES, + UPDATED_OBJECTS +}; +enum class TriggerEventType : uint8_t { + ANY, // Triggers always + VERTEX_CREATE, + EDGE_CREATE, + CREATE, + VERTEX_DELETE, + EDGE_DELETE, + DELETE, + VERTEX_UPDATE, + EDGE_UPDATE, + UPDATE +}; + +static_assert(std::is_trivially_copy_constructible_v, + "VertexAccessor is not trivially copy constructible, move it where possible and remove this assert"); +static_assert(std::is_trivially_copy_constructible_v, + "EdgeAccessor is not trivially copy constructible, move it where possible and remove this asssert"); + +// Holds the information necessary for triggers +class TriggerContext { + public: + TriggerContext() = default; + TriggerContext(std::vector> created_vertices, + std::vector> deleted_vertices, + std::vector> set_vertex_properties, + std::vector> removed_vertex_properties, + std::vector set_vertex_labels, + std::vector removed_vertex_labels, + std::vector> created_edges, + std::vector> deleted_edges, + std::vector> set_edge_properties, + std::vector> removed_edge_properties) + : created_vertices_{std::move(created_vertices)}, + deleted_vertices_{std::move(deleted_vertices)}, + set_vertex_properties_{std::move(set_vertex_properties)}, + removed_vertex_properties_{std::move(removed_vertex_properties)}, + set_vertex_labels_{std::move(set_vertex_labels)}, + removed_vertex_labels_{std::move(removed_vertex_labels)}, + created_edges_{std::move(created_edges)}, + deleted_edges_{std::move(deleted_edges)}, + set_edge_properties_{std::move(set_edge_properties)}, + removed_edge_properties_{std::move(removed_edge_properties)} {} + TriggerContext(const TriggerContext &) = default; + TriggerContext(TriggerContext &&) = default; + TriggerContext &operator=(const TriggerContext &) = default; + TriggerContext &operator=(TriggerContext &&) = default; + + // Adapt the TriggerContext object inplace for a different DbAccessor + // (each derived accessor, e.g. VertexAccessor, gets adapted + // to the sent DbAccessor so they can be used safely) + void AdaptForAccessor(DbAccessor *accessor); + + // Get TypedValue for the identifier defined with tag + TypedValue GetTypedValue(TriggerIdentifierTag tag, DbAccessor *dba) const; + bool ShouldEventTrigger(TriggerEventType) const; + + private: + std::vector> created_vertices_; + std::vector> deleted_vertices_; + std::vector> set_vertex_properties_; + std::vector> removed_vertex_properties_; + std::vector set_vertex_labels_; + std::vector removed_vertex_labels_; + + std::vector> created_edges_; + std::vector> deleted_edges_; + std::vector> set_edge_properties_; + std::vector> removed_edge_properties_; +}; + +// Collects the information necessary for triggers during a single transaction run. +class TriggerContextCollector { + public: template void RegisterCreatedObject(const TAccessor &created_object) { - GetRegistry().created_objects_.emplace_back(created_object); + GetRegistry().created_objects_.emplace(created_object.Gid(), detail::CreatedObject{created_object}); } template void RegisterDeletedObject(const TAccessor &deleted_object) { - GetRegistry().deleted_objects_.emplace_back(deleted_object); + auto ®istry = GetRegistry(); + if (registry.created_objects_.count(deleted_object.Gid())) { + return; + } + + registry.deleted_objects_.emplace_back(deleted_object); } template void RegisterSetObjectProperty(const TAccessor &object, const storage::PropertyId key, TypedValue old_value, TypedValue new_value) { - if (new_value.IsNull()) { - RegisterRemovedObjectProperty(object, key, std::move(old_value)); + auto ®istry = GetRegistry(); + if (registry.created_objects_.count(object.Gid())) { return; } - GetRegistry().set_object_properties_.emplace_back(object, key, std::move(old_value), - std::move(new_value)); + if (auto it = registry.property_changes_.find({object, key}); it != registry.property_changes_.end()) { + it->second.new_value = std::move(new_value); + return; + } + + registry.property_changes_.emplace(std::make_pair(object, key), + PropertyChangeInfo{std::move(old_value), std::move(new_value)}); } template @@ -77,109 +242,90 @@ struct TriggerContext { return; } - GetRegistry().removed_object_properties_.emplace_back(object, key, std::move(old_value)); + RegisterSetObjectProperty(object, key, std::move(old_value), TypedValue()); } void RegisterSetVertexLabel(const VertexAccessor &vertex, storage::LabelId label_id); void RegisterRemovedVertexLabel(const VertexAccessor &vertex, storage::LabelId label_id); + [[nodiscard]] TriggerContext TransformToTriggerContext() &&; - // Adapt the TriggerContext object inplace for a different DbAccessor - // (each derived accessor, e.g. VertexAccessor, gets adapted - // to the sent DbAccessor so they can be used safely) - void AdaptForAccessor(DbAccessor *accessor); - - TypedValue GetTypedValue(trigger::IdentifierTag tag, DbAccessor *dba) const; - - template - struct CreatedObject { - explicit CreatedObject(const TAccessor &object) : object{object} {} - - bool IsValid() const { return object.IsVisible(storage::View::OLD); } - - TAccessor object; - }; - - template - struct DeletedObject { - explicit DeletedObject(const TAccessor &object) : object{object} {} - - bool IsValid() const { return object.IsVisible(storage::View::OLD); } - - TAccessor object; - }; - - template - struct SetObjectProperty { - explicit SetObjectProperty(const TAccessor &object, storage::PropertyId key, TypedValue old_value, - TypedValue new_value) - : object{object}, key{key}, old_value{std::move(old_value)}, new_value{std::move(new_value)} {} - - std::map ToMap(DbAccessor *dba) const { - return {{detail::ObjectString(), TypedValue{object}}, - {"key", TypedValue{dba->PropertyToName(key)}}, - {"old", old_value}, - {"new", new_value}}; + private: + struct HashPair { + template + size_t operator()(const std::pair &pair) const { + using GidType = decltype(std::declval().Gid()); + return utils::HashCombine{}(pair.first.Gid(), pair.second); } + }; - bool IsValid() const { return object.IsVisible(storage::View::OLD); } - - TAccessor object; - storage::PropertyId key; + struct PropertyChangeInfo { TypedValue old_value; TypedValue new_value; }; template - struct RemovedObjectProperty { - explicit RemovedObjectProperty(const TAccessor &object, storage::PropertyId key, TypedValue old_value) - : object{object}, key{key}, old_value{std::move(old_value)} {} + using PropertyChangesMap = + std::unordered_map, PropertyChangeInfo, HashPair>; - std::map ToMap(DbAccessor *dba) const { - return {{detail::ObjectString(), TypedValue{object}}, - {"key", TypedValue{dba->PropertyToName(key)}}, - {"old", old_value}}; - } + template + using PropertyChangesLists = std::pair>, + std::vector>>; - bool IsValid() const { return object.IsVisible(storage::View::OLD); } - - TAccessor object; - storage::PropertyId key; - TypedValue old_value; - }; - - struct SetVertexLabel { - explicit SetVertexLabel(const VertexAccessor &vertex, const storage::LabelId label_id) - : object{vertex}, label_id{label_id} {} - - std::map ToMap(DbAccessor *dba) const; - bool IsValid() const; - - VertexAccessor object; - storage::LabelId label_id; - }; - - struct RemovedVertexLabel { - explicit RemovedVertexLabel(const VertexAccessor &vertex, const storage::LabelId label_id) - : object{vertex}, label_id{label_id} {} - - std::map ToMap(DbAccessor *dba) const; - bool IsValid() const; - - VertexAccessor object; - storage::LabelId label_id; - }; - - private: template struct Registry { - std::vector> created_objects_; - std::vector> deleted_objects_; - std::vector> set_object_properties_; - std::vector> removed_object_properties_; - }; + using ChangesSummary = + std::tuple>, std::vector>, + std::vector>, + std::vector>>; - Registry vertex_registry_; - Registry edge_registry_; + [[nodiscard]] static PropertyChangesLists PropertyMapToList(PropertyChangesMap &&map) { + std::vector> set_object_properties; + std::vector> removed_object_properties; + + for (auto it = map.begin(); it != map.end(); it = map.erase(it)) { + const auto &[key, property_change_info] = *it; + if (property_change_info.old_value.IsNull() && property_change_info.new_value.IsNull()) { + // no change happened on the transaction level + continue; + } + + if (const auto is_equal = property_change_info.old_value == property_change_info.new_value; + is_equal.IsBool() && is_equal.ValueBool()) { + // no change happened on the transaction level + continue; + } + + if (property_change_info.new_value.IsNull()) { + removed_object_properties.emplace_back(key.first, key.second /* property_id */, + std::move(property_change_info.old_value)); + } else { + set_object_properties.emplace_back(key.first, key.second, std::move(property_change_info.old_value), + std::move(property_change_info.new_value)); + } + } + + return PropertyChangesLists{std::move(set_object_properties), std::move(removed_object_properties)}; + } + + [[nodiscard]] ChangesSummary Summarize() && { + auto [set_object_properties, removed_object_properties] = PropertyMapToList(std::move(property_changes_)); + std::vector> created_objects_vec; + created_objects_vec.reserve(created_objects_.size()); + std::transform(created_objects_.begin(), created_objects_.end(), std::back_inserter(created_objects_vec), + [](const auto &gid_and_created_object) { return gid_and_created_object.second; }); + created_objects_.clear(); + + return {std::move(created_objects_vec), std::move(deleted_objects_), std::move(set_object_properties), + std::move(removed_object_properties)}; + } + + std::unordered_map> created_objects_; + std::vector> deleted_objects_; + // During the transaction, a single property on a single object could be changed multiple times. + // We want to register only the global change, at the end of the transaction. The change consists of + // the value before the transaction start, and the latest value assigned throughout the transaction. + PropertyChangesMap property_changes_; + }; template Registry &GetRegistry() { @@ -190,13 +336,29 @@ struct TriggerContext { } } - std::vector set_vertex_labels_; - std::vector removed_vertex_labels_; + using LabelChangesMap = std::unordered_map, int8_t, HashPair>; + using LabelChangesLists = std::pair, std::vector>; + + enum class LabelChange : int8_t { REMOVE = -1, ADD = 1 }; + + static int8_t LabelChangeToInt(LabelChange change); + + [[nodiscard]] static LabelChangesLists LabelMapToList(LabelChangesMap &&label_changes); + + void UpdateLabelMap(VertexAccessor vertex, storage::LabelId label_id, LabelChange change); + + Registry vertex_registry_; + Registry edge_registry_; + // During the transaction, a single label on a single vertex could be added and removed multiple times. + // We want to register only the global change, at the end of the transaction. The change consists of + // the state of the label before the transaction start, and the latest state assigned throughout the transaction. + LabelChangesMap label_changes_; }; struct Trigger { explicit Trigger(std::string name, const std::string &query, utils::SkipList *query_cache, - DbAccessor *db_accessor, utils::SpinLock *antlr_lock); + DbAccessor *db_accessor, utils::SpinLock *antlr_lock, + TriggerEventType event_type = TriggerEventType::ANY); void Execute(DbAccessor *dba, utils::MonotonicBufferResource *execution_memory, double tsc_frequency, double max_execution_time_sec, std::atomic *is_shutting_down, const TriggerContext &context) const; @@ -212,7 +374,7 @@ struct Trigger { private: struct TriggerPlan { - using IdentifierInfo = std::pair; + using IdentifierInfo = std::pair; explicit TriggerPlan(std::unique_ptr logical_plan, std::vector identifiers); @@ -224,6 +386,8 @@ struct Trigger { std::string name_; ParsedQuery parsed_statements_; + TriggerEventType event_type_; + mutable utils::SpinLock plan_lock_; mutable std::shared_ptr trigger_plan_; }; diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index b1f4a147e..8f7c2bb67 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -95,6 +95,9 @@ target_link_libraries(${test_prefix}query_plan_v2_create_set_remove_delete mg-qu add_unit_test(query_pretty_print.cpp) target_link_libraries(${test_prefix}query_pretty_print mg-query) +add_unit_test(query_trigger.cpp) +target_link_libraries(${test_prefix}query_trigger mg-query) + # Test query/procedure add_unit_test(query_procedure_mgp_type.cpp) target_link_libraries(${test_prefix}query_procedure_mgp_type mg-query) diff --git a/tests/unit/query_trigger.cpp b/tests/unit/query_trigger.cpp new file mode 100644 index 000000000..44936ef60 --- /dev/null +++ b/tests/unit/query_trigger.cpp @@ -0,0 +1,553 @@ +#include +#include + +#include "query/db_accessor.hpp" +#include "query/interpreter.hpp" +#include "query/trigger.hpp" +#include "query/typed_value.hpp" +#include "utils/memory.hpp" + +class TriggerContextTest : public ::testing::Test { + public: + void SetUp() override { db.emplace(); } + + void TearDown() override { + accessors.clear(); + db.reset(); + } + + storage::Storage::Accessor &StartTransaction() { + accessors.push_back(db->Access()); + return accessors.back(); + } + + protected: + std::optional db; + std::list accessors; +}; + +namespace { +void CheckTypedValueSize(const query::TriggerContext &trigger_context, const query::TriggerIdentifierTag tag, + const size_t expected_size, query::DbAccessor &dba) { + auto typed_values = trigger_context.GetTypedValue(tag, &dba); + ASSERT_TRUE(typed_values.IsList()); + ASSERT_EQ(typed_values.ValueList().size(), expected_size); +}; + +void CheckLabelMap(const query::TriggerContext &trigger_context, const query::TriggerIdentifierTag tag, + const size_t expected, query::DbAccessor &dba) { + auto typed_values = trigger_context.GetTypedValue(tag, &dba); + ASSERT_TRUE(typed_values.IsMap()); + auto &typed_values_map = typed_values.ValueMap(); + size_t value_count = 0; + for (const auto &[label, values] : typed_values_map) { + ASSERT_TRUE(values.IsList()); + value_count += values.ValueList().size(); + } + ASSERT_EQ(value_count, expected); +}; +} // namespace + +// Ensure that TriggerContext returns only valid objects. +// Returned TypedValue should always contain only objects +// that exist (unless its explicitly created for the deleted object) +TEST_F(TriggerContextTest, ValidObjectsTest) { + query::TriggerContext trigger_context; + query::TriggerContextCollector trigger_context_collector; + + size_t vertex_count = 0; + size_t edge_count = 0; + { + query::DbAccessor dba{&StartTransaction()}; + + auto create_vertex = [&] { + auto created_vertex = dba.InsertVertex(); + trigger_context_collector.RegisterCreatedObject(created_vertex); + ++vertex_count; + return created_vertex; + }; + + // Create vertices and add them to the trigger context as created + std::vector vertices; + for (size_t i = 0; i < 4; ++i) { + vertices.push_back(create_vertex()); + } + + auto create_edge = [&](auto &from, auto &to) { + auto maybe_edge = dba.InsertEdge(&from, &to, dba.NameToEdgeType("EDGE")); + ASSERT_FALSE(maybe_edge.HasError()); + trigger_context_collector.RegisterCreatedObject(*maybe_edge); + ++edge_count; + }; + + // Create edges and add them to the trigger context as created + create_edge(vertices[0], vertices[1]); + create_edge(vertices[1], vertices[2]); + create_edge(vertices[2], vertices[3]); + + dba.AdvanceCommand(); + trigger_context = std::move(trigger_context_collector).TransformToTriggerContext(); + trigger_context_collector = query::TriggerContextCollector{}; + + // Should have all the created objects + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::CREATED_VERTICES, vertex_count, dba); + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::CREATED_EDGES, edge_count, dba); + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::CREATED_OBJECTS, vertex_count + edge_count, dba); + + // we delete one of the vertices and edges in the same transaction + ASSERT_TRUE(dba.DetachRemoveVertex(&vertices[0]).HasValue()); + --vertex_count; + --edge_count; + + dba.AdvanceCommand(); + + // Should have one less created object for vertex and edge + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::CREATED_VERTICES, vertex_count, dba); + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::CREATED_EDGES, edge_count, dba); + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::CREATED_OBJECTS, vertex_count + edge_count, dba); + + ASSERT_FALSE(dba.Commit().HasError()); + } + + { + query::DbAccessor dba{&StartTransaction()}; + trigger_context.AdaptForAccessor(&dba); + + // Should have one less created object for vertex and edge + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::CREATED_VERTICES, vertex_count, dba); + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::CREATED_EDGES, edge_count, dba); + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::CREATED_OBJECTS, vertex_count + edge_count, dba); + } + + size_t deleted_vertex_count = 0; + size_t deleted_edge_count = 0; + { + query::DbAccessor dba{&StartTransaction()}; + + // register each type of change for each object + { + auto vertices = dba.Vertices(storage::View::OLD); + for (auto vertex : vertices) { + trigger_context_collector.RegisterSetObjectProperty(vertex, dba.NameToProperty("PROPERTY1"), + query::TypedValue("Value"), query::TypedValue("ValueNew")); + trigger_context_collector.RegisterRemovedObjectProperty(vertex, dba.NameToProperty("PROPERTY2"), + query::TypedValue("Value")); + trigger_context_collector.RegisterSetVertexLabel(vertex, dba.NameToLabel("LABEL1")); + trigger_context_collector.RegisterRemovedVertexLabel(vertex, dba.NameToLabel("LABEL2")); + + auto out_edges = vertex.OutEdges(storage::View::OLD); + ASSERT_TRUE(out_edges.HasValue()); + + for (auto edge : *out_edges) { + trigger_context_collector.RegisterSetObjectProperty( + edge, dba.NameToProperty("PROPERTY1"), query::TypedValue("Value"), query::TypedValue("ValueNew")); + trigger_context_collector.RegisterRemovedObjectProperty(edge, dba.NameToProperty("PROPERTY2"), + query::TypedValue("Value")); + } + } + } + + // Delete the first vertex with its edge and register the deleted object + { + auto vertices = dba.Vertices(storage::View::OLD); + for (auto vertex : vertices) { + const auto maybe_values = dba.DetachRemoveVertex(&vertex); + ASSERT_TRUE(maybe_values.HasValue()); + ASSERT_TRUE(maybe_values.GetValue()); + const auto &[deleted_vertex, deleted_edges] = *maybe_values.GetValue(); + + trigger_context_collector.RegisterDeletedObject(deleted_vertex); + ++deleted_vertex_count; + --vertex_count; + for (const auto &edge : deleted_edges) { + trigger_context_collector.RegisterDeletedObject(edge); + ++deleted_edge_count; + --edge_count; + } + + break; + } + } + + dba.AdvanceCommand(); + ASSERT_FALSE(dba.Commit().HasError()); + + trigger_context = std::move(trigger_context_collector).TransformToTriggerContext(); + trigger_context_collector = query::TriggerContextCollector{}; + + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::SET_VERTEX_PROPERTIES, vertex_count, dba); + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::SET_EDGE_PROPERTIES, edge_count, dba); + + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::REMOVED_VERTEX_PROPERTIES, vertex_count, dba); + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::REMOVED_EDGE_PROPERTIES, edge_count, dba); + + CheckLabelMap(trigger_context, query::TriggerIdentifierTag::SET_VERTEX_LABELS, vertex_count, dba); + CheckLabelMap(trigger_context, query::TriggerIdentifierTag::REMOVED_VERTEX_LABELS, vertex_count, dba); + + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::UPDATED_VERTICES, 4 * vertex_count, dba); + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::UPDATED_EDGES, 2 * edge_count, dba); + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::UPDATED_OBJECTS, + 4 * vertex_count + 2 * edge_count, dba); + + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::DELETED_VERTICES, deleted_vertex_count, dba); + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::DELETED_EDGES, deleted_edge_count, dba); + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::DELETED_OBJECTS, + deleted_vertex_count + deleted_edge_count, dba); + } + + // delete a single vertex with its edges, it should reduce number of typed values returned by the trigger context + // for each update event. + // TypedValue of the deleted objects stay the same as they're bound to the transaction which deleted them. + { + query::DbAccessor dba{&StartTransaction()}; + trigger_context.AdaptForAccessor(&dba); + + auto vertices = dba.Vertices(storage::View::OLD); + for (auto vertex : vertices) { + ASSERT_TRUE(dba.DetachRemoveVertex(&vertex).HasValue()); + break; + } + --vertex_count; + --edge_count; + + ASSERT_FALSE(dba.Commit().HasError()); + } + + { + query::DbAccessor dba{&StartTransaction()}; + trigger_context.AdaptForAccessor(&dba); + + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::SET_VERTEX_PROPERTIES, vertex_count, dba); + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::SET_EDGE_PROPERTIES, edge_count, dba); + + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::REMOVED_VERTEX_PROPERTIES, vertex_count, dba); + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::REMOVED_EDGE_PROPERTIES, edge_count, dba); + + CheckLabelMap(trigger_context, query::TriggerIdentifierTag::SET_VERTEX_LABELS, vertex_count, dba); + CheckLabelMap(trigger_context, query::TriggerIdentifierTag::REMOVED_VERTEX_LABELS, vertex_count, dba); + + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::UPDATED_VERTICES, 4 * vertex_count, dba); + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::UPDATED_EDGES, 2 * edge_count, dba); + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::UPDATED_OBJECTS, + 4 * vertex_count + 2 * edge_count, dba); + + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::DELETED_VERTICES, deleted_vertex_count, dba); + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::DELETED_EDGES, deleted_edge_count, dba); + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::DELETED_OBJECTS, + deleted_vertex_count + deleted_edge_count, dba); + } +} + +// If the trigger context registered a created object, each future event on the same object will be ignored. +// Binding the trigger context to transaction will mean that creating and updating an object in the same transaction +// will return only the CREATE event. +TEST_F(TriggerContextTest, ReturnCreateOnlyEvent) { + query::TriggerContextCollector trigger_context_collector; + + query::DbAccessor dba{&StartTransaction()}; + + auto create_vertex = [&] { + auto vertex = dba.InsertVertex(); + trigger_context_collector.RegisterCreatedObject(vertex); + trigger_context_collector.RegisterSetObjectProperty(vertex, dba.NameToProperty("PROPERTY1"), + query::TypedValue("Value"), query::TypedValue("ValueNew")); + trigger_context_collector.RegisterRemovedObjectProperty(vertex, dba.NameToProperty("PROPERTY2"), + query::TypedValue("Value")); + trigger_context_collector.RegisterSetVertexLabel(vertex, dba.NameToLabel("LABEL1")); + trigger_context_collector.RegisterRemovedVertexLabel(vertex, dba.NameToLabel("LABEL2")); + return vertex; + }; + + auto v1 = create_vertex(); + auto v2 = create_vertex(); + auto maybe_edge = dba.InsertEdge(&v1, &v2, dba.NameToEdgeType("EDGE")); + ASSERT_FALSE(maybe_edge.HasError()); + trigger_context_collector.RegisterCreatedObject(*maybe_edge); + trigger_context_collector.RegisterSetObjectProperty(*maybe_edge, dba.NameToProperty("PROPERTY1"), + query::TypedValue("Value"), query::TypedValue("ValueNew")); + trigger_context_collector.RegisterRemovedObjectProperty(*maybe_edge, dba.NameToProperty("PROPERTY2"), + query::TypedValue("Value")); + + dba.AdvanceCommand(); + + const auto trigger_context = std::move(trigger_context_collector).TransformToTriggerContext(); + + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::CREATED_VERTICES, 2, dba); + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::CREATED_EDGES, 1, dba); + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::CREATED_OBJECTS, 3, dba); + + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::SET_VERTEX_PROPERTIES, 0, dba); + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::SET_EDGE_PROPERTIES, 0, dba); + + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::REMOVED_VERTEX_PROPERTIES, 0, dba); + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::REMOVED_EDGE_PROPERTIES, 0, dba); + + CheckLabelMap(trigger_context, query::TriggerIdentifierTag::SET_VERTEX_LABELS, 0, dba); + CheckLabelMap(trigger_context, query::TriggerIdentifierTag::REMOVED_VERTEX_LABELS, 0, dba); + + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::UPDATED_VERTICES, 0, dba); + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::UPDATED_EDGES, 0, dba); + CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::UPDATED_OBJECTS, 0, dba); +} + +namespace { +void EXPECT_PROP_TRUE(const query::TypedValue &a) { + EXPECT_TRUE(a.type() == query::TypedValue::Type::Bool && a.ValueBool()); +} + +void EXPECT_PROP_EQ(const query::TypedValue &a, const query::TypedValue &b) { EXPECT_PROP_TRUE(a == b); } +} // namespace + +// During a transaction, same property for the same object can change multiple times. TriggerContext should ensure +// that only the change on the global value is returned (value before the transaction + latest value after the +// transaction) everything inbetween should be ignored. +TEST_F(TriggerContextTest, GlobalPropertyChange) { + query::DbAccessor dba{&StartTransaction()}; + + auto v = dba.InsertVertex(); + dba.AdvanceCommand(); + + { + SPDLOG_DEBUG("SET -> SET"); + query::TriggerContextCollector trigger_context_collector; + trigger_context_collector.RegisterSetObjectProperty(v, dba.NameToProperty("PROPERTY"), query::TypedValue("Value"), + query::TypedValue("ValueNew")); + trigger_context_collector.RegisterSetObjectProperty(v, dba.NameToProperty("PROPERTY"), + query::TypedValue("ValueNew"), query::TypedValue("ValueNewer")); + const auto trigger_context = std::move(trigger_context_collector).TransformToTriggerContext(); + auto updated_vertices = trigger_context.GetTypedValue(query::TriggerIdentifierTag::UPDATED_VERTICES, &dba); + ASSERT_TRUE(updated_vertices.IsList()); + auto &updated_vertices_list = updated_vertices.ValueList(); + ASSERT_EQ(updated_vertices_list.size(), 1); + auto &update = updated_vertices_list[0]; + ASSERT_TRUE(update.IsMap()); + EXPECT_PROP_EQ(update, query::TypedValue{std::map{ + {"event_type", query::TypedValue{"set_vertex_property"}}, + {"vertex", query::TypedValue{v}}, + {"key", query::TypedValue{"PROPERTY"}}, + {"old", query::TypedValue{"Value"}}, + {"new", query::TypedValue{"ValueNewer"}}}}); + } + + { + SPDLOG_DEBUG("SET -> REMOVE"); + query::TriggerContextCollector trigger_context_collector; + trigger_context_collector.RegisterSetObjectProperty(v, dba.NameToProperty("PROPERTY"), query::TypedValue("Value"), + query::TypedValue("ValueNew")); + trigger_context_collector.RegisterRemovedObjectProperty(v, dba.NameToProperty("PROPERTY"), + query::TypedValue("ValueNew")); + const auto trigger_context = std::move(trigger_context_collector).TransformToTriggerContext(); + auto updated_vertices = trigger_context.GetTypedValue(query::TriggerIdentifierTag::UPDATED_VERTICES, &dba); + ASSERT_TRUE(updated_vertices.IsList()); + auto &updated_vertices_list = updated_vertices.ValueList(); + ASSERT_EQ(updated_vertices_list.size(), 1); + auto &update = updated_vertices_list[0]; + ASSERT_TRUE(update.IsMap()); + EXPECT_PROP_EQ(update, query::TypedValue{std::map{ + {"event_type", query::TypedValue{"removed_vertex_property"}}, + {"vertex", query::TypedValue{v}}, + {"key", query::TypedValue{"PROPERTY"}}, + {"old", query::TypedValue{"Value"}}}}); + } + + { + SPDLOG_DEBUG("REMOVE -> SET"); + query::TriggerContextCollector trigger_context_collector; + trigger_context_collector.RegisterRemovedObjectProperty(v, dba.NameToProperty("PROPERTY"), + query::TypedValue("Value")); + trigger_context_collector.RegisterSetObjectProperty(v, dba.NameToProperty("PROPERTY"), query::TypedValue(), + query::TypedValue("ValueNew")); + const auto trigger_context = std::move(trigger_context_collector).TransformToTriggerContext(); + auto updated_vertices = trigger_context.GetTypedValue(query::TriggerIdentifierTag::UPDATED_VERTICES, &dba); + ASSERT_TRUE(updated_vertices.IsList()); + auto &updated_vertices_list = updated_vertices.ValueList(); + ASSERT_EQ(updated_vertices_list.size(), 1); + auto &update = updated_vertices_list[0]; + ASSERT_TRUE(update.IsMap()); + EXPECT_PROP_EQ(update, query::TypedValue{std::map{ + {"event_type", query::TypedValue{"set_vertex_property"}}, + {"vertex", query::TypedValue{v}}, + {"key", query::TypedValue{"PROPERTY"}}, + {"old", query::TypedValue{"Value"}}, + {"new", query::TypedValue{"ValueNew"}}}}); + } + + { + SPDLOG_DEBUG("REMOVE -> REMOVE"); + query::TriggerContextCollector trigger_context_collector; + trigger_context_collector.RegisterRemovedObjectProperty(v, dba.NameToProperty("PROPERTY"), + query::TypedValue("Value")); + trigger_context_collector.RegisterRemovedObjectProperty(v, dba.NameToProperty("PROPERTY"), query::TypedValue()); + const auto trigger_context = std::move(trigger_context_collector).TransformToTriggerContext(); + auto updated_vertices = trigger_context.GetTypedValue(query::TriggerIdentifierTag::UPDATED_VERTICES, &dba); + ASSERT_TRUE(updated_vertices.IsList()); + auto &updated_vertices_list = updated_vertices.ValueList(); + ASSERT_EQ(updated_vertices_list.size(), 1); + auto &update = updated_vertices_list[0]; + ASSERT_TRUE(update.IsMap()); + EXPECT_PROP_EQ(update, query::TypedValue{std::map{ + {"event_type", query::TypedValue{"removed_vertex_property"}}, + {"vertex", query::TypedValue{v}}, + {"key", query::TypedValue{"PROPERTY"}}, + {"old", query::TypedValue{"Value"}}}}); + } + + { + SPDLOG_DEBUG("SET -> SET (no change on transaction level)"); + query::TriggerContextCollector trigger_context_collector; + trigger_context_collector.RegisterSetObjectProperty(v, dba.NameToProperty("PROPERTY"), query::TypedValue("Value"), + query::TypedValue("ValueNew")); + trigger_context_collector.RegisterSetObjectProperty(v, dba.NameToProperty("PROPERTY"), + query::TypedValue("ValueNew"), query::TypedValue("Value")); + const auto trigger_context = std::move(trigger_context_collector).TransformToTriggerContext(); + auto updated_vertices = trigger_context.GetTypedValue(query::TriggerIdentifierTag::UPDATED_VERTICES, &dba); + ASSERT_TRUE(updated_vertices.IsList()); + auto &updated_vertices_list = updated_vertices.ValueList(); + ASSERT_EQ(updated_vertices_list.size(), 0); + } + + { + SPDLOG_DEBUG("SET -> REMOVE (no change on transaction level)"); + query::TriggerContextCollector trigger_context_collector; + trigger_context_collector.RegisterSetObjectProperty(v, dba.NameToProperty("PROPERTY"), query::TypedValue(), + query::TypedValue("ValueNew")); + trigger_context_collector.RegisterRemovedObjectProperty(v, dba.NameToProperty("PROPERTY"), + query::TypedValue("ValueNew")); + const auto trigger_context = std::move(trigger_context_collector).TransformToTriggerContext(); + auto updated_vertices = trigger_context.GetTypedValue(query::TriggerIdentifierTag::UPDATED_VERTICES, &dba); + ASSERT_TRUE(updated_vertices.IsList()); + auto &updated_vertices_list = updated_vertices.ValueList(); + ASSERT_EQ(updated_vertices_list.size(), 0); + } + + { + SPDLOG_DEBUG("REMOVE -> SET (no change on transaction level)"); + query::TriggerContextCollector trigger_context_collector; + trigger_context_collector.RegisterRemovedObjectProperty(v, dba.NameToProperty("PROPERTY"), + query::TypedValue("Value")); + trigger_context_collector.RegisterSetObjectProperty(v, dba.NameToProperty("PROPERTY"), query::TypedValue(), + query::TypedValue("Value")); + const auto trigger_context = std::move(trigger_context_collector).TransformToTriggerContext(); + auto updated_vertices = trigger_context.GetTypedValue(query::TriggerIdentifierTag::UPDATED_VERTICES, &dba); + ASSERT_TRUE(updated_vertices.IsList()); + auto &updated_vertices_list = updated_vertices.ValueList(); + ASSERT_EQ(updated_vertices_list.size(), 0); + } + + { + SPDLOG_DEBUG("REMOVE -> REMOVE (no change on transaction level)"); + query::TriggerContextCollector trigger_context_collector; + trigger_context_collector.RegisterRemovedObjectProperty(v, dba.NameToProperty("PROPERTY"), query::TypedValue()); + trigger_context_collector.RegisterRemovedObjectProperty(v, dba.NameToProperty("PROPERTY"), query::TypedValue()); + const auto trigger_context = std::move(trigger_context_collector).TransformToTriggerContext(); + auto updated_vertices = trigger_context.GetTypedValue(query::TriggerIdentifierTag::UPDATED_VERTICES, &dba); + ASSERT_TRUE(updated_vertices.IsList()); + auto &updated_vertices_list = updated_vertices.ValueList(); + ASSERT_EQ(updated_vertices_list.size(), 0); + } + + { + SPDLOG_DEBUG("SET -> REMOVE -> SET -> REMOVE -> SET"); + query::TriggerContextCollector trigger_context_collector; + trigger_context_collector.RegisterSetObjectProperty(v, dba.NameToProperty("PROPERTY"), query::TypedValue("Value0"), + query::TypedValue("Value1")); + trigger_context_collector.RegisterRemovedObjectProperty(v, dba.NameToProperty("PROPERTY"), + query::TypedValue("Value1")); + trigger_context_collector.RegisterSetObjectProperty(v, dba.NameToProperty("PROPERTY"), query::TypedValue(), + query::TypedValue("Value2")); + trigger_context_collector.RegisterRemovedObjectProperty(v, dba.NameToProperty("PROPERTY"), + query::TypedValue("Value2")); + trigger_context_collector.RegisterSetObjectProperty(v, dba.NameToProperty("PROPERTY"), query::TypedValue(), + query::TypedValue("Value3")); + const auto trigger_context = std::move(trigger_context_collector).TransformToTriggerContext(); + auto updated_vertices = trigger_context.GetTypedValue(query::TriggerIdentifierTag::UPDATED_VERTICES, &dba); + ASSERT_TRUE(updated_vertices.IsList()); + auto &updated_vertices_list = updated_vertices.ValueList(); + ASSERT_EQ(updated_vertices_list.size(), 1); + auto &update = updated_vertices_list[0]; + ASSERT_TRUE(update.IsMap()); + EXPECT_PROP_EQ(update, query::TypedValue{std::map{ + {"event_type", query::TypedValue{"set_vertex_property"}}, + {"vertex", query::TypedValue{v}}, + {"key", query::TypedValue{"PROPERTY"}}, + {"old", query::TypedValue{"Value0"}}, + {"new", query::TypedValue{"Value3"}}}}); + } +} + +// Same as above, but for label changes +TEST_F(TriggerContextTest, GlobalLabelChange) { + query::DbAccessor dba{&StartTransaction()}; + + auto v = dba.InsertVertex(); + dba.AdvanceCommand(); + + const auto label_id = dba.NameToLabel("LABEL"); + // You cannot add the same label multiple times and you cannot remove non existing labels + // so REMOVE -> REMOVE and SET -> SET doesn't make sense + { + SPDLOG_DEBUG("SET -> REMOVE"); + query::TriggerContextCollector trigger_context_collector; + trigger_context_collector.RegisterSetVertexLabel(v, label_id); + trigger_context_collector.RegisterRemovedVertexLabel(v, label_id); + const auto trigger_context = std::move(trigger_context_collector).TransformToTriggerContext(); + auto updated_vertices = trigger_context.GetTypedValue(query::TriggerIdentifierTag::UPDATED_VERTICES, &dba); + ASSERT_TRUE(updated_vertices.IsList()); + auto &updated_vertices_list = updated_vertices.ValueList(); + ASSERT_EQ(updated_vertices_list.size(), 0); + } + + { + SPDLOG_DEBUG("REMOVE -> SET"); + query::TriggerContextCollector trigger_context_collector; + trigger_context_collector.RegisterRemovedVertexLabel(v, label_id); + trigger_context_collector.RegisterSetVertexLabel(v, label_id); + const auto trigger_context = std::move(trigger_context_collector).TransformToTriggerContext(); + auto updated_vertices = trigger_context.GetTypedValue(query::TriggerIdentifierTag::UPDATED_VERTICES, &dba); + ASSERT_TRUE(updated_vertices.IsList()); + auto &updated_vertices_list = updated_vertices.ValueList(); + ASSERT_EQ(updated_vertices_list.size(), 0); + } + + { + SPDLOG_DEBUG("SET -> REMOVE -> SET -> REMOVE -> SET"); + query::TriggerContextCollector trigger_context_collector; + trigger_context_collector.RegisterSetVertexLabel(v, label_id); + trigger_context_collector.RegisterRemovedVertexLabel(v, label_id); + trigger_context_collector.RegisterSetVertexLabel(v, label_id); + trigger_context_collector.RegisterRemovedVertexLabel(v, label_id); + trigger_context_collector.RegisterSetVertexLabel(v, label_id); + const auto trigger_context = std::move(trigger_context_collector).TransformToTriggerContext(); + auto updated_vertices = trigger_context.GetTypedValue(query::TriggerIdentifierTag::UPDATED_VERTICES, &dba); + ASSERT_TRUE(updated_vertices.IsList()); + auto &updated_vertices_list = updated_vertices.ValueList(); + ASSERT_EQ(updated_vertices_list.size(), 1); + auto &update = updated_vertices_list[0]; + ASSERT_TRUE(update.IsMap()); + EXPECT_PROP_EQ(update, query::TypedValue{std::map{ + {"event_type", query::TypedValue{"set_vertex_label"}}, + {"vertex", query::TypedValue{v}}, + {"label", query::TypedValue{"LABEL"}}}}); + } + + { + SPDLOG_DEBUG("REMOVE -> SET -> REMOVE -> SET -> REMOVE"); + query::TriggerContextCollector trigger_context_collector; + trigger_context_collector.RegisterRemovedVertexLabel(v, label_id); + trigger_context_collector.RegisterSetVertexLabel(v, label_id); + trigger_context_collector.RegisterRemovedVertexLabel(v, label_id); + trigger_context_collector.RegisterSetVertexLabel(v, label_id); + trigger_context_collector.RegisterRemovedVertexLabel(v, label_id); + const auto trigger_context = std::move(trigger_context_collector).TransformToTriggerContext(); + auto updated_vertices = trigger_context.GetTypedValue(query::TriggerIdentifierTag::UPDATED_VERTICES, &dba); + ASSERT_TRUE(updated_vertices.IsList()); + auto &updated_vertices_list = updated_vertices.ValueList(); + ASSERT_EQ(updated_vertices_list.size(), 1); + auto &update = updated_vertices_list[0]; + ASSERT_TRUE(update.IsMap()); + EXPECT_PROP_EQ(update, query::TypedValue{std::map{ + {"event_type", query::TypedValue{"removed_vertex_label"}}, + {"vertex", query::TypedValue{v}}, + {"label", query::TypedValue{"LABEL"}}}}); + } +}