Support for different consumers (#280)

This commit is contained in:
Antonio Andelic 2021-10-27 09:06:02 +02:00 committed by Antonio Andelic
parent 6c971b856e
commit 0ebd52aac3
17 changed files with 957 additions and 146 deletions

View File

@ -118,10 +118,10 @@ Consumer::Consumer(const std::string &bootstrap_servers, ConsumerInfo info, Cons
: info_{std::move(info)}, consumer_function_(std::move(consumer_function)), cb_(info_.consumer_name) { : info_{std::move(info)}, consumer_function_(std::move(consumer_function)), cb_(info_.consumer_name) {
MG_ASSERT(consumer_function_, "Empty consumer function for Kafka consumer"); MG_ASSERT(consumer_function_, "Empty consumer function for Kafka consumer");
// NOLINTNEXTLINE (modernize-use-nullptr) // NOLINTNEXTLINE (modernize-use-nullptr)
if (info.batch_interval.value_or(kMinimumInterval) < kMinimumInterval) { if (info_.batch_interval.value_or(kMinimumInterval) < kMinimumInterval) {
throw ConsumerFailedToInitializeException(info_.consumer_name, "Batch interval has to be positive!"); throw ConsumerFailedToInitializeException(info_.consumer_name, "Batch interval has to be positive!");
} }
if (info.batch_size.value_or(kMinimumSize) < kMinimumSize) { if (info_.batch_size.value_or(kMinimumSize) < kMinimumSize) {
throw ConsumerFailedToInitializeException(info_.consumer_name, "Batch size has to be positive!"); throw ConsumerFailedToInitializeException(info_.consumer_name, "Batch size has to be positive!");
} }

View File

@ -33,7 +33,9 @@ set(mg_query_sources
procedure/module.cpp procedure/module.cpp
procedure/py_module.cpp procedure/py_module.cpp
serialization/property_value.cpp serialization/property_value.cpp
streams.cpp stream/streams.cpp
stream/sources.cpp
stream/common.cpp
trigger.cpp trigger.cpp
trigger_context.cpp trigger_context.cpp
typed_value.cpp) typed_value.cpp)

View File

@ -40,7 +40,6 @@
#include "query/plan/planner.hpp" #include "query/plan/planner.hpp"
#include "query/plan/profile.hpp" #include "query/plan/profile.hpp"
#include "query/plan/vertex_count_cache.hpp" #include "query/plan/vertex_count_cache.hpp"
#include "query/streams.hpp"
#include "query/trigger.hpp" #include "query/trigger.hpp"
#include "query/typed_value.hpp" #include "query/typed_value.hpp"
#include "storage/v2/property_value.hpp" #include "storage/v2/property_value.hpp"
@ -562,14 +561,15 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters &paramete
transformation_name = stream_query->transform_name_, bootstrap_servers = std::move(bootstrap), transformation_name = stream_query->transform_name_, bootstrap_servers = std::move(bootstrap),
owner = StringPointerToOptional(username)]() mutable { owner = StringPointerToOptional(username)]() mutable {
std::string bootstrap = bootstrap_servers ? std::move(*bootstrap_servers) : ""; std::string bootstrap = bootstrap_servers ? std::move(*bootstrap_servers) : "";
interpreter_context->streams.Create(stream_name, interpreter_context->streams.Create<query::KafkaStream>(
query::StreamInfo{.topics = std::move(topic_names), stream_name,
.consumer_group = std::move(consumer_group), {.common_info = {.batch_interval = batch_interval,
.batch_interval = batch_interval,
.batch_size = batch_size, .batch_size = batch_size,
.transformation_name = std::move(transformation_name), .transformation_name = std::move(transformation_name)},
.owner = std::move(owner), .topics = std::move(topic_names),
.bootstrap_servers = std::move(bootstrap)}); .consumer_group = std::move(consumer_group),
.bootstrap_servers = std::move(bootstrap)},
std::move(owner));
return std::vector<std::vector<TypedValue>>{}; return std::vector<std::vector<TypedValue>>{};
}; };
notifications->emplace_back(SeverityLevel::INFO, NotificationCode::CREATE_STREAM, notifications->emplace_back(SeverityLevel::INFO, NotificationCode::CREATE_STREAM,
@ -620,28 +620,12 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters &paramete
return callback; return callback;
} }
case StreamQuery::Action::SHOW_STREAMS: { case StreamQuery::Action::SHOW_STREAMS: {
callback.header = {"name", "topics", callback.header = {"name", "batch_interval", "batch_size", "transformation_name", "owner", "is running"};
"consumer_group", "batch_interval",
"batch_size", "transformation_name",
"owner", "bootstrap_servers",
"is running"};
callback.fn = [interpreter_context]() { callback.fn = [interpreter_context]() {
auto streams_status = interpreter_context->streams.GetStreamInfo(); auto streams_status = interpreter_context->streams.GetStreamInfo();
std::vector<std::vector<TypedValue>> results; std::vector<std::vector<TypedValue>> results;
results.reserve(streams_status.size()); results.reserve(streams_status.size());
auto topics_as_typed_topics = [](const auto &topics) { auto stream_info_as_typed_stream_info_emplace_in = [](auto &typed_status, const auto &stream_info) {
std::vector<TypedValue> typed_topics;
typed_topics.reserve(topics.size());
for (const auto &elem : topics) {
typed_topics.emplace_back(elem);
}
return typed_topics;
};
auto stream_info_as_typed_stream_info_emplace_in = [topics_as_typed_topics, interpreter_context](
auto &typed_status, const auto &stream_info) {
typed_status.emplace_back(topics_as_typed_topics(stream_info.topics));
typed_status.emplace_back(stream_info.consumer_group);
if (stream_info.batch_interval.has_value()) { if (stream_info.batch_interval.has_value()) {
typed_status.emplace_back(stream_info.batch_interval->count()); typed_status.emplace_back(stream_info.batch_interval->count());
} else { } else {
@ -653,16 +637,6 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters &paramete
typed_status.emplace_back(); typed_status.emplace_back();
} }
typed_status.emplace_back(stream_info.transformation_name); typed_status.emplace_back(stream_info.transformation_name);
if (stream_info.owner.has_value()) {
typed_status.emplace_back(*stream_info.owner);
} else {
typed_status.emplace_back();
}
if (stream_info.bootstrap_servers.empty()) {
typed_status.emplace_back(interpreter_context->streams.BootstrapServers());
} else {
typed_status.emplace_back(stream_info.bootstrap_servers);
}
}; };
for (const auto &status : streams_status) { for (const auto &status : streams_status) {
@ -670,6 +644,11 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters &paramete
typed_status.reserve(8); typed_status.reserve(8);
typed_status.emplace_back(status.name); typed_status.emplace_back(status.name);
stream_info_as_typed_stream_info_emplace_in(typed_status, status.info); stream_info_as_typed_stream_info_emplace_in(typed_status, status.info);
if (status.owner.has_value()) {
typed_status.emplace_back(*status.owner);
} else {
typed_status.emplace_back();
}
typed_status.emplace_back(status.is_running); typed_status.emplace_back(status.is_running);
results.push_back(std::move(typed_status)); results.push_back(std::move(typed_status));
} }

View File

@ -27,7 +27,7 @@
#include "query/plan/operator.hpp" #include "query/plan/operator.hpp"
#include "query/plan/read_write_type_checker.hpp" #include "query/plan/read_write_type_checker.hpp"
#include "query/stream.hpp" #include "query/stream.hpp"
#include "query/streams.hpp" #include "query/stream/streams.hpp"
#include "query/trigger.hpp" #include "query/trigger.hpp"
#include "query/typed_value.hpp" #include "query/typed_value.hpp"
#include "storage/v2/isolation_level.hpp" #include "storage/v2/isolation_level.hpp"

View File

@ -2493,31 +2493,122 @@ bool IsValidIdentifierName(const char *name) {
} // namespace query::procedure } // namespace query::procedure
mgp_error mgp_message_payload(mgp_message *message, const char **result) { mgp_error mgp_message_payload(mgp_message *message, const char **result) {
return WrapExceptions([message] { return message->msg->Payload().data(); }, result); return WrapExceptions(
[message] {
return std::visit(
[]<typename T>(T &&msg) -> const char * {
using MessageType = std::decay_t<T>;
if constexpr (std::same_as<MessageType, mgp_message::KafkaMessage>) {
return msg->Payload().data();
} else {
throw std::invalid_argument("Invalid source type");
}
},
message->msg);
},
result);
} }
mgp_error mgp_message_payload_size(mgp_message *message, size_t *result) { mgp_error mgp_message_payload_size(mgp_message *message, size_t *result) {
return WrapExceptions([message] { return message->msg->Payload().size(); }, result); return WrapExceptions(
[message] {
return std::visit(
[]<typename T>(T &&msg) -> size_t {
using MessageType = std::decay_t<T>;
if constexpr (std::same_as<MessageType, mgp_message::KafkaMessage>) {
return msg->Payload().size();
} else {
throw std::invalid_argument("Invalid source type");
}
},
message->msg);
},
result);
} }
mgp_error mgp_message_topic_name(mgp_message *message, const char **result) { mgp_error mgp_message_topic_name(mgp_message *message, const char **result) {
return WrapExceptions([message] { return message->msg->TopicName().data(); }, result); return WrapExceptions(
[message] {
return std::visit(
[]<typename T>(T &&msg) -> const char * {
using MessageType = std::decay_t<T>;
if constexpr (std::same_as<MessageType, mgp_message::KafkaMessage>) {
return msg->TopicName().data();
} else {
throw std::invalid_argument("Invalid source type");
}
},
message->msg);
},
result);
} }
mgp_error mgp_message_key(mgp_message *message, const char **result) { mgp_error mgp_message_key(mgp_message *message, const char **result) {
return WrapExceptions([message] { return message->msg->Key().data(); }, result); return WrapExceptions(
[message] {
return std::visit(
[]<typename T>(T &&msg) -> const char * {
using MessageType = std::decay_t<T>;
if constexpr (std::same_as<MessageType, mgp_message::KafkaMessage>) {
return msg->Key().data();
} else {
throw std::invalid_argument("Invalid source type");
}
},
message->msg);
},
result);
} }
mgp_error mgp_message_key_size(mgp_message *message, size_t *result) { mgp_error mgp_message_key_size(mgp_message *message, size_t *result) {
return WrapExceptions([message] { return message->msg->Key().size(); }, result); return WrapExceptions(
[message] {
return std::visit(
[]<typename T>(T &&msg) -> size_t {
using MessageType = std::decay_t<T>;
if constexpr (std::same_as<MessageType, mgp_message::KafkaMessage>) {
return msg->Key().size();
} else {
throw std::invalid_argument("Invalid source type");
}
},
message->msg);
},
result);
} }
mgp_error mgp_message_timestamp(mgp_message *message, int64_t *result) { mgp_error mgp_message_timestamp(mgp_message *message, int64_t *result) {
return WrapExceptions([message] { return message->msg->Timestamp(); }, result); return WrapExceptions(
[message] {
return std::visit(
[]<typename T>(T &&msg) -> int64_t {
using MessageType = std::decay_t<T>;
if constexpr (std::same_as<MessageType, mgp_message::KafkaMessage>) {
return msg->Timestamp();
} else {
throw std::invalid_argument("Invalid source type");
}
},
message->msg);
},
result);
} }
mgp_error mgp_message_offset(struct mgp_message *message, int64_t *result) { mgp_error mgp_message_offset(struct mgp_message *message, int64_t *result) {
return WrapExceptions([message] { return message->msg->Offset(); }, result); return WrapExceptions(
[message] {
return std::visit(
[]<typename T>(T &&msg) -> int64_t {
using MessageType = std::decay_t<T>;
if constexpr (std::same_as<MessageType, mgp_message::KafkaMessage>) {
return msg->Offset();
} else {
throw std::invalid_argument("Invalid source type");
}
},
message->msg);
},
result);
} }
mgp_error mgp_messages_size(mgp_messages *messages, size_t *result) { mgp_error mgp_messages_size(mgp_messages *messages, size_t *result) {

View File

@ -801,7 +801,10 @@ bool IsValidIdentifierName(const char *name);
} // namespace query::procedure } // namespace query::procedure
struct mgp_message { struct mgp_message {
const integrations::kafka::Message *msg; explicit mgp_message(const integrations::kafka::Message &message) : msg{&message} {}
using KafkaMessage = const integrations::kafka::Message *;
std::variant<KafkaMessage> msg;
}; };
struct mgp_messages { struct mgp_messages {

View File

@ -0,0 +1,55 @@
// Copyright 2021 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "query/stream/common.hpp"
#include <json/json.hpp>
namespace query {
namespace {
const std::string kBatchIntervalKey{"batch_interval"};
const std::string kBatchSizeKey{"batch_size"};
const std::string kTransformationName{"transformation_name"};
} // namespace
void to_json(nlohmann::json &data, CommonStreamInfo &&common_info) {
if (common_info.batch_interval) {
data[kBatchIntervalKey] = common_info.batch_interval->count();
} else {
data[kBatchIntervalKey] = nullptr;
}
if (common_info.batch_size) {
data[kBatchSizeKey] = *common_info.batch_size;
} else {
data[kBatchSizeKey] = nullptr;
}
data[kTransformationName] = common_info.transformation_name;
}
void from_json(const nlohmann::json &data, CommonStreamInfo &common_info) {
if (const auto batch_interval = data.at(kBatchIntervalKey); !batch_interval.is_null()) {
using BatchInterval = typename decltype(common_info.batch_interval)::value_type;
common_info.batch_interval = BatchInterval{batch_interval.get<typename BatchInterval::rep>()};
} else {
common_info.batch_interval = {};
}
if (const auto batch_size = data.at(kBatchSizeKey); !batch_size.is_null()) {
common_info.batch_size = batch_size.get<typename decltype(common_info.batch_size)::value_type>();
} else {
common_info.batch_size = {};
}
data.at(kTransformationName).get_to(common_info.transformation_name);
}
} // namespace query

View File

@ -0,0 +1,73 @@
// Copyright 2021 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <chrono>
#include <cstdint>
#include <functional>
#include <optional>
#include <string>
#include <json/json.hpp>
#include "query/procedure/mg_procedure_impl.hpp"
namespace query {
template <typename TMessage>
using ConsumerFunction = std::function<void(const std::vector<TMessage> &)>;
struct CommonStreamInfo {
std::optional<std::chrono::milliseconds> batch_interval;
std::optional<int64_t> batch_size;
std::string transformation_name;
};
template <typename T>
concept ConvertableToJson = requires(T value, nlohmann::json data) {
{ to_json(data, std::move(value)) } -> std::same_as<void>;
{ from_json(data, value) } -> std::same_as<void>;
};
template <typename T>
concept ConvertableToMgpMessage = requires(T value) {
mgp_message{value};
};
template <typename TStream>
concept Stream = requires(TStream stream) {
typename TStream::StreamInfo;
typename TStream::Message;
TStream{std::string{""}, typename TStream::StreamInfo{}, ConsumerFunction<typename TStream::Message>{}};
{ stream.Start() } -> std::same_as<void>;
{ stream.Stop() } -> std::same_as<void>;
{ stream.IsRunning() } -> std::same_as<bool>;
{
stream.Check(std::optional<std::chrono::milliseconds>{}, std::optional<int64_t>{},
ConsumerFunction<typename TStream::Message>{})
} -> std::same_as<void>;
{ typename TStream::StreamInfo{}.common_info } -> std::same_as<CommonStreamInfo>;
requires ConvertableToMgpMessage<typename TStream::Message>;
requires ConvertableToJson<typename TStream::StreamInfo>;
};
enum class StreamSourceType : uint8_t { KAFKA };
template <Stream T>
StreamSourceType StreamType(const T & /*stream*/);
const std::string kCommonInfoKey = "common_info";
void to_json(nlohmann::json &data, CommonStreamInfo &&info);
void from_json(const nlohmann::json &data, CommonStreamInfo &common_info);
} // namespace query

View File

@ -0,0 +1,70 @@
// Copyright 2021 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "query/stream/sources.hpp"
#include <json/json.hpp>
namespace query {
KafkaStream::KafkaStream(std::string stream_name, StreamInfo stream_info,
ConsumerFunction<integrations::kafka::Message> consumer_function) {
integrations::kafka::ConsumerInfo consumer_info{
.consumer_name = std::move(stream_name),
.topics = std::move(stream_info.topics),
.consumer_group = std::move(stream_info.consumer_group),
.batch_interval = stream_info.common_info.batch_interval,
.batch_size = stream_info.common_info.batch_size,
};
consumer_.emplace(std::move(stream_info.bootstrap_servers), std::move(consumer_info), std::move(consumer_function));
};
KafkaStream::StreamInfo KafkaStream::Info(std::string transformation_name) const {
const auto &info = consumer_->Info();
return {{.batch_interval = info.batch_interval,
.batch_size = info.batch_size,
.transformation_name = std::move(transformation_name)},
.topics = info.topics,
.consumer_group = info.consumer_group};
}
void KafkaStream::Start() { consumer_->Start(); }
void KafkaStream::Stop() { consumer_->Stop(); }
bool KafkaStream::IsRunning() const { return consumer_->IsRunning(); }
void KafkaStream::Check(std::optional<std::chrono::milliseconds> timeout, std::optional<int64_t> batch_limit,
const ConsumerFunction<integrations::kafka::Message> &consumer_function) const {
consumer_->Check(timeout, batch_limit, consumer_function);
}
utils::BasicResult<std::string> KafkaStream::SetStreamOffset(const int64_t offset) {
return consumer_->SetConsumerOffsets(offset);
}
namespace {
const std::string kTopicsKey{"topics"};
const std::string kConsumerGroupKey{"consumer_group"};
const std::string kBoostrapServers{"bootstrap_servers"};
} // namespace
void to_json(nlohmann::json &data, KafkaStream::StreamInfo &&info) {
data[kCommonInfoKey] = std::move(info.common_info);
data[kTopicsKey] = std::move(info.topics);
data[kConsumerGroupKey] = info.consumer_group;
data[kBoostrapServers] = std::move(info.bootstrap_servers);
}
void from_json(const nlohmann::json &data, KafkaStream::StreamInfo &info) {
data.at(kCommonInfoKey).get_to(info.common_info);
data.at(kTopicsKey).get_to(info.topics);
data.at(kConsumerGroupKey).get_to(info.consumer_group);
data.at(kBoostrapServers).get_to(info.bootstrap_servers);
}
} // namespace query

View File

@ -0,0 +1,57 @@
// Copyright 2021 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include "query/stream/common.hpp"
#include "integrations/kafka/consumer.hpp"
namespace query {
struct KafkaStream {
struct StreamInfo {
CommonStreamInfo common_info;
std::vector<std::string> topics;
std::string consumer_group;
std::string bootstrap_servers;
};
using Message = integrations::kafka::Message;
KafkaStream(std::string stream_name, StreamInfo stream_info,
ConsumerFunction<integrations::kafka::Message> consumer_function);
StreamInfo Info(std::string transformation_name) const;
void Start();
void Stop();
bool IsRunning() const;
void Check(std::optional<std::chrono::milliseconds> timeout, std::optional<int64_t> batch_limit,
const ConsumerFunction<Message> &consumer_function) const;
utils::BasicResult<std::string> SetStreamOffset(int64_t offset);
private:
using Consumer = integrations::kafka::Consumer;
std::optional<Consumer> consumer_;
};
void to_json(nlohmann::json &data, KafkaStream::StreamInfo &&info);
void from_json(const nlohmann::json &data, KafkaStream::StreamInfo &info);
template <>
inline StreamSourceType StreamType(const KafkaStream & /*stream*/) {
return StreamSourceType::KAFKA;
}
} // namespace query

View File

@ -0,0 +1,479 @@
// Copyright 2021 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "query/stream/streams.hpp"
#include <shared_mutex>
#include <string_view>
#include <utility>
#include <spdlog/spdlog.h>
#include <json/json.hpp>
#include "query/db_accessor.hpp"
#include "query/discard_value_stream.hpp"
#include "query/exceptions.hpp"
#include "query/interpreter.hpp"
#include "query/procedure/mg_procedure_helpers.hpp"
#include "query/procedure/mg_procedure_impl.hpp"
#include "query/procedure/module.hpp"
#include "query/stream/sources.hpp"
#include "query/typed_value.hpp"
#include "utils/event_counter.hpp"
#include "utils/memory.hpp"
#include "utils/on_scope_exit.hpp"
#include "utils/pmr/string.hpp"
namespace EventCounter {
extern const Event MessagesConsumed;
} // namespace EventCounter
namespace query {
namespace {
constexpr auto kExpectedTransformationResultSize = 2;
const utils::pmr::string query_param_name{"query", utils::NewDeleteResource()};
const utils::pmr::string params_param_name{"parameters", utils::NewDeleteResource()};
const std::map<std::string, storage::PropertyValue> empty_parameters{};
auto GetStream(auto &map, const std::string &stream_name) {
if (auto it = map.find(stream_name); it != map.end()) {
return it;
}
throw StreamsException("Couldn't find stream '{}'", stream_name);
}
std::pair<TypedValue /*query*/, TypedValue /*parameters*/> ExtractTransformationResult(
utils::pmr::map<utils::pmr::string, TypedValue> &&values, const std::string_view transformation_name,
const std::string_view stream_name) {
if (values.size() != kExpectedTransformationResultSize) {
throw StreamsException(
"Transformation '{}' in stream '{}' did not yield all fields (query, parameters) as required.",
transformation_name, stream_name);
}
auto get_value = [&](const utils::pmr::string &field_name) mutable -> TypedValue & {
auto it = values.find(field_name);
if (it == values.end()) {
throw StreamsException{"Transformation '{}' in stream '{}' did not yield a record with '{}' field.",
transformation_name, stream_name, field_name};
};
return it->second;
};
auto &query_value = get_value(query_param_name);
MG_ASSERT(query_value.IsString());
auto &params_value = get_value(params_param_name);
MG_ASSERT(params_value.IsNull() || params_value.IsMap());
return {std::move(query_value), std::move(params_value)};
}
template <typename TMessage>
void CallCustomTransformation(const std::string &transformation_name, const std::vector<TMessage> &messages,
mgp_result &result, storage::Storage::Accessor &storage_accessor,
utils::MemoryResource &memory_resource, const std::string &stream_name) {
DbAccessor db_accessor{&storage_accessor};
{
auto maybe_transformation =
procedure::FindTransformation(procedure::gModuleRegistry, transformation_name, utils::NewDeleteResource());
if (!maybe_transformation) {
throw StreamsException("Couldn't find transformation {} for stream '{}'", transformation_name, stream_name);
};
const auto &trans = *maybe_transformation->second;
mgp_messages mgp_messages{mgp_messages::storage_type{&memory_resource}};
std::transform(messages.begin(), messages.end(), std::back_inserter(mgp_messages.messages),
[](const TMessage &message) { return mgp_message{message}; });
mgp_graph graph{&db_accessor, storage::View::OLD, nullptr};
mgp_memory memory{&memory_resource};
result.rows.clear();
result.error_msg.reset();
result.signature = &trans.results;
MG_ASSERT(result.signature->size() == kExpectedTransformationResultSize);
MG_ASSERT(result.signature->contains(query_param_name));
MG_ASSERT(result.signature->contains(params_param_name));
spdlog::trace("Calling transformation in stream '{}'", stream_name);
trans.cb(&mgp_messages, &graph, &result, &memory);
}
if (result.error_msg.has_value()) {
throw StreamsException(result.error_msg->c_str());
}
}
template <Stream TStream>
StreamStatus<TStream> CreateStatus(std::string stream_name, std::string transformation_name,
std::optional<std::string> owner, const TStream &stream) {
return {.name = std::move(stream_name),
.type = StreamType(stream),
.is_running = stream.IsRunning(),
.info = stream.Info(std::move(transformation_name)),
.owner = std::move(owner)};
}
// nlohmann::json doesn't support string_view access yet
const std::string kStreamName{"name"};
const std::string kIsRunningKey{"is_running"};
const std::string kOwner{"owner"};
const std::string kType{"type"};
} // namespace
template <Stream TStream>
void to_json(nlohmann::json &data, StreamStatus<TStream> &&status) {
data[kStreamName] = std::move(status.name);
data[kType] = status.type;
data[kIsRunningKey] = status.is_running;
if (status.owner.has_value()) {
data[kOwner] = std::move(*status.owner);
} else {
data[kOwner] = nullptr;
}
to_json(data, std::move(status.info));
}
template <Stream TStream>
void from_json(const nlohmann::json &data, StreamStatus<TStream> &status) {
data.at(kStreamName).get_to(status.name);
data.at(kIsRunningKey).get_to(status.is_running);
if (const auto &owner = data.at(kOwner); !owner.is_null()) {
status.owner = owner.get<typename decltype(status.owner)::value_type>();
} else {
status.owner = {};
}
from_json(data, status.info);
}
namespace {
template <class... Ts>
struct Overloaded : Ts... {
using Ts::operator()...;
};
template <class... Ts>
Overloaded(Ts...) -> Overloaded<Ts...>;
} // namespace
Streams::Streams(InterpreterContext *interpreter_context, std::string bootstrap_servers,
std::filesystem::path directory)
: interpreter_context_(interpreter_context),
bootstrap_servers_(std::move(bootstrap_servers)),
storage_(std::move(directory)) {
constexpr std::string_view proc_name = "kafka_set_stream_offset";
auto set_stream_offset = [this, proc_name](mgp_list *args, mgp_graph * /*graph*/, mgp_result *result,
mgp_memory * /*memory*/) {
auto *arg_stream_name = procedure::Call<mgp_value *>(mgp_list_at, args, 0);
const auto *stream_name = procedure::Call<const char *>(mgp_value_get_string, arg_stream_name);
auto *arg_offset = procedure::Call<mgp_value *>(mgp_list_at, args, 1);
const auto offset = procedure::Call<int64_t>(mgp_value_get_int, arg_offset);
auto lock_ptr = streams_.Lock();
auto it = GetStream(*lock_ptr, std::string(stream_name));
std::visit(Overloaded{[&](StreamData<KafkaStream> &kafka_stream) {
auto stream_source_ptr = kafka_stream.stream_source->Lock();
const auto error = stream_source_ptr->SetStreamOffset(offset);
if (error.HasError()) {
MG_ASSERT(
mgp_result_set_error_msg(result, error.GetError().c_str()) == MGP_ERROR_NO_ERROR,
"Unable to set procedure error message of procedure: {}", proc_name);
}
},
[proc_name](auto && /*other*/) {
throw QueryRuntimeException("'{}' can be only used for Kafka stream sources", proc_name);
}},
it->second);
};
mgp_proc proc(proc_name, set_stream_offset, utils::NewDeleteResource(), false);
MG_ASSERT(mgp_proc_add_arg(&proc, "stream_name", procedure::Call<mgp_type *>(mgp_type_string)) == MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_arg(&proc, "offset", procedure::Call<mgp_type *>(mgp_type_int)) == MGP_ERROR_NO_ERROR);
procedure::gModuleRegistry.RegisterMgProcedure(proc_name, std::move(proc));
}
template <Stream TStream>
void Streams::Create(const std::string &stream_name, typename TStream::StreamInfo info,
std::optional<std::string> owner) {
auto locked_streams = streams_.Lock();
auto it = CreateConsumer<TStream>(*locked_streams, stream_name, std::move(info), std::move(owner));
try {
std::visit(
[&](auto &&stream_data) {
const auto stream_source_ptr = stream_data.stream_source->ReadLock();
Persist(CreateStatus(stream_name, stream_data.transformation_name, stream_data.owner, *stream_source_ptr));
},
it->second);
} catch (...) {
locked_streams->erase(it);
throw;
}
}
template void Streams::Create<KafkaStream>(const std::string &stream_name, KafkaStream::StreamInfo info,
std::optional<std::string> owner);
template <Stream TStream>
Streams::StreamsMap::iterator Streams::CreateConsumer(StreamsMap &map, const std::string &stream_name,
typename TStream::StreamInfo stream_info,
std::optional<std::string> owner) {
if (map.contains(stream_name)) {
throw StreamsException{"Stream already exists with name '{}'", stream_name};
}
auto *memory_resource = utils::NewDeleteResource();
auto consumer_function =
[interpreter_context = interpreter_context_, memory_resource, stream_name,
transformation_name = stream_info.common_info.transformation_name, owner = owner,
interpreter = std::make_shared<Interpreter>(interpreter_context_),
result = mgp_result{nullptr, memory_resource}](const std::vector<typename TStream::Message> &messages) mutable {
auto accessor = interpreter_context->db->Access();
EventCounter::IncrementCounter(EventCounter::MessagesConsumed, messages.size());
CallCustomTransformation(transformation_name, messages, result, accessor, *memory_resource, stream_name);
DiscardValueResultStream stream;
spdlog::trace("Start transaction in stream '{}'", stream_name);
utils::OnScopeExit cleanup{[&interpreter, &result]() {
result.rows.clear();
interpreter->Abort();
}};
interpreter->BeginTransaction();
const static std::map<std::string, storage::PropertyValue> empty_parameters{};
for (auto &row : result.rows) {
spdlog::trace("Processing row in stream '{}'", stream_name);
auto [query_value, params_value] =
ExtractTransformationResult(std::move(row.values), transformation_name, stream_name);
storage::PropertyValue params_prop{params_value};
std::string query{query_value.ValueString()};
spdlog::trace("Executing query '{}' in stream '{}'", query, stream_name);
auto prepare_result =
interpreter->Prepare(query, params_prop.IsNull() ? empty_parameters : params_prop.ValueMap(), nullptr);
if (!interpreter_context->auth_checker->IsUserAuthorized(owner, prepare_result.privileges)) {
throw StreamsException{
"Couldn't execute query '{}' for stream '{}' becuase the owner is not authorized to execute the "
"query!",
query, stream_name};
}
interpreter->PullAll(&stream);
}
spdlog::trace("Commit transaction in stream '{}'", stream_name);
interpreter->CommitTransaction();
result.rows.clear();
};
if (stream_info.bootstrap_servers.empty()) {
stream_info.bootstrap_servers = bootstrap_servers_;
}
auto insert_result = map.try_emplace(
stream_name, StreamData<TStream>{std::move(stream_info.common_info.transformation_name), std::move(owner),
std::make_unique<SynchronizedStreamSource<TStream>>(
stream_name, std::move(stream_info), std::move(consumer_function))});
MG_ASSERT(insert_result.second, "Unexpected error during storing consumer '{}'", stream_name);
return insert_result.first;
}
void Streams::RestoreStreams() {
spdlog::info("Loading streams...");
auto locked_streams_map = streams_.Lock();
MG_ASSERT(locked_streams_map->empty(), "Cannot restore streams when some streams already exist!");
for (const auto &[stream_name, stream_data] : storage_) {
const auto get_failed_message = [&stream_name = stream_name](const std::string_view message,
const std::string_view nested_message) {
return fmt::format("Failed to load stream '{}', because: {} caused by {}", stream_name, message, nested_message);
};
const auto create_consumer = [&, &stream_name = stream_name, this]<typename T>(StreamStatus<T> status,
auto &&stream_json_data) {
try {
stream_json_data.get_to(status);
} catch (const nlohmann::json::type_error &exception) {
spdlog::warn(get_failed_message("invalid type conversion", exception.what()));
return;
} catch (const nlohmann::json::out_of_range &exception) {
spdlog::warn(get_failed_message("non existing field", exception.what()));
return;
}
MG_ASSERT(status.name == stream_name, "Expected stream name is '{}', but got '{}'", status.name, stream_name);
try {
auto it = CreateConsumer<T>(*locked_streams_map, stream_name, std::move(status.info), {});
if (status.is_running) {
std::visit(
[&](auto &&stream_data) {
auto stream_source_ptr = stream_data.stream_source->Lock();
stream_source_ptr->Start();
},
it->second);
}
spdlog::info("Stream '{}' is loaded", stream_name);
} catch (const utils::BasicException &exception) {
spdlog::warn(get_failed_message("unexpected error", exception.what()));
}
};
auto stream_json_data = nlohmann::json::parse(stream_data);
const auto stream_type = static_cast<StreamSourceType>(stream_json_data.at("type"));
switch (stream_type) {
case StreamSourceType::KAFKA:
create_consumer(StreamStatus<KafkaStream>{}, std::move(stream_json_data));
break;
}
}
}
void Streams::Drop(const std::string &stream_name) {
auto locked_streams = streams_.Lock();
auto it = GetStream(*locked_streams, stream_name);
// streams_ is write locked, which means there is no access to it outside of this function, thus only the Test
// function can be executing with the consumer, nothing else.
// By acquiring the write lock here for the consumer, we make sure there is
// no running Test function for this consumer, therefore it can be erased.
std::visit([&](auto &&stream_data) { stream_data.stream_source->Lock(); }, it->second);
locked_streams->erase(it);
if (!storage_.Delete(stream_name)) {
throw StreamsException("Couldn't delete stream '{}' from persistent store!", stream_name);
}
// TODO(antaljanosbenjamin) Release the transformation
}
void Streams::Start(const std::string &stream_name) {
auto locked_streams = streams_.Lock();
auto it = GetStream(*locked_streams, stream_name);
std::visit(
[&, this](auto &&stream_data) {
auto stream_source_ptr = stream_data.stream_source->Lock();
stream_source_ptr->Start();
Persist(CreateStatus(stream_name, stream_data.transformation_name, stream_data.owner, *stream_source_ptr));
},
it->second);
}
void Streams::Stop(const std::string &stream_name) {
auto locked_streams = streams_.Lock();
auto it = GetStream(*locked_streams, stream_name);
std::visit(
[&, this](auto &&stream_data) {
auto stream_source_ptr = stream_data.stream_source->Lock();
stream_source_ptr->Stop();
Persist(CreateStatus(stream_name, stream_data.transformation_name, stream_data.owner, *stream_source_ptr));
},
it->second);
}
void Streams::StartAll() {
for (auto locked_streams = streams_.Lock(); auto &[stream_name, stream_data] : *locked_streams) {
std::visit(
[&stream_name = stream_name, this](auto &&stream_data) {
auto locked_stream_source = stream_data.stream_source->Lock();
if (!locked_stream_source->IsRunning()) {
locked_stream_source->Start();
Persist(
CreateStatus(stream_name, stream_data.transformation_name, stream_data.owner, *locked_stream_source));
}
},
stream_data);
}
}
void Streams::StopAll() {
for (auto locked_streams = streams_.Lock(); auto &[stream_name, stream_data] : *locked_streams) {
std::visit(
[&stream_name = stream_name, this](auto &&stream_data) {
auto locked_stream_source = stream_data.stream_source->Lock();
if (locked_stream_source->IsRunning()) {
locked_stream_source->Stop();
Persist(
CreateStatus(stream_name, stream_data.transformation_name, stream_data.owner, *locked_stream_source));
}
},
stream_data);
}
}
std::vector<StreamStatus<>> Streams::GetStreamInfo() const {
std::vector<StreamStatus<>> result;
{
for (auto locked_streams = streams_.ReadLock(); const auto &[stream_name, stream_data] : *locked_streams) {
std::visit(
[&, &stream_name = stream_name](auto &&stream_data) {
auto locked_stream_source = stream_data.stream_source->ReadLock();
auto info = locked_stream_source->Info(stream_data.transformation_name);
result.emplace_back(StreamStatus<>{stream_name, StreamType(*locked_stream_source),
locked_stream_source->IsRunning(), std::move(info.common_info),
stream_data.owner});
},
stream_data);
}
}
return result;
}
TransformationResult Streams::Check(const std::string &stream_name, std::optional<std::chrono::milliseconds> timeout,
std::optional<int64_t> batch_limit) const {
std::optional locked_streams{streams_.ReadLock()};
auto it = GetStream(**locked_streams, stream_name);
return std::visit(
[&](auto &&stream_data) {
// This depends on the fact that Drop will first acquire a write lock to the consumer, and erase it only after
// that
const auto locked_stream_source = stream_data.stream_source->ReadLock();
const auto transformation_name = stream_data.transformation_name;
locked_streams.reset();
auto *memory_resource = utils::NewDeleteResource();
mgp_result result{nullptr, memory_resource};
TransformationResult test_result;
auto consumer_function = [interpreter_context = interpreter_context_, memory_resource, &stream_name,
&transformation_name = transformation_name, &result,
&test_result]<typename T>(const std::vector<T> &messages) mutable {
auto accessor = interpreter_context->db->Access();
CallCustomTransformation(transformation_name, messages, result, accessor, *memory_resource, stream_name);
for (auto &row : result.rows) {
auto [query, parameters] =
ExtractTransformationResult(std::move(row.values), transformation_name, stream_name);
std::vector<TypedValue> result_row;
result_row.reserve(kExpectedTransformationResultSize);
result_row.push_back(std::move(query));
result_row.push_back(std::move(parameters));
test_result.push_back(std::move(result_row));
}
};
locked_stream_source->Check(timeout, batch_limit, consumer_function);
return test_result;
},
it->second);
}
std::string_view Streams::BootstrapServers() const { return bootstrap_servers_; }
} // namespace query

View File

@ -11,14 +11,22 @@
#pragma once #pragma once
#include <concepts>
#include <functional> #include <functional>
#include <map> #include <map>
#include <optional> #include <optional>
#include <type_traits>
#include <unordered_map> #include <unordered_map>
#include <json/json.hpp>
#include "integrations/kafka/consumer.hpp" #include "integrations/kafka/consumer.hpp"
#include "kvstore/kvstore.hpp" #include "kvstore/kvstore.hpp"
#include "query/stream/common.hpp"
#include "query/stream/sources.hpp"
#include "query/typed_value.hpp" #include "query/typed_value.hpp"
#include "storage/v2/property_value.hpp"
#include "utils/event_counter.hpp"
#include "utils/exceptions.hpp" #include "utils/exceptions.hpp"
#include "utils/rw_lock.hpp" #include "utils/rw_lock.hpp"
#include "utils/synchronized.hpp" #include "utils/synchronized.hpp"
@ -30,33 +38,33 @@ class StreamsException : public utils::BasicException {
using BasicException::BasicException; using BasicException::BasicException;
}; };
using TransformationResult = std::vector<std::vector<TypedValue>>; template <typename T>
using TransformFunction = std::function<TransformationResult(const std::vector<integrations::kafka::Message> &)>; struct StreamInfo;
struct StreamInfo { template <>
std::vector<std::string> topics; struct StreamInfo<void> {
std::string consumer_group; using Type = CommonStreamInfo;
std::optional<std::chrono::milliseconds> batch_interval;
std::optional<int64_t> batch_size;
std::string transformation_name;
std::optional<std::string> owner;
std::string bootstrap_servers;
}; };
template <Stream TStream>
struct StreamInfo<TStream> {
using Type = typename TStream::StreamInfo;
};
template <typename T>
using StreamInfoType = typename StreamInfo<T>::Type;
template <typename T = void>
struct StreamStatus { struct StreamStatus {
std::string name; std::string name;
StreamInfo info; StreamSourceType type;
bool is_running; bool is_running;
}; StreamInfoType<T> info;
using SynchronizedConsumer = utils::Synchronized<integrations::kafka::Consumer, utils::WritePrioritizedRWLock>;
struct StreamData {
std::string transformation_name;
std::optional<std::string> owner; std::optional<std::string> owner;
std::unique_ptr<SynchronizedConsumer> consumer;
}; };
using TransformationResult = std::vector<std::vector<TypedValue>>;
struct InterpreterContext; struct InterpreterContext;
/// Manages Kafka consumers. /// Manages Kafka consumers.
@ -85,7 +93,8 @@ class Streams final {
/// @param stream_info the necessary informations needed to create the Kafka consumer and transform the messages /// @param stream_info the necessary informations needed to create the Kafka consumer and transform the messages
/// ///
/// @throws StreamsException if the stream with the same name exists or if the creation of Kafka consumer fails /// @throws StreamsException if the stream with the same name exists or if the creation of Kafka consumer fails
void Create(const std::string &stream_name, StreamInfo stream_info); template <Stream TStream>
void Create(const std::string &stream_name, typename TStream::StreamInfo info, std::optional<std::string> owner);
/// Deletes an existing stream and all the data that was persisted. /// Deletes an existing stream and all the data that was persisted.
/// ///
@ -123,7 +132,7 @@ class Streams final {
/// Return current status for all streams. /// Return current status for all streams.
/// It might happend that the is_running field is out of date if the one of the streams stops during the invocation of /// It might happend that the is_running field is out of date if the one of the streams stops during the invocation of
/// this function because of an error. /// this function because of an error.
std::vector<StreamStatus> GetStreamInfo() const; std::vector<StreamStatus<>> GetStreamInfo() const;
/// Do a dry-run consume from a stream. /// Do a dry-run consume from a stream.
/// ///
@ -143,23 +152,32 @@ class Streams final {
/// Return the configuration value passed to memgraph. /// Return the configuration value passed to memgraph.
std::string_view BootstrapServers() const; std::string_view BootstrapServers() const;
/// Sets the stream's consumer offset.
///
/// @param stream_name we want to set the offset.
/// @param offset to set.
[[nodiscard]] utils::BasicResult<std::string> SetStreamOffset(std::string_view stream_name, int64_t offset);
private: private:
using StreamsMap = std::unordered_map<std::string, StreamData>; template <Stream TStream>
using SynchronizedStreamSource = utils::Synchronized<TStream, utils::WritePrioritizedRWLock>;
template <Stream TStream>
struct StreamData {
std::string transformation_name;
std::optional<std::string> owner;
std::unique_ptr<SynchronizedStreamSource<TStream>> stream_source;
};
using StreamDataVariant = std::variant<StreamData<KafkaStream>>;
using StreamsMap = std::unordered_map<std::string, StreamDataVariant>;
using SynchronizedStreamsMap = utils::Synchronized<StreamsMap, utils::WritePrioritizedRWLock>; using SynchronizedStreamsMap = utils::Synchronized<StreamsMap, utils::WritePrioritizedRWLock>;
static StreamStatus CreateStatus(const std::string &name, const std::string &transformation_name, template <Stream TStream>
const std::optional<std::string> &owner, StreamsMap::iterator CreateConsumer(StreamsMap &map, const std::string &stream_name,
const integrations::kafka::Consumer &consumer); typename TStream::StreamInfo stream_info, std::optional<std::string> owner);
StreamsMap::iterator CreateConsumer(StreamsMap &map, const std::string &stream_name, StreamInfo stream_info); template <Stream TStream>
void Persist(StreamStatus<TStream> &&status) {
void Persist(StreamStatus &&status); const std::string stream_name = status.name;
if (!storage_.Put(stream_name, nlohmann::json(std::move(status)).dump())) {
throw StreamsException{"Couldn't persist steam data for stream '{}'", stream_name};
}
}
InterpreterContext *interpreter_context_; InterpreterContext *interpreter_context_;
std::string bootstrap_servers_; std::string bootstrap_servers_;

View File

@ -15,14 +15,11 @@ import time
# These are the indices of the different values in the result of SHOW STREAM # These are the indices of the different values in the result of SHOW STREAM
# query # query
NAME = 0 NAME = 0
TOPICS = 1 BATCH_INTERVAL = 1
CONSUMER_GROUP = 2 BATCH_SIZE = 2
BATCH_INTERVAL = 3 TRANSFORM = 3
BATCH_SIZE = 4 OWNER = 4
TRANSFORM = 5 IS_RUNNING = 5
OWNER = 6
BOOTSTRAP_SERVERS = 7
IS_RUNNING = 8
def execute_and_fetch_all(cursor, query): def execute_and_fetch_all(cursor, query):
@ -74,17 +71,19 @@ def check_one_result_row(cursor, query):
def check_vertex_exists_with_topic_and_payload(cursor, topic, payload_bytes): def check_vertex_exists_with_topic_and_payload(cursor, topic, payload_bytes):
assert check_one_result_row(cursor, assert check_one_result_row(
cursor,
"MATCH (n: MESSAGE {" "MATCH (n: MESSAGE {"
f"payload: '{payload_bytes.decode('utf-8')}'," f"payload: '{payload_bytes.decode('utf-8')}',"
f"topic: '{topic}'" f"topic: '{topic}'"
"}) RETURN n") "}) RETURN n",
)
def get_stream_info(cursor, stream_name): def get_stream_info(cursor, stream_name):
stream_infos = execute_and_fetch_all(cursor, "SHOW STREAMS") stream_infos = execute_and_fetch_all(cursor, "SHOW STREAMS")
for stream_info in stream_infos: for stream_info in stream_infos:
if (stream_info[NAME] == stream_name): if stream_info[NAME] == stream_name:
return stream_info return stream_info
return None return None

View File

@ -77,9 +77,8 @@ def test_owner_is_shown(topics, connection):
f"TOPICS {topics[0]} " f"TOPICS {topics[0]} "
f"TRANSFORM transform.simple") f"TRANSFORM transform.simple")
common.check_stream_info(userless_cursor, "test", ("test", [ common.check_stream_info(userless_cursor, "test", ("test", None, None,
topics[0]], "mg_consumer", None, None, "transform.simple", stream_user, False))
"transform.simple", stream_user, "localhost:9092", False))
def test_insufficient_privileges(producer, topics, connection): def test_insufficient_privileges(producer, topics, connection):

View File

@ -226,17 +226,7 @@ def test_show_streams(producer, topics, connection):
common.check_stream_info( common.check_stream_info(
cursor, cursor,
"default_values", "default_values",
( ("default_values", None, None, "transform.simple", None, False),
"default_values",
[topics[0]],
"mg_consumer",
None,
None,
"transform.simple",
None,
"localhost:9092",
False,
),
) )
common.check_stream_info( common.check_stream_info(
@ -244,13 +234,10 @@ def test_show_streams(producer, topics, connection):
"complex_values", "complex_values",
( (
"complex_values", "complex_values",
topics,
consumer_group,
batch_interval, batch_interval,
batch_size, batch_size,
"transform.with_parameters", "transform.with_parameters",
None, None,
"localhost:9092",
False, False,
), ),
) )

View File

@ -138,7 +138,7 @@ class MgpApiTest : public ::testing::Test {
auto v = utils::pmr::vector<mgp_message>(utils::NewDeleteResource()); auto v = utils::pmr::vector<mgp_message>(utils::NewDeleteResource());
v.reserve(expected.size()); v.reserve(expected.size());
std::transform(msgs_storage_.begin(), msgs_storage_.end(), std::back_inserter(v), std::transform(msgs_storage_.begin(), msgs_storage_.end(), std::back_inserter(v),
[](auto &msgs) { return mgp_message{&msgs}; }); [](auto &msgs) { return mgp_message{msgs}; });
return v; return v;
} }

View File

@ -19,12 +19,12 @@
#include "kafka_mock.hpp" #include "kafka_mock.hpp"
#include "query/config.hpp" #include "query/config.hpp"
#include "query/interpreter.hpp" #include "query/interpreter.hpp"
#include "query/streams.hpp" #include "query/stream/streams.hpp"
#include "storage/v2/storage.hpp" #include "storage/v2/storage.hpp"
using Streams = query::Streams; using Streams = query::Streams;
using StreamInfo = query::StreamInfo; using StreamInfo = query::KafkaStream::StreamInfo;
using StreamStatus = query::StreamStatus; using StreamStatus = query::StreamStatus<query::KafkaStream>;
namespace { namespace {
const static std::string kTopicName{"TrialTopic"}; const static std::string kTopicName{"TrialTopic"};
@ -32,6 +32,7 @@ struct StreamCheckData {
std::string name; std::string name;
StreamInfo info; StreamInfo info;
bool is_running; bool is_running;
std::optional<std::string> owner;
}; };
std::string GetDefaultStreamName() { std::string GetDefaultStreamName() {
@ -39,17 +40,19 @@ std::string GetDefaultStreamName() {
} }
StreamInfo CreateDefaultStreamInfo() { StreamInfo CreateDefaultStreamInfo() {
return StreamInfo{ return StreamInfo{.common_info{
.topics = {kTopicName},
.consumer_group = "ConsumerGroup " + GetDefaultStreamName(),
.batch_interval = std::nullopt, .batch_interval = std::nullopt,
.batch_size = std::nullopt, .batch_size = std::nullopt,
.transformation_name = "not used in the tests", .transformation_name = "not used in the tests",
.owner = std::nullopt, },
}; .topics = {kTopicName},
.consumer_group = "ConsumerGroup " + GetDefaultStreamName(),
.bootstrap_servers = ""};
} }
StreamCheckData CreateDefaultStreamCheckData() { return {GetDefaultStreamName(), CreateDefaultStreamInfo(), false}; } StreamCheckData CreateDefaultStreamCheckData() {
return {GetDefaultStreamName(), CreateDefaultStreamInfo(), false, std::nullopt};
}
std::filesystem::path GetCleanDataDirectory() { std::filesystem::path GetCleanDataDirectory() {
const auto path = std::filesystem::temp_directory_path() / "query-streams"; const auto path = std::filesystem::temp_directory_path() / "query-streams";
@ -87,13 +90,9 @@ class StreamsTest : public ::testing::Test {
[&check_data](const auto &stream_status) { return stream_status.name == check_data.name; }); [&check_data](const auto &stream_status) { return stream_status.name == check_data.name; });
ASSERT_NE(it, stream_statuses.end()); ASSERT_NE(it, stream_statuses.end());
const auto &status = *it; const auto &status = *it;
// the order don't have to be strictly the same, but based on the implementation it shouldn't change EXPECT_EQ(check_data.info.common_info.batch_interval, status.info.batch_interval);
EXPECT_TRUE(std::equal(check_data.info.topics.begin(), check_data.info.topics.end(), status.info.topics.begin(), EXPECT_EQ(check_data.info.common_info.batch_size, status.info.batch_size);
status.info.topics.end())); EXPECT_EQ(check_data.info.common_info.transformation_name, status.info.transformation_name);
EXPECT_EQ(check_data.info.consumer_group, status.info.consumer_group);
EXPECT_EQ(check_data.info.batch_interval, status.info.batch_interval);
EXPECT_EQ(check_data.info.batch_size, status.info.batch_size);
EXPECT_EQ(check_data.info.transformation_name, status.info.transformation_name);
EXPECT_EQ(check_data.is_running, status.is_running); EXPECT_EQ(check_data.is_running, status.is_running);
} }
@ -115,7 +114,7 @@ class StreamsTest : public ::testing::Test {
TEST_F(StreamsTest, SimpleStreamManagement) { TEST_F(StreamsTest, SimpleStreamManagement) {
auto check_data = CreateDefaultStreamCheckData(); auto check_data = CreateDefaultStreamCheckData();
streams_->Create(check_data.name, check_data.info); streams_->Create<query::KafkaStream>(check_data.name, check_data.info, check_data.owner);
EXPECT_NO_FATAL_FAILURE(CheckStreamStatus(check_data)); EXPECT_NO_FATAL_FAILURE(CheckStreamStatus(check_data));
EXPECT_NO_THROW(streams_->Start(check_data.name)); EXPECT_NO_THROW(streams_->Start(check_data.name));
@ -141,10 +140,10 @@ TEST_F(StreamsTest, SimpleStreamManagement) {
TEST_F(StreamsTest, CreateAlreadyExisting) { TEST_F(StreamsTest, CreateAlreadyExisting) {
auto stream_info = CreateDefaultStreamInfo(); auto stream_info = CreateDefaultStreamInfo();
auto stream_name = GetDefaultStreamName(); auto stream_name = GetDefaultStreamName();
streams_->Create(stream_name, stream_info); streams_->Create<query::KafkaStream>(stream_name, stream_info, std::nullopt);
try { try {
streams_->Create(stream_name, stream_info); streams_->Create<query::KafkaStream>(stream_name, stream_info, std::nullopt);
FAIL() << "Creating already existing stream should throw\n"; FAIL() << "Creating already existing stream should throw\n";
} catch (query::StreamsException &exception) { } catch (query::StreamsException &exception) {
EXPECT_EQ(exception.what(), fmt::format("Stream already exists with name '{}'", stream_name)); EXPECT_EQ(exception.what(), fmt::format("Stream already exists with name '{}'", stream_name));
@ -155,7 +154,7 @@ TEST_F(StreamsTest, DropNotExistingStream) {
const auto stream_info = CreateDefaultStreamInfo(); const auto stream_info = CreateDefaultStreamInfo();
const auto stream_name = GetDefaultStreamName(); const auto stream_name = GetDefaultStreamName();
const std::string not_existing_stream_name{"ThisDoesn'tExists"}; const std::string not_existing_stream_name{"ThisDoesn'tExists"};
streams_->Create(stream_name, stream_info); streams_->Create<query::KafkaStream>(stream_name, stream_info, std::nullopt);
try { try {
streams_->Drop(not_existing_stream_name); streams_->Drop(not_existing_stream_name);
@ -182,18 +181,18 @@ TEST_F(StreamsTest, RestoreStreams) {
stream_check_data.name += iteration_postfix; stream_check_data.name += iteration_postfix;
stream_info.topics[0] += iteration_postfix; stream_info.topics[0] += iteration_postfix;
stream_info.consumer_group += iteration_postfix; stream_info.consumer_group += iteration_postfix;
stream_info.transformation_name += iteration_postfix; stream_info.common_info.transformation_name += iteration_postfix;
if (i > 0) { if (i > 0) {
stream_info.batch_interval = std::chrono::milliseconds((i + 1) * 10); stream_info.common_info.batch_interval = std::chrono::milliseconds((i + 1) * 10);
stream_info.batch_size = 1000 + i; stream_info.common_info.batch_size = 1000 + i;
stream_info.owner = std::string{"owner"} + iteration_postfix; stream_check_data.owner = std::string{"owner"} + iteration_postfix;
} }
mock_cluster_.CreateTopic(stream_info.topics[0]); mock_cluster_.CreateTopic(stream_info.topics[0]);
} }
stream_check_datas[1].info.batch_interval = {}; stream_check_datas[1].info.common_info.batch_interval = {};
stream_check_datas[2].info.batch_size = {}; stream_check_datas[2].info.common_info.batch_size = {};
stream_check_datas[3].info.owner = {}; stream_check_datas[3].owner = {};
const auto check_restore_logic = [&stream_check_datas, this]() { const auto check_restore_logic = [&stream_check_datas, this]() {
// Reset the Streams object to trigger reloading // Reset the Streams object to trigger reloading
@ -210,7 +209,7 @@ TEST_F(StreamsTest, RestoreStreams) {
EXPECT_TRUE(streams_->GetStreamInfo().empty()); EXPECT_TRUE(streams_->GetStreamInfo().empty());
for (auto &check_data : stream_check_datas) { for (auto &check_data : stream_check_datas) {
streams_->Create(check_data.name, check_data.info); streams_->Create<query::KafkaStream>(check_data.name, check_data.info, check_data.owner);
} }
{ {
SCOPED_TRACE("After streams are created"); SCOPED_TRACE("After streams are created");
@ -246,7 +245,7 @@ TEST_F(StreamsTest, RestoreStreams) {
TEST_F(StreamsTest, CheckWithTimeout) { TEST_F(StreamsTest, CheckWithTimeout) {
const auto stream_info = CreateDefaultStreamInfo(); const auto stream_info = CreateDefaultStreamInfo();
const auto stream_name = GetDefaultStreamName(); const auto stream_name = GetDefaultStreamName();
streams_->Create(stream_name, stream_info); streams_->Create<query::KafkaStream>(stream_name, stream_info, std::nullopt);
std::chrono::milliseconds timeout{3000}; std::chrono::milliseconds timeout{3000};