Added notification warning when failed to commit to all replica instances

This commit is contained in:
Josip Mrden 2023-02-19 15:20:45 +01:00
parent bbce21e78f
commit 3167808062
4 changed files with 45 additions and 20 deletions

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -2496,13 +2496,13 @@ void RunTriggersIndividually(const utils::SkipList<Trigger> &triggers, Interpret
}
} // namespace
void Interpreter::Commit() {
Interpreter::TransactionCommitResponse Interpreter::Commit() {
// It's possible that some queries did not finish because the user did
// not pull all of the results from the query.
// For now, we will not check if there are some unfinished queries.
// We should document clearly that all results should be pulled to complete
// a query.
if (!db_accessor_) return;
if (!db_accessor_) return Interpreter::TransactionCommitResponse{.success = true};
std::optional<TriggerContext> trigger_context = std::nullopt;
if (trigger_context_collector_) {
@ -2590,8 +2590,15 @@ void Interpreter::Commit() {
SPDLOG_DEBUG("Finished committing the transaction");
if (!commit_confirmed_by_all_sync_repplicas) {
throw ReplicationException("At least one SYNC replica has not confirmed committing last transaction.");
std::vector<Notification> notifications;
notifications.reserve(1);
notifications.emplace_back(SeverityLevel::WARNING, NotificationCode::REPLICA_PORT_WARNING,
"At least one SYNC replica has not confirmed committing last transaction.");
return TransactionCommitResponse{.success = true, .notifications = std::move(notifications)};
}
return Interpreter::TransactionCommitResponse{.success = true};
}
void Interpreter::AdvanceCommand() {

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -326,6 +326,11 @@ class Interpreter final {
}
};
struct TransactionCommitResponse {
bool success;
std::vector<Notification> notifications;
};
// Interpreter supports multiple prepared queries at the same time.
// The client can reference a specific query for pull using an arbitrary qid
// which is in our case the index of the query in the vector.
@ -354,7 +359,7 @@ class Interpreter final {
std::optional<storage::IsolationLevel> next_transaction_isolation_level;
PreparedQuery PrepareTransactionQuery(std::string_view query_upper);
void Commit();
TransactionCommitResponse Commit();
void AdvanceCommand();
void AbortCommand(std::unique_ptr<QueryExecution> *query_execution);
std::optional<storage::IsolationLevel> GetIsolationLevelOverride();
@ -402,28 +407,34 @@ std::map<std::string, TypedValue> Interpreter::Pull(TStream *result_stream, std:
if (maybe_res) {
// Save its summary
maybe_summary.emplace(std::move(query_execution->summary));
if (!query_execution->notifications.empty()) {
std::vector<TypedValue> notifications;
notifications.reserve(query_execution->notifications.size());
for (const auto &notification : query_execution->notifications) {
notifications.emplace_back(notification.ConvertToMap());
}
maybe_summary->insert_or_assign("notifications", std::move(notifications));
std::vector<TypedValue> notifications;
notifications.reserve(query_execution->notifications.size());
for (const auto &notification : query_execution->notifications) {
notifications.emplace_back(notification.ConvertToMap());
}
if (!in_explicit_transaction_) {
switch (*maybe_res) {
case QueryHandlerResult::COMMIT:
Commit();
case QueryHandlerResult::COMMIT: {
auto commit_response = Commit();
for (const auto &notification : commit_response.notifications) {
notifications.emplace_back(notification.ConvertToMap());
}
break;
case QueryHandlerResult::ABORT:
}
case QueryHandlerResult::ABORT: {
Abort();
break;
case QueryHandlerResult::NOTHING:
}
case QueryHandlerResult::NOTHING: {
// The only cases in which we have nothing to do are those where
// we're either in an explicit transaction or the query is such that
// a transaction wasn't started on a call to `Prepare()`.
MG_ASSERT(in_explicit_transaction_ || !db_accessor_);
break;
}
}
// As the transaction is done we can clear all the executions
// NOTE: we cannot clear query_execution inside the Abort and Commit
@ -435,6 +446,10 @@ std::map<std::string, TypedValue> Interpreter::Pull(TStream *result_stream, std:
// in the transaction can be in unfinished state
query_execution.reset(nullptr);
}
if (!notifications.empty()) {
maybe_summary->insert_or_assign("notifications", std::move(notifications));
}
}
} catch (const ExplicitTransactionUsageException &) {
query_execution.reset(nullptr);

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -42,6 +42,8 @@ constexpr std::string_view GetCodeString(const NotificationCode code) {
return "CheckStream"sv;
case NotificationCode::CREATE_TRIGGER:
return "CreateTrigger"sv;
case NotificationCode::COMMIT_TO_REPLICAS:
return "CommitToReplicas"sv;
case NotificationCode::DROP_CONSTRAINT:
return "DropConstraint"sv;
case NotificationCode::DROP_REPLICA:

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -24,9 +24,10 @@ namespace memgraph::query {
enum class SeverityLevel : uint8_t { INFO, WARNING };
enum class NotificationCode : uint8_t {
CHECK_STREAM,
COMMIT_TO_REPLICAS,
CREATE_CONSTRAINT,
CREATE_INDEX,
CHECK_STREAM,
CREATE_STREAM,
CREATE_TRIGGER,
DROP_CONSTRAINT,