Fix nested transaction error (#339)

This commit is contained in:
János Benjamin Antal 2022-02-03 09:41:23 +01:00 committed by GitHub
parent 7348ad6800
commit 661e5185d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -56,7 +56,7 @@ auto GetStream(auto &map, const std::string &stream_name) {
}
std::pair<TypedValue /*query*/, TypedValue /*parameters*/> ExtractTransformationResult(
utils::pmr::map<utils::pmr::string, TypedValue> &&values, const std::string_view transformation_name,
const utils::pmr::map<utils::pmr::string, TypedValue> &values, const std::string_view transformation_name,
const std::string_view stream_name) {
if (values.size() != kExpectedTransformationResultSize) {
throw StreamsException(
@ -64,7 +64,7 @@ std::pair<TypedValue /*query*/, TypedValue /*parameters*/> ExtractTransformation
transformation_name, stream_name);
}
auto get_value = [&](const utils::pmr::string &field_name) mutable -> TypedValue & {
auto get_value = [&](const utils::pmr::string &field_name) mutable -> const TypedValue & {
auto it = values.find(field_name);
if (it == values.end()) {
throw StreamsException{"Transformation '{}' in stream '{}' did not yield a record with '{}' field.",
@ -73,11 +73,11 @@ std::pair<TypedValue /*query*/, TypedValue /*parameters*/> ExtractTransformation
return it->second;
};
auto &query_value = get_value(query_param_name);
const auto &query_value = get_value(query_param_name);
MG_ASSERT(query_value.IsString());
auto &params_value = get_value(params_param_name);
const auto &params_value = get_value(params_param_name);
MG_ASSERT(params_value.IsNull() || params_value.IsMap());
return {std::move(query_value), std::move(params_value)};
return {query_value, params_value};
}
template <typename TMessage>
@ -530,8 +530,7 @@ Streams::StreamsMap::iterator Streams::CreateConsumer(StreamsMap &map, const std
interpreter->BeginTransaction();
for (auto &row : result.rows) {
spdlog::trace("Processing row in stream '{}'", stream_name);
auto [query_value, params_value] =
ExtractTransformationResult(std::move(row.values), transformation_name, stream_name);
auto [query_value, params_value] = ExtractTransformationResult(row.values, transformation_name, stream_name);
storage::PropertyValue params_prop{params_value};
std::string query{query_value.ValueString()};
@ -552,6 +551,7 @@ Streams::StreamsMap::iterator Streams::CreateConsumer(StreamsMap &map, const std
result.rows.clear();
break;
} catch (const query::TransactionSerializationException &e) {
interpreter->Abort();
if (i == total_retries) {
throw;
}
@ -748,8 +748,7 @@ TransformationResult Streams::Check(const std::string &stream_name, std::optiona
CallCustomTransformation(transformation_name, messages, result, accessor, *memory_resource, stream_name);
for (auto &row : result.rows) {
auto [query, parameters] =
ExtractTransformationResult(std::move(row.values), transformation_name, stream_name);
auto [query, parameters] = ExtractTransformationResult(row.values, transformation_name, stream_name);
std::vector<TypedValue> result_row;
result_row.reserve(kExpectedTransformationResultSize);
result_row.push_back(std::move(query));