Add configs and credentials to mg.kafka_stream_info

This commit is contained in:
János Benjamin Antal 2022-01-20 09:05:02 +01:00
parent 0bc73da66d
commit 1e3de8e76a
4 changed files with 79 additions and 7 deletions

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd.
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -10,11 +10,14 @@
// licenses/APL.txt.
#pragma once
#include <chrono>
#include <string>
namespace integrations {
constexpr int64_t kDefaultCheckBatchLimit{1};
constexpr std::chrono::milliseconds kDefaultCheckTimeout{30000};
constexpr std::chrono::milliseconds kMinimumInterval{1};
constexpr int64_t kMinimumSize{1};
const std::string kReducted = "<REDUCTED>";
} // namespace integrations

View File

@ -30,8 +30,6 @@
namespace integrations::kafka {
namespace {
const std::string kReducted = "<REDUCTED>";
utils::BasicResult<std::string, std::vector<Message>> GetBatch(RdKafka::KafkaConsumer &consumer,
const ConsumerInfo &info,
std::atomic<bool> &is_running) {

View File

@ -13,6 +13,8 @@
#include <json/json.hpp>
#include "integrations/constants.hpp"
namespace query::stream {
KafkaStream::KafkaStream(std::string stream_name, StreamInfo stream_info,
ConsumerFunction<integrations::kafka::Message> consumer_function) {
@ -31,13 +33,21 @@ KafkaStream::KafkaStream(std::string stream_name, StreamInfo stream_info,
KafkaStream::StreamInfo KafkaStream::Info(std::string transformation_name) const {
const auto &info = consumer_->Info();
using CredentialsType = decltype(StreamInfo::credentials);
CredentialsType reducted_credentials;
std::transform(info.private_configs.begin(), info.private_configs.end(),
std::inserter(reducted_credentials, reducted_credentials.end()),
[](const auto &pair) -> CredentialsType::value_type {
return {pair.first, integrations::kReducted};
});
return {{.batch_interval = info.batch_interval,
.batch_size = info.batch_size,
.transformation_name = std::move(transformation_name)},
.topics = info.topics,
.consumer_group = info.consumer_group,
.bootstrap_servers = info.bootstrap_servers,
.configs = info.public_configs};
.configs = info.public_configs,
.credentials = std::move(reducted_credentials)};
}
void KafkaStream::Start() { consumer_->Start(); }

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd.
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -29,6 +29,7 @@
#include "query/stream/sources.hpp"
#include "query/typed_value.hpp"
#include "utils/event_counter.hpp"
#include "utils/logging.hpp"
#include "utils/memory.hpp"
#include "utils/on_scope_exit.hpp"
#include "utils/pmr/string.hpp"
@ -208,10 +209,12 @@ void Streams::RegisterKafkaProcedures() {
constexpr std::string_view consumer_group_result_name = "consumer_group";
constexpr std::string_view topics_result_name = "topics";
constexpr std::string_view bootstrap_servers_result_name = "bootstrap_servers";
constexpr std::string_view configs_result_name = "configs";
constexpr std::string_view credentials_result_name = "credentials";
auto get_stream_info = [this, proc_name, consumer_group_result_name, topics_result_name,
bootstrap_servers_result_name](mgp_list *args, mgp_graph * /*graph*/, mgp_result *result,
mgp_memory *memory) {
bootstrap_servers_result_name, configs_result_name, credentials_result_name](
mgp_list *args, mgp_graph * /*graph*/, mgp_result *result, mgp_memory *memory) {
auto *arg_stream_name = procedure::Call<mgp_value *>(mgp_list_at, args, 0);
const auto *stream_name = procedure::Call<const char *>(mgp_value_get_string, arg_stream_name);
auto lock_ptr = streams_.Lock();
@ -274,6 +277,50 @@ void Streams::RegisterKafkaProcedures() {
return;
}
const auto convert_config_map =
[result, memory](const std::unordered_map<std::string, std::string> &configs_to_convert)
-> procedure::MgpUniquePtr<mgp_value> {
procedure::MgpUniquePtr<mgp_value> configs_value{nullptr, mgp_value_destroy};
procedure::MgpUniquePtr<mgp_map> configs{nullptr, mgp_map_destroy};
{
const auto success = procedure::TryOrSetError(
[&] { return procedure::CreateMgpObject(configs, mgp_map_make_empty, memory); }, result);
if (!success) {
return configs_value;
}
}
for (const auto &[key, value] : configs_to_convert) {
auto value_value = procedure::GetStringValueOrSetError(value.c_str(), memory, result);
if (!value_value) {
return configs_value;
}
DMG_ASSERT(configs->items.emplace(key, std::move(*value_value)).second);
}
{
const auto success = procedure::TryOrSetError(
[&] {
return procedure::CreateMgpObject(configs_value, mgp_value_make_map, configs.release());
},
result);
if (!success) {
return configs_value;
}
}
return configs_value;
};
auto configs_value = convert_config_map(info.configs);
if (configs_value == nullptr) {
return;
}
auto credentials_value = convert_config_map(info.credentials);
if (credentials_value == nullptr) {
return;
}
if (!procedure::InsertResultOrSetError(result, record, consumer_group_result_name.data(),
consumer_group_value.get())) {
return;
@ -287,6 +334,16 @@ void Streams::RegisterKafkaProcedures() {
bootstrap_servers_value.get())) {
return;
}
if (!procedure::InsertResultOrSetError(result, record, configs_result_name.data(),
configs_value.get())) {
return;
}
if (!procedure::InsertResultOrSetError(result, record, credentials_result_name.data(),
credentials_value.get())) {
return;
}
},
[proc_name](auto && /*other*/) {
throw QueryRuntimeException("'{}' can be only used for Kafka stream sources", proc_name);
@ -305,6 +362,10 @@ void Streams::RegisterKafkaProcedures() {
MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&proc, bootstrap_servers_result_name.data(),
procedure::Call<mgp_type *>(mgp_type_string)) == MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&proc, configs_result_name.data(), procedure::Call<mgp_type *>(mgp_type_map)) ==
MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&proc, credentials_result_name.data(), procedure::Call<mgp_type *>(mgp_type_map)) ==
MGP_ERROR_NO_ERROR);
procedure::gModuleRegistry.RegisterMgProcedure(proc_name, std::move(proc));
}