Use default values instantly for batch size/interval (#306)

This commit is contained in:
Antonio Andelic 2021-11-30 16:29:51 +01:00 committed by GitHub
parent 05d0aee494
commit d9bb4e2e46
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 92 additions and 100 deletions

View File

@ -13,8 +13,6 @@
#include <chrono>
namespace integrations {
constexpr std::chrono::milliseconds kDefaultBatchInterval{100};
constexpr int64_t kDefaultBatchSize{1000};
constexpr int64_t kDefaultCheckBatchLimit{1};
constexpr std::chrono::milliseconds kDefaultCheckTimeout{30000};
constexpr std::chrono::milliseconds kMinimumInterval{1};

View File

@ -35,14 +35,13 @@ utils::BasicResult<std::string, std::vector<Message>> GetBatch(RdKafka::KafkaCon
std::atomic<bool> &is_running) {
std::vector<Message> batch{};
int64_t batch_size = info.batch_size.value_or(kDefaultBatchSize);
batch.reserve(batch_size);
batch.reserve(info.batch_size);
auto remaining_timeout_in_ms = info.batch_interval.value_or(kDefaultBatchInterval).count();
auto remaining_timeout_in_ms = info.batch_interval.count();
auto start = std::chrono::steady_clock::now();
bool run_batch = true;
for (int64_t i = 0; remaining_timeout_in_ms > 0 && i < batch_size && is_running.load(); ++i) {
for (int64_t i = 0; remaining_timeout_in_ms > 0 && i < info.batch_size && is_running.load(); ++i) {
std::unique_ptr<RdKafka::Message> msg(consumer.consume(remaining_timeout_in_ms));
switch (msg->err()) {
case RdKafka::ERR__TIMED_OUT:
@ -111,13 +110,12 @@ int64_t Message::Offset() const {
Consumer::Consumer(ConsumerInfo info, ConsumerFunction consumer_function)
: info_{std::move(info)}, consumer_function_(std::move(consumer_function)), cb_(info_.consumer_name) {
MG_ASSERT(consumer_function_, "Empty consumer function for Kafka consumer");
// NOLINTNEXTLINE (modernize-use-nullptr)
if (info_.batch_interval.value_or(kMinimumInterval) < kMinimumInterval) {
if (info_.batch_interval < kMinimumInterval) {
throw ConsumerFailedToInitializeException(info_.consumer_name, "Batch interval has to be positive!");
}
if (info_.batch_size.value_or(kMinimumSize) < kMinimumSize) {
if (info_.batch_size < kMinimumSize) {
throw ConsumerFailedToInitializeException(info_.consumer_name, "Batch size has to be positive!");
}

View File

@ -84,8 +84,8 @@ struct ConsumerInfo {
std::vector<std::string> topics;
std::string consumer_group;
std::string bootstrap_servers;
std::optional<std::chrono::milliseconds> batch_interval;
std::optional<int64_t> batch_size;
std::chrono::milliseconds batch_interval;
int64_t batch_size;
};
/// Memgraphs Kafka consumer wrapper.

View File

@ -49,13 +49,12 @@ utils::BasicResult<std::string, std::vector<Message>> GetBatch(TConsumer &consum
const pulsar_client::MessageId &last_message_id) {
std::vector<Message> batch{};
const auto batch_size = info.batch_size.value_or(kDefaultBatchSize);
batch.reserve(batch_size);
batch.reserve(info.batch_size);
auto remaining_timeout_in_ms = info.batch_interval.value_or(kDefaultBatchInterval).count();
auto remaining_timeout_in_ms = info.batch_interval.count();
auto start = std::chrono::steady_clock::now();
while (remaining_timeout_in_ms > 0 && batch.size() < batch_size && is_running) {
while (remaining_timeout_in_ms > 0 && batch.size() < info.batch_size && is_running) {
pulsar_client::Message message;
const auto result = ConsumeMessage(consumer, message, remaining_timeout_in_ms);
switch (result) {

View File

@ -39,8 +39,8 @@ class Message final {
using ConsumerFunction = std::function<void(const std::vector<Message> &)>;
struct ConsumerInfo {
std::optional<int64_t> batch_size;
std::optional<std::chrono::milliseconds> batch_interval;
int64_t batch_size;
std::chrono::milliseconds batch_interval;
std::vector<std::string> topics;
std::string consumer_name;
std::string service_url;

View File

@ -532,9 +532,11 @@ std::optional<std::string> StringPointerToOptional(const std::string *str) {
return str == nullptr ? std::nullopt : std::make_optional(*str);
}
CommonStreamInfo GetCommonStreamInfo(StreamQuery *stream_query, ExpressionEvaluator &evaluator) {
return {.batch_interval = GetOptionalValue<std::chrono::milliseconds>(stream_query->batch_interval_, evaluator),
.batch_size = GetOptionalValue<int64_t>(stream_query->batch_size_, evaluator),
stream::CommonStreamInfo GetCommonStreamInfo(StreamQuery *stream_query, ExpressionEvaluator &evaluator) {
return {
.batch_interval = GetOptionalValue<std::chrono::milliseconds>(stream_query->batch_interval_, evaluator)
.value_or(stream::kDefaultBatchInterval),
.batch_size = GetOptionalValue<int64_t>(stream_query->batch_size_, evaluator).value_or(stream::kDefaultBatchSize),
.transformation_name = stream_query->transform_name_};
}
@ -569,7 +571,7 @@ Callback::CallbackFunction GetKafkaCreateCallback(StreamQuery *stream_query, Exp
std::string bootstrap = bootstrap_servers
? std::move(*bootstrap_servers)
: std::string{interpreter_context->config.default_kafka_bootstrap_servers};
interpreter_context->streams.Create<query::KafkaStream>(stream_name,
interpreter_context->streams.Create<query::stream::KafkaStream>(stream_name,
{.common_info = std::move(common_stream_info),
.topics = std::move(topic_names),
.consumer_group = std::move(consumer_group),
@ -594,7 +596,7 @@ Callback::CallbackFunction GetPulsarCreateCallback(StreamQuery *stream_query, Ex
owner = StringPointerToOptional(username)]() mutable {
std::string url =
service_url ? std::move(*service_url) : std::string{interpreter_context->config.default_pulsar_service_url};
interpreter_context->streams.Create<query::PulsarStream>(
interpreter_context->streams.Create<query::stream::PulsarStream>(
stream_name,
{.common_info = std::move(common_stream_info), .topics = std::move(topic_names), .service_url = std::move(url)},
std::move(owner));
@ -681,16 +683,8 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters &paramete
std::vector<std::vector<TypedValue>> results;
results.reserve(streams_status.size());
auto stream_info_as_typed_stream_info_emplace_in = [](auto &typed_status, const auto &stream_info) {
if (stream_info.batch_interval.has_value()) {
typed_status.emplace_back(stream_info.batch_interval->count());
} else {
typed_status.emplace_back();
}
if (stream_info.batch_size.has_value()) {
typed_status.emplace_back(*stream_info.batch_size);
} else {
typed_status.emplace_back();
}
typed_status.emplace_back(stream_info.batch_interval.count());
typed_status.emplace_back(stream_info.batch_size);
typed_status.emplace_back(stream_info.transformation_name);
};

View File

@ -190,7 +190,7 @@ struct InterpreterContext {
const InterpreterConfig config;
query::Streams streams;
query::stream::Streams streams;
};
/// Function that is used to tell all active interpreters that they should stop

View File

@ -2546,26 +2546,28 @@ bool IsValidIdentifierName(const char *name) {
} // namespace query::procedure
namespace {
using StreamSourceType = query::stream::StreamSourceType;
class InvalidMessageFunction : public std::invalid_argument {
public:
InvalidMessageFunction(const query::StreamSourceType type, const std::string_view function_name)
InvalidMessageFunction(const StreamSourceType type, const std::string_view function_name)
: std::invalid_argument{fmt::format("'{}' is not defined for a message from a stream of type '{}'", function_name,
query::StreamSourceTypeToString(type))} {}
StreamSourceTypeToString(type))} {}
};
query::StreamSourceType MessageToStreamSourceType(const mgp_message::KafkaMessage & /*msg*/) {
return query::StreamSourceType::KAFKA;
StreamSourceType MessageToStreamSourceType(const mgp_message::KafkaMessage & /*msg*/) {
return StreamSourceType::KAFKA;
}
query::StreamSourceType MessageToStreamSourceType(const mgp_message::PulsarMessage & /*msg*/) {
return query::StreamSourceType::PULSAR;
StreamSourceType MessageToStreamSourceType(const mgp_message::PulsarMessage & /*msg*/) {
return StreamSourceType::PULSAR;
}
mgp_source_type StreamSourceTypeToMgpSourceType(const query::StreamSourceType type) {
mgp_source_type StreamSourceTypeToMgpSourceType(const StreamSourceType type) {
switch (type) {
case query::StreamSourceType::KAFKA:
case StreamSourceType::KAFKA:
return mgp_source_type::KAFKA;
case query::StreamSourceType::PULSAR:
case StreamSourceType::PULSAR:
return mgp_source_type::PULSAR;
}
}

View File

@ -13,7 +13,7 @@
#include <json/json.hpp>
namespace query {
namespace query::stream {
namespace {
const std::string kBatchIntervalKey{"batch_interval"};
const std::string kBatchSizeKey{"batch_size"};
@ -21,35 +21,25 @@ const std::string kTransformationName{"transformation_name"};
} // namespace
void to_json(nlohmann::json &data, CommonStreamInfo &&common_info) {
if (common_info.batch_interval) {
data[kBatchIntervalKey] = common_info.batch_interval->count();
} else {
data[kBatchIntervalKey] = nullptr;
}
if (common_info.batch_size) {
data[kBatchSizeKey] = *common_info.batch_size;
} else {
data[kBatchSizeKey] = nullptr;
}
data[kBatchIntervalKey] = common_info.batch_interval.count();
data[kBatchSizeKey] = common_info.batch_size;
data[kTransformationName] = common_info.transformation_name;
}
void from_json(const nlohmann::json &data, CommonStreamInfo &common_info) {
if (const auto batch_interval = data.at(kBatchIntervalKey); !batch_interval.is_null()) {
using BatchInterval = typename decltype(common_info.batch_interval)::value_type;
using BatchInterval = decltype(common_info.batch_interval);
common_info.batch_interval = BatchInterval{batch_interval.get<typename BatchInterval::rep>()};
} else {
common_info.batch_interval = {};
common_info.batch_interval = kDefaultBatchInterval;
}
if (const auto batch_size = data.at(kBatchSizeKey); !batch_size.is_null()) {
common_info.batch_size = batch_size.get<typename decltype(common_info.batch_size)::value_type>();
common_info.batch_size = batch_size.get<decltype(common_info.batch_size)>();
} else {
common_info.batch_size = {};
common_info.batch_size = kDefaultBatchSize;
}
data.at(kTransformationName).get_to(common_info.transformation_name);
}
} // namespace query
} // namespace query::stream

View File

@ -21,14 +21,17 @@
#include "query/procedure/mg_procedure_impl.hpp"
namespace query {
namespace query::stream {
constexpr std::chrono::milliseconds kDefaultBatchInterval{100};
constexpr int64_t kDefaultBatchSize{1000};
template <typename TMessage>
using ConsumerFunction = std::function<void(const std::vector<TMessage> &)>;
struct CommonStreamInfo {
std::optional<std::chrono::milliseconds> batch_interval;
std::optional<int64_t> batch_size;
std::chrono::milliseconds batch_interval;
int64_t batch_size;
std::string transformation_name;
};
@ -79,4 +82,4 @@ const std::string kCommonInfoKey = "common_info";
void to_json(nlohmann::json &data, CommonStreamInfo &&info);
void from_json(const nlohmann::json &data, CommonStreamInfo &common_info);
} // namespace query
} // namespace query::stream

View File

@ -13,7 +13,7 @@
#include <json/json.hpp>
namespace query {
namespace query::stream {
KafkaStream::KafkaStream(std::string stream_name, StreamInfo stream_info,
ConsumerFunction<integrations::kafka::Message> consumer_function) {
integrations::kafka::ConsumerInfo consumer_info{
@ -114,4 +114,4 @@ void from_json(const nlohmann::json &data, PulsarStream::StreamInfo &info) {
data.at(kTopicsKey).get_to(info.topics);
data.at(kServiceUrl).get_to(info.service_url);
}
} // namespace query
} // namespace query::stream

View File

@ -16,7 +16,7 @@
#include "integrations/kafka/consumer.hpp"
#include "integrations/pulsar/consumer.hpp"
namespace query {
namespace query::stream {
struct KafkaStream {
struct StreamInfo {
@ -88,4 +88,4 @@ inline StreamSourceType StreamType(const PulsarStream & /*stream*/) {
return StreamSourceType::PULSAR;
}
} // namespace query
} // namespace query::stream

View File

@ -38,7 +38,7 @@ namespace EventCounter {
extern const Event MessagesConsumed;
} // namespace EventCounter
namespace query {
namespace query::stream {
namespace {
constexpr auto kExpectedTransformationResultSize = 2;
const utils::pmr::string query_param_name{"query", utils::NewDeleteResource()};
@ -525,6 +525,7 @@ Streams::StreamsMap::iterator Streams::CreateConsumer(StreamsMap &map, const std
}
}
};
auto insert_result = map.try_emplace(
stream_name, StreamData<TStream>{std::move(stream_info.common_info.transformation_name), std::move(owner),
std::make_unique<SynchronizedStreamSource<TStream>>(
@ -729,4 +730,4 @@ TransformationResult Streams::Check(const std::string &stream_name, std::optiona
it->second);
}
} // namespace query
} // namespace query::stream

View File

@ -33,6 +33,10 @@
namespace query {
struct InterpreterContext;
namespace stream {
class StreamsException : public utils::BasicException {
public:
using BasicException::BasicException;
@ -65,8 +69,6 @@ struct StreamStatus {
using TransformationResult = std::vector<std::vector<TypedValue>>;
struct InterpreterContext;
/// Manages Kafka consumers.
///
/// This class is responsible for all query supported actions to happen.
@ -188,4 +190,5 @@ class Streams final {
SynchronizedStreamsMap streams_;
};
} // namespace stream
} // namespace query

View File

@ -216,8 +216,8 @@ def test_show_streams(kafka_producer, kafka_topics, connection):
"default_values",
("default_values",
"kafka",
None,
None,
100,
1000,
"kafka_transform.simple",
None,
False),

View File

@ -266,8 +266,8 @@ def test_show_streams(pulsar_client, pulsar_topics, connection):
"default_values",
("default_values",
"pulsar",
None,
None,
100,
1000,
"pulsar_transform.simple",
None,
False),

View File

@ -77,7 +77,7 @@ def test_owner_is_shown(kafka_topics, connection):
f"TOPICS {kafka_topics[0]} "
f"TRANSFORM kafka_transform.simple")
common.check_stream_info(userless_cursor, "test", ("test", "kafka", None, None,
common.check_stream_info(userless_cursor, "test", ("test", "kafka", 100, 1000,
"kafka_transform.simple", stream_user, False))

View File

@ -21,6 +21,7 @@
#include <gtest/gtest.h>
#include <spdlog/common.h>
#include <spdlog/spdlog.h>
#include "integrations/kafka/consumer.hpp"
#include "integrations/kafka/exceptions.hpp"
#include "kafka_mock.hpp"
@ -40,6 +41,10 @@ int SpanToInt(std::span<const char> span) {
std::memcpy(&result, span.data(), sizeof(int));
return result;
}
constexpr std::chrono::milliseconds kDefaultBatchInterval{100};
constexpr int64_t kDefaultBatchSize{1000};
} // namespace
struct ConsumerTest : public ::testing::Test {
@ -52,8 +57,8 @@ struct ConsumerTest : public ::testing::Test {
.topics = {kTopicName},
.consumer_group = "ConsumerGroup " + test_name,
.bootstrap_servers = cluster.Bootstraps(),
.batch_interval = std::nullopt,
.batch_size = std::nullopt,
.batch_interval = kDefaultBatchInterval,
.batch_size = kDefaultBatchSize,
};
};

View File

@ -22,9 +22,9 @@
#include "query/stream/streams.hpp"
#include "storage/v2/storage.hpp"
using Streams = query::Streams;
using StreamInfo = query::KafkaStream::StreamInfo;
using StreamStatus = query::StreamStatus<query::KafkaStream>;
using Streams = query::stream::Streams;
using StreamInfo = query::stream::KafkaStream::StreamInfo;
using StreamStatus = query::stream::StreamStatus<query::stream::KafkaStream>;
namespace {
const static std::string kTopicName{"TrialTopic"};
@ -90,8 +90,8 @@ class StreamsTest : public ::testing::Test {
StreamInfo CreateDefaultStreamInfo() {
return StreamInfo{.common_info{
.batch_interval = std::nullopt,
.batch_size = std::nullopt,
.batch_interval = query::stream::kDefaultBatchInterval,
.batch_size = query::stream::kDefaultBatchSize,
.transformation_name = "not used in the tests",
},
.topics = {kTopicName},
@ -111,7 +111,7 @@ class StreamsTest : public ::testing::Test {
TEST_F(StreamsTest, SimpleStreamManagement) {
auto check_data = CreateDefaultStreamCheckData();
streams_->Create<query::KafkaStream>(check_data.name, check_data.info, check_data.owner);
streams_->Create<query::stream::KafkaStream>(check_data.name, check_data.info, check_data.owner);
EXPECT_NO_FATAL_FAILURE(CheckStreamStatus(check_data));
EXPECT_NO_THROW(streams_->Start(check_data.name));
@ -137,12 +137,12 @@ TEST_F(StreamsTest, SimpleStreamManagement) {
TEST_F(StreamsTest, CreateAlreadyExisting) {
auto stream_info = CreateDefaultStreamInfo();
auto stream_name = GetDefaultStreamName();
streams_->Create<query::KafkaStream>(stream_name, stream_info, std::nullopt);
streams_->Create<query::stream::KafkaStream>(stream_name, stream_info, std::nullopt);
try {
streams_->Create<query::KafkaStream>(stream_name, stream_info, std::nullopt);
streams_->Create<query::stream::KafkaStream>(stream_name, stream_info, std::nullopt);
FAIL() << "Creating already existing stream should throw\n";
} catch (query::StreamsException &exception) {
} catch (query::stream::StreamsException &exception) {
EXPECT_EQ(exception.what(), fmt::format("Stream already exists with name '{}'", stream_name));
}
}
@ -151,12 +151,12 @@ TEST_F(StreamsTest, DropNotExistingStream) {
const auto stream_info = CreateDefaultStreamInfo();
const auto stream_name = GetDefaultStreamName();
const std::string not_existing_stream_name{"ThisDoesn'tExists"};
streams_->Create<query::KafkaStream>(stream_name, stream_info, std::nullopt);
streams_->Create<query::stream::KafkaStream>(stream_name, stream_info, std::nullopt);
try {
streams_->Drop(not_existing_stream_name);
FAIL() << "Dropping not existing stream should throw\n";
} catch (query::StreamsException &exception) {
} catch (query::stream::StreamsException &exception) {
EXPECT_EQ(exception.what(), fmt::format("Couldn't find stream '{}'", not_existing_stream_name));
}
}
@ -187,8 +187,7 @@ TEST_F(StreamsTest, RestoreStreams) {
mock_cluster_.CreateTopic(stream_info.topics[0]);
}
stream_check_datas[1].info.common_info.batch_interval = {};
stream_check_datas[2].info.common_info.batch_size = {};
stream_check_datas[3].owner = {};
const auto check_restore_logic = [&stream_check_datas, this]() {
@ -206,7 +205,7 @@ TEST_F(StreamsTest, RestoreStreams) {
EXPECT_TRUE(streams_->GetStreamInfo().empty());
for (auto &check_data : stream_check_datas) {
streams_->Create<query::KafkaStream>(check_data.name, check_data.info, check_data.owner);
streams_->Create<query::stream::KafkaStream>(check_data.name, check_data.info, check_data.owner);
}
{
SCOPED_TRACE("After streams are created");
@ -242,7 +241,7 @@ TEST_F(StreamsTest, RestoreStreams) {
TEST_F(StreamsTest, CheckWithTimeout) {
const auto stream_info = CreateDefaultStreamInfo();
const auto stream_name = GetDefaultStreamName();
streams_->Create<query::KafkaStream>(stream_name, stream_info, std::nullopt);
streams_->Create<query::stream::KafkaStream>(stream_name, stream_info, std::nullopt);
std::chrono::milliseconds timeout{3000};