Trigger event types (#144)
Co-authored-by: Benjamin Antal <benjamin.antal@memgraph.io>
This commit is contained in:
parent
b459639968
commit
883f9c7ed3
@ -57,9 +57,7 @@ struct ExecutionContext {
|
||||
std::chrono::duration<double> profile_execution_time;
|
||||
plan::ProfilingStats stats;
|
||||
plan::ProfilingStats *stats_root{nullptr};
|
||||
|
||||
// trigger context
|
||||
TriggerContext *trigger_context{nullptr};
|
||||
TriggerContextCollector *trigger_context_collector{nullptr};
|
||||
};
|
||||
|
||||
inline bool MustAbort(const ExecutionContext &context) {
|
||||
|
@ -18,6 +18,7 @@
|
||||
#include "query/plan/planner.hpp"
|
||||
#include "query/plan/profile.hpp"
|
||||
#include "query/plan/vertex_count_cache.hpp"
|
||||
#include "query/trigger.hpp"
|
||||
#include "query/typed_value.hpp"
|
||||
#include "utils/algorithm.hpp"
|
||||
#include "utils/csv_parsing.hpp"
|
||||
@ -467,7 +468,8 @@ struct PullPlanVector {
|
||||
struct PullPlan {
|
||||
explicit PullPlan(std::shared_ptr<CachedPlan> plan, const Parameters ¶meters, bool is_profile_query,
|
||||
DbAccessor *dba, InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory,
|
||||
TriggerContext *trigger_context = nullptr, std::optional<size_t> memory_limit = {});
|
||||
TriggerContextCollector *trigger_context_collector = nullptr,
|
||||
std::optional<size_t> memory_limit = {});
|
||||
std::optional<ExecutionContext> Pull(AnyStream *stream, std::optional<int> n,
|
||||
const std::vector<Symbol> &output_symbols,
|
||||
std::map<std::string, TypedValue> *summary);
|
||||
@ -495,7 +497,7 @@ struct PullPlan {
|
||||
|
||||
PullPlan::PullPlan(const std::shared_ptr<CachedPlan> plan, const Parameters ¶meters, const bool is_profile_query,
|
||||
DbAccessor *dba, InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory,
|
||||
TriggerContext *trigger_context, const std::optional<size_t> memory_limit)
|
||||
TriggerContextCollector *trigger_context_collector, const std::optional<size_t> memory_limit)
|
||||
: plan_(plan),
|
||||
cursor_(plan->plan().MakeCursor(execution_memory)),
|
||||
frame_(plan->symbol_table().max_position(), execution_memory),
|
||||
@ -512,7 +514,7 @@ PullPlan::PullPlan(const std::shared_ptr<CachedPlan> plan, const Parameters &par
|
||||
ctx_.max_execution_time_sec = interpreter_context->execution_timeout_sec;
|
||||
ctx_.is_shutting_down = &interpreter_context->is_shutting_down;
|
||||
ctx_.is_profile_query = is_profile_query;
|
||||
ctx_.trigger_context = trigger_context;
|
||||
ctx_.trigger_context_collector = trigger_context_collector;
|
||||
}
|
||||
|
||||
std::optional<ExecutionContext> PullPlan::Pull(AnyStream *stream, std::optional<int> n,
|
||||
@ -605,33 +607,47 @@ Interpreter::Interpreter(InterpreterContext *interpreter_context) : interpreter_
|
||||
// auto triggers_acc = interpreter_context_->before_commit_triggers.access();
|
||||
// triggers_acc.insert(Trigger{"BeforeDelete",
|
||||
// "UNWIND deletedVertices as u CREATE(:DELETED_VERTEX {id: id(u) + 10})",
|
||||
// &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock});
|
||||
// triggers_acc.insert(Trigger{"BeforeDeleteEdge", "UNWIND deletedEdges as u CREATE(:DELETED_EDGE {id: id(u) +
|
||||
// 10})",
|
||||
// &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock});
|
||||
// &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock,
|
||||
// TriggerEventType::VERTEX_DELETE});
|
||||
// triggers_acc.insert(Trigger{"BeforeUpdatePropertyi",
|
||||
// "UNWIND assignedVertexProperties as u SET u.vertex.two = u.new",
|
||||
// &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock,
|
||||
// TriggerEventType::VERTEX_UPDATE});
|
||||
// triggers_acc.insert(Trigger{"BeforeDeleteEdge", "UNWIND deletedEdges as u CREATE(:DELETED_EDGE {id: id(u) +10})
|
||||
// ",
|
||||
// &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock,
|
||||
// TriggerEventType::EDGE_DELETE});
|
||||
// // triggers_acc.insert(Trigger{"BeforeDelete2", "UNWIND deletedEdges as u SET u.deleted = 0",
|
||||
// // &interpreter_context_->ast_cache, &dba,
|
||||
// // &interpreter_context_->antlr_lock});
|
||||
// triggers_acc.insert(Trigger{"BeforeDeleteProcedure", "CALL script.procedure(updatedVertices) YIELD * RETURN *",
|
||||
// &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock});
|
||||
// triggers_acc.insert(Trigger{"BeforeDeleteProcedure",
|
||||
// "CALL script.procedure('VERTEX_UPDATE', updatedVertices) YIELD * RETURN *",
|
||||
// &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock,
|
||||
// TriggerEventType::VERTEX_UPDATE});
|
||||
// triggers_acc.insert(Trigger{"BeforeCreator", "UNWIND createdVertices as u SET u.before = id(u) + 10",
|
||||
// &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock});
|
||||
// &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock,
|
||||
// TriggerEventType::VERTEX_CREATE});
|
||||
// triggers_acc.insert(Trigger{"BeforeCreatorEdge", "UNWIND createdEdges as u SET u.before = id(u) + 10",
|
||||
// &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock});
|
||||
// &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock,
|
||||
// TriggerEventType::EDGE_CREATE});
|
||||
// triggers_acc.insert(Trigger{"BeforeSetLabelProcedure",
|
||||
// "CALL label.procedure(assignedVertexLabels) YIELD * RETURN *",
|
||||
// &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock});
|
||||
// "CALL label.procedure('VERTEX_UPDATE', assignedVertexLabels) YIELD * RETURN *",
|
||||
// &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock,
|
||||
// TriggerEventType::VERTEX_UPDATE});
|
||||
// }
|
||||
// {
|
||||
// auto storage_acc = interpreter_context->db->Access();
|
||||
// DbAccessor dba(&storage_acc);
|
||||
// auto triggers_acc = interpreter_context->after_commit_triggers.access();
|
||||
// triggers_acc.insert(Trigger{"AfterDelete", "UNWIND deletedVertices as u CREATE(:DELETED {id: u.id + 100})",
|
||||
// &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock});
|
||||
// &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock,
|
||||
// TriggerEventType::VERTEX_DELETE});
|
||||
// triggers_acc.insert(Trigger{"AfterCreator", "UNWIND createdVertices as u SET u.after = u.id + 100",
|
||||
// &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock});
|
||||
// triggers_acc.insert(Trigger{"AfterUpdateProcedure", "CALL script.procedure(updatedVertices) YIELD * RETURN *",
|
||||
// &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock});
|
||||
// &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock,
|
||||
// TriggerEventType::VERTEX_CREATE});
|
||||
// triggers_acc.insert(Trigger{
|
||||
// "AfterUpdateProcedure", "CALL script.procedure('UPDATE',updatedObjects) YIELD * RETURN *",
|
||||
// &interpreter_context_->ast_cache, &dba, &interpreter_context_->antlr_lock, TriggerEventType::UPDATE});
|
||||
// }
|
||||
// } catch (const utils::BasicException &e) {
|
||||
// spdlog::critical("Failed to create a trigger because: {}", e.what());
|
||||
@ -654,7 +670,7 @@ PreparedQuery Interpreter::PrepareTransactionQuery(std::string_view query_upper)
|
||||
|
||||
if (interpreter_context_->before_commit_triggers.size() > 0 ||
|
||||
interpreter_context_->after_commit_triggers.size() > 0) {
|
||||
trigger_context_.emplace();
|
||||
trigger_context_collector_.emplace();
|
||||
}
|
||||
};
|
||||
} else if (query_upper == "COMMIT") {
|
||||
@ -702,7 +718,8 @@ PreparedQuery Interpreter::PrepareTransactionQuery(std::string_view query_upper)
|
||||
|
||||
PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string, TypedValue> *summary,
|
||||
InterpreterContext *interpreter_context, DbAccessor *dba,
|
||||
utils::MemoryResource *execution_memory, TriggerContext *trigger_context = nullptr) {
|
||||
utils::MemoryResource *execution_memory,
|
||||
TriggerContextCollector *trigger_context_collector = nullptr) {
|
||||
auto *cypher_query = utils::Downcast<CypherQuery>(parsed_query.query);
|
||||
|
||||
Frame frame(0);
|
||||
@ -740,7 +757,7 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string,
|
||||
}
|
||||
|
||||
auto pull_plan = std::make_shared<PullPlan>(plan, parsed_query.parameters, false, dba, interpreter_context,
|
||||
execution_memory, trigger_context, memory_limit);
|
||||
execution_memory, trigger_context_collector, memory_limit);
|
||||
return PreparedQuery{std::move(header), std::move(parsed_query.required_privileges),
|
||||
[pull_plan = std::move(pull_plan), output_symbols = std::move(output_symbols), summary](
|
||||
AnyStream *stream, std::optional<int> n) -> std::optional<QueryHandlerResult> {
|
||||
@ -1373,7 +1390,7 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
|
||||
if (utils::Downcast<CypherQuery>(parsed_query.query) &&
|
||||
(interpreter_context_->before_commit_triggers.size() > 0 ||
|
||||
interpreter_context_->after_commit_triggers.size() > 0)) {
|
||||
trigger_context_.emplace();
|
||||
trigger_context_collector_.emplace();
|
||||
}
|
||||
}
|
||||
|
||||
@ -1383,7 +1400,7 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
|
||||
if (utils::Downcast<CypherQuery>(parsed_query.query)) {
|
||||
prepared_query = PrepareCypherQuery(std::move(parsed_query), &query_execution->summary, interpreter_context_,
|
||||
&*execution_db_accessor_, &query_execution->execution_memory,
|
||||
trigger_context_ ? &*trigger_context_ : nullptr);
|
||||
trigger_context_collector_ ? &*trigger_context_collector_ : nullptr);
|
||||
} else if (utils::Downcast<ExplainQuery>(parsed_query.query)) {
|
||||
prepared_query = PrepareExplainQuery(std::move(parsed_query), &query_execution->summary, interpreter_context_,
|
||||
&*execution_db_accessor_, &query_execution->execution_memory);
|
||||
@ -1451,7 +1468,7 @@ void Interpreter::Abort() {
|
||||
db_accessor_->Abort();
|
||||
execution_db_accessor_.reset();
|
||||
db_accessor_.reset();
|
||||
trigger_context_.reset();
|
||||
trigger_context_collector_.reset();
|
||||
}
|
||||
|
||||
namespace {
|
||||
@ -1459,7 +1476,6 @@ void RunTriggersIndividually(const utils::SkipList<Trigger> &triggers, Interpret
|
||||
TriggerContext trigger_context) {
|
||||
// Run the triggers
|
||||
for (const auto &trigger : triggers.access()) {
|
||||
spdlog::debug("Executing trigger '{}'", trigger.name());
|
||||
utils::MonotonicBufferResource execution_memory{kExecutionMemoryBlockSize};
|
||||
|
||||
// create a new transaction for each trigger
|
||||
@ -1512,16 +1528,20 @@ void Interpreter::Commit() {
|
||||
// a query.
|
||||
if (!db_accessor_) return;
|
||||
|
||||
if (trigger_context_) {
|
||||
std::optional<TriggerContext> trigger_context = std::nullopt;
|
||||
if (trigger_context_collector_) {
|
||||
trigger_context.emplace(std::move(*trigger_context_collector_).TransformToTriggerContext());
|
||||
}
|
||||
|
||||
if (trigger_context) {
|
||||
// Run the triggers
|
||||
for (const auto &trigger : interpreter_context_->before_commit_triggers.access()) {
|
||||
spdlog::debug("Executing trigger '{}'", trigger.name());
|
||||
utils::MonotonicBufferResource execution_memory{kExecutionMemoryBlockSize};
|
||||
AdvanceCommand();
|
||||
try {
|
||||
trigger.Execute(&*execution_db_accessor_, &execution_memory, *interpreter_context_->tsc_frequency,
|
||||
interpreter_context_->execution_timeout_sec, &interpreter_context_->is_shutting_down,
|
||||
*trigger_context_);
|
||||
*trigger_context);
|
||||
} catch (const utils::BasicException &e) {
|
||||
throw utils::BasicException(
|
||||
fmt::format("Trigger '{}' caused the transaction to fail.\nException: {}", trigger.name(), e.what()));
|
||||
@ -1540,7 +1560,7 @@ void Interpreter::Commit() {
|
||||
auto property_name = execution_db_accessor_->PropertyToName(*constraint_violation.properties.begin());
|
||||
execution_db_accessor_.reset();
|
||||
db_accessor_.reset();
|
||||
trigger_context_.reset();
|
||||
trigger_context_collector_.reset();
|
||||
throw QueryException("Unable to commit due to existence constraint violation on :{}({})", label_name,
|
||||
property_name);
|
||||
break;
|
||||
@ -1553,7 +1573,7 @@ void Interpreter::Commit() {
|
||||
[this](auto &stream, const auto &prop) { stream << execution_db_accessor_->PropertyToName(prop); });
|
||||
execution_db_accessor_.reset();
|
||||
db_accessor_.reset();
|
||||
trigger_context_.reset();
|
||||
trigger_context_collector_.reset();
|
||||
throw QueryException("Unable to commit due to unique constraint violation on :{}({})", label_name,
|
||||
property_names_stream.str());
|
||||
break;
|
||||
@ -1561,8 +1581,8 @@ void Interpreter::Commit() {
|
||||
}
|
||||
}
|
||||
|
||||
if (trigger_context_) {
|
||||
background_thread_.AddTask([trigger_context = std::move(*trigger_context_),
|
||||
if (trigger_context && interpreter_context_->after_commit_triggers.size() > 0) {
|
||||
background_thread_.AddTask([trigger_context = std::move(*trigger_context),
|
||||
interpreter_context = this->interpreter_context_,
|
||||
user_transaction = std::shared_ptr(std::move(db_accessor_))]() mutable {
|
||||
RunTriggersIndividually(interpreter_context->after_commit_triggers, interpreter_context,
|
||||
@ -1574,7 +1594,7 @@ void Interpreter::Commit() {
|
||||
|
||||
execution_db_accessor_.reset();
|
||||
db_accessor_.reset();
|
||||
trigger_context_.reset();
|
||||
trigger_context_collector_.reset();
|
||||
|
||||
SPDLOG_DEBUG("Finished comitting the transaction");
|
||||
}
|
||||
|
@ -303,7 +303,7 @@ class Interpreter final {
|
||||
// move this unique_ptr into a shrared_ptr.
|
||||
std::unique_ptr<storage::Storage::Accessor> db_accessor_;
|
||||
std::optional<DbAccessor> execution_db_accessor_;
|
||||
std::optional<TriggerContext> trigger_context_;
|
||||
std::optional<TriggerContextCollector> trigger_context_collector_;
|
||||
bool in_explicit_transaction_{false};
|
||||
bool expect_rollback_{false};
|
||||
|
||||
|
@ -209,8 +209,8 @@ bool CreateNode::CreateNodeCursor::Pull(Frame &frame, ExecutionContext &context)
|
||||
|
||||
if (input_cursor_->Pull(frame, context)) {
|
||||
auto created_vertex = CreateLocalVertex(self_.node_info_, &frame, context);
|
||||
if (context.trigger_context) {
|
||||
context.trigger_context->RegisterCreatedObject(created_vertex);
|
||||
if (context.trigger_context_collector) {
|
||||
context.trigger_context_collector->RegisterCreatedObject(created_vertex);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
@ -311,8 +311,8 @@ bool CreateExpand::CreateExpandCursor::Pull(Frame &frame, ExecutionContext &cont
|
||||
}
|
||||
}();
|
||||
|
||||
if (context.trigger_context) {
|
||||
context.trigger_context->RegisterCreatedObject(created_edge);
|
||||
if (context.trigger_context_collector) {
|
||||
context.trigger_context_collector->RegisterCreatedObject(created_edge);
|
||||
}
|
||||
|
||||
return true;
|
||||
@ -329,8 +329,8 @@ VertexAccessor &CreateExpand::CreateExpandCursor::OtherVertex(Frame &frame, Exec
|
||||
return dest_node_value.ValueVertex();
|
||||
} else {
|
||||
auto &created_vertex = CreateLocalVertex(self_.node_info_, &frame, context);
|
||||
if (context.trigger_context) {
|
||||
context.trigger_context->RegisterCreatedObject(created_vertex);
|
||||
if (context.trigger_context_collector) {
|
||||
context.trigger_context_collector->RegisterCreatedObject(created_vertex);
|
||||
}
|
||||
return created_vertex;
|
||||
}
|
||||
@ -1848,8 +1848,8 @@ bool Delete::DeleteCursor::Pull(Frame &frame, ExecutionContext &context) {
|
||||
}
|
||||
}
|
||||
|
||||
if (context.trigger_context && maybe_value.GetValue()) {
|
||||
context.trigger_context->RegisterDeletedObject(*maybe_value.GetValue());
|
||||
if (context.trigger_context_collector && maybe_value.GetValue()) {
|
||||
context.trigger_context_collector->RegisterDeletedObject(*maybe_value.GetValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1873,10 +1873,10 @@ bool Delete::DeleteCursor::Pull(Frame &frame, ExecutionContext &context) {
|
||||
throw QueryRuntimeException("Unexpected error when deleting a node.");
|
||||
}
|
||||
}
|
||||
if (context.trigger_context && res.GetValue()) {
|
||||
context.trigger_context->RegisterDeletedObject(res.GetValue()->first);
|
||||
if (context.trigger_context_collector && res.GetValue()) {
|
||||
context.trigger_context_collector->RegisterDeletedObject(res.GetValue()->first);
|
||||
for (const auto &deleted_edge : res.GetValue()->second) {
|
||||
context.trigger_context->RegisterDeletedObject(deleted_edge);
|
||||
context.trigger_context_collector->RegisterDeletedObject(deleted_edge);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@ -1894,8 +1894,8 @@ bool Delete::DeleteCursor::Pull(Frame &frame, ExecutionContext &context) {
|
||||
}
|
||||
}
|
||||
|
||||
if (context.trigger_context && res.GetValue()) {
|
||||
context.trigger_context->RegisterDeletedObject(*res.GetValue());
|
||||
if (context.trigger_context_collector && res.GetValue()) {
|
||||
context.trigger_context_collector->RegisterDeletedObject(*res.GetValue());
|
||||
}
|
||||
}
|
||||
break;
|
||||
@ -1953,18 +1953,20 @@ bool SetProperty::SetPropertyCursor::Pull(Frame &frame, ExecutionContext &contex
|
||||
case TypedValue::Type::Vertex: {
|
||||
auto old_value = PropsSetChecked(&lhs.ValueVertex(), self_.property_, rhs);
|
||||
|
||||
if (context.trigger_context) {
|
||||
context.trigger_context->RegisterSetObjectProperty(lhs.ValueVertex(), self_.property_,
|
||||
TypedValue{std::move(old_value)}, std::move(rhs));
|
||||
if (context.trigger_context_collector) {
|
||||
// rhs cannot be moved because it was created with the allocator that is only valid during current pull
|
||||
context.trigger_context_collector->RegisterSetObjectProperty(lhs.ValueVertex(), self_.property_,
|
||||
TypedValue{std::move(old_value)}, TypedValue{rhs});
|
||||
}
|
||||
break;
|
||||
}
|
||||
case TypedValue::Type::Edge: {
|
||||
auto old_value = PropsSetChecked(&lhs.ValueEdge(), self_.property_, rhs);
|
||||
|
||||
if (context.trigger_context) {
|
||||
context.trigger_context->RegisterSetObjectProperty(lhs.ValueEdge(), self_.property_,
|
||||
TypedValue{std::move(old_value)}, std::move(rhs));
|
||||
if (context.trigger_context_collector) {
|
||||
// rhs cannot be moved because it was created with the allocator that is only valid during current pull
|
||||
context.trigger_context_collector->RegisterSetObjectProperty(lhs.ValueEdge(), self_.property_,
|
||||
TypedValue{std::move(old_value)}, TypedValue{rhs});
|
||||
}
|
||||
break;
|
||||
}
|
||||
@ -2039,7 +2041,7 @@ void SetPropertiesOnRecord(TRecordAccessor *record, const TypedValue &rhs, SetPr
|
||||
}
|
||||
}
|
||||
|
||||
if (context->trigger_context) {
|
||||
if (context->trigger_context_collector) {
|
||||
old_values.emplace(std::move(*maybe_value));
|
||||
}
|
||||
}
|
||||
@ -2073,7 +2075,7 @@ void SetPropertiesOnRecord(TRecordAccessor *record, const TypedValue &rhs, SetPr
|
||||
|
||||
return {};
|
||||
}();
|
||||
context->trigger_context->RegisterSetObjectProperty(*record, key, TypedValue(std::move(old_value)),
|
||||
context->trigger_context_collector->RegisterSetObjectProperty(*record, key, TypedValue(std::move(old_value)),
|
||||
TypedValue(std::move(new_value)));
|
||||
};
|
||||
|
||||
@ -2094,7 +2096,7 @@ void SetPropertiesOnRecord(TRecordAccessor *record, const TypedValue &rhs, SetPr
|
||||
}
|
||||
}
|
||||
|
||||
if (context->trigger_context) {
|
||||
if (context->trigger_context_collector) {
|
||||
register_set_property(std::move(*maybe_error), kv.first, std::move(kv.second));
|
||||
}
|
||||
}
|
||||
@ -2111,7 +2113,7 @@ void SetPropertiesOnRecord(TRecordAccessor *record, const TypedValue &rhs, SetPr
|
||||
for (const auto &kv : rhs.ValueMap()) {
|
||||
auto key = context->db_accessor->NameToProperty(kv.first);
|
||||
auto old_value = PropsSetChecked(record, key, kv.second);
|
||||
if (context->trigger_context) {
|
||||
if (context->trigger_context_collector) {
|
||||
register_set_property(std::move(old_value), key, kv.second);
|
||||
}
|
||||
}
|
||||
@ -2123,10 +2125,10 @@ void SetPropertiesOnRecord(TRecordAccessor *record, const TypedValue &rhs, SetPr
|
||||
"map.");
|
||||
}
|
||||
|
||||
if (context->trigger_context && old_values) {
|
||||
if (context->trigger_context_collector && old_values) {
|
||||
// register removed properties
|
||||
for (auto &[property_id, property_value] : *old_values) {
|
||||
context->trigger_context->RegisterRemovedObjectProperty(*record, property_id,
|
||||
context->trigger_context_collector->RegisterRemovedObjectProperty(*record, property_id,
|
||||
TypedValue(std::move(property_value)));
|
||||
}
|
||||
}
|
||||
@ -2196,9 +2198,9 @@ bool SetLabels::SetLabelsCursor::Pull(Frame &frame, ExecutionContext &context) {
|
||||
ExpectType(self_.input_symbol_, vertex_value, TypedValue::Type::Vertex);
|
||||
auto &vertex = vertex_value.ValueVertex();
|
||||
for (auto label : self_.labels_) {
|
||||
auto maybe_error = vertex.AddLabel(label);
|
||||
if (maybe_error.HasError()) {
|
||||
switch (maybe_error.GetError()) {
|
||||
auto maybe_value = vertex.AddLabel(label);
|
||||
if (maybe_value.HasError()) {
|
||||
switch (maybe_value.GetError()) {
|
||||
case storage::Error::SERIALIZATION_ERROR:
|
||||
throw QueryRuntimeException("Can't serialize due to concurrent operations.");
|
||||
case storage::Error::DELETED_OBJECT:
|
||||
@ -2210,8 +2212,8 @@ bool SetLabels::SetLabelsCursor::Pull(Frame &frame, ExecutionContext &context) {
|
||||
}
|
||||
}
|
||||
|
||||
if (context.trigger_context) {
|
||||
context.trigger_context->RegisterSetVertexLabel(vertex, label);
|
||||
if (context.trigger_context_collector && *maybe_value) {
|
||||
context.trigger_context_collector->RegisterSetVertexLabel(vertex, label);
|
||||
}
|
||||
}
|
||||
|
||||
@ -2269,8 +2271,8 @@ bool RemoveProperty::RemovePropertyCursor::Pull(Frame &frame, ExecutionContext &
|
||||
}
|
||||
}
|
||||
|
||||
if (context.trigger_context) {
|
||||
context.trigger_context->RegisterRemovedObjectProperty(*record, property,
|
||||
if (context.trigger_context_collector) {
|
||||
context.trigger_context_collector->RegisterRemovedObjectProperty(*record, property,
|
||||
TypedValue(std::move(*maybe_old_value)));
|
||||
}
|
||||
};
|
||||
@ -2339,8 +2341,8 @@ bool RemoveLabels::RemoveLabelsCursor::Pull(Frame &frame, ExecutionContext &cont
|
||||
}
|
||||
}
|
||||
|
||||
if (context.trigger_context && *maybe_value) {
|
||||
context.trigger_context->RegisterRemovedVertexLabel(vertex, label);
|
||||
if (context.trigger_context_collector && *maybe_value) {
|
||||
context.trigger_context_collector->RegisterRemovedVertexLabel(vertex, label);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -12,23 +12,112 @@
|
||||
namespace query {
|
||||
|
||||
namespace {
|
||||
// clang-format off
|
||||
std::vector<std::pair<Identifier, trigger::IdentifierTag>> GetPredefinedIdentifiers() {
|
||||
return {{{"createdVertices", false}, trigger::IdentifierTag::CREATED_VERTICES },
|
||||
{{"createdEdges", false}, trigger::IdentifierTag::CREATED_EDGES },
|
||||
{{"deletedVertices", false}, trigger::IdentifierTag::DELETED_VERTICES },
|
||||
{{"deletedEdges", false}, trigger::IdentifierTag::DELETED_EDGES },
|
||||
{{"assignedVertexProperties", false}, trigger::IdentifierTag::SET_VERTEX_PROPERTIES },
|
||||
{{"assignedEdgeProperties", false}, trigger::IdentifierTag::SET_EDGE_PROPERTIES },
|
||||
{{"removedVertexProperties", false}, trigger::IdentifierTag::REMOVED_VERTEX_PROPERTIES},
|
||||
{{"removedEdgeProperties", false}, trigger::IdentifierTag::REMOVED_EDGE_PROPERTIES },
|
||||
{{"assignedVertexLabels", false}, trigger::IdentifierTag::SET_VERTEX_LABELS },
|
||||
{{"removedVertexLabels", false}, trigger::IdentifierTag::REMOVED_VERTEX_LABELS },
|
||||
{{"updatedVertices", false}, trigger::IdentifierTag::UPDATED_VERTICES },
|
||||
{{"updatedEdges", false}, trigger::IdentifierTag::UPDATED_EDGES },
|
||||
{{"updatedObjects", false}, trigger::IdentifierTag::UPDATED_OBJECTS }};
|
||||
|
||||
auto IdentifierString(const TriggerIdentifierTag tag) noexcept {
|
||||
switch (tag) {
|
||||
case TriggerIdentifierTag::CREATED_VERTICES:
|
||||
return "createdVertices";
|
||||
|
||||
case TriggerIdentifierTag::CREATED_EDGES:
|
||||
return "createdEdges";
|
||||
|
||||
case TriggerIdentifierTag::CREATED_OBJECTS:
|
||||
return "createdObjects";
|
||||
|
||||
case TriggerIdentifierTag::DELETED_VERTICES:
|
||||
return "deletedVertices";
|
||||
|
||||
case TriggerIdentifierTag::DELETED_EDGES:
|
||||
return "deletedEdges";
|
||||
|
||||
case TriggerIdentifierTag::DELETED_OBJECTS:
|
||||
return "deletedObjects";
|
||||
|
||||
case TriggerIdentifierTag::SET_VERTEX_PROPERTIES:
|
||||
return "assignedVertexProperties";
|
||||
|
||||
case TriggerIdentifierTag::SET_EDGE_PROPERTIES:
|
||||
return "assignedEdgeProperties";
|
||||
|
||||
case TriggerIdentifierTag::REMOVED_VERTEX_PROPERTIES:
|
||||
return "removedVertexProperties";
|
||||
|
||||
case TriggerIdentifierTag::REMOVED_EDGE_PROPERTIES:
|
||||
return "removedEdgeProperties";
|
||||
|
||||
case TriggerIdentifierTag::SET_VERTEX_LABELS:
|
||||
return "assignedVertexLabels";
|
||||
|
||||
case TriggerIdentifierTag::REMOVED_VERTEX_LABELS:
|
||||
return "removedVertexLabels";
|
||||
|
||||
case TriggerIdentifierTag::UPDATED_VERTICES:
|
||||
return "updatedVertices";
|
||||
|
||||
case TriggerIdentifierTag::UPDATED_EDGES:
|
||||
return "updatedEdges";
|
||||
|
||||
case TriggerIdentifierTag::UPDATED_OBJECTS:
|
||||
return "updatedObjects";
|
||||
}
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
concept SameAsIdentifierTag = std::same_as<T, TriggerIdentifierTag>;
|
||||
|
||||
template <SameAsIdentifierTag... TArgs>
|
||||
std::vector<std::pair<Identifier, TriggerIdentifierTag>> TagsToIdentifiers(const TArgs &...args) {
|
||||
std::vector<std::pair<Identifier, TriggerIdentifierTag>> identifiers;
|
||||
identifiers.reserve(sizeof...(args));
|
||||
|
||||
auto add_identifier = [&identifiers](const auto tag) {
|
||||
identifiers.emplace_back(Identifier{IdentifierString(tag), false}, tag);
|
||||
};
|
||||
|
||||
(add_identifier(args), ...);
|
||||
|
||||
return identifiers;
|
||||
};
|
||||
|
||||
std::vector<std::pair<Identifier, TriggerIdentifierTag>> GetPredefinedIdentifiers(const TriggerEventType event_type) {
|
||||
using IdentifierTag = TriggerIdentifierTag;
|
||||
using EventType = TriggerEventType;
|
||||
|
||||
switch (event_type) {
|
||||
case EventType::ANY:
|
||||
return {};
|
||||
|
||||
case EventType::CREATE:
|
||||
return TagsToIdentifiers(IdentifierTag::CREATED_OBJECTS);
|
||||
|
||||
case EventType::VERTEX_CREATE:
|
||||
return TagsToIdentifiers(IdentifierTag::CREATED_VERTICES);
|
||||
|
||||
case EventType::EDGE_CREATE:
|
||||
return TagsToIdentifiers(IdentifierTag::CREATED_EDGES);
|
||||
|
||||
case EventType::DELETE:
|
||||
return TagsToIdentifiers(IdentifierTag::DELETED_OBJECTS);
|
||||
|
||||
case EventType::VERTEX_DELETE:
|
||||
return TagsToIdentifiers(IdentifierTag::DELETED_VERTICES);
|
||||
|
||||
case EventType::EDGE_DELETE:
|
||||
return TagsToIdentifiers(IdentifierTag::DELETED_EDGES);
|
||||
|
||||
case EventType::UPDATE:
|
||||
return TagsToIdentifiers(IdentifierTag::UPDATED_OBJECTS);
|
||||
|
||||
case EventType::VERTEX_UPDATE:
|
||||
return TagsToIdentifiers(IdentifierTag::SET_VERTEX_PROPERTIES, IdentifierTag::REMOVED_VERTEX_PROPERTIES,
|
||||
IdentifierTag::SET_VERTEX_LABELS, IdentifierTag::REMOVED_VERTEX_LABELS,
|
||||
IdentifierTag::UPDATED_VERTICES);
|
||||
|
||||
case EventType::EDGE_UPDATE:
|
||||
return TagsToIdentifiers(IdentifierTag::SET_EDGE_PROPERTIES, IdentifierTag::REMOVED_EDGE_PROPERTIES,
|
||||
IdentifierTag::UPDATED_EDGES);
|
||||
}
|
||||
}
|
||||
// clang-format on
|
||||
|
||||
template <typename T>
|
||||
concept WithToMap = requires(const T value, DbAccessor *dba) {
|
||||
@ -42,14 +131,12 @@ TypedValue ToTypedValue(const T &value, DbAccessor *dba) {
|
||||
}
|
||||
|
||||
template <detail::ObjectAccessor TAccessor>
|
||||
TypedValue ToTypedValue(const TriggerContext::CreatedObject<TAccessor> &created_object,
|
||||
[[maybe_unused]] DbAccessor *dba) {
|
||||
TypedValue ToTypedValue(const detail::CreatedObject<TAccessor> &created_object, [[maybe_unused]] DbAccessor *dba) {
|
||||
return TypedValue{created_object.object};
|
||||
}
|
||||
|
||||
template <detail::ObjectAccessor TAccessor>
|
||||
TypedValue ToTypedValue(const TriggerContext::DeletedObject<TAccessor> &deleted_object,
|
||||
[[maybe_unused]] DbAccessor *dba) {
|
||||
TypedValue ToTypedValue(const detail::DeletedObject<TAccessor> &deleted_object, [[maybe_unused]] DbAccessor *dba) {
|
||||
return TypedValue{deleted_object.object};
|
||||
}
|
||||
|
||||
@ -67,7 +154,7 @@ concept ConvertableToTypedValue = requires(T value, DbAccessor *dba) {
|
||||
&&WithIsValid<T>;
|
||||
|
||||
template <typename T>
|
||||
concept LabelUpdateContext = utils::SameAsAnyOf<T, TriggerContext::SetVertexLabel, TriggerContext::RemovedVertexLabel>;
|
||||
concept LabelUpdateContext = utils::SameAsAnyOf<T, detail::SetVertexLabel, detail::RemovedVertexLabel>;
|
||||
|
||||
template <LabelUpdateContext TContext>
|
||||
TypedValue ToTypedValue(const std::vector<TContext> &values, DbAccessor *dba) {
|
||||
@ -79,17 +166,19 @@ TypedValue ToTypedValue(const std::vector<TContext> &values, DbAccessor *dba) {
|
||||
}
|
||||
}
|
||||
|
||||
std::map<std::string, TypedValue> typed_values;
|
||||
TypedValue result{std::map<std::string, TypedValue>{}};
|
||||
auto &typed_values = result.ValueMap();
|
||||
for (auto &[label_id, vertices] : vertices_by_labels) {
|
||||
typed_values.emplace(dba->LabelToName(label_id), TypedValue(std::move(vertices)));
|
||||
}
|
||||
|
||||
return TypedValue(std::move(typed_values));
|
||||
return result;
|
||||
}
|
||||
|
||||
template <ConvertableToTypedValue T>
|
||||
TypedValue ToTypedValue(const std::vector<T> &values, DbAccessor *dba) requires(!LabelUpdateContext<T>) {
|
||||
std::vector<TypedValue> typed_values;
|
||||
TypedValue result{std::vector<TypedValue>{}};
|
||||
auto &typed_values = result.ValueList();
|
||||
typed_values.reserve(values.size());
|
||||
|
||||
for (const auto &value : values) {
|
||||
@ -98,131 +187,97 @@ TypedValue ToTypedValue(const std::vector<T> &values, DbAccessor *dba) requires(
|
||||
}
|
||||
}
|
||||
|
||||
return TypedValue(std::move(typed_values));
|
||||
return result;
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
const char *TypeToString() {
|
||||
if constexpr (std::same_as<T, TriggerContext::SetObjectProperty<VertexAccessor>>) {
|
||||
if constexpr (std::same_as<T, detail::CreatedObject<VertexAccessor>>) {
|
||||
return "created_vertex";
|
||||
} else if constexpr (std::same_as<T, detail::CreatedObject<EdgeAccessor>>) {
|
||||
return "created_edge";
|
||||
} else if constexpr (std::same_as<T, detail::DeletedObject<VertexAccessor>>) {
|
||||
return "deleted_vertex";
|
||||
} else if constexpr (std::same_as<T, detail::DeletedObject<EdgeAccessor>>) {
|
||||
return "deleted_edge";
|
||||
} else if constexpr (std::same_as<T, detail::SetObjectProperty<VertexAccessor>>) {
|
||||
return "set_vertex_property";
|
||||
} else if constexpr (std::same_as<T, TriggerContext::SetObjectProperty<EdgeAccessor>>) {
|
||||
} else if constexpr (std::same_as<T, detail::SetObjectProperty<EdgeAccessor>>) {
|
||||
return "set_edge_property";
|
||||
} else if constexpr (std::same_as<T, TriggerContext::RemovedObjectProperty<VertexAccessor>>) {
|
||||
} else if constexpr (std::same_as<T, detail::RemovedObjectProperty<VertexAccessor>>) {
|
||||
return "removed_vertex_property";
|
||||
} else if constexpr (std::same_as<T, TriggerContext::RemovedObjectProperty<EdgeAccessor>>) {
|
||||
} else if constexpr (std::same_as<T, detail::RemovedObjectProperty<EdgeAccessor>>) {
|
||||
return "removed_edge_property";
|
||||
} else if constexpr (std::same_as<T, TriggerContext::SetVertexLabel>) {
|
||||
} else if constexpr (std::same_as<T, detail::SetVertexLabel>) {
|
||||
return "set_vertex_label";
|
||||
} else if constexpr (std::same_as<T, TriggerContext::RemovedVertexLabel>) {
|
||||
} else if constexpr (std::same_as<T, detail::RemovedVertexLabel>) {
|
||||
return "removed_vertex_label";
|
||||
}
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
concept UpdateContext = WithToMap<T> &&WithIsValid<T>;
|
||||
concept ContextInfo = WithToMap<T> &&WithIsValid<T>;
|
||||
|
||||
template <UpdateContext... Args>
|
||||
TypedValue Updated(DbAccessor *dba, const std::vector<Args> &...args) {
|
||||
template <ContextInfo... Args>
|
||||
TypedValue Concatenate(DbAccessor *dba, const std::vector<Args> &...args) {
|
||||
const auto size = (args.size() + ...);
|
||||
std::vector<TypedValue> updated;
|
||||
updated.reserve(size);
|
||||
TypedValue result{std::vector<TypedValue>{}};
|
||||
auto &concatenated = result.ValueList();
|
||||
concatenated.reserve(size);
|
||||
|
||||
const auto add_to_updated = [&]<UpdateContext T>(const std::vector<T> &values) {
|
||||
const auto add_to_concatenated = [&]<ContextInfo T>(const std::vector<T> &values) {
|
||||
for (const auto &value : values) {
|
||||
if (value.IsValid()) {
|
||||
auto map = value.ToMap(dba);
|
||||
map["type"] = TypeToString<T>();
|
||||
updated.emplace_back(std::move(map));
|
||||
map["event_type"] = TypeToString<T>();
|
||||
concatenated.emplace_back(std::move(map));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
(add_to_updated(args), ...);
|
||||
(add_to_concatenated(args), ...);
|
||||
|
||||
return TypedValue(std::move(updated));
|
||||
return result;
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
concept WithEmpty = requires(const T value) {
|
||||
{ value.empty() }
|
||||
->std::same_as<bool>;
|
||||
};
|
||||
|
||||
template <WithEmpty... TContainer>
|
||||
bool AnyContainsValue(const TContainer &...value_containers) {
|
||||
return (!value_containers.empty() || ...);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
bool TriggerContext::SetVertexLabel::IsValid() const { return object.IsVisible(storage::View::OLD); }
|
||||
namespace detail {
|
||||
bool SetVertexLabel::IsValid() const { return object.IsVisible(storage::View::OLD); }
|
||||
|
||||
std::map<std::string, TypedValue> TriggerContext::SetVertexLabel::ToMap(DbAccessor *dba) const {
|
||||
std::map<std::string, TypedValue> SetVertexLabel::ToMap(DbAccessor *dba) const {
|
||||
return {{"vertex", TypedValue{object}}, {"label", TypedValue{dba->LabelToName(label_id)}}};
|
||||
}
|
||||
|
||||
bool TriggerContext::RemovedVertexLabel::IsValid() const { return object.IsVisible(storage::View::OLD); }
|
||||
bool RemovedVertexLabel::IsValid() const { return object.IsVisible(storage::View::OLD); }
|
||||
|
||||
std::map<std::string, TypedValue> TriggerContext::RemovedVertexLabel::ToMap(DbAccessor *dba) const {
|
||||
std::map<std::string, TypedValue> RemovedVertexLabel::ToMap(DbAccessor *dba) const {
|
||||
return {{"vertex", TypedValue{object}}, {"label", TypedValue{dba->LabelToName(label_id)}}};
|
||||
}
|
||||
|
||||
void TriggerContext::RegisterSetVertexLabel(const VertexAccessor &vertex, const storage::LabelId label_id) {
|
||||
set_vertex_labels_.emplace_back(vertex, label_id);
|
||||
}
|
||||
|
||||
void TriggerContext::RegisterRemovedVertexLabel(const VertexAccessor &vertex, const storage::LabelId label_id) {
|
||||
removed_vertex_labels_.emplace_back(vertex, label_id);
|
||||
}
|
||||
|
||||
TypedValue TriggerContext::GetTypedValue(const trigger::IdentifierTag tag, DbAccessor *dba) const {
|
||||
const auto &[created_vertices, deleted_vertices, set_vertex_properties, removed_vertex_properties] = vertex_registry_;
|
||||
const auto &[created_edges, deleted_edges, set_edge_properties, removed_edge_properties] = edge_registry_;
|
||||
|
||||
switch (tag) {
|
||||
case trigger::IdentifierTag::CREATED_VERTICES:
|
||||
return ToTypedValue(created_vertices, dba);
|
||||
|
||||
case trigger::IdentifierTag::CREATED_EDGES:
|
||||
return ToTypedValue(created_edges, dba);
|
||||
|
||||
case trigger::IdentifierTag::DELETED_VERTICES:
|
||||
return ToTypedValue(deleted_vertices, dba);
|
||||
|
||||
case trigger::IdentifierTag::DELETED_EDGES:
|
||||
return ToTypedValue(deleted_edges, dba);
|
||||
|
||||
case trigger::IdentifierTag::SET_VERTEX_PROPERTIES:
|
||||
return ToTypedValue(set_vertex_properties, dba);
|
||||
|
||||
case trigger::IdentifierTag::SET_EDGE_PROPERTIES:
|
||||
return ToTypedValue(set_edge_properties, dba);
|
||||
|
||||
case trigger::IdentifierTag::REMOVED_VERTEX_PROPERTIES:
|
||||
return ToTypedValue(removed_vertex_properties, dba);
|
||||
|
||||
case trigger::IdentifierTag::REMOVED_EDGE_PROPERTIES:
|
||||
return ToTypedValue(removed_edge_properties, dba);
|
||||
|
||||
case trigger::IdentifierTag::SET_VERTEX_LABELS:
|
||||
return ToTypedValue(set_vertex_labels_, dba);
|
||||
|
||||
case trigger::IdentifierTag::REMOVED_VERTEX_LABELS:
|
||||
return ToTypedValue(removed_vertex_labels_, dba);
|
||||
|
||||
case trigger::IdentifierTag::UPDATED_VERTICES:
|
||||
return Updated(dba, set_vertex_properties, removed_vertex_properties, set_vertex_labels_, removed_vertex_labels_);
|
||||
|
||||
case trigger::IdentifierTag::UPDATED_EDGES:
|
||||
return Updated(dba, set_edge_properties, removed_edge_properties);
|
||||
|
||||
case trigger::IdentifierTag::UPDATED_OBJECTS:
|
||||
return Updated(dba, set_vertex_properties, set_edge_properties, removed_vertex_properties,
|
||||
removed_edge_properties, set_vertex_labels_, removed_vertex_labels_);
|
||||
}
|
||||
}
|
||||
} // namespace detail
|
||||
|
||||
void TriggerContext::AdaptForAccessor(DbAccessor *accessor) {
|
||||
auto &[created_vertices, deleted_vertices, set_vertex_properties, removed_vertex_properties] = vertex_registry_;
|
||||
// adapt created_vertices_
|
||||
{
|
||||
auto it = created_vertices.begin();
|
||||
for (const auto &created_vertex : created_vertices) {
|
||||
// adapt created_vertices_
|
||||
auto it = created_vertices_.begin();
|
||||
for (auto &created_vertex : created_vertices_) {
|
||||
if (auto maybe_vertex = accessor->FindVertex(created_vertex.object.Gid(), storage::View::OLD); maybe_vertex) {
|
||||
*it = CreatedObject{*maybe_vertex};
|
||||
*it = detail::CreatedObject{*maybe_vertex};
|
||||
++it;
|
||||
}
|
||||
}
|
||||
created_vertices.erase(it, created_vertices.end());
|
||||
created_vertices_.erase(it, created_vertices_.end());
|
||||
}
|
||||
|
||||
// deleted_vertices_ should keep the transaction context of the transaction which deleted it
|
||||
@ -241,30 +296,30 @@ void TriggerContext::AdaptForAccessor(DbAccessor *accessor) {
|
||||
values->erase(it, values->end());
|
||||
};
|
||||
|
||||
adapt_context_with_vertex(&set_vertex_properties);
|
||||
adapt_context_with_vertex(&removed_vertex_properties);
|
||||
adapt_context_with_vertex(&set_vertex_properties_);
|
||||
adapt_context_with_vertex(&removed_vertex_properties_);
|
||||
adapt_context_with_vertex(&set_vertex_labels_);
|
||||
adapt_context_with_vertex(&removed_vertex_labels_);
|
||||
|
||||
auto &[created_edges, deleted_edges, set_edge_properties, removed_edge_properties] = edge_registry_;
|
||||
// adapt created_edges
|
||||
{
|
||||
auto it = created_edges.begin();
|
||||
for (const auto &created_edge : created_edges) {
|
||||
if (auto maybe_vertex = accessor->FindVertex(created_edge.object.From().Gid(), storage::View::OLD);
|
||||
maybe_vertex) {
|
||||
auto maybe_out_edges = maybe_vertex->OutEdges(storage::View::OLD);
|
||||
// adapt created_edges
|
||||
auto it = created_edges_.begin();
|
||||
for (auto &created_edge : created_edges_) {
|
||||
const auto maybe_from_vertex = accessor->FindVertex(created_edge.object.From().Gid(), storage::View::OLD);
|
||||
if (!maybe_from_vertex) {
|
||||
continue;
|
||||
}
|
||||
auto maybe_out_edges = maybe_from_vertex->OutEdges(storage::View::OLD);
|
||||
MG_ASSERT(maybe_out_edges.HasValue());
|
||||
const auto edge_gid = created_edge.object.Gid();
|
||||
for (const auto &edge : *maybe_out_edges) {
|
||||
if (edge.Gid() == created_edge.object.Gid()) {
|
||||
*it = CreatedObject{edge};
|
||||
if (edge.Gid() == edge_gid) {
|
||||
*it = detail::CreatedObject{edge};
|
||||
++it;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
created_edges.erase(it, created_edges.end());
|
||||
created_edges_.erase(it, created_edges_.end());
|
||||
}
|
||||
|
||||
// deleted_edges_ should keep the transaction context of the transaction which deleted it
|
||||
@ -290,13 +345,164 @@ void TriggerContext::AdaptForAccessor(DbAccessor *accessor) {
|
||||
values->erase(it, values->end());
|
||||
};
|
||||
|
||||
adapt_context_with_edge(&set_edge_properties);
|
||||
adapt_context_with_edge(&removed_edge_properties);
|
||||
adapt_context_with_edge(&set_edge_properties_);
|
||||
adapt_context_with_edge(&removed_edge_properties_);
|
||||
}
|
||||
|
||||
TypedValue TriggerContext::GetTypedValue(const TriggerIdentifierTag tag, DbAccessor *dba) const {
|
||||
switch (tag) {
|
||||
case TriggerIdentifierTag::CREATED_VERTICES:
|
||||
return ToTypedValue(created_vertices_, dba);
|
||||
|
||||
case TriggerIdentifierTag::CREATED_EDGES:
|
||||
return ToTypedValue(created_edges_, dba);
|
||||
|
||||
case TriggerIdentifierTag::CREATED_OBJECTS:
|
||||
return Concatenate(dba, created_vertices_, created_edges_);
|
||||
|
||||
case TriggerIdentifierTag::DELETED_VERTICES:
|
||||
return ToTypedValue(deleted_vertices_, dba);
|
||||
|
||||
case TriggerIdentifierTag::DELETED_EDGES:
|
||||
return ToTypedValue(deleted_edges_, dba);
|
||||
|
||||
case TriggerIdentifierTag::DELETED_OBJECTS:
|
||||
return Concatenate(dba, deleted_vertices_, deleted_edges_);
|
||||
|
||||
case TriggerIdentifierTag::SET_VERTEX_PROPERTIES:
|
||||
return ToTypedValue(set_vertex_properties_, dba);
|
||||
|
||||
case TriggerIdentifierTag::SET_EDGE_PROPERTIES:
|
||||
return ToTypedValue(set_edge_properties_, dba);
|
||||
|
||||
case TriggerIdentifierTag::REMOVED_VERTEX_PROPERTIES:
|
||||
return ToTypedValue(removed_vertex_properties_, dba);
|
||||
|
||||
case TriggerIdentifierTag::REMOVED_EDGE_PROPERTIES:
|
||||
return ToTypedValue(removed_edge_properties_, dba);
|
||||
|
||||
case TriggerIdentifierTag::SET_VERTEX_LABELS:
|
||||
return ToTypedValue(set_vertex_labels_, dba);
|
||||
|
||||
case TriggerIdentifierTag::REMOVED_VERTEX_LABELS:
|
||||
return ToTypedValue(removed_vertex_labels_, dba);
|
||||
|
||||
case TriggerIdentifierTag::UPDATED_VERTICES:
|
||||
return Concatenate(dba, set_vertex_properties_, removed_vertex_properties_, set_vertex_labels_,
|
||||
removed_vertex_labels_);
|
||||
|
||||
case TriggerIdentifierTag::UPDATED_EDGES:
|
||||
return Concatenate(dba, set_edge_properties_, removed_edge_properties_);
|
||||
|
||||
case TriggerIdentifierTag::UPDATED_OBJECTS:
|
||||
return Concatenate(dba, set_vertex_properties_, set_edge_properties_, removed_vertex_properties_,
|
||||
removed_edge_properties_, set_vertex_labels_, removed_vertex_labels_);
|
||||
}
|
||||
}
|
||||
|
||||
bool TriggerContext::ShouldEventTrigger(const TriggerEventType event_type) const {
|
||||
using EventType = TriggerEventType;
|
||||
switch (event_type) {
|
||||
case EventType::ANY:
|
||||
return true;
|
||||
|
||||
case EventType::CREATE:
|
||||
return AnyContainsValue(created_vertices_, created_edges_);
|
||||
|
||||
case EventType::VERTEX_CREATE:
|
||||
return AnyContainsValue(created_vertices_);
|
||||
|
||||
case EventType::EDGE_CREATE:
|
||||
return AnyContainsValue(created_edges_);
|
||||
|
||||
case EventType::DELETE:
|
||||
return AnyContainsValue(deleted_vertices_, deleted_edges_);
|
||||
|
||||
case EventType::VERTEX_DELETE:
|
||||
return AnyContainsValue(deleted_vertices_);
|
||||
|
||||
case EventType::EDGE_DELETE:
|
||||
return AnyContainsValue(deleted_edges_);
|
||||
|
||||
case EventType::UPDATE:
|
||||
return AnyContainsValue(set_vertex_properties_, set_edge_properties_, removed_vertex_properties_,
|
||||
removed_edge_properties_, set_vertex_labels_, removed_vertex_labels_);
|
||||
|
||||
case EventType::VERTEX_UPDATE:
|
||||
return AnyContainsValue(set_vertex_properties_, removed_vertex_properties_, set_vertex_labels_,
|
||||
removed_vertex_labels_);
|
||||
|
||||
case EventType::EDGE_UPDATE:
|
||||
return AnyContainsValue(set_edge_properties_, removed_edge_properties_);
|
||||
}
|
||||
}
|
||||
|
||||
void TriggerContextCollector::UpdateLabelMap(const VertexAccessor vertex, const storage::LabelId label_id,
|
||||
const LabelChange change) {
|
||||
auto ®istry = GetRegistry<VertexAccessor>();
|
||||
if (registry.created_objects_.count(vertex.Gid())) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (auto it = label_changes_.find({vertex, label_id}); it != label_changes_.end()) {
|
||||
it->second = std::clamp(it->second + LabelChangeToInt(change), -1, 1);
|
||||
return;
|
||||
}
|
||||
|
||||
label_changes_.emplace(std::make_pair(vertex, label_id), LabelChangeToInt(change));
|
||||
}
|
||||
|
||||
void TriggerContextCollector::RegisterSetVertexLabel(const VertexAccessor &vertex, const storage::LabelId label_id) {
|
||||
UpdateLabelMap(vertex, label_id, LabelChange::ADD);
|
||||
}
|
||||
|
||||
void TriggerContextCollector::RegisterRemovedVertexLabel(const VertexAccessor &vertex,
|
||||
const storage::LabelId label_id) {
|
||||
UpdateLabelMap(vertex, label_id, LabelChange::REMOVE);
|
||||
}
|
||||
|
||||
int8_t TriggerContextCollector::LabelChangeToInt(LabelChange change) {
|
||||
static_assert(std::is_same_v<std::underlying_type_t<LabelChange>, int8_t>,
|
||||
"The underlying type of LabelChange doesn't match the return type!");
|
||||
return static_cast<int8_t>(change);
|
||||
}
|
||||
|
||||
TriggerContext TriggerContextCollector::TransformToTriggerContext() && {
|
||||
auto [created_vertices, deleted_vertices, set_vertex_properties, removed_vertex_properties] =
|
||||
std::move(vertex_registry_).Summarize();
|
||||
auto [set_vertex_labels, removed_vertex_labels] = LabelMapToList(std::move(label_changes_));
|
||||
auto [created_edges, deleted_edges, set_edge_properties, removed_edge_properties] =
|
||||
std::move(edge_registry_).Summarize();
|
||||
|
||||
return {std::move(created_vertices), std::move(deleted_vertices),
|
||||
std::move(set_vertex_properties), std::move(removed_vertex_properties),
|
||||
std::move(set_vertex_labels), std::move(removed_vertex_labels),
|
||||
std::move(created_edges), std::move(deleted_edges),
|
||||
std::move(set_edge_properties), std::move(removed_edge_properties)};
|
||||
}
|
||||
|
||||
TriggerContextCollector::LabelChangesLists TriggerContextCollector::LabelMapToList(LabelChangesMap &&label_changes) {
|
||||
std::vector<detail::SetVertexLabel> set_vertex_labels;
|
||||
std::vector<detail::RemovedVertexLabel> removed_vertex_labels;
|
||||
|
||||
for (const auto &[key, label_state] : label_changes) {
|
||||
if (label_state == LabelChangeToInt(LabelChange::ADD)) {
|
||||
set_vertex_labels.emplace_back(key.first, key.second);
|
||||
} else if (label_state == LabelChangeToInt(LabelChange::REMOVE)) {
|
||||
removed_vertex_labels.emplace_back(key.first, key.second);
|
||||
}
|
||||
}
|
||||
|
||||
label_changes.clear();
|
||||
|
||||
return {std::move(set_vertex_labels), std::move(removed_vertex_labels)};
|
||||
}
|
||||
|
||||
Trigger::Trigger(std::string name, const std::string &query, utils::SkipList<QueryCacheEntry> *query_cache,
|
||||
DbAccessor *db_accessor, utils::SpinLock *antlr_lock)
|
||||
: name_(std::move(name)), parsed_statements_{ParseQuery(query, {}, query_cache, antlr_lock)} {
|
||||
DbAccessor *db_accessor, utils::SpinLock *antlr_lock, const TriggerEventType event_type)
|
||||
: name_(std::move(name)),
|
||||
parsed_statements_{ParseQuery(query, {}, query_cache, antlr_lock)},
|
||||
event_type_{event_type} {
|
||||
// We check immediately if the query is valid by trying to create a plan.
|
||||
GetPlan(db_accessor);
|
||||
}
|
||||
@ -310,7 +516,7 @@ std::shared_ptr<Trigger::TriggerPlan> Trigger::GetPlan(DbAccessor *db_accessor)
|
||||
return trigger_plan_;
|
||||
}
|
||||
|
||||
auto identifiers = GetPredefinedIdentifiers();
|
||||
auto identifiers = GetPredefinedIdentifiers(event_type_);
|
||||
|
||||
AstStorage ast_storage;
|
||||
ast_storage.properties_ = parsed_statements_.ast_storage.properties_;
|
||||
@ -332,6 +538,11 @@ std::shared_ptr<Trigger::TriggerPlan> Trigger::GetPlan(DbAccessor *db_accessor)
|
||||
void Trigger::Execute(DbAccessor *dba, utils::MonotonicBufferResource *execution_memory, const double tsc_frequency,
|
||||
const double max_execution_time_sec, std::atomic<bool> *is_shutting_down,
|
||||
const TriggerContext &context) const {
|
||||
if (!context.ShouldEventTrigger(event_type_)) {
|
||||
return;
|
||||
}
|
||||
|
||||
spdlog::debug("Executing trigger '{}'", name_);
|
||||
auto trigger_plan = GetPlan(dba);
|
||||
MG_ASSERT(trigger_plan, "Invalid trigger plan received");
|
||||
auto &[plan, identifiers] = *trigger_plan;
|
||||
|
@ -1,33 +1,18 @@
|
||||
#pragma once
|
||||
#include <algorithm>
|
||||
#include <concepts>
|
||||
#include <iterator>
|
||||
#include <tuple>
|
||||
#include <type_traits>
|
||||
#include <utility>
|
||||
|
||||
#include "query/cypher_query_interpreter.hpp"
|
||||
#include "query/db_accessor.hpp"
|
||||
#include "query/frontend/ast/ast.hpp"
|
||||
#include "query/typed_value.hpp"
|
||||
#include "utils/concepts.hpp"
|
||||
#include "utils/fnv.hpp"
|
||||
|
||||
namespace query {
|
||||
|
||||
namespace trigger {
|
||||
enum class IdentifierTag : uint8_t {
|
||||
CREATED_VERTICES,
|
||||
CREATED_EDGES,
|
||||
DELETED_VERTICES,
|
||||
DELETED_EDGES,
|
||||
SET_VERTEX_PROPERTIES,
|
||||
SET_EDGE_PROPERTIES,
|
||||
REMOVED_VERTEX_PROPERTIES,
|
||||
REMOVED_EDGE_PROPERTIES,
|
||||
SET_VERTEX_LABELS,
|
||||
REMOVED_VERTEX_LABELS,
|
||||
UPDATED_VERTICES,
|
||||
UPDATED_EDGES,
|
||||
UPDATED_OBJECTS
|
||||
};
|
||||
} // namespace trigger
|
||||
|
||||
namespace detail {
|
||||
template <typename T>
|
||||
concept ObjectAccessor = utils::SameAsAnyOf<T, VertexAccessor, EdgeAccessor>;
|
||||
@ -40,82 +25,39 @@ const char *ObjectString() {
|
||||
return "edge";
|
||||
}
|
||||
}
|
||||
} // namespace detail
|
||||
|
||||
struct TriggerContext {
|
||||
static_assert(std::is_trivially_copy_constructible_v<VertexAccessor>,
|
||||
"VertexAccessor is not trivially copy constructible, move it where possible and remove this assert");
|
||||
static_assert(std::is_trivially_copy_constructible_v<EdgeAccessor>,
|
||||
"EdgeAccessor is not trivially copy constructible, move it where possible and remove this asssert");
|
||||
|
||||
template <detail::ObjectAccessor TAccessor>
|
||||
void RegisterCreatedObject(const TAccessor &created_object) {
|
||||
GetRegistry<TAccessor>().created_objects_.emplace_back(created_object);
|
||||
}
|
||||
|
||||
template <detail::ObjectAccessor TAccessor>
|
||||
void RegisterDeletedObject(const TAccessor &deleted_object) {
|
||||
GetRegistry<TAccessor>().deleted_objects_.emplace_back(deleted_object);
|
||||
}
|
||||
|
||||
template <detail::ObjectAccessor TAccessor>
|
||||
void RegisterSetObjectProperty(const TAccessor &object, const storage::PropertyId key, TypedValue old_value,
|
||||
TypedValue new_value) {
|
||||
if (new_value.IsNull()) {
|
||||
RegisterRemovedObjectProperty(object, key, std::move(old_value));
|
||||
return;
|
||||
}
|
||||
|
||||
GetRegistry<TAccessor>().set_object_properties_.emplace_back(object, key, std::move(old_value),
|
||||
std::move(new_value));
|
||||
}
|
||||
|
||||
template <detail::ObjectAccessor TAccessor>
|
||||
void RegisterRemovedObjectProperty(const TAccessor &object, const storage::PropertyId key, TypedValue old_value) {
|
||||
// property is already removed
|
||||
if (old_value.IsNull()) {
|
||||
return;
|
||||
}
|
||||
|
||||
GetRegistry<TAccessor>().removed_object_properties_.emplace_back(object, key, std::move(old_value));
|
||||
}
|
||||
|
||||
void RegisterSetVertexLabel(const VertexAccessor &vertex, storage::LabelId label_id);
|
||||
void RegisterRemovedVertexLabel(const VertexAccessor &vertex, storage::LabelId label_id);
|
||||
|
||||
// Adapt the TriggerContext object inplace for a different DbAccessor
|
||||
// (each derived accessor, e.g. VertexAccessor, gets adapted
|
||||
// to the sent DbAccessor so they can be used safely)
|
||||
void AdaptForAccessor(DbAccessor *accessor);
|
||||
|
||||
TypedValue GetTypedValue(trigger::IdentifierTag tag, DbAccessor *dba) const;
|
||||
|
||||
template <detail::ObjectAccessor TAccessor>
|
||||
struct CreatedObject {
|
||||
template <ObjectAccessor TAccessor>
|
||||
struct CreatedObject {
|
||||
explicit CreatedObject(const TAccessor &object) : object{object} {}
|
||||
|
||||
bool IsValid() const { return object.IsVisible(storage::View::OLD); }
|
||||
std::map<std::string, TypedValue> ToMap([[maybe_unused]] DbAccessor *dba) const {
|
||||
return {{ObjectString<TAccessor>(), TypedValue{object}}};
|
||||
}
|
||||
|
||||
TAccessor object;
|
||||
};
|
||||
};
|
||||
|
||||
template <detail::ObjectAccessor TAccessor>
|
||||
struct DeletedObject {
|
||||
template <ObjectAccessor TAccessor>
|
||||
struct DeletedObject {
|
||||
explicit DeletedObject(const TAccessor &object) : object{object} {}
|
||||
|
||||
bool IsValid() const { return object.IsVisible(storage::View::OLD); }
|
||||
std::map<std::string, TypedValue> ToMap([[maybe_unused]] DbAccessor *dba) const {
|
||||
return {{ObjectString<TAccessor>(), TypedValue{object}}};
|
||||
}
|
||||
|
||||
TAccessor object;
|
||||
};
|
||||
};
|
||||
|
||||
template <detail::ObjectAccessor TAccessor>
|
||||
struct SetObjectProperty {
|
||||
template <ObjectAccessor TAccessor>
|
||||
struct SetObjectProperty {
|
||||
explicit SetObjectProperty(const TAccessor &object, storage::PropertyId key, TypedValue old_value,
|
||||
TypedValue new_value)
|
||||
: object{object}, key{key}, old_value{std::move(old_value)}, new_value{std::move(new_value)} {}
|
||||
|
||||
std::map<std::string, TypedValue> ToMap(DbAccessor *dba) const {
|
||||
return {{detail::ObjectString<TAccessor>(), TypedValue{object}},
|
||||
return {{ObjectString<TAccessor>(), TypedValue{object}},
|
||||
{"key", TypedValue{dba->PropertyToName(key)}},
|
||||
{"old", old_value},
|
||||
{"new", new_value}};
|
||||
@ -127,15 +69,15 @@ struct TriggerContext {
|
||||
storage::PropertyId key;
|
||||
TypedValue old_value;
|
||||
TypedValue new_value;
|
||||
};
|
||||
};
|
||||
|
||||
template <detail::ObjectAccessor TAccessor>
|
||||
struct RemovedObjectProperty {
|
||||
template <ObjectAccessor TAccessor>
|
||||
struct RemovedObjectProperty {
|
||||
explicit RemovedObjectProperty(const TAccessor &object, storage::PropertyId key, TypedValue old_value)
|
||||
: object{object}, key{key}, old_value{std::move(old_value)} {}
|
||||
|
||||
std::map<std::string, TypedValue> ToMap(DbAccessor *dba) const {
|
||||
return {{detail::ObjectString<TAccessor>(), TypedValue{object}},
|
||||
return {{ObjectString<TAccessor>(), TypedValue{object}},
|
||||
{"key", TypedValue{dba->PropertyToName(key)}},
|
||||
{"old", old_value}};
|
||||
}
|
||||
@ -145,9 +87,9 @@ struct TriggerContext {
|
||||
TAccessor object;
|
||||
storage::PropertyId key;
|
||||
TypedValue old_value;
|
||||
};
|
||||
};
|
||||
|
||||
struct SetVertexLabel {
|
||||
struct SetVertexLabel {
|
||||
explicit SetVertexLabel(const VertexAccessor &vertex, const storage::LabelId label_id)
|
||||
: object{vertex}, label_id{label_id} {}
|
||||
|
||||
@ -156,9 +98,9 @@ struct TriggerContext {
|
||||
|
||||
VertexAccessor object;
|
||||
storage::LabelId label_id;
|
||||
};
|
||||
};
|
||||
|
||||
struct RemovedVertexLabel {
|
||||
struct RemovedVertexLabel {
|
||||
explicit RemovedVertexLabel(const VertexAccessor &vertex, const storage::LabelId label_id)
|
||||
: object{vertex}, label_id{label_id} {}
|
||||
|
||||
@ -167,19 +109,223 @@ struct TriggerContext {
|
||||
|
||||
VertexAccessor object;
|
||||
storage::LabelId label_id;
|
||||
};
|
||||
};
|
||||
} // namespace detail
|
||||
|
||||
enum class TriggerIdentifierTag : uint8_t {
|
||||
CREATED_VERTICES,
|
||||
CREATED_EDGES,
|
||||
CREATED_OBJECTS,
|
||||
DELETED_VERTICES,
|
||||
DELETED_EDGES,
|
||||
DELETED_OBJECTS,
|
||||
SET_VERTEX_PROPERTIES,
|
||||
SET_EDGE_PROPERTIES,
|
||||
REMOVED_VERTEX_PROPERTIES,
|
||||
REMOVED_EDGE_PROPERTIES,
|
||||
SET_VERTEX_LABELS,
|
||||
REMOVED_VERTEX_LABELS,
|
||||
UPDATED_VERTICES,
|
||||
UPDATED_EDGES,
|
||||
UPDATED_OBJECTS
|
||||
};
|
||||
|
||||
enum class TriggerEventType : uint8_t {
|
||||
ANY, // Triggers always
|
||||
VERTEX_CREATE,
|
||||
EDGE_CREATE,
|
||||
CREATE,
|
||||
VERTEX_DELETE,
|
||||
EDGE_DELETE,
|
||||
DELETE,
|
||||
VERTEX_UPDATE,
|
||||
EDGE_UPDATE,
|
||||
UPDATE
|
||||
};
|
||||
|
||||
static_assert(std::is_trivially_copy_constructible_v<VertexAccessor>,
|
||||
"VertexAccessor is not trivially copy constructible, move it where possible and remove this assert");
|
||||
static_assert(std::is_trivially_copy_constructible_v<EdgeAccessor>,
|
||||
"EdgeAccessor is not trivially copy constructible, move it where possible and remove this asssert");
|
||||
|
||||
// Holds the information necessary for triggers
|
||||
class TriggerContext {
|
||||
public:
|
||||
TriggerContext() = default;
|
||||
TriggerContext(std::vector<detail::CreatedObject<VertexAccessor>> created_vertices,
|
||||
std::vector<detail::DeletedObject<VertexAccessor>> deleted_vertices,
|
||||
std::vector<detail::SetObjectProperty<VertexAccessor>> set_vertex_properties,
|
||||
std::vector<detail::RemovedObjectProperty<VertexAccessor>> removed_vertex_properties,
|
||||
std::vector<detail::SetVertexLabel> set_vertex_labels,
|
||||
std::vector<detail::RemovedVertexLabel> removed_vertex_labels,
|
||||
std::vector<detail::CreatedObject<EdgeAccessor>> created_edges,
|
||||
std::vector<detail::DeletedObject<EdgeAccessor>> deleted_edges,
|
||||
std::vector<detail::SetObjectProperty<EdgeAccessor>> set_edge_properties,
|
||||
std::vector<detail::RemovedObjectProperty<EdgeAccessor>> removed_edge_properties)
|
||||
: created_vertices_{std::move(created_vertices)},
|
||||
deleted_vertices_{std::move(deleted_vertices)},
|
||||
set_vertex_properties_{std::move(set_vertex_properties)},
|
||||
removed_vertex_properties_{std::move(removed_vertex_properties)},
|
||||
set_vertex_labels_{std::move(set_vertex_labels)},
|
||||
removed_vertex_labels_{std::move(removed_vertex_labels)},
|
||||
created_edges_{std::move(created_edges)},
|
||||
deleted_edges_{std::move(deleted_edges)},
|
||||
set_edge_properties_{std::move(set_edge_properties)},
|
||||
removed_edge_properties_{std::move(removed_edge_properties)} {}
|
||||
TriggerContext(const TriggerContext &) = default;
|
||||
TriggerContext(TriggerContext &&) = default;
|
||||
TriggerContext &operator=(const TriggerContext &) = default;
|
||||
TriggerContext &operator=(TriggerContext &&) = default;
|
||||
|
||||
// Adapt the TriggerContext object inplace for a different DbAccessor
|
||||
// (each derived accessor, e.g. VertexAccessor, gets adapted
|
||||
// to the sent DbAccessor so they can be used safely)
|
||||
void AdaptForAccessor(DbAccessor *accessor);
|
||||
|
||||
// Get TypedValue for the identifier defined with tag
|
||||
TypedValue GetTypedValue(TriggerIdentifierTag tag, DbAccessor *dba) const;
|
||||
bool ShouldEventTrigger(TriggerEventType) const;
|
||||
|
||||
private:
|
||||
std::vector<detail::CreatedObject<VertexAccessor>> created_vertices_;
|
||||
std::vector<detail::DeletedObject<VertexAccessor>> deleted_vertices_;
|
||||
std::vector<detail::SetObjectProperty<VertexAccessor>> set_vertex_properties_;
|
||||
std::vector<detail::RemovedObjectProperty<VertexAccessor>> removed_vertex_properties_;
|
||||
std::vector<detail::SetVertexLabel> set_vertex_labels_;
|
||||
std::vector<detail::RemovedVertexLabel> removed_vertex_labels_;
|
||||
|
||||
std::vector<detail::CreatedObject<EdgeAccessor>> created_edges_;
|
||||
std::vector<detail::DeletedObject<EdgeAccessor>> deleted_edges_;
|
||||
std::vector<detail::SetObjectProperty<EdgeAccessor>> set_edge_properties_;
|
||||
std::vector<detail::RemovedObjectProperty<EdgeAccessor>> removed_edge_properties_;
|
||||
};
|
||||
|
||||
// Collects the information necessary for triggers during a single transaction run.
|
||||
class TriggerContextCollector {
|
||||
public:
|
||||
template <detail::ObjectAccessor TAccessor>
|
||||
struct Registry {
|
||||
std::vector<CreatedObject<TAccessor>> created_objects_;
|
||||
std::vector<DeletedObject<TAccessor>> deleted_objects_;
|
||||
std::vector<SetObjectProperty<TAccessor>> set_object_properties_;
|
||||
std::vector<RemovedObjectProperty<TAccessor>> removed_object_properties_;
|
||||
void RegisterCreatedObject(const TAccessor &created_object) {
|
||||
GetRegistry<TAccessor>().created_objects_.emplace(created_object.Gid(), detail::CreatedObject{created_object});
|
||||
}
|
||||
|
||||
template <detail::ObjectAccessor TAccessor>
|
||||
void RegisterDeletedObject(const TAccessor &deleted_object) {
|
||||
auto ®istry = GetRegistry<TAccessor>();
|
||||
if (registry.created_objects_.count(deleted_object.Gid())) {
|
||||
return;
|
||||
}
|
||||
|
||||
registry.deleted_objects_.emplace_back(deleted_object);
|
||||
}
|
||||
|
||||
template <detail::ObjectAccessor TAccessor>
|
||||
void RegisterSetObjectProperty(const TAccessor &object, const storage::PropertyId key, TypedValue old_value,
|
||||
TypedValue new_value) {
|
||||
auto ®istry = GetRegistry<TAccessor>();
|
||||
if (registry.created_objects_.count(object.Gid())) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (auto it = registry.property_changes_.find({object, key}); it != registry.property_changes_.end()) {
|
||||
it->second.new_value = std::move(new_value);
|
||||
return;
|
||||
}
|
||||
|
||||
registry.property_changes_.emplace(std::make_pair(object, key),
|
||||
PropertyChangeInfo{std::move(old_value), std::move(new_value)});
|
||||
}
|
||||
|
||||
template <detail::ObjectAccessor TAccessor>
|
||||
void RegisterRemovedObjectProperty(const TAccessor &object, const storage::PropertyId key, TypedValue old_value) {
|
||||
// property is already removed
|
||||
if (old_value.IsNull()) {
|
||||
return;
|
||||
}
|
||||
|
||||
RegisterSetObjectProperty(object, key, std::move(old_value), TypedValue());
|
||||
}
|
||||
|
||||
void RegisterSetVertexLabel(const VertexAccessor &vertex, storage::LabelId label_id);
|
||||
void RegisterRemovedVertexLabel(const VertexAccessor &vertex, storage::LabelId label_id);
|
||||
[[nodiscard]] TriggerContext TransformToTriggerContext() &&;
|
||||
|
||||
private:
|
||||
struct HashPair {
|
||||
template <detail::ObjectAccessor TAccessor, typename T2>
|
||||
size_t operator()(const std::pair<TAccessor, T2> &pair) const {
|
||||
using GidType = decltype(std::declval<TAccessor>().Gid());
|
||||
return utils::HashCombine<GidType, T2>{}(pair.first.Gid(), pair.second);
|
||||
}
|
||||
};
|
||||
|
||||
Registry<VertexAccessor> vertex_registry_;
|
||||
Registry<EdgeAccessor> edge_registry_;
|
||||
struct PropertyChangeInfo {
|
||||
TypedValue old_value;
|
||||
TypedValue new_value;
|
||||
};
|
||||
|
||||
template <detail::ObjectAccessor TAccessor>
|
||||
using PropertyChangesMap =
|
||||
std::unordered_map<std::pair<TAccessor, storage::PropertyId>, PropertyChangeInfo, HashPair>;
|
||||
|
||||
template <detail::ObjectAccessor TAccessor>
|
||||
using PropertyChangesLists = std::pair<std::vector<detail::SetObjectProperty<TAccessor>>,
|
||||
std::vector<detail::RemovedObjectProperty<TAccessor>>>;
|
||||
|
||||
template <detail::ObjectAccessor TAccessor>
|
||||
struct Registry {
|
||||
using ChangesSummary =
|
||||
std::tuple<std::vector<detail::CreatedObject<TAccessor>>, std::vector<detail::DeletedObject<TAccessor>>,
|
||||
std::vector<detail::SetObjectProperty<TAccessor>>,
|
||||
std::vector<detail::RemovedObjectProperty<TAccessor>>>;
|
||||
|
||||
[[nodiscard]] static PropertyChangesLists<TAccessor> PropertyMapToList(PropertyChangesMap<TAccessor> &&map) {
|
||||
std::vector<detail::SetObjectProperty<TAccessor>> set_object_properties;
|
||||
std::vector<detail::RemovedObjectProperty<TAccessor>> removed_object_properties;
|
||||
|
||||
for (auto it = map.begin(); it != map.end(); it = map.erase(it)) {
|
||||
const auto &[key, property_change_info] = *it;
|
||||
if (property_change_info.old_value.IsNull() && property_change_info.new_value.IsNull()) {
|
||||
// no change happened on the transaction level
|
||||
continue;
|
||||
}
|
||||
|
||||
if (const auto is_equal = property_change_info.old_value == property_change_info.new_value;
|
||||
is_equal.IsBool() && is_equal.ValueBool()) {
|
||||
// no change happened on the transaction level
|
||||
continue;
|
||||
}
|
||||
|
||||
if (property_change_info.new_value.IsNull()) {
|
||||
removed_object_properties.emplace_back(key.first, key.second /* property_id */,
|
||||
std::move(property_change_info.old_value));
|
||||
} else {
|
||||
set_object_properties.emplace_back(key.first, key.second, std::move(property_change_info.old_value),
|
||||
std::move(property_change_info.new_value));
|
||||
}
|
||||
}
|
||||
|
||||
return PropertyChangesLists<TAccessor>{std::move(set_object_properties), std::move(removed_object_properties)};
|
||||
}
|
||||
|
||||
[[nodiscard]] ChangesSummary Summarize() && {
|
||||
auto [set_object_properties, removed_object_properties] = PropertyMapToList(std::move(property_changes_));
|
||||
std::vector<detail::CreatedObject<TAccessor>> created_objects_vec;
|
||||
created_objects_vec.reserve(created_objects_.size());
|
||||
std::transform(created_objects_.begin(), created_objects_.end(), std::back_inserter(created_objects_vec),
|
||||
[](const auto &gid_and_created_object) { return gid_and_created_object.second; });
|
||||
created_objects_.clear();
|
||||
|
||||
return {std::move(created_objects_vec), std::move(deleted_objects_), std::move(set_object_properties),
|
||||
std::move(removed_object_properties)};
|
||||
}
|
||||
|
||||
std::unordered_map<storage::Gid, detail::CreatedObject<TAccessor>> created_objects_;
|
||||
std::vector<detail::DeletedObject<TAccessor>> deleted_objects_;
|
||||
// During the transaction, a single property on a single object could be changed multiple times.
|
||||
// We want to register only the global change, at the end of the transaction. The change consists of
|
||||
// the value before the transaction start, and the latest value assigned throughout the transaction.
|
||||
PropertyChangesMap<TAccessor> property_changes_;
|
||||
};
|
||||
|
||||
template <detail::ObjectAccessor TAccessor>
|
||||
Registry<TAccessor> &GetRegistry() {
|
||||
@ -190,13 +336,29 @@ struct TriggerContext {
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<SetVertexLabel> set_vertex_labels_;
|
||||
std::vector<RemovedVertexLabel> removed_vertex_labels_;
|
||||
using LabelChangesMap = std::unordered_map<std::pair<VertexAccessor, storage::LabelId>, int8_t, HashPair>;
|
||||
using LabelChangesLists = std::pair<std::vector<detail::SetVertexLabel>, std::vector<detail::RemovedVertexLabel>>;
|
||||
|
||||
enum class LabelChange : int8_t { REMOVE = -1, ADD = 1 };
|
||||
|
||||
static int8_t LabelChangeToInt(LabelChange change);
|
||||
|
||||
[[nodiscard]] static LabelChangesLists LabelMapToList(LabelChangesMap &&label_changes);
|
||||
|
||||
void UpdateLabelMap(VertexAccessor vertex, storage::LabelId label_id, LabelChange change);
|
||||
|
||||
Registry<VertexAccessor> vertex_registry_;
|
||||
Registry<EdgeAccessor> edge_registry_;
|
||||
// During the transaction, a single label on a single vertex could be added and removed multiple times.
|
||||
// We want to register only the global change, at the end of the transaction. The change consists of
|
||||
// the state of the label before the transaction start, and the latest state assigned throughout the transaction.
|
||||
LabelChangesMap label_changes_;
|
||||
};
|
||||
|
||||
struct Trigger {
|
||||
explicit Trigger(std::string name, const std::string &query, utils::SkipList<QueryCacheEntry> *query_cache,
|
||||
DbAccessor *db_accessor, utils::SpinLock *antlr_lock);
|
||||
DbAccessor *db_accessor, utils::SpinLock *antlr_lock,
|
||||
TriggerEventType event_type = TriggerEventType::ANY);
|
||||
|
||||
void Execute(DbAccessor *dba, utils::MonotonicBufferResource *execution_memory, double tsc_frequency,
|
||||
double max_execution_time_sec, std::atomic<bool> *is_shutting_down, const TriggerContext &context) const;
|
||||
@ -212,7 +374,7 @@ struct Trigger {
|
||||
|
||||
private:
|
||||
struct TriggerPlan {
|
||||
using IdentifierInfo = std::pair<Identifier, trigger::IdentifierTag>;
|
||||
using IdentifierInfo = std::pair<Identifier, TriggerIdentifierTag>;
|
||||
|
||||
explicit TriggerPlan(std::unique_ptr<LogicalPlan> logical_plan, std::vector<IdentifierInfo> identifiers);
|
||||
|
||||
@ -224,6 +386,8 @@ struct Trigger {
|
||||
std::string name_;
|
||||
ParsedQuery parsed_statements_;
|
||||
|
||||
TriggerEventType event_type_;
|
||||
|
||||
mutable utils::SpinLock plan_lock_;
|
||||
mutable std::shared_ptr<TriggerPlan> trigger_plan_;
|
||||
};
|
||||
|
@ -95,6 +95,9 @@ target_link_libraries(${test_prefix}query_plan_v2_create_set_remove_delete mg-qu
|
||||
add_unit_test(query_pretty_print.cpp)
|
||||
target_link_libraries(${test_prefix}query_pretty_print mg-query)
|
||||
|
||||
add_unit_test(query_trigger.cpp)
|
||||
target_link_libraries(${test_prefix}query_trigger mg-query)
|
||||
|
||||
# Test query/procedure
|
||||
add_unit_test(query_procedure_mgp_type.cpp)
|
||||
target_link_libraries(${test_prefix}query_procedure_mgp_type mg-query)
|
||||
|
553
tests/unit/query_trigger.cpp
Normal file
553
tests/unit/query_trigger.cpp
Normal file
@ -0,0 +1,553 @@
|
||||
#include <gtest/gtest.h>
|
||||
#include <filesystem>
|
||||
|
||||
#include "query/db_accessor.hpp"
|
||||
#include "query/interpreter.hpp"
|
||||
#include "query/trigger.hpp"
|
||||
#include "query/typed_value.hpp"
|
||||
#include "utils/memory.hpp"
|
||||
|
||||
class TriggerContextTest : public ::testing::Test {
|
||||
public:
|
||||
void SetUp() override { db.emplace(); }
|
||||
|
||||
void TearDown() override {
|
||||
accessors.clear();
|
||||
db.reset();
|
||||
}
|
||||
|
||||
storage::Storage::Accessor &StartTransaction() {
|
||||
accessors.push_back(db->Access());
|
||||
return accessors.back();
|
||||
}
|
||||
|
||||
protected:
|
||||
std::optional<storage::Storage> db;
|
||||
std::list<storage::Storage::Accessor> accessors;
|
||||
};
|
||||
|
||||
namespace {
|
||||
void CheckTypedValueSize(const query::TriggerContext &trigger_context, const query::TriggerIdentifierTag tag,
|
||||
const size_t expected_size, query::DbAccessor &dba) {
|
||||
auto typed_values = trigger_context.GetTypedValue(tag, &dba);
|
||||
ASSERT_TRUE(typed_values.IsList());
|
||||
ASSERT_EQ(typed_values.ValueList().size(), expected_size);
|
||||
};
|
||||
|
||||
void CheckLabelMap(const query::TriggerContext &trigger_context, const query::TriggerIdentifierTag tag,
|
||||
const size_t expected, query::DbAccessor &dba) {
|
||||
auto typed_values = trigger_context.GetTypedValue(tag, &dba);
|
||||
ASSERT_TRUE(typed_values.IsMap());
|
||||
auto &typed_values_map = typed_values.ValueMap();
|
||||
size_t value_count = 0;
|
||||
for (const auto &[label, values] : typed_values_map) {
|
||||
ASSERT_TRUE(values.IsList());
|
||||
value_count += values.ValueList().size();
|
||||
}
|
||||
ASSERT_EQ(value_count, expected);
|
||||
};
|
||||
} // namespace
|
||||
|
||||
// Ensure that TriggerContext returns only valid objects.
|
||||
// Returned TypedValue should always contain only objects
|
||||
// that exist (unless its explicitly created for the deleted object)
|
||||
TEST_F(TriggerContextTest, ValidObjectsTest) {
|
||||
query::TriggerContext trigger_context;
|
||||
query::TriggerContextCollector trigger_context_collector;
|
||||
|
||||
size_t vertex_count = 0;
|
||||
size_t edge_count = 0;
|
||||
{
|
||||
query::DbAccessor dba{&StartTransaction()};
|
||||
|
||||
auto create_vertex = [&] {
|
||||
auto created_vertex = dba.InsertVertex();
|
||||
trigger_context_collector.RegisterCreatedObject(created_vertex);
|
||||
++vertex_count;
|
||||
return created_vertex;
|
||||
};
|
||||
|
||||
// Create vertices and add them to the trigger context as created
|
||||
std::vector<query::VertexAccessor> vertices;
|
||||
for (size_t i = 0; i < 4; ++i) {
|
||||
vertices.push_back(create_vertex());
|
||||
}
|
||||
|
||||
auto create_edge = [&](auto &from, auto &to) {
|
||||
auto maybe_edge = dba.InsertEdge(&from, &to, dba.NameToEdgeType("EDGE"));
|
||||
ASSERT_FALSE(maybe_edge.HasError());
|
||||
trigger_context_collector.RegisterCreatedObject(*maybe_edge);
|
||||
++edge_count;
|
||||
};
|
||||
|
||||
// Create edges and add them to the trigger context as created
|
||||
create_edge(vertices[0], vertices[1]);
|
||||
create_edge(vertices[1], vertices[2]);
|
||||
create_edge(vertices[2], vertices[3]);
|
||||
|
||||
dba.AdvanceCommand();
|
||||
trigger_context = std::move(trigger_context_collector).TransformToTriggerContext();
|
||||
trigger_context_collector = query::TriggerContextCollector{};
|
||||
|
||||
// Should have all the created objects
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::CREATED_VERTICES, vertex_count, dba);
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::CREATED_EDGES, edge_count, dba);
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::CREATED_OBJECTS, vertex_count + edge_count, dba);
|
||||
|
||||
// we delete one of the vertices and edges in the same transaction
|
||||
ASSERT_TRUE(dba.DetachRemoveVertex(&vertices[0]).HasValue());
|
||||
--vertex_count;
|
||||
--edge_count;
|
||||
|
||||
dba.AdvanceCommand();
|
||||
|
||||
// Should have one less created object for vertex and edge
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::CREATED_VERTICES, vertex_count, dba);
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::CREATED_EDGES, edge_count, dba);
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::CREATED_OBJECTS, vertex_count + edge_count, dba);
|
||||
|
||||
ASSERT_FALSE(dba.Commit().HasError());
|
||||
}
|
||||
|
||||
{
|
||||
query::DbAccessor dba{&StartTransaction()};
|
||||
trigger_context.AdaptForAccessor(&dba);
|
||||
|
||||
// Should have one less created object for vertex and edge
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::CREATED_VERTICES, vertex_count, dba);
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::CREATED_EDGES, edge_count, dba);
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::CREATED_OBJECTS, vertex_count + edge_count, dba);
|
||||
}
|
||||
|
||||
size_t deleted_vertex_count = 0;
|
||||
size_t deleted_edge_count = 0;
|
||||
{
|
||||
query::DbAccessor dba{&StartTransaction()};
|
||||
|
||||
// register each type of change for each object
|
||||
{
|
||||
auto vertices = dba.Vertices(storage::View::OLD);
|
||||
for (auto vertex : vertices) {
|
||||
trigger_context_collector.RegisterSetObjectProperty(vertex, dba.NameToProperty("PROPERTY1"),
|
||||
query::TypedValue("Value"), query::TypedValue("ValueNew"));
|
||||
trigger_context_collector.RegisterRemovedObjectProperty(vertex, dba.NameToProperty("PROPERTY2"),
|
||||
query::TypedValue("Value"));
|
||||
trigger_context_collector.RegisterSetVertexLabel(vertex, dba.NameToLabel("LABEL1"));
|
||||
trigger_context_collector.RegisterRemovedVertexLabel(vertex, dba.NameToLabel("LABEL2"));
|
||||
|
||||
auto out_edges = vertex.OutEdges(storage::View::OLD);
|
||||
ASSERT_TRUE(out_edges.HasValue());
|
||||
|
||||
for (auto edge : *out_edges) {
|
||||
trigger_context_collector.RegisterSetObjectProperty(
|
||||
edge, dba.NameToProperty("PROPERTY1"), query::TypedValue("Value"), query::TypedValue("ValueNew"));
|
||||
trigger_context_collector.RegisterRemovedObjectProperty(edge, dba.NameToProperty("PROPERTY2"),
|
||||
query::TypedValue("Value"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Delete the first vertex with its edge and register the deleted object
|
||||
{
|
||||
auto vertices = dba.Vertices(storage::View::OLD);
|
||||
for (auto vertex : vertices) {
|
||||
const auto maybe_values = dba.DetachRemoveVertex(&vertex);
|
||||
ASSERT_TRUE(maybe_values.HasValue());
|
||||
ASSERT_TRUE(maybe_values.GetValue());
|
||||
const auto &[deleted_vertex, deleted_edges] = *maybe_values.GetValue();
|
||||
|
||||
trigger_context_collector.RegisterDeletedObject(deleted_vertex);
|
||||
++deleted_vertex_count;
|
||||
--vertex_count;
|
||||
for (const auto &edge : deleted_edges) {
|
||||
trigger_context_collector.RegisterDeletedObject(edge);
|
||||
++deleted_edge_count;
|
||||
--edge_count;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
dba.AdvanceCommand();
|
||||
ASSERT_FALSE(dba.Commit().HasError());
|
||||
|
||||
trigger_context = std::move(trigger_context_collector).TransformToTriggerContext();
|
||||
trigger_context_collector = query::TriggerContextCollector{};
|
||||
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::SET_VERTEX_PROPERTIES, vertex_count, dba);
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::SET_EDGE_PROPERTIES, edge_count, dba);
|
||||
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::REMOVED_VERTEX_PROPERTIES, vertex_count, dba);
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::REMOVED_EDGE_PROPERTIES, edge_count, dba);
|
||||
|
||||
CheckLabelMap(trigger_context, query::TriggerIdentifierTag::SET_VERTEX_LABELS, vertex_count, dba);
|
||||
CheckLabelMap(trigger_context, query::TriggerIdentifierTag::REMOVED_VERTEX_LABELS, vertex_count, dba);
|
||||
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::UPDATED_VERTICES, 4 * vertex_count, dba);
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::UPDATED_EDGES, 2 * edge_count, dba);
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::UPDATED_OBJECTS,
|
||||
4 * vertex_count + 2 * edge_count, dba);
|
||||
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::DELETED_VERTICES, deleted_vertex_count, dba);
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::DELETED_EDGES, deleted_edge_count, dba);
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::DELETED_OBJECTS,
|
||||
deleted_vertex_count + deleted_edge_count, dba);
|
||||
}
|
||||
|
||||
// delete a single vertex with its edges, it should reduce number of typed values returned by the trigger context
|
||||
// for each update event.
|
||||
// TypedValue of the deleted objects stay the same as they're bound to the transaction which deleted them.
|
||||
{
|
||||
query::DbAccessor dba{&StartTransaction()};
|
||||
trigger_context.AdaptForAccessor(&dba);
|
||||
|
||||
auto vertices = dba.Vertices(storage::View::OLD);
|
||||
for (auto vertex : vertices) {
|
||||
ASSERT_TRUE(dba.DetachRemoveVertex(&vertex).HasValue());
|
||||
break;
|
||||
}
|
||||
--vertex_count;
|
||||
--edge_count;
|
||||
|
||||
ASSERT_FALSE(dba.Commit().HasError());
|
||||
}
|
||||
|
||||
{
|
||||
query::DbAccessor dba{&StartTransaction()};
|
||||
trigger_context.AdaptForAccessor(&dba);
|
||||
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::SET_VERTEX_PROPERTIES, vertex_count, dba);
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::SET_EDGE_PROPERTIES, edge_count, dba);
|
||||
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::REMOVED_VERTEX_PROPERTIES, vertex_count, dba);
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::REMOVED_EDGE_PROPERTIES, edge_count, dba);
|
||||
|
||||
CheckLabelMap(trigger_context, query::TriggerIdentifierTag::SET_VERTEX_LABELS, vertex_count, dba);
|
||||
CheckLabelMap(trigger_context, query::TriggerIdentifierTag::REMOVED_VERTEX_LABELS, vertex_count, dba);
|
||||
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::UPDATED_VERTICES, 4 * vertex_count, dba);
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::UPDATED_EDGES, 2 * edge_count, dba);
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::UPDATED_OBJECTS,
|
||||
4 * vertex_count + 2 * edge_count, dba);
|
||||
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::DELETED_VERTICES, deleted_vertex_count, dba);
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::DELETED_EDGES, deleted_edge_count, dba);
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::DELETED_OBJECTS,
|
||||
deleted_vertex_count + deleted_edge_count, dba);
|
||||
}
|
||||
}
|
||||
|
||||
// If the trigger context registered a created object, each future event on the same object will be ignored.
|
||||
// Binding the trigger context to transaction will mean that creating and updating an object in the same transaction
|
||||
// will return only the CREATE event.
|
||||
TEST_F(TriggerContextTest, ReturnCreateOnlyEvent) {
|
||||
query::TriggerContextCollector trigger_context_collector;
|
||||
|
||||
query::DbAccessor dba{&StartTransaction()};
|
||||
|
||||
auto create_vertex = [&] {
|
||||
auto vertex = dba.InsertVertex();
|
||||
trigger_context_collector.RegisterCreatedObject(vertex);
|
||||
trigger_context_collector.RegisterSetObjectProperty(vertex, dba.NameToProperty("PROPERTY1"),
|
||||
query::TypedValue("Value"), query::TypedValue("ValueNew"));
|
||||
trigger_context_collector.RegisterRemovedObjectProperty(vertex, dba.NameToProperty("PROPERTY2"),
|
||||
query::TypedValue("Value"));
|
||||
trigger_context_collector.RegisterSetVertexLabel(vertex, dba.NameToLabel("LABEL1"));
|
||||
trigger_context_collector.RegisterRemovedVertexLabel(vertex, dba.NameToLabel("LABEL2"));
|
||||
return vertex;
|
||||
};
|
||||
|
||||
auto v1 = create_vertex();
|
||||
auto v2 = create_vertex();
|
||||
auto maybe_edge = dba.InsertEdge(&v1, &v2, dba.NameToEdgeType("EDGE"));
|
||||
ASSERT_FALSE(maybe_edge.HasError());
|
||||
trigger_context_collector.RegisterCreatedObject(*maybe_edge);
|
||||
trigger_context_collector.RegisterSetObjectProperty(*maybe_edge, dba.NameToProperty("PROPERTY1"),
|
||||
query::TypedValue("Value"), query::TypedValue("ValueNew"));
|
||||
trigger_context_collector.RegisterRemovedObjectProperty(*maybe_edge, dba.NameToProperty("PROPERTY2"),
|
||||
query::TypedValue("Value"));
|
||||
|
||||
dba.AdvanceCommand();
|
||||
|
||||
const auto trigger_context = std::move(trigger_context_collector).TransformToTriggerContext();
|
||||
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::CREATED_VERTICES, 2, dba);
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::CREATED_EDGES, 1, dba);
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::CREATED_OBJECTS, 3, dba);
|
||||
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::SET_VERTEX_PROPERTIES, 0, dba);
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::SET_EDGE_PROPERTIES, 0, dba);
|
||||
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::REMOVED_VERTEX_PROPERTIES, 0, dba);
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::REMOVED_EDGE_PROPERTIES, 0, dba);
|
||||
|
||||
CheckLabelMap(trigger_context, query::TriggerIdentifierTag::SET_VERTEX_LABELS, 0, dba);
|
||||
CheckLabelMap(trigger_context, query::TriggerIdentifierTag::REMOVED_VERTEX_LABELS, 0, dba);
|
||||
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::UPDATED_VERTICES, 0, dba);
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::UPDATED_EDGES, 0, dba);
|
||||
CheckTypedValueSize(trigger_context, query::TriggerIdentifierTag::UPDATED_OBJECTS, 0, dba);
|
||||
}
|
||||
|
||||
namespace {
|
||||
void EXPECT_PROP_TRUE(const query::TypedValue &a) {
|
||||
EXPECT_TRUE(a.type() == query::TypedValue::Type::Bool && a.ValueBool());
|
||||
}
|
||||
|
||||
void EXPECT_PROP_EQ(const query::TypedValue &a, const query::TypedValue &b) { EXPECT_PROP_TRUE(a == b); }
|
||||
} // namespace
|
||||
|
||||
// During a transaction, same property for the same object can change multiple times. TriggerContext should ensure
|
||||
// that only the change on the global value is returned (value before the transaction + latest value after the
|
||||
// transaction) everything inbetween should be ignored.
|
||||
TEST_F(TriggerContextTest, GlobalPropertyChange) {
|
||||
query::DbAccessor dba{&StartTransaction()};
|
||||
|
||||
auto v = dba.InsertVertex();
|
||||
dba.AdvanceCommand();
|
||||
|
||||
{
|
||||
SPDLOG_DEBUG("SET -> SET");
|
||||
query::TriggerContextCollector trigger_context_collector;
|
||||
trigger_context_collector.RegisterSetObjectProperty(v, dba.NameToProperty("PROPERTY"), query::TypedValue("Value"),
|
||||
query::TypedValue("ValueNew"));
|
||||
trigger_context_collector.RegisterSetObjectProperty(v, dba.NameToProperty("PROPERTY"),
|
||||
query::TypedValue("ValueNew"), query::TypedValue("ValueNewer"));
|
||||
const auto trigger_context = std::move(trigger_context_collector).TransformToTriggerContext();
|
||||
auto updated_vertices = trigger_context.GetTypedValue(query::TriggerIdentifierTag::UPDATED_VERTICES, &dba);
|
||||
ASSERT_TRUE(updated_vertices.IsList());
|
||||
auto &updated_vertices_list = updated_vertices.ValueList();
|
||||
ASSERT_EQ(updated_vertices_list.size(), 1);
|
||||
auto &update = updated_vertices_list[0];
|
||||
ASSERT_TRUE(update.IsMap());
|
||||
EXPECT_PROP_EQ(update, query::TypedValue{std::map<std::string, query::TypedValue>{
|
||||
{"event_type", query::TypedValue{"set_vertex_property"}},
|
||||
{"vertex", query::TypedValue{v}},
|
||||
{"key", query::TypedValue{"PROPERTY"}},
|
||||
{"old", query::TypedValue{"Value"}},
|
||||
{"new", query::TypedValue{"ValueNewer"}}}});
|
||||
}
|
||||
|
||||
{
|
||||
SPDLOG_DEBUG("SET -> REMOVE");
|
||||
query::TriggerContextCollector trigger_context_collector;
|
||||
trigger_context_collector.RegisterSetObjectProperty(v, dba.NameToProperty("PROPERTY"), query::TypedValue("Value"),
|
||||
query::TypedValue("ValueNew"));
|
||||
trigger_context_collector.RegisterRemovedObjectProperty(v, dba.NameToProperty("PROPERTY"),
|
||||
query::TypedValue("ValueNew"));
|
||||
const auto trigger_context = std::move(trigger_context_collector).TransformToTriggerContext();
|
||||
auto updated_vertices = trigger_context.GetTypedValue(query::TriggerIdentifierTag::UPDATED_VERTICES, &dba);
|
||||
ASSERT_TRUE(updated_vertices.IsList());
|
||||
auto &updated_vertices_list = updated_vertices.ValueList();
|
||||
ASSERT_EQ(updated_vertices_list.size(), 1);
|
||||
auto &update = updated_vertices_list[0];
|
||||
ASSERT_TRUE(update.IsMap());
|
||||
EXPECT_PROP_EQ(update, query::TypedValue{std::map<std::string, query::TypedValue>{
|
||||
{"event_type", query::TypedValue{"removed_vertex_property"}},
|
||||
{"vertex", query::TypedValue{v}},
|
||||
{"key", query::TypedValue{"PROPERTY"}},
|
||||
{"old", query::TypedValue{"Value"}}}});
|
||||
}
|
||||
|
||||
{
|
||||
SPDLOG_DEBUG("REMOVE -> SET");
|
||||
query::TriggerContextCollector trigger_context_collector;
|
||||
trigger_context_collector.RegisterRemovedObjectProperty(v, dba.NameToProperty("PROPERTY"),
|
||||
query::TypedValue("Value"));
|
||||
trigger_context_collector.RegisterSetObjectProperty(v, dba.NameToProperty("PROPERTY"), query::TypedValue(),
|
||||
query::TypedValue("ValueNew"));
|
||||
const auto trigger_context = std::move(trigger_context_collector).TransformToTriggerContext();
|
||||
auto updated_vertices = trigger_context.GetTypedValue(query::TriggerIdentifierTag::UPDATED_VERTICES, &dba);
|
||||
ASSERT_TRUE(updated_vertices.IsList());
|
||||
auto &updated_vertices_list = updated_vertices.ValueList();
|
||||
ASSERT_EQ(updated_vertices_list.size(), 1);
|
||||
auto &update = updated_vertices_list[0];
|
||||
ASSERT_TRUE(update.IsMap());
|
||||
EXPECT_PROP_EQ(update, query::TypedValue{std::map<std::string, query::TypedValue>{
|
||||
{"event_type", query::TypedValue{"set_vertex_property"}},
|
||||
{"vertex", query::TypedValue{v}},
|
||||
{"key", query::TypedValue{"PROPERTY"}},
|
||||
{"old", query::TypedValue{"Value"}},
|
||||
{"new", query::TypedValue{"ValueNew"}}}});
|
||||
}
|
||||
|
||||
{
|
||||
SPDLOG_DEBUG("REMOVE -> REMOVE");
|
||||
query::TriggerContextCollector trigger_context_collector;
|
||||
trigger_context_collector.RegisterRemovedObjectProperty(v, dba.NameToProperty("PROPERTY"),
|
||||
query::TypedValue("Value"));
|
||||
trigger_context_collector.RegisterRemovedObjectProperty(v, dba.NameToProperty("PROPERTY"), query::TypedValue());
|
||||
const auto trigger_context = std::move(trigger_context_collector).TransformToTriggerContext();
|
||||
auto updated_vertices = trigger_context.GetTypedValue(query::TriggerIdentifierTag::UPDATED_VERTICES, &dba);
|
||||
ASSERT_TRUE(updated_vertices.IsList());
|
||||
auto &updated_vertices_list = updated_vertices.ValueList();
|
||||
ASSERT_EQ(updated_vertices_list.size(), 1);
|
||||
auto &update = updated_vertices_list[0];
|
||||
ASSERT_TRUE(update.IsMap());
|
||||
EXPECT_PROP_EQ(update, query::TypedValue{std::map<std::string, query::TypedValue>{
|
||||
{"event_type", query::TypedValue{"removed_vertex_property"}},
|
||||
{"vertex", query::TypedValue{v}},
|
||||
{"key", query::TypedValue{"PROPERTY"}},
|
||||
{"old", query::TypedValue{"Value"}}}});
|
||||
}
|
||||
|
||||
{
|
||||
SPDLOG_DEBUG("SET -> SET (no change on transaction level)");
|
||||
query::TriggerContextCollector trigger_context_collector;
|
||||
trigger_context_collector.RegisterSetObjectProperty(v, dba.NameToProperty("PROPERTY"), query::TypedValue("Value"),
|
||||
query::TypedValue("ValueNew"));
|
||||
trigger_context_collector.RegisterSetObjectProperty(v, dba.NameToProperty("PROPERTY"),
|
||||
query::TypedValue("ValueNew"), query::TypedValue("Value"));
|
||||
const auto trigger_context = std::move(trigger_context_collector).TransformToTriggerContext();
|
||||
auto updated_vertices = trigger_context.GetTypedValue(query::TriggerIdentifierTag::UPDATED_VERTICES, &dba);
|
||||
ASSERT_TRUE(updated_vertices.IsList());
|
||||
auto &updated_vertices_list = updated_vertices.ValueList();
|
||||
ASSERT_EQ(updated_vertices_list.size(), 0);
|
||||
}
|
||||
|
||||
{
|
||||
SPDLOG_DEBUG("SET -> REMOVE (no change on transaction level)");
|
||||
query::TriggerContextCollector trigger_context_collector;
|
||||
trigger_context_collector.RegisterSetObjectProperty(v, dba.NameToProperty("PROPERTY"), query::TypedValue(),
|
||||
query::TypedValue("ValueNew"));
|
||||
trigger_context_collector.RegisterRemovedObjectProperty(v, dba.NameToProperty("PROPERTY"),
|
||||
query::TypedValue("ValueNew"));
|
||||
const auto trigger_context = std::move(trigger_context_collector).TransformToTriggerContext();
|
||||
auto updated_vertices = trigger_context.GetTypedValue(query::TriggerIdentifierTag::UPDATED_VERTICES, &dba);
|
||||
ASSERT_TRUE(updated_vertices.IsList());
|
||||
auto &updated_vertices_list = updated_vertices.ValueList();
|
||||
ASSERT_EQ(updated_vertices_list.size(), 0);
|
||||
}
|
||||
|
||||
{
|
||||
SPDLOG_DEBUG("REMOVE -> SET (no change on transaction level)");
|
||||
query::TriggerContextCollector trigger_context_collector;
|
||||
trigger_context_collector.RegisterRemovedObjectProperty(v, dba.NameToProperty("PROPERTY"),
|
||||
query::TypedValue("Value"));
|
||||
trigger_context_collector.RegisterSetObjectProperty(v, dba.NameToProperty("PROPERTY"), query::TypedValue(),
|
||||
query::TypedValue("Value"));
|
||||
const auto trigger_context = std::move(trigger_context_collector).TransformToTriggerContext();
|
||||
auto updated_vertices = trigger_context.GetTypedValue(query::TriggerIdentifierTag::UPDATED_VERTICES, &dba);
|
||||
ASSERT_TRUE(updated_vertices.IsList());
|
||||
auto &updated_vertices_list = updated_vertices.ValueList();
|
||||
ASSERT_EQ(updated_vertices_list.size(), 0);
|
||||
}
|
||||
|
||||
{
|
||||
SPDLOG_DEBUG("REMOVE -> REMOVE (no change on transaction level)");
|
||||
query::TriggerContextCollector trigger_context_collector;
|
||||
trigger_context_collector.RegisterRemovedObjectProperty(v, dba.NameToProperty("PROPERTY"), query::TypedValue());
|
||||
trigger_context_collector.RegisterRemovedObjectProperty(v, dba.NameToProperty("PROPERTY"), query::TypedValue());
|
||||
const auto trigger_context = std::move(trigger_context_collector).TransformToTriggerContext();
|
||||
auto updated_vertices = trigger_context.GetTypedValue(query::TriggerIdentifierTag::UPDATED_VERTICES, &dba);
|
||||
ASSERT_TRUE(updated_vertices.IsList());
|
||||
auto &updated_vertices_list = updated_vertices.ValueList();
|
||||
ASSERT_EQ(updated_vertices_list.size(), 0);
|
||||
}
|
||||
|
||||
{
|
||||
SPDLOG_DEBUG("SET -> REMOVE -> SET -> REMOVE -> SET");
|
||||
query::TriggerContextCollector trigger_context_collector;
|
||||
trigger_context_collector.RegisterSetObjectProperty(v, dba.NameToProperty("PROPERTY"), query::TypedValue("Value0"),
|
||||
query::TypedValue("Value1"));
|
||||
trigger_context_collector.RegisterRemovedObjectProperty(v, dba.NameToProperty("PROPERTY"),
|
||||
query::TypedValue("Value1"));
|
||||
trigger_context_collector.RegisterSetObjectProperty(v, dba.NameToProperty("PROPERTY"), query::TypedValue(),
|
||||
query::TypedValue("Value2"));
|
||||
trigger_context_collector.RegisterRemovedObjectProperty(v, dba.NameToProperty("PROPERTY"),
|
||||
query::TypedValue("Value2"));
|
||||
trigger_context_collector.RegisterSetObjectProperty(v, dba.NameToProperty("PROPERTY"), query::TypedValue(),
|
||||
query::TypedValue("Value3"));
|
||||
const auto trigger_context = std::move(trigger_context_collector).TransformToTriggerContext();
|
||||
auto updated_vertices = trigger_context.GetTypedValue(query::TriggerIdentifierTag::UPDATED_VERTICES, &dba);
|
||||
ASSERT_TRUE(updated_vertices.IsList());
|
||||
auto &updated_vertices_list = updated_vertices.ValueList();
|
||||
ASSERT_EQ(updated_vertices_list.size(), 1);
|
||||
auto &update = updated_vertices_list[0];
|
||||
ASSERT_TRUE(update.IsMap());
|
||||
EXPECT_PROP_EQ(update, query::TypedValue{std::map<std::string, query::TypedValue>{
|
||||
{"event_type", query::TypedValue{"set_vertex_property"}},
|
||||
{"vertex", query::TypedValue{v}},
|
||||
{"key", query::TypedValue{"PROPERTY"}},
|
||||
{"old", query::TypedValue{"Value0"}},
|
||||
{"new", query::TypedValue{"Value3"}}}});
|
||||
}
|
||||
}
|
||||
|
||||
// Same as above, but for label changes
|
||||
TEST_F(TriggerContextTest, GlobalLabelChange) {
|
||||
query::DbAccessor dba{&StartTransaction()};
|
||||
|
||||
auto v = dba.InsertVertex();
|
||||
dba.AdvanceCommand();
|
||||
|
||||
const auto label_id = dba.NameToLabel("LABEL");
|
||||
// You cannot add the same label multiple times and you cannot remove non existing labels
|
||||
// so REMOVE -> REMOVE and SET -> SET doesn't make sense
|
||||
{
|
||||
SPDLOG_DEBUG("SET -> REMOVE");
|
||||
query::TriggerContextCollector trigger_context_collector;
|
||||
trigger_context_collector.RegisterSetVertexLabel(v, label_id);
|
||||
trigger_context_collector.RegisterRemovedVertexLabel(v, label_id);
|
||||
const auto trigger_context = std::move(trigger_context_collector).TransformToTriggerContext();
|
||||
auto updated_vertices = trigger_context.GetTypedValue(query::TriggerIdentifierTag::UPDATED_VERTICES, &dba);
|
||||
ASSERT_TRUE(updated_vertices.IsList());
|
||||
auto &updated_vertices_list = updated_vertices.ValueList();
|
||||
ASSERT_EQ(updated_vertices_list.size(), 0);
|
||||
}
|
||||
|
||||
{
|
||||
SPDLOG_DEBUG("REMOVE -> SET");
|
||||
query::TriggerContextCollector trigger_context_collector;
|
||||
trigger_context_collector.RegisterRemovedVertexLabel(v, label_id);
|
||||
trigger_context_collector.RegisterSetVertexLabel(v, label_id);
|
||||
const auto trigger_context = std::move(trigger_context_collector).TransformToTriggerContext();
|
||||
auto updated_vertices = trigger_context.GetTypedValue(query::TriggerIdentifierTag::UPDATED_VERTICES, &dba);
|
||||
ASSERT_TRUE(updated_vertices.IsList());
|
||||
auto &updated_vertices_list = updated_vertices.ValueList();
|
||||
ASSERT_EQ(updated_vertices_list.size(), 0);
|
||||
}
|
||||
|
||||
{
|
||||
SPDLOG_DEBUG("SET -> REMOVE -> SET -> REMOVE -> SET");
|
||||
query::TriggerContextCollector trigger_context_collector;
|
||||
trigger_context_collector.RegisterSetVertexLabel(v, label_id);
|
||||
trigger_context_collector.RegisterRemovedVertexLabel(v, label_id);
|
||||
trigger_context_collector.RegisterSetVertexLabel(v, label_id);
|
||||
trigger_context_collector.RegisterRemovedVertexLabel(v, label_id);
|
||||
trigger_context_collector.RegisterSetVertexLabel(v, label_id);
|
||||
const auto trigger_context = std::move(trigger_context_collector).TransformToTriggerContext();
|
||||
auto updated_vertices = trigger_context.GetTypedValue(query::TriggerIdentifierTag::UPDATED_VERTICES, &dba);
|
||||
ASSERT_TRUE(updated_vertices.IsList());
|
||||
auto &updated_vertices_list = updated_vertices.ValueList();
|
||||
ASSERT_EQ(updated_vertices_list.size(), 1);
|
||||
auto &update = updated_vertices_list[0];
|
||||
ASSERT_TRUE(update.IsMap());
|
||||
EXPECT_PROP_EQ(update, query::TypedValue{std::map<std::string, query::TypedValue>{
|
||||
{"event_type", query::TypedValue{"set_vertex_label"}},
|
||||
{"vertex", query::TypedValue{v}},
|
||||
{"label", query::TypedValue{"LABEL"}}}});
|
||||
}
|
||||
|
||||
{
|
||||
SPDLOG_DEBUG("REMOVE -> SET -> REMOVE -> SET -> REMOVE");
|
||||
query::TriggerContextCollector trigger_context_collector;
|
||||
trigger_context_collector.RegisterRemovedVertexLabel(v, label_id);
|
||||
trigger_context_collector.RegisterSetVertexLabel(v, label_id);
|
||||
trigger_context_collector.RegisterRemovedVertexLabel(v, label_id);
|
||||
trigger_context_collector.RegisterSetVertexLabel(v, label_id);
|
||||
trigger_context_collector.RegisterRemovedVertexLabel(v, label_id);
|
||||
const auto trigger_context = std::move(trigger_context_collector).TransformToTriggerContext();
|
||||
auto updated_vertices = trigger_context.GetTypedValue(query::TriggerIdentifierTag::UPDATED_VERTICES, &dba);
|
||||
ASSERT_TRUE(updated_vertices.IsList());
|
||||
auto &updated_vertices_list = updated_vertices.ValueList();
|
||||
ASSERT_EQ(updated_vertices_list.size(), 1);
|
||||
auto &update = updated_vertices_list[0];
|
||||
ASSERT_TRUE(update.IsMap());
|
||||
EXPECT_PROP_EQ(update, query::TypedValue{std::map<std::string, query::TypedValue>{
|
||||
{"event_type", query::TypedValue{"removed_vertex_label"}},
|
||||
{"vertex", query::TypedValue{v}},
|
||||
{"label", query::TypedValue{"LABEL"}}}});
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user