Fix use after move errors in case of transaction retry

This commit is contained in:
János Benjamin Antal 2022-01-31 16:10:55 +01:00
parent 6c2763b492
commit 59cc1d5f2b

View File

@ -54,7 +54,7 @@ auto GetStream(auto &map, const std::string &stream_name) {
}
std::pair<TypedValue /*query*/, TypedValue /*parameters*/> ExtractTransformationResult(
utils::pmr::map<utils::pmr::string, TypedValue> &&values, const std::string_view transformation_name,
const utils::pmr::map<utils::pmr::string, TypedValue> &values, const std::string_view transformation_name,
const std::string_view stream_name) {
if (values.size() != kExpectedTransformationResultSize) {
throw StreamsException(
@ -62,7 +62,7 @@ std::pair<TypedValue /*query*/, TypedValue /*parameters*/> ExtractTransformation
transformation_name, stream_name);
}
auto get_value = [&](const utils::pmr::string &field_name) mutable -> TypedValue & {
auto get_value = [&](const utils::pmr::string &field_name) mutable -> const TypedValue & {
auto it = values.find(field_name);
if (it == values.end()) {
throw StreamsException{"Transformation '{}' in stream '{}' did not yield a record with '{}' field.",
@ -71,11 +71,11 @@ std::pair<TypedValue /*query*/, TypedValue /*parameters*/> ExtractTransformation
return it->second;
};
auto &query_value = get_value(query_param_name);
const auto &query_value = get_value(query_param_name);
MG_ASSERT(query_value.IsString());
auto &params_value = get_value(params_param_name);
const auto &params_value = get_value(params_param_name);
MG_ASSERT(params_value.IsNull() || params_value.IsMap());
return {std::move(query_value), std::move(params_value)};
return {query_value, params_value};
}
template <typename TMessage>
@ -462,8 +462,7 @@ Streams::StreamsMap::iterator Streams::CreateConsumer(StreamsMap &map, const std
interpreter->BeginTransaction();
for (auto &row : result.rows) {
spdlog::trace("Processing row in stream '{}'", stream_name);
auto [query_value, params_value] =
ExtractTransformationResult(std::move(row.values), transformation_name, stream_name);
auto [query_value, params_value] = ExtractTransformationResult(row.values, transformation_name, stream_name);
storage::PropertyValue params_prop{params_value};
std::string query{query_value.ValueString()};
@ -681,8 +680,7 @@ TransformationResult Streams::Check(const std::string &stream_name, std::optiona
CallCustomTransformation(transformation_name, messages, result, accessor, *memory_resource, stream_name);
for (auto &row : result.rows) {
auto [query, parameters] =
ExtractTransformationResult(std::move(row.values), transformation_name, stream_name);
auto [query, parameters] = ExtractTransformationResult(row.values, transformation_name, stream_name);
std::vector<TypedValue> result_row;
result_row.reserve(kExpectedTransformationResultSize);
result_row.push_back(std::move(query));