diff --git a/src/integrations/kafka/consumer.cpp b/src/integrations/kafka/consumer.cpp index be77882ac..0ec8f0e94 100644 --- a/src/integrations/kafka/consumer.cpp +++ b/src/integrations/kafka/consumer.cpp @@ -118,10 +118,10 @@ Consumer::Consumer(const std::string &bootstrap_servers, ConsumerInfo info, Cons : info_{std::move(info)}, consumer_function_(std::move(consumer_function)), cb_(info_.consumer_name) { MG_ASSERT(consumer_function_, "Empty consumer function for Kafka consumer"); // NOLINTNEXTLINE (modernize-use-nullptr) - if (info.batch_interval.value_or(kMinimumInterval) < kMinimumInterval) { + if (info_.batch_interval.value_or(kMinimumInterval) < kMinimumInterval) { throw ConsumerFailedToInitializeException(info_.consumer_name, "Batch interval has to be positive!"); } - if (info.batch_size.value_or(kMinimumSize) < kMinimumSize) { + if (info_.batch_size.value_or(kMinimumSize) < kMinimumSize) { throw ConsumerFailedToInitializeException(info_.consumer_name, "Batch size has to be positive!"); } diff --git a/src/query/CMakeLists.txt b/src/query/CMakeLists.txt index 03700d477..3e2e2eade 100644 --- a/src/query/CMakeLists.txt +++ b/src/query/CMakeLists.txt @@ -33,7 +33,9 @@ set(mg_query_sources procedure/module.cpp procedure/py_module.cpp serialization/property_value.cpp - streams.cpp + stream/streams.cpp + stream/sources.cpp + stream/common.cpp trigger.cpp trigger_context.cpp typed_value.cpp) diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index d5578cdef..1d493ce4b 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -40,7 +40,6 @@ #include "query/plan/planner.hpp" #include "query/plan/profile.hpp" #include "query/plan/vertex_count_cache.hpp" -#include "query/streams.hpp" #include "query/trigger.hpp" #include "query/typed_value.hpp" #include "storage/v2/property_value.hpp" @@ -562,14 +561,15 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters ¶mete transformation_name = stream_query->transform_name_, bootstrap_servers = std::move(bootstrap), owner = StringPointerToOptional(username)]() mutable { std::string bootstrap = bootstrap_servers ? std::move(*bootstrap_servers) : ""; - interpreter_context->streams.Create(stream_name, - query::StreamInfo{.topics = std::move(topic_names), - .consumer_group = std::move(consumer_group), - .batch_interval = batch_interval, - .batch_size = batch_size, - .transformation_name = std::move(transformation_name), - .owner = std::move(owner), - .bootstrap_servers = std::move(bootstrap)}); + interpreter_context->streams.Create( + stream_name, + {.common_info = {.batch_interval = batch_interval, + .batch_size = batch_size, + .transformation_name = std::move(transformation_name)}, + .topics = std::move(topic_names), + .consumer_group = std::move(consumer_group), + .bootstrap_servers = std::move(bootstrap)}, + std::move(owner)); return std::vector>{}; }; notifications->emplace_back(SeverityLevel::INFO, NotificationCode::CREATE_STREAM, @@ -620,28 +620,12 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters ¶mete return callback; } case StreamQuery::Action::SHOW_STREAMS: { - callback.header = {"name", "topics", - "consumer_group", "batch_interval", - "batch_size", "transformation_name", - "owner", "bootstrap_servers", - "is running"}; + callback.header = {"name", "batch_interval", "batch_size", "transformation_name", "owner", "is running"}; callback.fn = [interpreter_context]() { auto streams_status = interpreter_context->streams.GetStreamInfo(); std::vector> results; results.reserve(streams_status.size()); - auto topics_as_typed_topics = [](const auto &topics) { - std::vector typed_topics; - typed_topics.reserve(topics.size()); - for (const auto &elem : topics) { - typed_topics.emplace_back(elem); - } - return typed_topics; - }; - - auto stream_info_as_typed_stream_info_emplace_in = [topics_as_typed_topics, interpreter_context]( - auto &typed_status, const auto &stream_info) { - typed_status.emplace_back(topics_as_typed_topics(stream_info.topics)); - typed_status.emplace_back(stream_info.consumer_group); + auto stream_info_as_typed_stream_info_emplace_in = [](auto &typed_status, const auto &stream_info) { if (stream_info.batch_interval.has_value()) { typed_status.emplace_back(stream_info.batch_interval->count()); } else { @@ -653,16 +637,6 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters ¶mete typed_status.emplace_back(); } typed_status.emplace_back(stream_info.transformation_name); - if (stream_info.owner.has_value()) { - typed_status.emplace_back(*stream_info.owner); - } else { - typed_status.emplace_back(); - } - if (stream_info.bootstrap_servers.empty()) { - typed_status.emplace_back(interpreter_context->streams.BootstrapServers()); - } else { - typed_status.emplace_back(stream_info.bootstrap_servers); - } }; for (const auto &status : streams_status) { @@ -670,6 +644,11 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters ¶mete typed_status.reserve(8); typed_status.emplace_back(status.name); stream_info_as_typed_stream_info_emplace_in(typed_status, status.info); + if (status.owner.has_value()) { + typed_status.emplace_back(*status.owner); + } else { + typed_status.emplace_back(); + } typed_status.emplace_back(status.is_running); results.push_back(std::move(typed_status)); } diff --git a/src/query/interpreter.hpp b/src/query/interpreter.hpp index 251a7aafe..8c3310ccd 100644 --- a/src/query/interpreter.hpp +++ b/src/query/interpreter.hpp @@ -27,7 +27,7 @@ #include "query/plan/operator.hpp" #include "query/plan/read_write_type_checker.hpp" #include "query/stream.hpp" -#include "query/streams.hpp" +#include "query/stream/streams.hpp" #include "query/trigger.hpp" #include "query/typed_value.hpp" #include "storage/v2/isolation_level.hpp" diff --git a/src/query/procedure/mg_procedure_impl.cpp b/src/query/procedure/mg_procedure_impl.cpp index 30b635847..c603a38c0 100644 --- a/src/query/procedure/mg_procedure_impl.cpp +++ b/src/query/procedure/mg_procedure_impl.cpp @@ -2493,31 +2493,122 @@ bool IsValidIdentifierName(const char *name) { } // namespace query::procedure mgp_error mgp_message_payload(mgp_message *message, const char **result) { - return WrapExceptions([message] { return message->msg->Payload().data(); }, result); + return WrapExceptions( + [message] { + return std::visit( + [](T &&msg) -> const char * { + using MessageType = std::decay_t; + if constexpr (std::same_as) { + return msg->Payload().data(); + } else { + throw std::invalid_argument("Invalid source type"); + } + }, + message->msg); + }, + result); } mgp_error mgp_message_payload_size(mgp_message *message, size_t *result) { - return WrapExceptions([message] { return message->msg->Payload().size(); }, result); + return WrapExceptions( + [message] { + return std::visit( + [](T &&msg) -> size_t { + using MessageType = std::decay_t; + if constexpr (std::same_as) { + return msg->Payload().size(); + } else { + throw std::invalid_argument("Invalid source type"); + } + }, + message->msg); + }, + result); } mgp_error mgp_message_topic_name(mgp_message *message, const char **result) { - return WrapExceptions([message] { return message->msg->TopicName().data(); }, result); + return WrapExceptions( + [message] { + return std::visit( + [](T &&msg) -> const char * { + using MessageType = std::decay_t; + if constexpr (std::same_as) { + return msg->TopicName().data(); + } else { + throw std::invalid_argument("Invalid source type"); + } + }, + message->msg); + }, + result); } mgp_error mgp_message_key(mgp_message *message, const char **result) { - return WrapExceptions([message] { return message->msg->Key().data(); }, result); + return WrapExceptions( + [message] { + return std::visit( + [](T &&msg) -> const char * { + using MessageType = std::decay_t; + if constexpr (std::same_as) { + return msg->Key().data(); + } else { + throw std::invalid_argument("Invalid source type"); + } + }, + message->msg); + }, + result); } mgp_error mgp_message_key_size(mgp_message *message, size_t *result) { - return WrapExceptions([message] { return message->msg->Key().size(); }, result); + return WrapExceptions( + [message] { + return std::visit( + [](T &&msg) -> size_t { + using MessageType = std::decay_t; + if constexpr (std::same_as) { + return msg->Key().size(); + } else { + throw std::invalid_argument("Invalid source type"); + } + }, + message->msg); + }, + result); } mgp_error mgp_message_timestamp(mgp_message *message, int64_t *result) { - return WrapExceptions([message] { return message->msg->Timestamp(); }, result); + return WrapExceptions( + [message] { + return std::visit( + [](T &&msg) -> int64_t { + using MessageType = std::decay_t; + if constexpr (std::same_as) { + return msg->Timestamp(); + } else { + throw std::invalid_argument("Invalid source type"); + } + }, + message->msg); + }, + result); } mgp_error mgp_message_offset(struct mgp_message *message, int64_t *result) { - return WrapExceptions([message] { return message->msg->Offset(); }, result); + return WrapExceptions( + [message] { + return std::visit( + [](T &&msg) -> int64_t { + using MessageType = std::decay_t; + if constexpr (std::same_as) { + return msg->Offset(); + } else { + throw std::invalid_argument("Invalid source type"); + } + }, + message->msg); + }, + result); } mgp_error mgp_messages_size(mgp_messages *messages, size_t *result) { diff --git a/src/query/procedure/mg_procedure_impl.hpp b/src/query/procedure/mg_procedure_impl.hpp index 54d8f0640..c32578c7f 100644 --- a/src/query/procedure/mg_procedure_impl.hpp +++ b/src/query/procedure/mg_procedure_impl.hpp @@ -801,7 +801,10 @@ bool IsValidIdentifierName(const char *name); } // namespace query::procedure struct mgp_message { - const integrations::kafka::Message *msg; + explicit mgp_message(const integrations::kafka::Message &message) : msg{&message} {} + + using KafkaMessage = const integrations::kafka::Message *; + std::variant msg; }; struct mgp_messages { diff --git a/src/query/stream/common.cpp b/src/query/stream/common.cpp new file mode 100644 index 000000000..03bf8e0e0 --- /dev/null +++ b/src/query/stream/common.cpp @@ -0,0 +1,55 @@ +// Copyright 2021 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include "query/stream/common.hpp" + +#include + +namespace query { +namespace { +const std::string kBatchIntervalKey{"batch_interval"}; +const std::string kBatchSizeKey{"batch_size"}; +const std::string kTransformationName{"transformation_name"}; +} // namespace + +void to_json(nlohmann::json &data, CommonStreamInfo &&common_info) { + if (common_info.batch_interval) { + data[kBatchIntervalKey] = common_info.batch_interval->count(); + } else { + data[kBatchIntervalKey] = nullptr; + } + + if (common_info.batch_size) { + data[kBatchSizeKey] = *common_info.batch_size; + } else { + data[kBatchSizeKey] = nullptr; + } + + data[kTransformationName] = common_info.transformation_name; +} + +void from_json(const nlohmann::json &data, CommonStreamInfo &common_info) { + if (const auto batch_interval = data.at(kBatchIntervalKey); !batch_interval.is_null()) { + using BatchInterval = typename decltype(common_info.batch_interval)::value_type; + common_info.batch_interval = BatchInterval{batch_interval.get()}; + } else { + common_info.batch_interval = {}; + } + + if (const auto batch_size = data.at(kBatchSizeKey); !batch_size.is_null()) { + common_info.batch_size = batch_size.get(); + } else { + common_info.batch_size = {}; + } + + data.at(kTransformationName).get_to(common_info.transformation_name); +} +} // namespace query diff --git a/src/query/stream/common.hpp b/src/query/stream/common.hpp new file mode 100644 index 000000000..339f4f1f3 --- /dev/null +++ b/src/query/stream/common.hpp @@ -0,0 +1,73 @@ +// Copyright 2021 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include +#include +#include +#include +#include + +#include + +#include "query/procedure/mg_procedure_impl.hpp" + +namespace query { + +template +using ConsumerFunction = std::function &)>; + +struct CommonStreamInfo { + std::optional batch_interval; + std::optional batch_size; + std::string transformation_name; +}; + +template +concept ConvertableToJson = requires(T value, nlohmann::json data) { + { to_json(data, std::move(value)) } -> std::same_as; + { from_json(data, value) } -> std::same_as; +}; + +template +concept ConvertableToMgpMessage = requires(T value) { + mgp_message{value}; +}; + +template +concept Stream = requires(TStream stream) { + typename TStream::StreamInfo; + typename TStream::Message; + TStream{std::string{""}, typename TStream::StreamInfo{}, ConsumerFunction{}}; + { stream.Start() } -> std::same_as; + { stream.Stop() } -> std::same_as; + { stream.IsRunning() } -> std::same_as; + { + stream.Check(std::optional{}, std::optional{}, + ConsumerFunction{}) + } -> std::same_as; + { typename TStream::StreamInfo{}.common_info } -> std::same_as; + + requires ConvertableToMgpMessage; + requires ConvertableToJson; +}; + +enum class StreamSourceType : uint8_t { KAFKA }; + +template +StreamSourceType StreamType(const T & /*stream*/); + +const std::string kCommonInfoKey = "common_info"; + +void to_json(nlohmann::json &data, CommonStreamInfo &&info); +void from_json(const nlohmann::json &data, CommonStreamInfo &common_info); +} // namespace query diff --git a/src/query/stream/sources.cpp b/src/query/stream/sources.cpp new file mode 100644 index 000000000..50ca02996 --- /dev/null +++ b/src/query/stream/sources.cpp @@ -0,0 +1,70 @@ +// Copyright 2021 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include "query/stream/sources.hpp" + +#include + +namespace query { +KafkaStream::KafkaStream(std::string stream_name, StreamInfo stream_info, + ConsumerFunction consumer_function) { + integrations::kafka::ConsumerInfo consumer_info{ + .consumer_name = std::move(stream_name), + .topics = std::move(stream_info.topics), + .consumer_group = std::move(stream_info.consumer_group), + .batch_interval = stream_info.common_info.batch_interval, + .batch_size = stream_info.common_info.batch_size, + }; + consumer_.emplace(std::move(stream_info.bootstrap_servers), std::move(consumer_info), std::move(consumer_function)); +}; + +KafkaStream::StreamInfo KafkaStream::Info(std::string transformation_name) const { + const auto &info = consumer_->Info(); + return {{.batch_interval = info.batch_interval, + .batch_size = info.batch_size, + .transformation_name = std::move(transformation_name)}, + .topics = info.topics, + .consumer_group = info.consumer_group}; +} + +void KafkaStream::Start() { consumer_->Start(); } +void KafkaStream::Stop() { consumer_->Stop(); } +bool KafkaStream::IsRunning() const { return consumer_->IsRunning(); } + +void KafkaStream::Check(std::optional timeout, std::optional batch_limit, + const ConsumerFunction &consumer_function) const { + consumer_->Check(timeout, batch_limit, consumer_function); +} + +utils::BasicResult KafkaStream::SetStreamOffset(const int64_t offset) { + return consumer_->SetConsumerOffsets(offset); +} + +namespace { +const std::string kTopicsKey{"topics"}; +const std::string kConsumerGroupKey{"consumer_group"}; +const std::string kBoostrapServers{"bootstrap_servers"}; +} // namespace + +void to_json(nlohmann::json &data, KafkaStream::StreamInfo &&info) { + data[kCommonInfoKey] = std::move(info.common_info); + data[kTopicsKey] = std::move(info.topics); + data[kConsumerGroupKey] = info.consumer_group; + data[kBoostrapServers] = std::move(info.bootstrap_servers); +} + +void from_json(const nlohmann::json &data, KafkaStream::StreamInfo &info) { + data.at(kCommonInfoKey).get_to(info.common_info); + data.at(kTopicsKey).get_to(info.topics); + data.at(kConsumerGroupKey).get_to(info.consumer_group); + data.at(kBoostrapServers).get_to(info.bootstrap_servers); +} +} // namespace query diff --git a/src/query/stream/sources.hpp b/src/query/stream/sources.hpp new file mode 100644 index 000000000..1f7954d49 --- /dev/null +++ b/src/query/stream/sources.hpp @@ -0,0 +1,57 @@ +// Copyright 2021 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include "query/stream/common.hpp" + +#include "integrations/kafka/consumer.hpp" + +namespace query { + +struct KafkaStream { + struct StreamInfo { + CommonStreamInfo common_info; + std::vector topics; + std::string consumer_group; + std::string bootstrap_servers; + }; + + using Message = integrations::kafka::Message; + + KafkaStream(std::string stream_name, StreamInfo stream_info, + ConsumerFunction consumer_function); + + StreamInfo Info(std::string transformation_name) const; + + void Start(); + void Stop(); + bool IsRunning() const; + + void Check(std::optional timeout, std::optional batch_limit, + const ConsumerFunction &consumer_function) const; + + utils::BasicResult SetStreamOffset(int64_t offset); + + private: + using Consumer = integrations::kafka::Consumer; + std::optional consumer_; +}; + +void to_json(nlohmann::json &data, KafkaStream::StreamInfo &&info); +void from_json(const nlohmann::json &data, KafkaStream::StreamInfo &info); + +template <> +inline StreamSourceType StreamType(const KafkaStream & /*stream*/) { + return StreamSourceType::KAFKA; +} + +} // namespace query diff --git a/src/query/stream/streams.cpp b/src/query/stream/streams.cpp new file mode 100644 index 000000000..dc72773c1 --- /dev/null +++ b/src/query/stream/streams.cpp @@ -0,0 +1,479 @@ +// Copyright 2021 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include "query/stream/streams.hpp" + +#include +#include +#include + +#include +#include + +#include "query/db_accessor.hpp" +#include "query/discard_value_stream.hpp" +#include "query/exceptions.hpp" +#include "query/interpreter.hpp" +#include "query/procedure/mg_procedure_helpers.hpp" +#include "query/procedure/mg_procedure_impl.hpp" +#include "query/procedure/module.hpp" +#include "query/stream/sources.hpp" +#include "query/typed_value.hpp" +#include "utils/event_counter.hpp" +#include "utils/memory.hpp" +#include "utils/on_scope_exit.hpp" +#include "utils/pmr/string.hpp" + +namespace EventCounter { +extern const Event MessagesConsumed; +} // namespace EventCounter + +namespace query { +namespace { +constexpr auto kExpectedTransformationResultSize = 2; +const utils::pmr::string query_param_name{"query", utils::NewDeleteResource()}; +const utils::pmr::string params_param_name{"parameters", utils::NewDeleteResource()}; + +const std::map empty_parameters{}; + +auto GetStream(auto &map, const std::string &stream_name) { + if (auto it = map.find(stream_name); it != map.end()) { + return it; + } + throw StreamsException("Couldn't find stream '{}'", stream_name); +} + +std::pair ExtractTransformationResult( + utils::pmr::map &&values, const std::string_view transformation_name, + const std::string_view stream_name) { + if (values.size() != kExpectedTransformationResultSize) { + throw StreamsException( + "Transformation '{}' in stream '{}' did not yield all fields (query, parameters) as required.", + transformation_name, stream_name); + } + + auto get_value = [&](const utils::pmr::string &field_name) mutable -> TypedValue & { + auto it = values.find(field_name); + if (it == values.end()) { + throw StreamsException{"Transformation '{}' in stream '{}' did not yield a record with '{}' field.", + transformation_name, stream_name, field_name}; + }; + return it->second; + }; + + auto &query_value = get_value(query_param_name); + MG_ASSERT(query_value.IsString()); + auto ¶ms_value = get_value(params_param_name); + MG_ASSERT(params_value.IsNull() || params_value.IsMap()); + return {std::move(query_value), std::move(params_value)}; +} + +template +void CallCustomTransformation(const std::string &transformation_name, const std::vector &messages, + mgp_result &result, storage::Storage::Accessor &storage_accessor, + utils::MemoryResource &memory_resource, const std::string &stream_name) { + DbAccessor db_accessor{&storage_accessor}; + { + auto maybe_transformation = + procedure::FindTransformation(procedure::gModuleRegistry, transformation_name, utils::NewDeleteResource()); + + if (!maybe_transformation) { + throw StreamsException("Couldn't find transformation {} for stream '{}'", transformation_name, stream_name); + }; + const auto &trans = *maybe_transformation->second; + mgp_messages mgp_messages{mgp_messages::storage_type{&memory_resource}}; + std::transform(messages.begin(), messages.end(), std::back_inserter(mgp_messages.messages), + [](const TMessage &message) { return mgp_message{message}; }); + mgp_graph graph{&db_accessor, storage::View::OLD, nullptr}; + mgp_memory memory{&memory_resource}; + result.rows.clear(); + result.error_msg.reset(); + result.signature = &trans.results; + + MG_ASSERT(result.signature->size() == kExpectedTransformationResultSize); + MG_ASSERT(result.signature->contains(query_param_name)); + MG_ASSERT(result.signature->contains(params_param_name)); + + spdlog::trace("Calling transformation in stream '{}'", stream_name); + trans.cb(&mgp_messages, &graph, &result, &memory); + } + if (result.error_msg.has_value()) { + throw StreamsException(result.error_msg->c_str()); + } +} + +template +StreamStatus CreateStatus(std::string stream_name, std::string transformation_name, + std::optional owner, const TStream &stream) { + return {.name = std::move(stream_name), + .type = StreamType(stream), + .is_running = stream.IsRunning(), + .info = stream.Info(std::move(transformation_name)), + .owner = std::move(owner)}; +} + +// nlohmann::json doesn't support string_view access yet +const std::string kStreamName{"name"}; +const std::string kIsRunningKey{"is_running"}; +const std::string kOwner{"owner"}; +const std::string kType{"type"}; +} // namespace + +template +void to_json(nlohmann::json &data, StreamStatus &&status) { + data[kStreamName] = std::move(status.name); + data[kType] = status.type; + data[kIsRunningKey] = status.is_running; + + if (status.owner.has_value()) { + data[kOwner] = std::move(*status.owner); + } else { + data[kOwner] = nullptr; + } + + to_json(data, std::move(status.info)); +} + +template +void from_json(const nlohmann::json &data, StreamStatus &status) { + data.at(kStreamName).get_to(status.name); + data.at(kIsRunningKey).get_to(status.is_running); + + if (const auto &owner = data.at(kOwner); !owner.is_null()) { + status.owner = owner.get(); + } else { + status.owner = {}; + } + + from_json(data, status.info); +} + +namespace { +template +struct Overloaded : Ts... { + using Ts::operator()...; +}; +template +Overloaded(Ts...) -> Overloaded; +} // namespace +Streams::Streams(InterpreterContext *interpreter_context, std::string bootstrap_servers, + std::filesystem::path directory) + : interpreter_context_(interpreter_context), + bootstrap_servers_(std::move(bootstrap_servers)), + storage_(std::move(directory)) { + constexpr std::string_view proc_name = "kafka_set_stream_offset"; + auto set_stream_offset = [this, proc_name](mgp_list *args, mgp_graph * /*graph*/, mgp_result *result, + mgp_memory * /*memory*/) { + auto *arg_stream_name = procedure::Call(mgp_list_at, args, 0); + const auto *stream_name = procedure::Call(mgp_value_get_string, arg_stream_name); + auto *arg_offset = procedure::Call(mgp_list_at, args, 1); + const auto offset = procedure::Call(mgp_value_get_int, arg_offset); + auto lock_ptr = streams_.Lock(); + auto it = GetStream(*lock_ptr, std::string(stream_name)); + std::visit(Overloaded{[&](StreamData &kafka_stream) { + auto stream_source_ptr = kafka_stream.stream_source->Lock(); + const auto error = stream_source_ptr->SetStreamOffset(offset); + if (error.HasError()) { + MG_ASSERT( + mgp_result_set_error_msg(result, error.GetError().c_str()) == MGP_ERROR_NO_ERROR, + "Unable to set procedure error message of procedure: {}", proc_name); + } + }, + [proc_name](auto && /*other*/) { + throw QueryRuntimeException("'{}' can be only used for Kafka stream sources", proc_name); + }}, + it->second); + }; + + mgp_proc proc(proc_name, set_stream_offset, utils::NewDeleteResource(), false); + MG_ASSERT(mgp_proc_add_arg(&proc, "stream_name", procedure::Call(mgp_type_string)) == MGP_ERROR_NO_ERROR); + MG_ASSERT(mgp_proc_add_arg(&proc, "offset", procedure::Call(mgp_type_int)) == MGP_ERROR_NO_ERROR); + + procedure::gModuleRegistry.RegisterMgProcedure(proc_name, std::move(proc)); +} + +template +void Streams::Create(const std::string &stream_name, typename TStream::StreamInfo info, + std::optional owner) { + auto locked_streams = streams_.Lock(); + auto it = CreateConsumer(*locked_streams, stream_name, std::move(info), std::move(owner)); + + try { + std::visit( + [&](auto &&stream_data) { + const auto stream_source_ptr = stream_data.stream_source->ReadLock(); + Persist(CreateStatus(stream_name, stream_data.transformation_name, stream_data.owner, *stream_source_ptr)); + }, + it->second); + } catch (...) { + locked_streams->erase(it); + throw; + } +} + +template void Streams::Create(const std::string &stream_name, KafkaStream::StreamInfo info, + std::optional owner); + +template +Streams::StreamsMap::iterator Streams::CreateConsumer(StreamsMap &map, const std::string &stream_name, + typename TStream::StreamInfo stream_info, + std::optional owner) { + if (map.contains(stream_name)) { + throw StreamsException{"Stream already exists with name '{}'", stream_name}; + } + + auto *memory_resource = utils::NewDeleteResource(); + + auto consumer_function = + [interpreter_context = interpreter_context_, memory_resource, stream_name, + transformation_name = stream_info.common_info.transformation_name, owner = owner, + interpreter = std::make_shared(interpreter_context_), + result = mgp_result{nullptr, memory_resource}](const std::vector &messages) mutable { + auto accessor = interpreter_context->db->Access(); + EventCounter::IncrementCounter(EventCounter::MessagesConsumed, messages.size()); + CallCustomTransformation(transformation_name, messages, result, accessor, *memory_resource, stream_name); + + DiscardValueResultStream stream; + + spdlog::trace("Start transaction in stream '{}'", stream_name); + utils::OnScopeExit cleanup{[&interpreter, &result]() { + result.rows.clear(); + interpreter->Abort(); + }}; + interpreter->BeginTransaction(); + + const static std::map empty_parameters{}; + + for (auto &row : result.rows) { + spdlog::trace("Processing row in stream '{}'", stream_name); + auto [query_value, params_value] = + ExtractTransformationResult(std::move(row.values), transformation_name, stream_name); + storage::PropertyValue params_prop{params_value}; + + std::string query{query_value.ValueString()}; + spdlog::trace("Executing query '{}' in stream '{}'", query, stream_name); + auto prepare_result = + interpreter->Prepare(query, params_prop.IsNull() ? empty_parameters : params_prop.ValueMap(), nullptr); + if (!interpreter_context->auth_checker->IsUserAuthorized(owner, prepare_result.privileges)) { + throw StreamsException{ + "Couldn't execute query '{}' for stream '{}' becuase the owner is not authorized to execute the " + "query!", + query, stream_name}; + } + interpreter->PullAll(&stream); + } + + spdlog::trace("Commit transaction in stream '{}'", stream_name); + interpreter->CommitTransaction(); + result.rows.clear(); + }; + + if (stream_info.bootstrap_servers.empty()) { + stream_info.bootstrap_servers = bootstrap_servers_; + } + + auto insert_result = map.try_emplace( + stream_name, StreamData{std::move(stream_info.common_info.transformation_name), std::move(owner), + std::make_unique>( + stream_name, std::move(stream_info), std::move(consumer_function))}); + MG_ASSERT(insert_result.second, "Unexpected error during storing consumer '{}'", stream_name); + return insert_result.first; +} + +void Streams::RestoreStreams() { + spdlog::info("Loading streams..."); + auto locked_streams_map = streams_.Lock(); + MG_ASSERT(locked_streams_map->empty(), "Cannot restore streams when some streams already exist!"); + + for (const auto &[stream_name, stream_data] : storage_) { + const auto get_failed_message = [&stream_name = stream_name](const std::string_view message, + const std::string_view nested_message) { + return fmt::format("Failed to load stream '{}', because: {} caused by {}", stream_name, message, nested_message); + }; + + const auto create_consumer = [&, &stream_name = stream_name, this](StreamStatus status, + auto &&stream_json_data) { + try { + stream_json_data.get_to(status); + } catch (const nlohmann::json::type_error &exception) { + spdlog::warn(get_failed_message("invalid type conversion", exception.what())); + return; + } catch (const nlohmann::json::out_of_range &exception) { + spdlog::warn(get_failed_message("non existing field", exception.what())); + return; + } + MG_ASSERT(status.name == stream_name, "Expected stream name is '{}', but got '{}'", status.name, stream_name); + + try { + auto it = CreateConsumer(*locked_streams_map, stream_name, std::move(status.info), {}); + if (status.is_running) { + std::visit( + [&](auto &&stream_data) { + auto stream_source_ptr = stream_data.stream_source->Lock(); + stream_source_ptr->Start(); + }, + it->second); + } + spdlog::info("Stream '{}' is loaded", stream_name); + } catch (const utils::BasicException &exception) { + spdlog::warn(get_failed_message("unexpected error", exception.what())); + } + }; + + auto stream_json_data = nlohmann::json::parse(stream_data); + const auto stream_type = static_cast(stream_json_data.at("type")); + + switch (stream_type) { + case StreamSourceType::KAFKA: + create_consumer(StreamStatus{}, std::move(stream_json_data)); + break; + } + } +} + +void Streams::Drop(const std::string &stream_name) { + auto locked_streams = streams_.Lock(); + + auto it = GetStream(*locked_streams, stream_name); + + // streams_ is write locked, which means there is no access to it outside of this function, thus only the Test + // function can be executing with the consumer, nothing else. + // By acquiring the write lock here for the consumer, we make sure there is + // no running Test function for this consumer, therefore it can be erased. + std::visit([&](auto &&stream_data) { stream_data.stream_source->Lock(); }, it->second); + + locked_streams->erase(it); + if (!storage_.Delete(stream_name)) { + throw StreamsException("Couldn't delete stream '{}' from persistent store!", stream_name); + } + + // TODO(antaljanosbenjamin) Release the transformation +} + +void Streams::Start(const std::string &stream_name) { + auto locked_streams = streams_.Lock(); + auto it = GetStream(*locked_streams, stream_name); + + std::visit( + [&, this](auto &&stream_data) { + auto stream_source_ptr = stream_data.stream_source->Lock(); + stream_source_ptr->Start(); + Persist(CreateStatus(stream_name, stream_data.transformation_name, stream_data.owner, *stream_source_ptr)); + }, + it->second); +} + +void Streams::Stop(const std::string &stream_name) { + auto locked_streams = streams_.Lock(); + auto it = GetStream(*locked_streams, stream_name); + + std::visit( + [&, this](auto &&stream_data) { + auto stream_source_ptr = stream_data.stream_source->Lock(); + stream_source_ptr->Stop(); + + Persist(CreateStatus(stream_name, stream_data.transformation_name, stream_data.owner, *stream_source_ptr)); + }, + it->second); +} + +void Streams::StartAll() { + for (auto locked_streams = streams_.Lock(); auto &[stream_name, stream_data] : *locked_streams) { + std::visit( + [&stream_name = stream_name, this](auto &&stream_data) { + auto locked_stream_source = stream_data.stream_source->Lock(); + if (!locked_stream_source->IsRunning()) { + locked_stream_source->Start(); + Persist( + CreateStatus(stream_name, stream_data.transformation_name, stream_data.owner, *locked_stream_source)); + } + }, + stream_data); + } +} + +void Streams::StopAll() { + for (auto locked_streams = streams_.Lock(); auto &[stream_name, stream_data] : *locked_streams) { + std::visit( + [&stream_name = stream_name, this](auto &&stream_data) { + auto locked_stream_source = stream_data.stream_source->Lock(); + if (locked_stream_source->IsRunning()) { + locked_stream_source->Stop(); + Persist( + CreateStatus(stream_name, stream_data.transformation_name, stream_data.owner, *locked_stream_source)); + } + }, + stream_data); + } +} + +std::vector> Streams::GetStreamInfo() const { + std::vector> result; + { + for (auto locked_streams = streams_.ReadLock(); const auto &[stream_name, stream_data] : *locked_streams) { + std::visit( + [&, &stream_name = stream_name](auto &&stream_data) { + auto locked_stream_source = stream_data.stream_source->ReadLock(); + auto info = locked_stream_source->Info(stream_data.transformation_name); + result.emplace_back(StreamStatus<>{stream_name, StreamType(*locked_stream_source), + locked_stream_source->IsRunning(), std::move(info.common_info), + stream_data.owner}); + }, + stream_data); + } + } + return result; +} + +TransformationResult Streams::Check(const std::string &stream_name, std::optional timeout, + std::optional batch_limit) const { + std::optional locked_streams{streams_.ReadLock()}; + auto it = GetStream(**locked_streams, stream_name); + + return std::visit( + [&](auto &&stream_data) { + // This depends on the fact that Drop will first acquire a write lock to the consumer, and erase it only after + // that + const auto locked_stream_source = stream_data.stream_source->ReadLock(); + const auto transformation_name = stream_data.transformation_name; + locked_streams.reset(); + + auto *memory_resource = utils::NewDeleteResource(); + mgp_result result{nullptr, memory_resource}; + TransformationResult test_result; + + auto consumer_function = [interpreter_context = interpreter_context_, memory_resource, &stream_name, + &transformation_name = transformation_name, &result, + &test_result](const std::vector &messages) mutable { + auto accessor = interpreter_context->db->Access(); + CallCustomTransformation(transformation_name, messages, result, accessor, *memory_resource, stream_name); + + for (auto &row : result.rows) { + auto [query, parameters] = + ExtractTransformationResult(std::move(row.values), transformation_name, stream_name); + std::vector result_row; + result_row.reserve(kExpectedTransformationResultSize); + result_row.push_back(std::move(query)); + result_row.push_back(std::move(parameters)); + + test_result.push_back(std::move(result_row)); + } + }; + + locked_stream_source->Check(timeout, batch_limit, consumer_function); + return test_result; + }, + it->second); +} + +std::string_view Streams::BootstrapServers() const { return bootstrap_servers_; } + +} // namespace query diff --git a/src/query/streams.hpp b/src/query/stream/streams.hpp similarity index 75% rename from src/query/streams.hpp rename to src/query/stream/streams.hpp index 03b2afc3c..f46bfd927 100644 --- a/src/query/streams.hpp +++ b/src/query/stream/streams.hpp @@ -11,14 +11,22 @@ #pragma once +#include #include #include #include +#include #include +#include + #include "integrations/kafka/consumer.hpp" #include "kvstore/kvstore.hpp" +#include "query/stream/common.hpp" +#include "query/stream/sources.hpp" #include "query/typed_value.hpp" +#include "storage/v2/property_value.hpp" +#include "utils/event_counter.hpp" #include "utils/exceptions.hpp" #include "utils/rw_lock.hpp" #include "utils/synchronized.hpp" @@ -30,33 +38,33 @@ class StreamsException : public utils::BasicException { using BasicException::BasicException; }; -using TransformationResult = std::vector>; -using TransformFunction = std::function &)>; +template +struct StreamInfo; -struct StreamInfo { - std::vector topics; - std::string consumer_group; - std::optional batch_interval; - std::optional batch_size; - std::string transformation_name; - std::optional owner; - std::string bootstrap_servers; +template <> +struct StreamInfo { + using Type = CommonStreamInfo; }; +template +struct StreamInfo { + using Type = typename TStream::StreamInfo; +}; + +template +using StreamInfoType = typename StreamInfo::Type; + +template struct StreamStatus { std::string name; - StreamInfo info; + StreamSourceType type; bool is_running; -}; - -using SynchronizedConsumer = utils::Synchronized; - -struct StreamData { - std::string transformation_name; + StreamInfoType info; std::optional owner; - std::unique_ptr consumer; }; +using TransformationResult = std::vector>; + struct InterpreterContext; /// Manages Kafka consumers. @@ -85,7 +93,8 @@ class Streams final { /// @param stream_info the necessary informations needed to create the Kafka consumer and transform the messages /// /// @throws StreamsException if the stream with the same name exists or if the creation of Kafka consumer fails - void Create(const std::string &stream_name, StreamInfo stream_info); + template + void Create(const std::string &stream_name, typename TStream::StreamInfo info, std::optional owner); /// Deletes an existing stream and all the data that was persisted. /// @@ -123,7 +132,7 @@ class Streams final { /// Return current status for all streams. /// It might happend that the is_running field is out of date if the one of the streams stops during the invocation of /// this function because of an error. - std::vector GetStreamInfo() const; + std::vector> GetStreamInfo() const; /// Do a dry-run consume from a stream. /// @@ -143,23 +152,32 @@ class Streams final { /// Return the configuration value passed to memgraph. std::string_view BootstrapServers() const; - /// Sets the stream's consumer offset. - /// - /// @param stream_name we want to set the offset. - /// @param offset to set. - [[nodiscard]] utils::BasicResult SetStreamOffset(std::string_view stream_name, int64_t offset); - private: - using StreamsMap = std::unordered_map; + template + using SynchronizedStreamSource = utils::Synchronized; + + template + struct StreamData { + std::string transformation_name; + std::optional owner; + std::unique_ptr> stream_source; + }; + + using StreamDataVariant = std::variant>; + using StreamsMap = std::unordered_map; using SynchronizedStreamsMap = utils::Synchronized; - static StreamStatus CreateStatus(const std::string &name, const std::string &transformation_name, - const std::optional &owner, - const integrations::kafka::Consumer &consumer); + template + StreamsMap::iterator CreateConsumer(StreamsMap &map, const std::string &stream_name, + typename TStream::StreamInfo stream_info, std::optional owner); - StreamsMap::iterator CreateConsumer(StreamsMap &map, const std::string &stream_name, StreamInfo stream_info); - - void Persist(StreamStatus &&status); + template + void Persist(StreamStatus &&status) { + const std::string stream_name = status.name; + if (!storage_.Put(stream_name, nlohmann::json(std::move(status)).dump())) { + throw StreamsException{"Couldn't persist steam data for stream '{}'", stream_name}; + } + } InterpreterContext *interpreter_context_; std::string bootstrap_servers_; diff --git a/tests/e2e/streams/common.py b/tests/e2e/streams/common.py index 626a39876..cefe62592 100644 --- a/tests/e2e/streams/common.py +++ b/tests/e2e/streams/common.py @@ -15,14 +15,11 @@ import time # These are the indices of the different values in the result of SHOW STREAM # query NAME = 0 -TOPICS = 1 -CONSUMER_GROUP = 2 -BATCH_INTERVAL = 3 -BATCH_SIZE = 4 -TRANSFORM = 5 -OWNER = 6 -BOOTSTRAP_SERVERS = 7 -IS_RUNNING = 8 +BATCH_INTERVAL = 1 +BATCH_SIZE = 2 +TRANSFORM = 3 +OWNER = 4 +IS_RUNNING = 5 def execute_and_fetch_all(cursor, query): @@ -74,17 +71,19 @@ def check_one_result_row(cursor, query): def check_vertex_exists_with_topic_and_payload(cursor, topic, payload_bytes): - assert check_one_result_row(cursor, - "MATCH (n: MESSAGE {" - f"payload: '{payload_bytes.decode('utf-8')}'," - f"topic: '{topic}'" - "}) RETURN n") + assert check_one_result_row( + cursor, + "MATCH (n: MESSAGE {" + f"payload: '{payload_bytes.decode('utf-8')}'," + f"topic: '{topic}'" + "}) RETURN n", + ) def get_stream_info(cursor, stream_name): stream_infos = execute_and_fetch_all(cursor, "SHOW STREAMS") for stream_info in stream_infos: - if (stream_info[NAME] == stream_name): + if stream_info[NAME] == stream_name: return stream_info return None diff --git a/tests/e2e/streams/streams_owner_tests.py b/tests/e2e/streams/streams_owner_tests.py index 6087033f7..d1fb62f79 100644 --- a/tests/e2e/streams/streams_owner_tests.py +++ b/tests/e2e/streams/streams_owner_tests.py @@ -77,9 +77,8 @@ def test_owner_is_shown(topics, connection): f"TOPICS {topics[0]} " f"TRANSFORM transform.simple") - common.check_stream_info(userless_cursor, "test", ("test", [ - topics[0]], "mg_consumer", None, None, - "transform.simple", stream_user, "localhost:9092", False)) + common.check_stream_info(userless_cursor, "test", ("test", None, None, + "transform.simple", stream_user, False)) def test_insufficient_privileges(producer, topics, connection): diff --git a/tests/e2e/streams/streams_tests.py b/tests/e2e/streams/streams_tests.py index 67494dc3e..9e2ac241b 100755 --- a/tests/e2e/streams/streams_tests.py +++ b/tests/e2e/streams/streams_tests.py @@ -226,17 +226,7 @@ def test_show_streams(producer, topics, connection): common.check_stream_info( cursor, "default_values", - ( - "default_values", - [topics[0]], - "mg_consumer", - None, - None, - "transform.simple", - None, - "localhost:9092", - False, - ), + ("default_values", None, None, "transform.simple", None, False), ) common.check_stream_info( @@ -244,13 +234,10 @@ def test_show_streams(producer, topics, connection): "complex_values", ( "complex_values", - topics, - consumer_group, batch_interval, batch_size, "transform.with_parameters", None, - "localhost:9092", False, ), ) diff --git a/tests/unit/mgp_kafka_c_api.cpp b/tests/unit/mgp_kafka_c_api.cpp index 8085f8f5a..074efdc9c 100644 --- a/tests/unit/mgp_kafka_c_api.cpp +++ b/tests/unit/mgp_kafka_c_api.cpp @@ -138,7 +138,7 @@ class MgpApiTest : public ::testing::Test { auto v = utils::pmr::vector(utils::NewDeleteResource()); v.reserve(expected.size()); std::transform(msgs_storage_.begin(), msgs_storage_.end(), std::back_inserter(v), - [](auto &msgs) { return mgp_message{&msgs}; }); + [](auto &msgs) { return mgp_message{msgs}; }); return v; } diff --git a/tests/unit/query_streams.cpp b/tests/unit/query_streams.cpp index 7185e7c4d..0fe2fbbb9 100644 --- a/tests/unit/query_streams.cpp +++ b/tests/unit/query_streams.cpp @@ -19,12 +19,12 @@ #include "kafka_mock.hpp" #include "query/config.hpp" #include "query/interpreter.hpp" -#include "query/streams.hpp" +#include "query/stream/streams.hpp" #include "storage/v2/storage.hpp" using Streams = query::Streams; -using StreamInfo = query::StreamInfo; -using StreamStatus = query::StreamStatus; +using StreamInfo = query::KafkaStream::StreamInfo; +using StreamStatus = query::StreamStatus; namespace { const static std::string kTopicName{"TrialTopic"}; @@ -32,6 +32,7 @@ struct StreamCheckData { std::string name; StreamInfo info; bool is_running; + std::optional owner; }; std::string GetDefaultStreamName() { @@ -39,17 +40,19 @@ std::string GetDefaultStreamName() { } StreamInfo CreateDefaultStreamInfo() { - return StreamInfo{ - .topics = {kTopicName}, - .consumer_group = "ConsumerGroup " + GetDefaultStreamName(), - .batch_interval = std::nullopt, - .batch_size = std::nullopt, - .transformation_name = "not used in the tests", - .owner = std::nullopt, - }; + return StreamInfo{.common_info{ + .batch_interval = std::nullopt, + .batch_size = std::nullopt, + .transformation_name = "not used in the tests", + }, + .topics = {kTopicName}, + .consumer_group = "ConsumerGroup " + GetDefaultStreamName(), + .bootstrap_servers = ""}; } -StreamCheckData CreateDefaultStreamCheckData() { return {GetDefaultStreamName(), CreateDefaultStreamInfo(), false}; } +StreamCheckData CreateDefaultStreamCheckData() { + return {GetDefaultStreamName(), CreateDefaultStreamInfo(), false, std::nullopt}; +} std::filesystem::path GetCleanDataDirectory() { const auto path = std::filesystem::temp_directory_path() / "query-streams"; @@ -87,13 +90,9 @@ class StreamsTest : public ::testing::Test { [&check_data](const auto &stream_status) { return stream_status.name == check_data.name; }); ASSERT_NE(it, stream_statuses.end()); const auto &status = *it; - // the order don't have to be strictly the same, but based on the implementation it shouldn't change - EXPECT_TRUE(std::equal(check_data.info.topics.begin(), check_data.info.topics.end(), status.info.topics.begin(), - status.info.topics.end())); - EXPECT_EQ(check_data.info.consumer_group, status.info.consumer_group); - EXPECT_EQ(check_data.info.batch_interval, status.info.batch_interval); - EXPECT_EQ(check_data.info.batch_size, status.info.batch_size); - EXPECT_EQ(check_data.info.transformation_name, status.info.transformation_name); + EXPECT_EQ(check_data.info.common_info.batch_interval, status.info.batch_interval); + EXPECT_EQ(check_data.info.common_info.batch_size, status.info.batch_size); + EXPECT_EQ(check_data.info.common_info.transformation_name, status.info.transformation_name); EXPECT_EQ(check_data.is_running, status.is_running); } @@ -115,7 +114,7 @@ class StreamsTest : public ::testing::Test { TEST_F(StreamsTest, SimpleStreamManagement) { auto check_data = CreateDefaultStreamCheckData(); - streams_->Create(check_data.name, check_data.info); + streams_->Create(check_data.name, check_data.info, check_data.owner); EXPECT_NO_FATAL_FAILURE(CheckStreamStatus(check_data)); EXPECT_NO_THROW(streams_->Start(check_data.name)); @@ -141,10 +140,10 @@ TEST_F(StreamsTest, SimpleStreamManagement) { TEST_F(StreamsTest, CreateAlreadyExisting) { auto stream_info = CreateDefaultStreamInfo(); auto stream_name = GetDefaultStreamName(); - streams_->Create(stream_name, stream_info); + streams_->Create(stream_name, stream_info, std::nullopt); try { - streams_->Create(stream_name, stream_info); + streams_->Create(stream_name, stream_info, std::nullopt); FAIL() << "Creating already existing stream should throw\n"; } catch (query::StreamsException &exception) { EXPECT_EQ(exception.what(), fmt::format("Stream already exists with name '{}'", stream_name)); @@ -155,7 +154,7 @@ TEST_F(StreamsTest, DropNotExistingStream) { const auto stream_info = CreateDefaultStreamInfo(); const auto stream_name = GetDefaultStreamName(); const std::string not_existing_stream_name{"ThisDoesn'tExists"}; - streams_->Create(stream_name, stream_info); + streams_->Create(stream_name, stream_info, std::nullopt); try { streams_->Drop(not_existing_stream_name); @@ -182,18 +181,18 @@ TEST_F(StreamsTest, RestoreStreams) { stream_check_data.name += iteration_postfix; stream_info.topics[0] += iteration_postfix; stream_info.consumer_group += iteration_postfix; - stream_info.transformation_name += iteration_postfix; + stream_info.common_info.transformation_name += iteration_postfix; if (i > 0) { - stream_info.batch_interval = std::chrono::milliseconds((i + 1) * 10); - stream_info.batch_size = 1000 + i; - stream_info.owner = std::string{"owner"} + iteration_postfix; + stream_info.common_info.batch_interval = std::chrono::milliseconds((i + 1) * 10); + stream_info.common_info.batch_size = 1000 + i; + stream_check_data.owner = std::string{"owner"} + iteration_postfix; } mock_cluster_.CreateTopic(stream_info.topics[0]); } - stream_check_datas[1].info.batch_interval = {}; - stream_check_datas[2].info.batch_size = {}; - stream_check_datas[3].info.owner = {}; + stream_check_datas[1].info.common_info.batch_interval = {}; + stream_check_datas[2].info.common_info.batch_size = {}; + stream_check_datas[3].owner = {}; const auto check_restore_logic = [&stream_check_datas, this]() { // Reset the Streams object to trigger reloading @@ -210,7 +209,7 @@ TEST_F(StreamsTest, RestoreStreams) { EXPECT_TRUE(streams_->GetStreamInfo().empty()); for (auto &check_data : stream_check_datas) { - streams_->Create(check_data.name, check_data.info); + streams_->Create(check_data.name, check_data.info, check_data.owner); } { SCOPED_TRACE("After streams are created"); @@ -246,7 +245,7 @@ TEST_F(StreamsTest, RestoreStreams) { TEST_F(StreamsTest, CheckWithTimeout) { const auto stream_info = CreateDefaultStreamInfo(); const auto stream_name = GetDefaultStreamName(); - streams_->Create(stream_name, stream_info); + streams_->Create(stream_name, stream_info, std::nullopt); std::chrono::milliseconds timeout{3000};