From 6eb52581eb0b0567b1657a20aa35672b1239455e Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis Date: Tue, 16 Nov 2021 15:29:45 +0200 Subject: [PATCH] Retry transaction on consumer (#294) --- src/memgraph.cpp | 24 ++++++++++++----- src/query/common.hpp | 6 +---- src/query/config.hpp | 3 +++ src/query/exceptions.hpp | 12 +++++++++ src/query/plan/operator.cpp | 20 +++++++------- src/query/stream/streams.cpp | 52 ++++++++++++++++++++++-------------- 6 files changed, 76 insertions(+), 41 deletions(-) diff --git a/src/memgraph.cpp b/src/memgraph.cpp index ed1414752..19a73403a 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -186,6 +186,15 @@ DEFINE_bool(telemetry_enabled, false, "the database runtime (vertex and edge counts and resource usage) " "to allow for easier improvement of the product."); +// Streams flags +// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_uint32( + stream_transaction_conflict_retries, 30, + "Number of times to retry when a stream transformation fails to commit because of conflicting transactions"); +// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_uint32( + stream_transaction_retry_interval, 500, + "Retry interval in milliseconds when a stream transformation fails to commit because of conflicting transactions"); // NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) DEFINE_string(kafka_bootstrap_servers, "", "List of default Kafka brokers as a comma separated list of broker host or host:port."); @@ -1122,12 +1131,15 @@ int main(int argc, char **argv) { } storage::Storage db(db_config); - query::InterpreterContext interpreter_context{&db, - {.query = {.allow_load_csv = FLAGS_allow_load_csv}, - .execution_timeout_sec = FLAGS_query_execution_timeout_sec, - .default_kafka_bootstrap_servers = FLAGS_kafka_bootstrap_servers, - .default_pulsar_service_url = FLAGS_pulsar_service_url}, - FLAGS_data_directory}; + query::InterpreterContext interpreter_context{ + &db, + {.query = {.allow_load_csv = FLAGS_allow_load_csv}, + .execution_timeout_sec = FLAGS_query_execution_timeout_sec, + .default_kafka_bootstrap_servers = FLAGS_kafka_bootstrap_servers, + .default_pulsar_service_url = FLAGS_pulsar_service_url, + .stream_transaction_conflict_retries = FLAGS_stream_transaction_conflict_retries, + .stream_transaction_retry_interval = std::chrono::milliseconds(FLAGS_stream_transaction_retry_interval)}, + FLAGS_data_directory}; #ifdef MG_ENTERPRISE SessionData session_data{&db, &interpreter_context, &auth, &audit_log}; #else diff --git a/src/query/common.hpp b/src/query/common.hpp index 9e3dffce2..c855d3885 100644 --- a/src/query/common.hpp +++ b/src/query/common.hpp @@ -33,10 +33,6 @@ namespace impl { bool TypedValueCompare(const TypedValue &a, const TypedValue &b); } // namespace impl -constexpr inline std::string_view kSerializationErrorMessage{ - "Cannot resolve conflicting transactions. You can retry this transaction when the conflicting transaction is " - "finished."}; - /// Custom Comparator type for comparing vectors of TypedValues. /// /// Does lexicographical ordering of elements based on the above @@ -95,7 +91,7 @@ storage::PropertyValue PropsSetChecked(T *record, const storage::PropertyId &key if (maybe_old_value.HasError()) { switch (maybe_old_value.GetError()) { case storage::Error::SERIALIZATION_ERROR: - throw QueryRuntimeException(kSerializationErrorMessage); + throw TransactionSerializationException(); case storage::Error::DELETED_OBJECT: throw QueryRuntimeException("Trying to set properties on a deleted object."); case storage::Error::PROPERTIES_DISABLED: diff --git a/src/query/config.hpp b/src/query/config.hpp index 4d18cf19c..aaee9326d 100644 --- a/src/query/config.hpp +++ b/src/query/config.hpp @@ -10,6 +10,7 @@ // licenses/APL.txt. #pragma once +#include #include namespace query { @@ -23,5 +24,7 @@ struct InterpreterConfig { std::string default_kafka_bootstrap_servers; std::string default_pulsar_service_url; + uint32_t stream_transaction_conflict_retries; + std::chrono::milliseconds stream_transaction_retry_interval; }; } // namespace query diff --git a/src/query/exceptions.hpp b/src/query/exceptions.hpp index 9b530aa3e..f0881de94 100644 --- a/src/query/exceptions.hpp +++ b/src/query/exceptions.hpp @@ -130,6 +130,18 @@ class ExplicitTransactionUsageException : public QueryRuntimeException { using QueryRuntimeException::QueryRuntimeException; }; +/** + * An exception for serialization error + */ +class TransactionSerializationException : public QueryException { + public: + using QueryException::QueryException; + TransactionSerializationException() + : QueryException( + "Cannot resolve conflicting transactions. You can retry this transaction when the conflicting transaction " + "is finished") {} +}; + class ReconstructionException : public QueryException { public: ReconstructionException() diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index b8d2a1fff..b8652d3a3 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -181,7 +181,7 @@ VertexAccessor &CreateLocalVertex(const NodeCreationInfo &node_info, Frame *fram if (maybe_error.HasError()) { switch (maybe_error.GetError()) { case storage::Error::SERIALIZATION_ERROR: - throw QueryRuntimeException(kSerializationErrorMessage); + throw TransactionSerializationException(); case storage::Error::DELETED_OBJECT: throw QueryRuntimeException("Trying to set a label on a deleted node."); case storage::Error::VERTEX_HAS_EDGES: @@ -298,7 +298,7 @@ EdgeAccessor CreateEdge(const EdgeCreationInfo &edge_info, DbAccessor *dba, Vert } else { switch (maybe_edge.GetError()) { case storage::Error::SERIALIZATION_ERROR: - throw QueryRuntimeException(kSerializationErrorMessage); + throw TransactionSerializationException(); case storage::Error::DELETED_OBJECT: throw QueryRuntimeException("Trying to create an edge on a deleted node."); case storage::Error::VERTEX_HAS_EDGES: @@ -1921,7 +1921,7 @@ bool Delete::DeleteCursor::Pull(Frame &frame, ExecutionContext &context) { if (maybe_value.HasError()) { switch (maybe_value.GetError()) { case storage::Error::SERIALIZATION_ERROR: - throw QueryRuntimeException(kSerializationErrorMessage); + throw TransactionSerializationException(); case storage::Error::DELETED_OBJECT: case storage::Error::VERTEX_HAS_EDGES: case storage::Error::PROPERTIES_DISABLED: @@ -1947,7 +1947,7 @@ bool Delete::DeleteCursor::Pull(Frame &frame, ExecutionContext &context) { if (res.HasError()) { switch (res.GetError()) { case storage::Error::SERIALIZATION_ERROR: - throw QueryRuntimeException(kSerializationErrorMessage); + throw TransactionSerializationException(); case storage::Error::DELETED_OBJECT: case storage::Error::VERTEX_HAS_EDGES: case storage::Error::PROPERTIES_DISABLED: @@ -1978,7 +1978,7 @@ bool Delete::DeleteCursor::Pull(Frame &frame, ExecutionContext &context) { if (res.HasError()) { switch (res.GetError()) { case storage::Error::SERIALIZATION_ERROR: - throw QueryRuntimeException(kSerializationErrorMessage); + throw TransactionSerializationException(); case storage::Error::VERTEX_HAS_EDGES: throw RemoveAttachedVertexException(); case storage::Error::DELETED_OBJECT: @@ -2128,7 +2128,7 @@ void SetPropertiesOnRecord(TRecordAccessor *record, const TypedValue &rhs, SetPr case storage::Error::DELETED_OBJECT: throw QueryRuntimeException("Trying to set properties on a deleted graph element."); case storage::Error::SERIALIZATION_ERROR: - throw QueryRuntimeException(kSerializationErrorMessage); + throw TransactionSerializationException(); case storage::Error::PROPERTIES_DISABLED: throw QueryRuntimeException("Can't set property because properties on edges are disabled."); case storage::Error::VERTEX_HAS_EDGES: @@ -2184,7 +2184,7 @@ void SetPropertiesOnRecord(TRecordAccessor *record, const TypedValue &rhs, SetPr case storage::Error::DELETED_OBJECT: throw QueryRuntimeException("Trying to set properties on a deleted graph element."); case storage::Error::SERIALIZATION_ERROR: - throw QueryRuntimeException(kSerializationErrorMessage); + throw TransactionSerializationException(); case storage::Error::PROPERTIES_DISABLED: throw QueryRuntimeException("Can't set property because properties on edges are disabled."); case storage::Error::VERTEX_HAS_EDGES: @@ -2299,7 +2299,7 @@ bool SetLabels::SetLabelsCursor::Pull(Frame &frame, ExecutionContext &context) { if (maybe_value.HasError()) { switch (maybe_value.GetError()) { case storage::Error::SERIALIZATION_ERROR: - throw QueryRuntimeException(kSerializationErrorMessage); + throw TransactionSerializationException(); case storage::Error::DELETED_OBJECT: throw QueryRuntimeException("Trying to set a label on a deleted node."); case storage::Error::VERTEX_HAS_EDGES: @@ -2357,7 +2357,7 @@ bool RemoveProperty::RemovePropertyCursor::Pull(Frame &frame, ExecutionContext & case storage::Error::DELETED_OBJECT: throw QueryRuntimeException("Trying to remove a property on a deleted graph element."); case storage::Error::SERIALIZATION_ERROR: - throw QueryRuntimeException(kSerializationErrorMessage); + throw TransactionSerializationException(); case storage::Error::PROPERTIES_DISABLED: throw QueryRuntimeException( "Can't remove property because properties on edges are " @@ -2428,7 +2428,7 @@ bool RemoveLabels::RemoveLabelsCursor::Pull(Frame &frame, ExecutionContext &cont if (maybe_value.HasError()) { switch (maybe_value.GetError()) { case storage::Error::SERIALIZATION_ERROR: - throw QueryRuntimeException(kSerializationErrorMessage); + throw TransactionSerializationException(); case storage::Error::DELETED_OBJECT: throw QueryRuntimeException("Trying to remove labels from a deleted node."); case storage::Error::VERTEX_HAS_EDGES: diff --git a/src/query/stream/streams.cpp b/src/query/stream/streams.cpp index 220a50152..a4b769b0d 100644 --- a/src/query/stream/streams.cpp +++ b/src/query/stream/streams.cpp @@ -232,26 +232,30 @@ Streams::StreamsMap::iterator Streams::CreateConsumer(StreamsMap &map, const std auto *memory_resource = utils::NewDeleteResource(); - auto consumer_function = - [interpreter_context = interpreter_context_, memory_resource, stream_name, - transformation_name = stream_info.common_info.transformation_name, owner = owner, - interpreter = std::make_shared(interpreter_context_), - result = mgp_result{nullptr, memory_resource}](const std::vector &messages) mutable { - auto accessor = interpreter_context->db->Access(); - EventCounter::IncrementCounter(EventCounter::MessagesConsumed, messages.size()); - CallCustomTransformation(transformation_name, messages, result, accessor, *memory_resource, stream_name); + auto consumer_function = [interpreter_context = interpreter_context_, memory_resource, stream_name, + transformation_name = stream_info.common_info.transformation_name, owner = owner, + interpreter = std::make_shared(interpreter_context_), + result = mgp_result{nullptr, memory_resource}, + total_retries = interpreter_context_->config.stream_transaction_conflict_retries, + retry_interval = interpreter_context_->config.stream_transaction_retry_interval]( + const std::vector &messages) mutable { + auto accessor = interpreter_context->db->Access(); + EventCounter::IncrementCounter(EventCounter::MessagesConsumed, messages.size()); + CallCustomTransformation(transformation_name, messages, result, accessor, *memory_resource, stream_name); - DiscardValueResultStream stream; + DiscardValueResultStream stream; - spdlog::trace("Start transaction in stream '{}'", stream_name); - utils::OnScopeExit cleanup{[&interpreter, &result]() { - result.rows.clear(); - interpreter->Abort(); - }}; + spdlog::trace("Start transaction in stream '{}'", stream_name); + utils::OnScopeExit cleanup{[&interpreter, &result]() { + result.rows.clear(); + interpreter->Abort(); + }}; + + const static std::map empty_parameters{}; + uint32_t i = 0; + while (true) { + try { interpreter->BeginTransaction(); - - const static std::map empty_parameters{}; - for (auto &row : result.rows) { spdlog::trace("Processing row in stream '{}'", stream_name); auto [query_value, params_value] = @@ -264,7 +268,7 @@ Streams::StreamsMap::iterator Streams::CreateConsumer(StreamsMap &map, const std interpreter->Prepare(query, params_prop.IsNull() ? empty_parameters : params_prop.ValueMap(), nullptr); if (!interpreter_context->auth_checker->IsUserAuthorized(owner, prepare_result.privileges)) { throw StreamsException{ - "Couldn't execute query '{}' for stream '{}' becuase the owner is not authorized to execute the " + "Couldn't execute query '{}' for stream '{}' because the owner is not authorized to execute the " "query!", query, stream_name}; } @@ -274,8 +278,16 @@ Streams::StreamsMap::iterator Streams::CreateConsumer(StreamsMap &map, const std spdlog::trace("Commit transaction in stream '{}'", stream_name); interpreter->CommitTransaction(); result.rows.clear(); - }; - + break; + } catch (const query::TransactionSerializationException &e) { + if (i == total_retries) { + throw; + } + ++i; + std::this_thread::sleep_for(retry_interval); + } + } + }; auto insert_result = map.try_emplace( stream_name, StreamData{std::move(stream_info.common_info.transformation_name), std::move(owner), std::make_unique>(