From d9bb4e2e46169bf15234fedfee4ce50bb56aba49 Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Tue, 30 Nov 2021 16:29:51 +0100 Subject: [PATCH] Use default values instantly for batch size/interval (#306) --- src/integrations/constants.hpp | 2 -- src/integrations/kafka/consumer.cpp | 12 +++----- src/integrations/kafka/consumer.hpp | 4 +-- src/integrations/pulsar/consumer.cpp | 7 ++--- src/integrations/pulsar/consumer.hpp | 4 +-- src/query/interpreter.cpp | 36 +++++++++------------- src/query/interpreter.hpp | 2 +- src/query/procedure/mg_procedure_impl.cpp | 20 ++++++------ src/query/stream/common.cpp | 26 +++++----------- src/query/stream/common.hpp | 11 ++++--- src/query/stream/sources.cpp | 4 +-- src/query/stream/sources.hpp | 4 +-- src/query/stream/streams.cpp | 5 +-- src/query/stream/streams.hpp | 7 +++-- tests/e2e/streams/kafka_streams_tests.py | 4 +-- tests/e2e/streams/pulsar_streams_tests.py | 4 +-- tests/e2e/streams/streams_owner_tests.py | 2 +- tests/unit/integrations_kafka_consumer.cpp | 9 ++++-- tests/unit/query_streams.cpp | 29 +++++++++-------- 19 files changed, 92 insertions(+), 100 deletions(-) diff --git a/src/integrations/constants.hpp b/src/integrations/constants.hpp index c1500dba0..6adb193c7 100644 --- a/src/integrations/constants.hpp +++ b/src/integrations/constants.hpp @@ -13,8 +13,6 @@ #include namespace integrations { -constexpr std::chrono::milliseconds kDefaultBatchInterval{100}; -constexpr int64_t kDefaultBatchSize{1000}; constexpr int64_t kDefaultCheckBatchLimit{1}; constexpr std::chrono::milliseconds kDefaultCheckTimeout{30000}; constexpr std::chrono::milliseconds kMinimumInterval{1}; diff --git a/src/integrations/kafka/consumer.cpp b/src/integrations/kafka/consumer.cpp index ce5e5d928..afcbbfb0b 100644 --- a/src/integrations/kafka/consumer.cpp +++ b/src/integrations/kafka/consumer.cpp @@ -35,14 +35,13 @@ utils::BasicResult> GetBatch(RdKafka::KafkaCon std::atomic &is_running) { std::vector batch{}; - int64_t batch_size = info.batch_size.value_or(kDefaultBatchSize); - batch.reserve(batch_size); + batch.reserve(info.batch_size); - auto remaining_timeout_in_ms = info.batch_interval.value_or(kDefaultBatchInterval).count(); + auto remaining_timeout_in_ms = info.batch_interval.count(); auto start = std::chrono::steady_clock::now(); bool run_batch = true; - for (int64_t i = 0; remaining_timeout_in_ms > 0 && i < batch_size && is_running.load(); ++i) { + for (int64_t i = 0; remaining_timeout_in_ms > 0 && i < info.batch_size && is_running.load(); ++i) { std::unique_ptr msg(consumer.consume(remaining_timeout_in_ms)); switch (msg->err()) { case RdKafka::ERR__TIMED_OUT: @@ -111,13 +110,12 @@ int64_t Message::Offset() const { Consumer::Consumer(ConsumerInfo info, ConsumerFunction consumer_function) : info_{std::move(info)}, consumer_function_(std::move(consumer_function)), cb_(info_.consumer_name) { - MG_ASSERT(consumer_function_, "Empty consumer function for Kafka consumer"); // NOLINTNEXTLINE (modernize-use-nullptr) - if (info_.batch_interval.value_or(kMinimumInterval) < kMinimumInterval) { + if (info_.batch_interval < kMinimumInterval) { throw ConsumerFailedToInitializeException(info_.consumer_name, "Batch interval has to be positive!"); } - if (info_.batch_size.value_or(kMinimumSize) < kMinimumSize) { + if (info_.batch_size < kMinimumSize) { throw ConsumerFailedToInitializeException(info_.consumer_name, "Batch size has to be positive!"); } diff --git a/src/integrations/kafka/consumer.hpp b/src/integrations/kafka/consumer.hpp index 630963a87..63178b4be 100644 --- a/src/integrations/kafka/consumer.hpp +++ b/src/integrations/kafka/consumer.hpp @@ -84,8 +84,8 @@ struct ConsumerInfo { std::vector topics; std::string consumer_group; std::string bootstrap_servers; - std::optional batch_interval; - std::optional batch_size; + std::chrono::milliseconds batch_interval; + int64_t batch_size; }; /// Memgraphs Kafka consumer wrapper. diff --git a/src/integrations/pulsar/consumer.cpp b/src/integrations/pulsar/consumer.cpp index ab79295f4..42be39088 100644 --- a/src/integrations/pulsar/consumer.cpp +++ b/src/integrations/pulsar/consumer.cpp @@ -49,13 +49,12 @@ utils::BasicResult> GetBatch(TConsumer &consum const pulsar_client::MessageId &last_message_id) { std::vector batch{}; - const auto batch_size = info.batch_size.value_or(kDefaultBatchSize); - batch.reserve(batch_size); + batch.reserve(info.batch_size); - auto remaining_timeout_in_ms = info.batch_interval.value_or(kDefaultBatchInterval).count(); + auto remaining_timeout_in_ms = info.batch_interval.count(); auto start = std::chrono::steady_clock::now(); - while (remaining_timeout_in_ms > 0 && batch.size() < batch_size && is_running) { + while (remaining_timeout_in_ms > 0 && batch.size() < info.batch_size && is_running) { pulsar_client::Message message; const auto result = ConsumeMessage(consumer, message, remaining_timeout_in_ms); switch (result) { diff --git a/src/integrations/pulsar/consumer.hpp b/src/integrations/pulsar/consumer.hpp index 16cc56268..8a37f5fb0 100644 --- a/src/integrations/pulsar/consumer.hpp +++ b/src/integrations/pulsar/consumer.hpp @@ -39,8 +39,8 @@ class Message final { using ConsumerFunction = std::function &)>; struct ConsumerInfo { - std::optional batch_size; - std::optional batch_interval; + int64_t batch_size; + std::chrono::milliseconds batch_interval; std::vector topics; std::string consumer_name; std::string service_url; diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index bf0ac9e4f..50d897970 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -532,10 +532,12 @@ std::optional StringPointerToOptional(const std::string *str) { return str == nullptr ? std::nullopt : std::make_optional(*str); } -CommonStreamInfo GetCommonStreamInfo(StreamQuery *stream_query, ExpressionEvaluator &evaluator) { - return {.batch_interval = GetOptionalValue(stream_query->batch_interval_, evaluator), - .batch_size = GetOptionalValue(stream_query->batch_size_, evaluator), - .transformation_name = stream_query->transform_name_}; +stream::CommonStreamInfo GetCommonStreamInfo(StreamQuery *stream_query, ExpressionEvaluator &evaluator) { + return { + .batch_interval = GetOptionalValue(stream_query->batch_interval_, evaluator) + .value_or(stream::kDefaultBatchInterval), + .batch_size = GetOptionalValue(stream_query->batch_size_, evaluator).value_or(stream::kDefaultBatchSize), + .transformation_name = stream_query->transform_name_}; } std::vector EvaluateTopicNames(ExpressionEvaluator &evaluator, @@ -569,12 +571,12 @@ Callback::CallbackFunction GetKafkaCreateCallback(StreamQuery *stream_query, Exp std::string bootstrap = bootstrap_servers ? std::move(*bootstrap_servers) : std::string{interpreter_context->config.default_kafka_bootstrap_servers}; - interpreter_context->streams.Create(stream_name, - {.common_info = std::move(common_stream_info), - .topics = std::move(topic_names), - .consumer_group = std::move(consumer_group), - .bootstrap_servers = std::move(bootstrap)}, - std::move(owner)); + interpreter_context->streams.Create(stream_name, + {.common_info = std::move(common_stream_info), + .topics = std::move(topic_names), + .consumer_group = std::move(consumer_group), + .bootstrap_servers = std::move(bootstrap)}, + std::move(owner)); return std::vector>{}; }; @@ -594,7 +596,7 @@ Callback::CallbackFunction GetPulsarCreateCallback(StreamQuery *stream_query, Ex owner = StringPointerToOptional(username)]() mutable { std::string url = service_url ? std::move(*service_url) : std::string{interpreter_context->config.default_pulsar_service_url}; - interpreter_context->streams.Create( + interpreter_context->streams.Create( stream_name, {.common_info = std::move(common_stream_info), .topics = std::move(topic_names), .service_url = std::move(url)}, std::move(owner)); @@ -681,16 +683,8 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters ¶mete std::vector> results; results.reserve(streams_status.size()); auto stream_info_as_typed_stream_info_emplace_in = [](auto &typed_status, const auto &stream_info) { - if (stream_info.batch_interval.has_value()) { - typed_status.emplace_back(stream_info.batch_interval->count()); - } else { - typed_status.emplace_back(); - } - if (stream_info.batch_size.has_value()) { - typed_status.emplace_back(*stream_info.batch_size); - } else { - typed_status.emplace_back(); - } + typed_status.emplace_back(stream_info.batch_interval.count()); + typed_status.emplace_back(stream_info.batch_size); typed_status.emplace_back(stream_info.transformation_name); }; diff --git a/src/query/interpreter.hpp b/src/query/interpreter.hpp index 95b8249be..133ee3a6e 100644 --- a/src/query/interpreter.hpp +++ b/src/query/interpreter.hpp @@ -190,7 +190,7 @@ struct InterpreterContext { const InterpreterConfig config; - query::Streams streams; + query::stream::Streams streams; }; /// Function that is used to tell all active interpreters that they should stop diff --git a/src/query/procedure/mg_procedure_impl.cpp b/src/query/procedure/mg_procedure_impl.cpp index ce20b775c..a380eebdf 100644 --- a/src/query/procedure/mg_procedure_impl.cpp +++ b/src/query/procedure/mg_procedure_impl.cpp @@ -2546,26 +2546,28 @@ bool IsValidIdentifierName(const char *name) { } // namespace query::procedure namespace { +using StreamSourceType = query::stream::StreamSourceType; + class InvalidMessageFunction : public std::invalid_argument { public: - InvalidMessageFunction(const query::StreamSourceType type, const std::string_view function_name) + InvalidMessageFunction(const StreamSourceType type, const std::string_view function_name) : std::invalid_argument{fmt::format("'{}' is not defined for a message from a stream of type '{}'", function_name, - query::StreamSourceTypeToString(type))} {} + StreamSourceTypeToString(type))} {} }; -query::StreamSourceType MessageToStreamSourceType(const mgp_message::KafkaMessage & /*msg*/) { - return query::StreamSourceType::KAFKA; +StreamSourceType MessageToStreamSourceType(const mgp_message::KafkaMessage & /*msg*/) { + return StreamSourceType::KAFKA; } -query::StreamSourceType MessageToStreamSourceType(const mgp_message::PulsarMessage & /*msg*/) { - return query::StreamSourceType::PULSAR; +StreamSourceType MessageToStreamSourceType(const mgp_message::PulsarMessage & /*msg*/) { + return StreamSourceType::PULSAR; } -mgp_source_type StreamSourceTypeToMgpSourceType(const query::StreamSourceType type) { +mgp_source_type StreamSourceTypeToMgpSourceType(const StreamSourceType type) { switch (type) { - case query::StreamSourceType::KAFKA: + case StreamSourceType::KAFKA: return mgp_source_type::KAFKA; - case query::StreamSourceType::PULSAR: + case StreamSourceType::PULSAR: return mgp_source_type::PULSAR; } } diff --git a/src/query/stream/common.cpp b/src/query/stream/common.cpp index 03bf8e0e0..cf6cdc59c 100644 --- a/src/query/stream/common.cpp +++ b/src/query/stream/common.cpp @@ -13,7 +13,7 @@ #include -namespace query { +namespace query::stream { namespace { const std::string kBatchIntervalKey{"batch_interval"}; const std::string kBatchSizeKey{"batch_size"}; @@ -21,35 +21,25 @@ const std::string kTransformationName{"transformation_name"}; } // namespace void to_json(nlohmann::json &data, CommonStreamInfo &&common_info) { - if (common_info.batch_interval) { - data[kBatchIntervalKey] = common_info.batch_interval->count(); - } else { - data[kBatchIntervalKey] = nullptr; - } - - if (common_info.batch_size) { - data[kBatchSizeKey] = *common_info.batch_size; - } else { - data[kBatchSizeKey] = nullptr; - } - + data[kBatchIntervalKey] = common_info.batch_interval.count(); + data[kBatchSizeKey] = common_info.batch_size; data[kTransformationName] = common_info.transformation_name; } void from_json(const nlohmann::json &data, CommonStreamInfo &common_info) { if (const auto batch_interval = data.at(kBatchIntervalKey); !batch_interval.is_null()) { - using BatchInterval = typename decltype(common_info.batch_interval)::value_type; + using BatchInterval = decltype(common_info.batch_interval); common_info.batch_interval = BatchInterval{batch_interval.get()}; } else { - common_info.batch_interval = {}; + common_info.batch_interval = kDefaultBatchInterval; } if (const auto batch_size = data.at(kBatchSizeKey); !batch_size.is_null()) { - common_info.batch_size = batch_size.get(); + common_info.batch_size = batch_size.get(); } else { - common_info.batch_size = {}; + common_info.batch_size = kDefaultBatchSize; } data.at(kTransformationName).get_to(common_info.transformation_name); } -} // namespace query +} // namespace query::stream diff --git a/src/query/stream/common.hpp b/src/query/stream/common.hpp index 5ad1280b0..0c28ea828 100644 --- a/src/query/stream/common.hpp +++ b/src/query/stream/common.hpp @@ -21,14 +21,17 @@ #include "query/procedure/mg_procedure_impl.hpp" -namespace query { +namespace query::stream { + +constexpr std::chrono::milliseconds kDefaultBatchInterval{100}; +constexpr int64_t kDefaultBatchSize{1000}; template using ConsumerFunction = std::function &)>; struct CommonStreamInfo { - std::optional batch_interval; - std::optional batch_size; + std::chrono::milliseconds batch_interval; + int64_t batch_size; std::string transformation_name; }; @@ -79,4 +82,4 @@ const std::string kCommonInfoKey = "common_info"; void to_json(nlohmann::json &data, CommonStreamInfo &&info); void from_json(const nlohmann::json &data, CommonStreamInfo &common_info); -} // namespace query +} // namespace query::stream diff --git a/src/query/stream/sources.cpp b/src/query/stream/sources.cpp index 9beb2e639..5c1148672 100644 --- a/src/query/stream/sources.cpp +++ b/src/query/stream/sources.cpp @@ -13,7 +13,7 @@ #include -namespace query { +namespace query::stream { KafkaStream::KafkaStream(std::string stream_name, StreamInfo stream_info, ConsumerFunction consumer_function) { integrations::kafka::ConsumerInfo consumer_info{ @@ -114,4 +114,4 @@ void from_json(const nlohmann::json &data, PulsarStream::StreamInfo &info) { data.at(kTopicsKey).get_to(info.topics); data.at(kServiceUrl).get_to(info.service_url); } -} // namespace query +} // namespace query::stream diff --git a/src/query/stream/sources.hpp b/src/query/stream/sources.hpp index afe1bf767..ec29890ed 100644 --- a/src/query/stream/sources.hpp +++ b/src/query/stream/sources.hpp @@ -16,7 +16,7 @@ #include "integrations/kafka/consumer.hpp" #include "integrations/pulsar/consumer.hpp" -namespace query { +namespace query::stream { struct KafkaStream { struct StreamInfo { @@ -88,4 +88,4 @@ inline StreamSourceType StreamType(const PulsarStream & /*stream*/) { return StreamSourceType::PULSAR; } -} // namespace query +} // namespace query::stream diff --git a/src/query/stream/streams.cpp b/src/query/stream/streams.cpp index ab26dc9d3..a80fa5478 100644 --- a/src/query/stream/streams.cpp +++ b/src/query/stream/streams.cpp @@ -38,7 +38,7 @@ namespace EventCounter { extern const Event MessagesConsumed; } // namespace EventCounter -namespace query { +namespace query::stream { namespace { constexpr auto kExpectedTransformationResultSize = 2; const utils::pmr::string query_param_name{"query", utils::NewDeleteResource()}; @@ -525,6 +525,7 @@ Streams::StreamsMap::iterator Streams::CreateConsumer(StreamsMap &map, const std } } }; + auto insert_result = map.try_emplace( stream_name, StreamData{std::move(stream_info.common_info.transformation_name), std::move(owner), std::make_unique>( @@ -729,4 +730,4 @@ TransformationResult Streams::Check(const std::string &stream_name, std::optiona it->second); } -} // namespace query +} // namespace query::stream diff --git a/src/query/stream/streams.hpp b/src/query/stream/streams.hpp index 16c5c6b46..b927c4c07 100644 --- a/src/query/stream/streams.hpp +++ b/src/query/stream/streams.hpp @@ -33,6 +33,10 @@ namespace query { +struct InterpreterContext; + +namespace stream { + class StreamsException : public utils::BasicException { public: using BasicException::BasicException; @@ -65,8 +69,6 @@ struct StreamStatus { using TransformationResult = std::vector>; -struct InterpreterContext; - /// Manages Kafka consumers. /// /// This class is responsible for all query supported actions to happen. @@ -188,4 +190,5 @@ class Streams final { SynchronizedStreamsMap streams_; }; +} // namespace stream } // namespace query diff --git a/tests/e2e/streams/kafka_streams_tests.py b/tests/e2e/streams/kafka_streams_tests.py index b49668630..b05fc7e8a 100755 --- a/tests/e2e/streams/kafka_streams_tests.py +++ b/tests/e2e/streams/kafka_streams_tests.py @@ -216,8 +216,8 @@ def test_show_streams(kafka_producer, kafka_topics, connection): "default_values", ("default_values", "kafka", - None, - None, + 100, + 1000, "kafka_transform.simple", None, False), diff --git a/tests/e2e/streams/pulsar_streams_tests.py b/tests/e2e/streams/pulsar_streams_tests.py index 707e83103..4df7e186e 100755 --- a/tests/e2e/streams/pulsar_streams_tests.py +++ b/tests/e2e/streams/pulsar_streams_tests.py @@ -266,8 +266,8 @@ def test_show_streams(pulsar_client, pulsar_topics, connection): "default_values", ("default_values", "pulsar", - None, - None, + 100, + 1000, "pulsar_transform.simple", None, False), diff --git a/tests/e2e/streams/streams_owner_tests.py b/tests/e2e/streams/streams_owner_tests.py index d67927155..6fd658fa6 100644 --- a/tests/e2e/streams/streams_owner_tests.py +++ b/tests/e2e/streams/streams_owner_tests.py @@ -77,7 +77,7 @@ def test_owner_is_shown(kafka_topics, connection): f"TOPICS {kafka_topics[0]} " f"TRANSFORM kafka_transform.simple") - common.check_stream_info(userless_cursor, "test", ("test", "kafka", None, None, + common.check_stream_info(userless_cursor, "test", ("test", "kafka", 100, 1000, "kafka_transform.simple", stream_user, False)) diff --git a/tests/unit/integrations_kafka_consumer.cpp b/tests/unit/integrations_kafka_consumer.cpp index 08dd6a0b1..502317658 100644 --- a/tests/unit/integrations_kafka_consumer.cpp +++ b/tests/unit/integrations_kafka_consumer.cpp @@ -21,6 +21,7 @@ #include #include #include + #include "integrations/kafka/consumer.hpp" #include "integrations/kafka/exceptions.hpp" #include "kafka_mock.hpp" @@ -40,6 +41,10 @@ int SpanToInt(std::span span) { std::memcpy(&result, span.data(), sizeof(int)); return result; } + +constexpr std::chrono::milliseconds kDefaultBatchInterval{100}; +constexpr int64_t kDefaultBatchSize{1000}; + } // namespace struct ConsumerTest : public ::testing::Test { @@ -52,8 +57,8 @@ struct ConsumerTest : public ::testing::Test { .topics = {kTopicName}, .consumer_group = "ConsumerGroup " + test_name, .bootstrap_servers = cluster.Bootstraps(), - .batch_interval = std::nullopt, - .batch_size = std::nullopt, + .batch_interval = kDefaultBatchInterval, + .batch_size = kDefaultBatchSize, }; }; diff --git a/tests/unit/query_streams.cpp b/tests/unit/query_streams.cpp index 817b7948c..018da55c3 100644 --- a/tests/unit/query_streams.cpp +++ b/tests/unit/query_streams.cpp @@ -22,9 +22,9 @@ #include "query/stream/streams.hpp" #include "storage/v2/storage.hpp" -using Streams = query::Streams; -using StreamInfo = query::KafkaStream::StreamInfo; -using StreamStatus = query::StreamStatus; +using Streams = query::stream::Streams; +using StreamInfo = query::stream::KafkaStream::StreamInfo; +using StreamStatus = query::stream::StreamStatus; namespace { const static std::string kTopicName{"TrialTopic"}; @@ -90,8 +90,8 @@ class StreamsTest : public ::testing::Test { StreamInfo CreateDefaultStreamInfo() { return StreamInfo{.common_info{ - .batch_interval = std::nullopt, - .batch_size = std::nullopt, + .batch_interval = query::stream::kDefaultBatchInterval, + .batch_size = query::stream::kDefaultBatchSize, .transformation_name = "not used in the tests", }, .topics = {kTopicName}, @@ -111,7 +111,7 @@ class StreamsTest : public ::testing::Test { TEST_F(StreamsTest, SimpleStreamManagement) { auto check_data = CreateDefaultStreamCheckData(); - streams_->Create(check_data.name, check_data.info, check_data.owner); + streams_->Create(check_data.name, check_data.info, check_data.owner); EXPECT_NO_FATAL_FAILURE(CheckStreamStatus(check_data)); EXPECT_NO_THROW(streams_->Start(check_data.name)); @@ -137,12 +137,12 @@ TEST_F(StreamsTest, SimpleStreamManagement) { TEST_F(StreamsTest, CreateAlreadyExisting) { auto stream_info = CreateDefaultStreamInfo(); auto stream_name = GetDefaultStreamName(); - streams_->Create(stream_name, stream_info, std::nullopt); + streams_->Create(stream_name, stream_info, std::nullopt); try { - streams_->Create(stream_name, stream_info, std::nullopt); + streams_->Create(stream_name, stream_info, std::nullopt); FAIL() << "Creating already existing stream should throw\n"; - } catch (query::StreamsException &exception) { + } catch (query::stream::StreamsException &exception) { EXPECT_EQ(exception.what(), fmt::format("Stream already exists with name '{}'", stream_name)); } } @@ -151,12 +151,12 @@ TEST_F(StreamsTest, DropNotExistingStream) { const auto stream_info = CreateDefaultStreamInfo(); const auto stream_name = GetDefaultStreamName(); const std::string not_existing_stream_name{"ThisDoesn'tExists"}; - streams_->Create(stream_name, stream_info, std::nullopt); + streams_->Create(stream_name, stream_info, std::nullopt); try { streams_->Drop(not_existing_stream_name); FAIL() << "Dropping not existing stream should throw\n"; - } catch (query::StreamsException &exception) { + } catch (query::stream::StreamsException &exception) { EXPECT_EQ(exception.what(), fmt::format("Couldn't find stream '{}'", not_existing_stream_name)); } } @@ -187,8 +187,7 @@ TEST_F(StreamsTest, RestoreStreams) { mock_cluster_.CreateTopic(stream_info.topics[0]); } - stream_check_datas[1].info.common_info.batch_interval = {}; - stream_check_datas[2].info.common_info.batch_size = {}; + stream_check_datas[3].owner = {}; const auto check_restore_logic = [&stream_check_datas, this]() { @@ -206,7 +205,7 @@ TEST_F(StreamsTest, RestoreStreams) { EXPECT_TRUE(streams_->GetStreamInfo().empty()); for (auto &check_data : stream_check_datas) { - streams_->Create(check_data.name, check_data.info, check_data.owner); + streams_->Create(check_data.name, check_data.info, check_data.owner); } { SCOPED_TRACE("After streams are created"); @@ -242,7 +241,7 @@ TEST_F(StreamsTest, RestoreStreams) { TEST_F(StreamsTest, CheckWithTimeout) { const auto stream_info = CreateDefaultStreamInfo(); const auto stream_name = GetDefaultStreamName(); - streams_->Create(stream_name, stream_info, std::nullopt); + streams_->Create(stream_name, stream_info, std::nullopt); std::chrono::milliseconds timeout{3000};