Trigger restore after query modules load (#183)
This commit is contained in:
parent
15911b64dc
commit
e016c74e4b
@ -1069,6 +1069,15 @@ int main(int argc, char **argv) {
|
||||
query::procedure::gModuleRegistry.SetModulesDirectory(query_modules_directories);
|
||||
query::procedure::gModuleRegistry.UnloadAndLoadModulesFromDirectories();
|
||||
|
||||
{
|
||||
// Triggers can execute query procedures, so we need to reload the modules first and then
|
||||
// the triggers
|
||||
auto storage_accessor = interpreter_context.db->Access();
|
||||
auto dba = query::DbAccessor{&storage_accessor};
|
||||
interpreter_context.trigger_store.RestoreTriggers(&interpreter_context.ast_cache, &dba,
|
||||
&interpreter_context.antlr_lock);
|
||||
}
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
AuthQueryHandler auth_handler(&auth, std::regex(FLAGS_auth_user_or_role_name_regex));
|
||||
#else
|
||||
|
@ -602,11 +602,8 @@ std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::Pull(AnyStream *strea
|
||||
using RWType = plan::ReadWriteTypeChecker::RWType;
|
||||
} // namespace
|
||||
|
||||
InterpreterContext::InterpreterContext(storage::Storage *db, const std::filesystem::path &data_directory) : db(db) {
|
||||
auto storage_accessor = db->Access();
|
||||
DbAccessor dba{&storage_accessor};
|
||||
trigger_store.emplace(data_directory / "triggers", &ast_cache, &dba, &antlr_lock);
|
||||
}
|
||||
InterpreterContext::InterpreterContext(storage::Storage *db, const std::filesystem::path &data_directory)
|
||||
: db(db), trigger_store(data_directory / "triggers") {}
|
||||
|
||||
Interpreter::Interpreter(InterpreterContext *interpreter_context) : interpreter_context_(interpreter_context) {
|
||||
MG_ASSERT(interpreter_context_, "Interpreter context must not be NULL");
|
||||
@ -627,8 +624,8 @@ PreparedQuery Interpreter::PrepareTransactionQuery(std::string_view query_upper)
|
||||
std::make_unique<storage::Storage::Accessor>(interpreter_context_->db->Access(GetIsolationLevelOverride()));
|
||||
execution_db_accessor_.emplace(db_accessor_.get());
|
||||
|
||||
if (interpreter_context_->trigger_store->HasTriggers()) {
|
||||
trigger_context_collector_.emplace(interpreter_context_->trigger_store->GetEventTypes());
|
||||
if (interpreter_context_->trigger_store.HasTriggers()) {
|
||||
trigger_context_collector_.emplace(interpreter_context_->trigger_store.GetEventTypes());
|
||||
}
|
||||
};
|
||||
} else if (query_upper == "COMMIT") {
|
||||
@ -1090,7 +1087,7 @@ Callback CreateTrigger(TriggerQuery *trigger_query,
|
||||
[trigger_name = std::move(trigger_query->trigger_name_), trigger_statement = std::move(trigger_query->statement_),
|
||||
event_type = trigger_query->event_type_, before_commit = trigger_query->before_commit_, interpreter_context, dba,
|
||||
user_parameters]() -> std::vector<std::vector<TypedValue>> {
|
||||
interpreter_context->trigger_store->AddTrigger(
|
||||
interpreter_context->trigger_store.AddTrigger(
|
||||
trigger_name, trigger_statement, user_parameters, ToTriggerEventType(event_type),
|
||||
before_commit ? TriggerPhase::BEFORE_COMMIT : TriggerPhase::AFTER_COMMIT, &interpreter_context->ast_cache,
|
||||
dba, &interpreter_context->antlr_lock);
|
||||
@ -1102,7 +1099,7 @@ Callback DropTrigger(TriggerQuery *trigger_query, InterpreterContext *interprete
|
||||
return {{},
|
||||
[trigger_name = std::move(trigger_query->trigger_name_),
|
||||
interpreter_context]() -> std::vector<std::vector<TypedValue>> {
|
||||
interpreter_context->trigger_store->DropTrigger(trigger_name);
|
||||
interpreter_context->trigger_store.DropTrigger(trigger_name);
|
||||
return {};
|
||||
}};
|
||||
}
|
||||
@ -1110,7 +1107,7 @@ Callback DropTrigger(TriggerQuery *trigger_query, InterpreterContext *interprete
|
||||
Callback ShowTriggers(InterpreterContext *interpreter_context) {
|
||||
return {{"trigger name", "statement", "event type", "phase"}, [interpreter_context] {
|
||||
std::vector<std::vector<TypedValue>> results;
|
||||
auto trigger_infos = interpreter_context->trigger_store->GetTriggerInfo();
|
||||
auto trigger_infos = interpreter_context->trigger_store.GetTriggerInfo();
|
||||
results.reserve(trigger_infos.size());
|
||||
for (auto &trigger_info : trigger_infos) {
|
||||
std::vector<TypedValue> typed_trigger_info;
|
||||
@ -1503,8 +1500,8 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
|
||||
std::make_unique<storage::Storage::Accessor>(interpreter_context_->db->Access(GetIsolationLevelOverride()));
|
||||
execution_db_accessor_.emplace(db_accessor_.get());
|
||||
|
||||
if (utils::Downcast<CypherQuery>(parsed_query.query) && interpreter_context_->trigger_store->HasTriggers()) {
|
||||
trigger_context_collector_.emplace(interpreter_context_->trigger_store->GetEventTypes());
|
||||
if (utils::Downcast<CypherQuery>(parsed_query.query) && interpreter_context_->trigger_store.HasTriggers()) {
|
||||
trigger_context_collector_.emplace(interpreter_context_->trigger_store.GetEventTypes());
|
||||
}
|
||||
}
|
||||
|
||||
@ -1655,7 +1652,7 @@ void Interpreter::Commit() {
|
||||
|
||||
if (trigger_context) {
|
||||
// Run the triggers
|
||||
for (const auto &trigger : interpreter_context_->trigger_store->BeforeCommitTriggers().access()) {
|
||||
for (const auto &trigger : interpreter_context_->trigger_store.BeforeCommitTriggers().access()) {
|
||||
utils::MonotonicBufferResource execution_memory{kExecutionMemoryBlockSize};
|
||||
AdvanceCommand();
|
||||
try {
|
||||
@ -1707,11 +1704,11 @@ void Interpreter::Commit() {
|
||||
// probably will schedule its after commit triggers, because the other transactions that want to commit are still
|
||||
// waiting for commiting or one of them just started commiting its changes.
|
||||
// This means the ordered execution of after commit triggers are not guaranteed.
|
||||
if (trigger_context && interpreter_context_->trigger_store->AfterCommitTriggers().size() > 0) {
|
||||
if (trigger_context && interpreter_context_->trigger_store.AfterCommitTriggers().size() > 0) {
|
||||
interpreter_context_->after_commit_trigger_pool.AddTask(
|
||||
[trigger_context = std::move(*trigger_context), interpreter_context = this->interpreter_context_,
|
||||
user_transaction = std::shared_ptr(std::move(db_accessor_))]() mutable {
|
||||
RunTriggersIndividually(interpreter_context->trigger_store->AfterCommitTriggers(), interpreter_context,
|
||||
RunTriggersIndividually(interpreter_context->trigger_store.AfterCommitTriggers(), interpreter_context,
|
||||
std::move(trigger_context));
|
||||
user_transaction->FinalizeTransaction();
|
||||
SPDLOG_DEBUG("Finished executing after commit triggers"); // NOLINT(bugprone-lambda-function-name)
|
||||
|
@ -169,7 +169,7 @@ struct InterpreterContext {
|
||||
utils::SkipList<QueryCacheEntry> ast_cache;
|
||||
utils::SkipList<PlanCacheEntry> plan_cache;
|
||||
|
||||
std::optional<TriggerStore> trigger_store;
|
||||
TriggerStore trigger_store;
|
||||
utils::ThreadPool after_commit_trigger_pool{1};
|
||||
};
|
||||
|
||||
|
@ -230,12 +230,16 @@ void Trigger::Execute(DbAccessor *dba, utils::MonotonicBufferResource *execution
|
||||
}
|
||||
|
||||
namespace {
|
||||
// When the format of the persisted trigger is changed, increase this version
|
||||
constexpr uint64_t kVersion{1};
|
||||
} // namespace
|
||||
|
||||
TriggerStore::TriggerStore(std::filesystem::path directory, utils::SkipList<QueryCacheEntry> *query_cache,
|
||||
DbAccessor *db_accessor, utils::SpinLock *antlr_lock)
|
||||
: storage_{std::move(directory)} {
|
||||
TriggerStore::TriggerStore(std::filesystem::path directory) : storage_{std::move(directory)} {}
|
||||
|
||||
void TriggerStore::RestoreTriggers(utils::SkipList<QueryCacheEntry> *query_cache, DbAccessor *db_accessor,
|
||||
utils::SpinLock *antlr_lock) {
|
||||
MG_ASSERT(before_commit_triggers_.size() == 0 && after_commit_triggers_.size() == 0,
|
||||
"Cannot restore trigger when some triggers already exist!");
|
||||
spdlog::info("Loading triggers...");
|
||||
|
||||
for (const auto &[trigger_name, trigger_data] : storage_) {
|
||||
@ -324,6 +328,7 @@ void TriggerStore::AddTrigger(const std::string &name, const std::string &query,
|
||||
e.what(), TriggerEventTypeToString(event_type), identifier_names_stream.str());
|
||||
}
|
||||
|
||||
// When the format of the persisted trigger is changed, update the kVersion
|
||||
nlohmann::json data = nlohmann::json::object();
|
||||
data["statement"] = query;
|
||||
data["user_parameters"] = serialization::SerializePropertyValueMap(user_parameters);
|
||||
|
@ -60,8 +60,10 @@ struct Trigger {
|
||||
enum class TriggerPhase : uint8_t { BEFORE_COMMIT, AFTER_COMMIT };
|
||||
|
||||
struct TriggerStore {
|
||||
explicit TriggerStore(std::filesystem::path directory, utils::SkipList<QueryCacheEntry> *query_cache,
|
||||
DbAccessor *db_accessor, utils::SpinLock *antlr_lock);
|
||||
explicit TriggerStore(std::filesystem::path directory);
|
||||
|
||||
void RestoreTriggers(utils::SkipList<QueryCacheEntry> *query_cache, DbAccessor *db_accessor,
|
||||
utils::SpinLock *antlr_lock);
|
||||
|
||||
void AddTrigger(const std::string &name, const std::string &query,
|
||||
const std::map<std::string, storage::PropertyValue> &user_parameters, TriggerEventType event_type,
|
||||
|
@ -830,10 +830,15 @@ class TriggerStoreTest : public ::testing::Test {
|
||||
std::optional<storage::Storage::Accessor> storage_accessor;
|
||||
};
|
||||
|
||||
TEST_F(TriggerStoreTest, Load) {
|
||||
TEST_F(TriggerStoreTest, Restore) {
|
||||
std::optional<query::TriggerStore> store;
|
||||
|
||||
store.emplace(testing_directory, &ast_cache, &*dba, &antlr_lock);
|
||||
const auto reset_store = [&] {
|
||||
store.emplace(testing_directory);
|
||||
store->RestoreTriggers(&ast_cache, &*dba, &antlr_lock);
|
||||
};
|
||||
|
||||
reset_store();
|
||||
|
||||
const auto check_empty = [&] {
|
||||
ASSERT_EQ(store->GetTriggerInfo().size(), 0);
|
||||
@ -879,7 +884,7 @@ TEST_F(TriggerStoreTest, Load) {
|
||||
check_triggers();
|
||||
|
||||
// recreate trigger store, this should reload everything from the disk
|
||||
store.emplace(testing_directory, &ast_cache, &*dba, &antlr_lock);
|
||||
reset_store();
|
||||
check_triggers();
|
||||
|
||||
ASSERT_NO_THROW(store->DropTrigger(trigger_name_after));
|
||||
@ -887,13 +892,13 @@ TEST_F(TriggerStoreTest, Load) {
|
||||
|
||||
check_empty();
|
||||
|
||||
store.emplace(testing_directory, &ast_cache, &*dba, &antlr_lock);
|
||||
reset_store();
|
||||
|
||||
check_empty();
|
||||
}
|
||||
|
||||
TEST_F(TriggerStoreTest, AddTrigger) {
|
||||
query::TriggerStore store{testing_directory, &ast_cache, &*dba, &antlr_lock};
|
||||
query::TriggerStore store{testing_directory};
|
||||
|
||||
// Invalid query in statements
|
||||
ASSERT_THROW(store.AddTrigger("trigger", "RETUR 1", {}, query::TriggerEventType::VERTEX_CREATE,
|
||||
@ -926,7 +931,7 @@ TEST_F(TriggerStoreTest, AddTrigger) {
|
||||
}
|
||||
|
||||
TEST_F(TriggerStoreTest, DropTrigger) {
|
||||
query::TriggerStore store{testing_directory, &ast_cache, &*dba, &antlr_lock};
|
||||
query::TriggerStore store{testing_directory};
|
||||
|
||||
ASSERT_THROW(store.DropTrigger("Unknown"), utils::BasicException);
|
||||
|
||||
@ -940,7 +945,7 @@ TEST_F(TriggerStoreTest, DropTrigger) {
|
||||
}
|
||||
|
||||
TEST_F(TriggerStoreTest, TriggerInfo) {
|
||||
query::TriggerStore store{testing_directory, &ast_cache, &*dba, &antlr_lock};
|
||||
query::TriggerStore store{testing_directory};
|
||||
|
||||
std::vector<query::TriggerStore::TriggerInfo> expected_info;
|
||||
store.AddTrigger("trigger", "RETURN 1", {}, query::TriggerEventType::VERTEX_CREATE,
|
||||
@ -985,7 +990,7 @@ TEST_F(TriggerStoreTest, TriggerInfo) {
|
||||
}
|
||||
|
||||
TEST_F(TriggerStoreTest, AnyTriggerAllKeywords) {
|
||||
query::TriggerStore store{testing_directory, &ast_cache, &*dba, &antlr_lock};
|
||||
query::TriggerStore store{testing_directory};
|
||||
|
||||
using namespace std::literals;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user