Add support for after commit triggers (#136)
This commit is contained in:
parent
7e44434cdf
commit
7bf40eb5d2
@ -1401,6 +1401,54 @@ void Interpreter::Abort() {
|
|||||||
db_accessor_ = std::nullopt;
|
db_accessor_ = std::nullopt;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
void RunTriggersIndividually(const utils::SkipList<Trigger> &triggers, InterpreterContext *interpreter_context) {
|
||||||
|
// Run the triggers
|
||||||
|
for (const auto &trigger : triggers.access()) {
|
||||||
|
spdlog::debug("Executing trigger '{}'", trigger.name());
|
||||||
|
utils::MonotonicBufferResource execution_memory{kExecutionMemoryBlockSize};
|
||||||
|
|
||||||
|
// create a new transaction for each trigger
|
||||||
|
auto storage_acc = interpreter_context->db->Access();
|
||||||
|
DbAccessor db_accessor{&storage_acc};
|
||||||
|
|
||||||
|
try {
|
||||||
|
trigger.Execute(&interpreter_context->plan_cache, &db_accessor, &execution_memory,
|
||||||
|
*interpreter_context->tsc_frequency, interpreter_context->execution_timeout_sec,
|
||||||
|
&interpreter_context->is_shutting_down);
|
||||||
|
} catch (const utils::BasicException &exception) {
|
||||||
|
spdlog::warn("Trigger {} failed with exception:\n{}", trigger.name(), exception.what());
|
||||||
|
db_accessor.Abort();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto maybe_constraint_violation = db_accessor.Commit();
|
||||||
|
if (maybe_constraint_violation.HasError()) {
|
||||||
|
const auto &constraint_violation = maybe_constraint_violation.GetError();
|
||||||
|
switch (constraint_violation.type) {
|
||||||
|
case storage::ConstraintViolation::Type::EXISTENCE: {
|
||||||
|
const auto &label_name = db_accessor.LabelToName(constraint_violation.label);
|
||||||
|
MG_ASSERT(constraint_violation.properties.size() == 1U);
|
||||||
|
const auto &property_name = db_accessor.PropertyToName(*constraint_violation.properties.begin());
|
||||||
|
spdlog::warn("Trigger '{}' failed to commit due to existence constraint violation on :{}({})", trigger.name(),
|
||||||
|
label_name, property_name);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case storage::ConstraintViolation::Type::UNIQUE: {
|
||||||
|
const auto &label_name = db_accessor.LabelToName(constraint_violation.label);
|
||||||
|
std::stringstream property_names_stream;
|
||||||
|
utils::PrintIterable(property_names_stream, constraint_violation.properties, ", ",
|
||||||
|
[&](auto &stream, const auto &prop) { stream << db_accessor.PropertyToName(prop); });
|
||||||
|
spdlog::warn("Trigger '{}' failed to commit due to unique constraint violation on :{}({})", trigger.name(),
|
||||||
|
label_name, property_names_stream.str());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} // namespace
|
||||||
|
|
||||||
void Interpreter::Commit() {
|
void Interpreter::Commit() {
|
||||||
// It's possible that some queries did not finish because the user did
|
// It's possible that some queries did not finish because the user did
|
||||||
// not pull all of the results from the query.
|
// not pull all of the results from the query.
|
||||||
@ -1409,6 +1457,16 @@ void Interpreter::Commit() {
|
|||||||
// a query.
|
// a query.
|
||||||
if (!db_accessor_) return;
|
if (!db_accessor_) return;
|
||||||
|
|
||||||
|
// Run the triggers
|
||||||
|
for (const auto &trigger : interpreter_context_->before_commit_triggers.access()) {
|
||||||
|
spdlog::debug("Executing trigger '{}'", trigger.name());
|
||||||
|
utils::MonotonicBufferResource execution_memory{kExecutionMemoryBlockSize};
|
||||||
|
trigger.Execute(&interpreter_context_->plan_cache, &*execution_db_accessor_, &execution_memory,
|
||||||
|
*interpreter_context_->tsc_frequency, interpreter_context_->execution_timeout_sec,
|
||||||
|
&interpreter_context_->is_shutting_down);
|
||||||
|
}
|
||||||
|
SPDLOG_DEBUG("Finished executing before commit triggers");
|
||||||
|
|
||||||
auto maybe_constraint_violation = db_accessor_->Commit();
|
auto maybe_constraint_violation = db_accessor_->Commit();
|
||||||
if (maybe_constraint_violation.HasError()) {
|
if (maybe_constraint_violation.HasError()) {
|
||||||
const auto &constraint_violation = maybe_constraint_violation.GetError();
|
const auto &constraint_violation = maybe_constraint_violation.GetError();
|
||||||
@ -1438,16 +1496,15 @@ void Interpreter::Commit() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run the triggers
|
|
||||||
for (const auto &trigger : interpreter_context_->triggers.access()) {
|
|
||||||
utils::MonotonicBufferResource execution_memory{kExecutionMemoryBlockSize};
|
|
||||||
trigger.Execute(&interpreter_context_->plan_cache, &*execution_db_accessor_, &execution_memory,
|
|
||||||
*interpreter_context_->tsc_frequency, interpreter_context_->execution_timeout_sec,
|
|
||||||
&interpreter_context_->is_shutting_down);
|
|
||||||
}
|
|
||||||
|
|
||||||
execution_db_accessor_ = std::nullopt;
|
execution_db_accessor_ = std::nullopt;
|
||||||
db_accessor_ = std::nullopt;
|
db_accessor_ = std::nullopt;
|
||||||
|
|
||||||
|
background_thread_.AddTask([interpreter_context = this->interpreter_context_] {
|
||||||
|
RunTriggersIndividually(interpreter_context->after_commit_triggers, interpreter_context);
|
||||||
|
SPDLOG_DEBUG("Finished executing after commit triggers"); // NOLINT(bugprone-lambda-function-name)
|
||||||
|
});
|
||||||
|
|
||||||
|
SPDLOG_DEBUG("Finished comitting the transaction");
|
||||||
}
|
}
|
||||||
|
|
||||||
void Interpreter::AdvanceCommand() {
|
void Interpreter::AdvanceCommand() {
|
||||||
|
@ -20,6 +20,7 @@
|
|||||||
#include "utils/memory.hpp"
|
#include "utils/memory.hpp"
|
||||||
#include "utils/skip_list.hpp"
|
#include "utils/skip_list.hpp"
|
||||||
#include "utils/spin_lock.hpp"
|
#include "utils/spin_lock.hpp"
|
||||||
|
#include "utils/thread_pool.hpp"
|
||||||
#include "utils/timer.hpp"
|
#include "utils/timer.hpp"
|
||||||
#include "utils/tsc.hpp"
|
#include "utils/tsc.hpp"
|
||||||
|
|
||||||
@ -147,8 +148,14 @@ struct PreparedQuery {
|
|||||||
*/
|
*/
|
||||||
struct InterpreterContext {
|
struct InterpreterContext {
|
||||||
explicit InterpreterContext(storage::Storage *db) : db(db) {
|
explicit InterpreterContext(storage::Storage *db) : db(db) {
|
||||||
// auto triggers_acc = triggers.access();
|
// {
|
||||||
// triggers_acc.insert(Trigger{"Creator", "CREATE (:CREATED)", &ast_cache, &antlr_lock});
|
// auto triggers_acc = before_commit_triggers.access();
|
||||||
|
// triggers_acc.insert(Trigger{"BeforeCreator", "CREATE (:BEFORE)", &ast_cache, &antlr_lock});
|
||||||
|
// }
|
||||||
|
// {
|
||||||
|
// auto triggers_acc = after_commit_triggers.access();
|
||||||
|
// triggers_acc.insert(Trigger{"AfterCreator", "CREATE (:AFTER)", &ast_cache, &antlr_lock});
|
||||||
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
storage::Storage *db;
|
storage::Storage *db;
|
||||||
@ -171,7 +178,8 @@ struct InterpreterContext {
|
|||||||
utils::SkipList<PlanCacheEntry> plan_cache;
|
utils::SkipList<PlanCacheEntry> plan_cache;
|
||||||
|
|
||||||
// use a thread safe container
|
// use a thread safe container
|
||||||
utils::SkipList<Trigger> triggers;
|
utils::SkipList<Trigger> before_commit_triggers;
|
||||||
|
utils::SkipList<Trigger> after_commit_triggers;
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Function that is used to tell all active interpreters that they should stop
|
/// Function that is used to tell all active interpreters that they should stop
|
||||||
@ -304,6 +312,8 @@ class Interpreter final {
|
|||||||
bool in_explicit_transaction_{false};
|
bool in_explicit_transaction_{false};
|
||||||
bool expect_rollback_{false};
|
bool expect_rollback_{false};
|
||||||
|
|
||||||
|
utils::ThreadPool background_thread_{1};
|
||||||
|
|
||||||
PreparedQuery PrepareTransactionQuery(std::string_view query_upper);
|
PreparedQuery PrepareTransactionQuery(std::string_view query_upper);
|
||||||
void Commit();
|
void Commit();
|
||||||
void AdvanceCommand();
|
void AdvanceCommand();
|
||||||
|
@ -19,6 +19,8 @@ struct Trigger {
|
|||||||
// NOLINTNEXTLINE (modernize-use-nullptr)
|
// NOLINTNEXTLINE (modernize-use-nullptr)
|
||||||
bool operator<(const std::string &other) const { return name_ < other; }
|
bool operator<(const std::string &other) const { return name_ < other; }
|
||||||
|
|
||||||
|
const auto &name() const { return name_; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::string name_;
|
std::string name_;
|
||||||
ParsedQuery parsed_statements_;
|
ParsedQuery parsed_statements_;
|
||||||
|
Loading…
Reference in New Issue
Block a user