Small trigger fixes (#158)

* Fix warning message

* Update version

* Run query callbacks only on pull

* Use warn level for failure of loading a trigger
This commit is contained in:
antonio2368 2021-05-26 19:57:08 +02:00 committed by GitHub
parent a3ecc52429
commit 560eb04f67
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 86 additions and 75 deletions

View File

@ -1,6 +1,6 @@
# Change Log
## Future
## v1.5.0
### Major Feature and Improvements

View File

@ -26,6 +26,7 @@
#include "utils/event_counter.hpp"
#include "utils/exceptions.hpp"
#include "utils/flag_validation.hpp"
#include "utils/likely.hpp"
#include "utils/logging.hpp"
#include "utils/memory.hpp"
#include "utils/memory_tracker.hpp"
@ -340,8 +341,8 @@ Callback HandleAuthQuery(AuthQuery *auth_query, AuthQueryHandler *auth, const Pa
}
}
Callback HandleReplicationQuery(ReplicationQuery *repl_query, ReplQueryHandler *handler, const Parameters &parameters,
DbAccessor *db_accessor) {
Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &parameters,
InterpreterContext *interpreter_context, DbAccessor *db_accessor) {
Frame frame(0);
SymbolTable symbol_table;
EvaluationContext evaluation_context;
@ -361,16 +362,17 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, ReplQueryHandler *
if (port.IsInt()) {
maybe_port = port.ValueInt();
}
callback.fn = [handler, role = repl_query->role_, maybe_port] {
handler->SetReplicationRole(role, maybe_port);
callback.fn = [handler = ReplQueryHandler{interpreter_context->db}, role = repl_query->role_,
maybe_port]() mutable {
handler.SetReplicationRole(role, maybe_port);
return std::vector<std::vector<TypedValue>>();
};
return callback;
}
case ReplicationQuery::Action::SHOW_REPLICATION_ROLE: {
callback.header = {"replication mode"};
callback.fn = [handler] {
auto mode = handler->ShowReplicationRole();
callback.fn = [handler = ReplQueryHandler{interpreter_context->db}] {
auto mode = handler.ShowReplicationRole();
switch (mode) {
case ReplicationQuery::ReplicationRole::MAIN: {
return std::vector<std::vector<TypedValue>>{{TypedValue("main")}};
@ -393,24 +395,25 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, ReplQueryHandler *
} else if (timeout.IsInt()) {
maybe_timeout = static_cast<double>(timeout.ValueInt());
}
callback.fn = [handler, name, socket_address, sync_mode, maybe_timeout] {
handler->RegisterReplica(name, std::string(socket_address.ValueString()), sync_mode, maybe_timeout);
callback.fn = [handler = ReplQueryHandler{interpreter_context->db}, name, socket_address, sync_mode,
maybe_timeout]() mutable {
handler.RegisterReplica(name, std::string(socket_address.ValueString()), sync_mode, maybe_timeout);
return std::vector<std::vector<TypedValue>>();
};
return callback;
}
case ReplicationQuery::Action::DROP_REPLICA: {
const auto &name = repl_query->replica_name_;
callback.fn = [handler, name] {
handler->DropReplica(name);
callback.fn = [handler = ReplQueryHandler{interpreter_context->db}, name]() mutable {
handler.DropReplica(name);
return std::vector<std::vector<TypedValue>>();
};
return callback;
}
case ReplicationQuery::Action::SHOW_REPLICAS: {
callback.header = {"name", "socket_address", "sync_mode", "timeout"};
callback.fn = [handler, replica_nfields = callback.header.size()] {
const auto &replicas = handler->ShowReplicas();
callback.fn = [handler = ReplQueryHandler{interpreter_context->db}, replica_nfields = callback.header.size()] {
const auto &replicas = handler.ShowReplicas();
auto typed_replicas = std::vector<std::vector<TypedValue>>{};
typed_replicas.reserve(replicas.size());
for (const auto &replica : replicas) {
@ -978,12 +981,15 @@ PreparedQuery PrepareReplicationQuery(ParsedQuery parsed_query, const bool in_ex
}
auto *replication_query = utils::Downcast<ReplicationQuery>(parsed_query.query);
ReplQueryHandler handler{interpreter_context->db};
auto callback = HandleReplicationQuery(replication_query, &handler, parsed_query.parameters, dba);
auto callback = HandleReplicationQuery(replication_query, parsed_query.parameters, interpreter_context, dba);
return PreparedQuery{callback.header, std::move(parsed_query.required_privileges),
[pull_plan = std::make_shared<PullPlanVector>(callback.fn())](
AnyStream *stream, std::optional<int> n) -> std::optional<QueryHandlerResult> {
[callback_fn = std::move(callback.fn), pull_plan = std::shared_ptr<PullPlanVector>{nullptr}](
AnyStream *stream, std::optional<int> n) mutable -> std::optional<QueryHandlerResult> {
if (UNLIKELY(!pull_plan)) {
pull_plan = std::make_shared<PullPlanVector>(callback_fn());
}
if (pull_plan->Pull(stream, n)) {
return QueryHandlerResult::COMMIT;
}
@ -1002,31 +1008,22 @@ PreparedQuery PrepareLockPathQuery(ParsedQuery parsed_query, const bool in_expli
auto *lock_path_query = utils::Downcast<LockPathQuery>(parsed_query.query);
Frame frame(0);
SymbolTable symbol_table;
EvaluationContext evaluation_context;
evaluation_context.timestamp =
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch())
.count();
evaluation_context.parameters = parsed_query.parameters;
ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, dba, storage::View::OLD);
Callback callback;
switch (lock_path_query->action_) {
case LockPathQuery::Action::LOCK_PATH:
if (!interpreter_context->db->LockPath()) {
throw QueryRuntimeException("Failed to lock the data directory");
}
break;
case LockPathQuery::Action::UNLOCK_PATH:
if (!interpreter_context->db->UnlockPath()) {
throw QueryRuntimeException("Failed to unlock the data directory");
}
break;
}
return PreparedQuery{callback.header, std::move(parsed_query.required_privileges),
[](AnyStream *stream, std::optional<int> n) -> std::optional<QueryHandlerResult> {
return PreparedQuery{{},
std::move(parsed_query.required_privileges),
[interpreter_context, action = lock_path_query->action_](
AnyStream *stream, std::optional<int> n) -> std::optional<QueryHandlerResult> {
switch (action) {
case LockPathQuery::Action::LOCK_PATH:
if (!interpreter_context->db->LockPath()) {
throw QueryRuntimeException("Failed to lock the data directory");
}
break;
case LockPathQuery::Action::UNLOCK_PATH:
if (!interpreter_context->db->UnlockPath()) {
throw QueryRuntimeException("Failed to unlock the data directory");
}
break;
}
return QueryHandlerResult::COMMIT;
},
RWType::NONE};
@ -1038,14 +1035,14 @@ PreparedQuery PrepareFreeMemoryQuery(ParsedQuery parsed_query, const bool in_exp
throw FreeMemoryModificationInMulticommandTxException();
}
interpreter_context->db->FreeMemory();
return PreparedQuery{{},
std::move(parsed_query.required_privileges),
[](AnyStream *stream, std::optional<int> n) -> std::optional<QueryHandlerResult> {
return QueryHandlerResult::COMMIT;
},
RWType::NONE};
return PreparedQuery{
{},
std::move(parsed_query.required_privileges),
[interpreter_context](AnyStream *stream, std::optional<int> n) -> std::optional<QueryHandlerResult> {
interpreter_context->db->FreeMemory();
return QueryHandlerResult::COMMIT;
},
RWType::NONE};
}
TriggerEventType ToTriggerEventType(const TriggerQuery::EventType event_type) {
@ -1085,19 +1082,24 @@ TriggerEventType ToTriggerEventType(const TriggerQuery::EventType event_type) {
Callback CreateTrigger(TriggerQuery *trigger_query,
const std::map<std::string, storage::PropertyValue> &user_parameters,
InterpreterContext *interpreter_context, DbAccessor *dba) {
return {{}, [trigger_query, interpreter_context, dba, &user_parameters]() -> std::vector<std::vector<TypedValue>> {
interpreter_context->trigger_store->AddTrigger(
trigger_query->trigger_name_, trigger_query->statement_, user_parameters,
ToTriggerEventType(trigger_query->event_type_),
trigger_query->before_commit_ ? TriggerPhase::BEFORE_COMMIT : TriggerPhase::AFTER_COMMIT,
&interpreter_context->ast_cache, dba, &interpreter_context->antlr_lock);
return {};
}};
return {
{},
[trigger_name = std::move(trigger_query->trigger_name_), trigger_statement = std::move(trigger_query->statement_),
event_type = trigger_query->event_type_, before_commit = trigger_query->before_commit_, interpreter_context, dba,
user_parameters]() -> std::vector<std::vector<TypedValue>> {
interpreter_context->trigger_store->AddTrigger(
trigger_name, trigger_statement, user_parameters, ToTriggerEventType(event_type),
before_commit ? TriggerPhase::BEFORE_COMMIT : TriggerPhase::AFTER_COMMIT, &interpreter_context->ast_cache,
dba, &interpreter_context->antlr_lock);
return {};
}};
}
Callback DropTrigger(TriggerQuery *trigger_query, InterpreterContext *interpreter_context) {
return {{}, [trigger_query, interpreter_context]() -> std::vector<std::vector<TypedValue>> {
interpreter_context->trigger_store->DropTrigger(trigger_query->trigger_name_);
return {{},
[trigger_name = std::move(trigger_query->trigger_name_),
interpreter_context]() -> std::vector<std::vector<TypedValue>> {
interpreter_context->trigger_store->DropTrigger(trigger_name);
return {};
}};
}
@ -1143,11 +1145,13 @@ PreparedQuery PrepareTriggerQuery(ParsedQuery parsed_query, const bool in_explic
}
}();
auto results = callback.fn();
return PreparedQuery{std::move(callback.header), std::move(parsed_query.required_privileges),
[pull_plan = std::make_shared<PullPlanVector>(std::move(results))](
AnyStream *stream, std::optional<int> n) -> std::optional<QueryHandlerResult> {
[callback_fn = std::move(callback.fn), pull_plan = std::shared_ptr<PullPlanVector>{nullptr}](
AnyStream *stream, std::optional<int> n) mutable -> std::optional<QueryHandlerResult> {
if (UNLIKELY(!pull_plan)) {
pull_plan = std::make_shared<PullPlanVector>(callback_fn());
}
if (pull_plan->Pull(stream, n)) {
return QueryHandlerResult::COMMIT;
}

View File

@ -99,11 +99,11 @@ class ReplicationQueryHandler {
ReplicationQueryHandler() = default;
virtual ~ReplicationQueryHandler() = default;
ReplicationQueryHandler(const ReplicationQueryHandler &) = delete;
ReplicationQueryHandler &operator=(const ReplicationQueryHandler &) = delete;
ReplicationQueryHandler(const ReplicationQueryHandler &) = default;
ReplicationQueryHandler &operator=(const ReplicationQueryHandler &) = default;
ReplicationQueryHandler(ReplicationQueryHandler &&) = delete;
ReplicationQueryHandler &operator=(ReplicationQueryHandler &&) = delete;
ReplicationQueryHandler(ReplicationQueryHandler &&) = default;
ReplicationQueryHandler &operator=(ReplicationQueryHandler &&) = default;
struct Replica {
std::string name;

View File

@ -240,38 +240,45 @@ TriggerStore::TriggerStore(std::filesystem::path directory, utils::SkipList<Quer
spdlog::info("Loading triggers...");
for (const auto &[trigger_name, trigger_data] : storage_) {
// structured binding cannot be captured in lambda
const auto get_failed_message = [](const std::string_view trigger_name, const std::string_view message) {
return fmt::format("Failed to load trigger '{}'. {}", trigger_name, message);
};
const auto invalid_state_message = get_failed_message(trigger_name, "Invalid state of the trigger data.");
spdlog::debug("Loading trigger '{}'", trigger_name);
auto json_trigger_data = nlohmann::json::parse(trigger_data);
if (!json_trigger_data["version"].is_number_unsigned()) {
spdlog::debug("Invalid state of the trigger data.");
spdlog::warn(invalid_state_message);
continue;
}
if (json_trigger_data["version"] != kVersion) {
spdlog::debug("Invalid version of the trigger data. Got {}");
spdlog::warn(get_failed_message(trigger_name, "Invalid version of the trigger data."));
continue;
}
if (!json_trigger_data["statement"].is_string()) {
spdlog::debug("Invalid state of the trigger data");
spdlog::warn(invalid_state_message);
continue;
}
auto statement = json_trigger_data["statement"].get<std::string>();
if (!json_trigger_data["phase"].is_number_integer()) {
spdlog::debug("Invalid state of the trigger data");
spdlog::warn(invalid_state_message);
continue;
}
const auto phase = json_trigger_data["phase"].get<TriggerPhase>();
if (!json_trigger_data["event_type"].is_number_integer()) {
spdlog::debug("Invalid state of the trigger data");
spdlog::warn(invalid_state_message);
continue;
}
const auto event_type = json_trigger_data["event_type"].get<TriggerEventType>();
if (!json_trigger_data["user_parameters"].is_object()) {
spdlog::debug("Invalid state of the trigger data");
spdlog::warn(invalid_state_message);
continue;
}
const auto user_parameters = serialization::DeserializePropertyValueMap(json_trigger_data["user_parameters"]);
@ -280,7 +287,7 @@ TriggerStore::TriggerStore(std::filesystem::path directory, utils::SkipList<Quer
try {
trigger.emplace(trigger_name, statement, user_parameters, event_type, query_cache, db_accessor, antlr_lock);
} catch (const utils::BasicException &e) {
spdlog::debug("Failed to create a trigger '{}' because: {}", trigger_name, e.what());
spdlog::warn("Failed to create trigger '{}' because: {}", trigger_name, e.what());
continue;
}