From 560eb04f67d8fd9f430cc102d217d17693235c97 Mon Sep 17 00:00:00 2001 From: antonio2368 Date: Wed, 26 May 2021 19:57:08 +0200 Subject: [PATCH] Small trigger fixes (#158) * Fix warning message * Update version * Run query callbacks only on pull * Use warn level for failure of loading a trigger --- CHANGELOG.md | 2 +- src/query/interpreter.cpp | 130 ++++++++++++++++++++------------------ src/query/interpreter.hpp | 8 +-- src/query/trigger.cpp | 21 ++++-- 4 files changed, 86 insertions(+), 75 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e281fe50d..ef4fd5692 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # Change Log -## Future +## v1.5.0 ### Major Feature and Improvements diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 67fa4b279..39432f716 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -26,6 +26,7 @@ #include "utils/event_counter.hpp" #include "utils/exceptions.hpp" #include "utils/flag_validation.hpp" +#include "utils/likely.hpp" #include "utils/logging.hpp" #include "utils/memory.hpp" #include "utils/memory_tracker.hpp" @@ -340,8 +341,8 @@ Callback HandleAuthQuery(AuthQuery *auth_query, AuthQueryHandler *auth, const Pa } } -Callback HandleReplicationQuery(ReplicationQuery *repl_query, ReplQueryHandler *handler, const Parameters ¶meters, - DbAccessor *db_accessor) { +Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters ¶meters, + InterpreterContext *interpreter_context, DbAccessor *db_accessor) { Frame frame(0); SymbolTable symbol_table; EvaluationContext evaluation_context; @@ -361,16 +362,17 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, ReplQueryHandler * if (port.IsInt()) { maybe_port = port.ValueInt(); } - callback.fn = [handler, role = repl_query->role_, maybe_port] { - handler->SetReplicationRole(role, maybe_port); + callback.fn = [handler = ReplQueryHandler{interpreter_context->db}, role = repl_query->role_, + maybe_port]() mutable { + handler.SetReplicationRole(role, maybe_port); return std::vector>(); }; return callback; } case ReplicationQuery::Action::SHOW_REPLICATION_ROLE: { callback.header = {"replication mode"}; - callback.fn = [handler] { - auto mode = handler->ShowReplicationRole(); + callback.fn = [handler = ReplQueryHandler{interpreter_context->db}] { + auto mode = handler.ShowReplicationRole(); switch (mode) { case ReplicationQuery::ReplicationRole::MAIN: { return std::vector>{{TypedValue("main")}}; @@ -393,24 +395,25 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, ReplQueryHandler * } else if (timeout.IsInt()) { maybe_timeout = static_cast(timeout.ValueInt()); } - callback.fn = [handler, name, socket_address, sync_mode, maybe_timeout] { - handler->RegisterReplica(name, std::string(socket_address.ValueString()), sync_mode, maybe_timeout); + callback.fn = [handler = ReplQueryHandler{interpreter_context->db}, name, socket_address, sync_mode, + maybe_timeout]() mutable { + handler.RegisterReplica(name, std::string(socket_address.ValueString()), sync_mode, maybe_timeout); return std::vector>(); }; return callback; } case ReplicationQuery::Action::DROP_REPLICA: { const auto &name = repl_query->replica_name_; - callback.fn = [handler, name] { - handler->DropReplica(name); + callback.fn = [handler = ReplQueryHandler{interpreter_context->db}, name]() mutable { + handler.DropReplica(name); return std::vector>(); }; return callback; } case ReplicationQuery::Action::SHOW_REPLICAS: { callback.header = {"name", "socket_address", "sync_mode", "timeout"}; - callback.fn = [handler, replica_nfields = callback.header.size()] { - const auto &replicas = handler->ShowReplicas(); + callback.fn = [handler = ReplQueryHandler{interpreter_context->db}, replica_nfields = callback.header.size()] { + const auto &replicas = handler.ShowReplicas(); auto typed_replicas = std::vector>{}; typed_replicas.reserve(replicas.size()); for (const auto &replica : replicas) { @@ -978,12 +981,15 @@ PreparedQuery PrepareReplicationQuery(ParsedQuery parsed_query, const bool in_ex } auto *replication_query = utils::Downcast(parsed_query.query); - ReplQueryHandler handler{interpreter_context->db}; - auto callback = HandleReplicationQuery(replication_query, &handler, parsed_query.parameters, dba); + auto callback = HandleReplicationQuery(replication_query, parsed_query.parameters, interpreter_context, dba); return PreparedQuery{callback.header, std::move(parsed_query.required_privileges), - [pull_plan = std::make_shared(callback.fn())]( - AnyStream *stream, std::optional n) -> std::optional { + [callback_fn = std::move(callback.fn), pull_plan = std::shared_ptr{nullptr}]( + AnyStream *stream, std::optional n) mutable -> std::optional { + if (UNLIKELY(!pull_plan)) { + pull_plan = std::make_shared(callback_fn()); + } + if (pull_plan->Pull(stream, n)) { return QueryHandlerResult::COMMIT; } @@ -1002,31 +1008,22 @@ PreparedQuery PrepareLockPathQuery(ParsedQuery parsed_query, const bool in_expli auto *lock_path_query = utils::Downcast(parsed_query.query); - Frame frame(0); - SymbolTable symbol_table; - EvaluationContext evaluation_context; - evaluation_context.timestamp = - std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()) - .count(); - evaluation_context.parameters = parsed_query.parameters; - ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, dba, storage::View::OLD); - - Callback callback; - switch (lock_path_query->action_) { - case LockPathQuery::Action::LOCK_PATH: - if (!interpreter_context->db->LockPath()) { - throw QueryRuntimeException("Failed to lock the data directory"); - } - break; - case LockPathQuery::Action::UNLOCK_PATH: - if (!interpreter_context->db->UnlockPath()) { - throw QueryRuntimeException("Failed to unlock the data directory"); - } - break; - } - - return PreparedQuery{callback.header, std::move(parsed_query.required_privileges), - [](AnyStream *stream, std::optional n) -> std::optional { + return PreparedQuery{{}, + std::move(parsed_query.required_privileges), + [interpreter_context, action = lock_path_query->action_]( + AnyStream *stream, std::optional n) -> std::optional { + switch (action) { + case LockPathQuery::Action::LOCK_PATH: + if (!interpreter_context->db->LockPath()) { + throw QueryRuntimeException("Failed to lock the data directory"); + } + break; + case LockPathQuery::Action::UNLOCK_PATH: + if (!interpreter_context->db->UnlockPath()) { + throw QueryRuntimeException("Failed to unlock the data directory"); + } + break; + } return QueryHandlerResult::COMMIT; }, RWType::NONE}; @@ -1038,14 +1035,14 @@ PreparedQuery PrepareFreeMemoryQuery(ParsedQuery parsed_query, const bool in_exp throw FreeMemoryModificationInMulticommandTxException(); } - interpreter_context->db->FreeMemory(); - - return PreparedQuery{{}, - std::move(parsed_query.required_privileges), - [](AnyStream *stream, std::optional n) -> std::optional { - return QueryHandlerResult::COMMIT; - }, - RWType::NONE}; + return PreparedQuery{ + {}, + std::move(parsed_query.required_privileges), + [interpreter_context](AnyStream *stream, std::optional n) -> std::optional { + interpreter_context->db->FreeMemory(); + return QueryHandlerResult::COMMIT; + }, + RWType::NONE}; } TriggerEventType ToTriggerEventType(const TriggerQuery::EventType event_type) { @@ -1085,19 +1082,24 @@ TriggerEventType ToTriggerEventType(const TriggerQuery::EventType event_type) { Callback CreateTrigger(TriggerQuery *trigger_query, const std::map &user_parameters, InterpreterContext *interpreter_context, DbAccessor *dba) { - return {{}, [trigger_query, interpreter_context, dba, &user_parameters]() -> std::vector> { - interpreter_context->trigger_store->AddTrigger( - trigger_query->trigger_name_, trigger_query->statement_, user_parameters, - ToTriggerEventType(trigger_query->event_type_), - trigger_query->before_commit_ ? TriggerPhase::BEFORE_COMMIT : TriggerPhase::AFTER_COMMIT, - &interpreter_context->ast_cache, dba, &interpreter_context->antlr_lock); - return {}; - }}; + return { + {}, + [trigger_name = std::move(trigger_query->trigger_name_), trigger_statement = std::move(trigger_query->statement_), + event_type = trigger_query->event_type_, before_commit = trigger_query->before_commit_, interpreter_context, dba, + user_parameters]() -> std::vector> { + interpreter_context->trigger_store->AddTrigger( + trigger_name, trigger_statement, user_parameters, ToTriggerEventType(event_type), + before_commit ? TriggerPhase::BEFORE_COMMIT : TriggerPhase::AFTER_COMMIT, &interpreter_context->ast_cache, + dba, &interpreter_context->antlr_lock); + return {}; + }}; } Callback DropTrigger(TriggerQuery *trigger_query, InterpreterContext *interpreter_context) { - return {{}, [trigger_query, interpreter_context]() -> std::vector> { - interpreter_context->trigger_store->DropTrigger(trigger_query->trigger_name_); + return {{}, + [trigger_name = std::move(trigger_query->trigger_name_), + interpreter_context]() -> std::vector> { + interpreter_context->trigger_store->DropTrigger(trigger_name); return {}; }}; } @@ -1143,11 +1145,13 @@ PreparedQuery PrepareTriggerQuery(ParsedQuery parsed_query, const bool in_explic } }(); - auto results = callback.fn(); - return PreparedQuery{std::move(callback.header), std::move(parsed_query.required_privileges), - [pull_plan = std::make_shared(std::move(results))]( - AnyStream *stream, std::optional n) -> std::optional { + [callback_fn = std::move(callback.fn), pull_plan = std::shared_ptr{nullptr}]( + AnyStream *stream, std::optional n) mutable -> std::optional { + if (UNLIKELY(!pull_plan)) { + pull_plan = std::make_shared(callback_fn()); + } + if (pull_plan->Pull(stream, n)) { return QueryHandlerResult::COMMIT; } diff --git a/src/query/interpreter.hpp b/src/query/interpreter.hpp index f443caf89..87bec040d 100644 --- a/src/query/interpreter.hpp +++ b/src/query/interpreter.hpp @@ -99,11 +99,11 @@ class ReplicationQueryHandler { ReplicationQueryHandler() = default; virtual ~ReplicationQueryHandler() = default; - ReplicationQueryHandler(const ReplicationQueryHandler &) = delete; - ReplicationQueryHandler &operator=(const ReplicationQueryHandler &) = delete; + ReplicationQueryHandler(const ReplicationQueryHandler &) = default; + ReplicationQueryHandler &operator=(const ReplicationQueryHandler &) = default; - ReplicationQueryHandler(ReplicationQueryHandler &&) = delete; - ReplicationQueryHandler &operator=(ReplicationQueryHandler &&) = delete; + ReplicationQueryHandler(ReplicationQueryHandler &&) = default; + ReplicationQueryHandler &operator=(ReplicationQueryHandler &&) = default; struct Replica { std::string name; diff --git a/src/query/trigger.cpp b/src/query/trigger.cpp index 401161c98..401e43d50 100644 --- a/src/query/trigger.cpp +++ b/src/query/trigger.cpp @@ -240,38 +240,45 @@ TriggerStore::TriggerStore(std::filesystem::path directory, utils::SkipList(); if (!json_trigger_data["phase"].is_number_integer()) { - spdlog::debug("Invalid state of the trigger data"); + spdlog::warn(invalid_state_message); continue; } const auto phase = json_trigger_data["phase"].get(); if (!json_trigger_data["event_type"].is_number_integer()) { - spdlog::debug("Invalid state of the trigger data"); + spdlog::warn(invalid_state_message); continue; } const auto event_type = json_trigger_data["event_type"].get(); if (!json_trigger_data["user_parameters"].is_object()) { - spdlog::debug("Invalid state of the trigger data"); + spdlog::warn(invalid_state_message); continue; } const auto user_parameters = serialization::DeserializePropertyValueMap(json_trigger_data["user_parameters"]); @@ -280,7 +287,7 @@ TriggerStore::TriggerStore(std::filesystem::path directory, utils::SkipList