From 6cb293688d96c992fa33b48d2187661f4727deea Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis Date: Mon, 15 Nov 2021 15:44:30 +0200 Subject: [PATCH] Fix check stream not reseting back to the correct offset (#295) --- src/integrations/kafka/consumer.cpp | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/integrations/kafka/consumer.cpp b/src/integrations/kafka/consumer.cpp index 2f795942b..be77882ac 100644 --- a/src/integrations/kafka/consumer.cpp +++ b/src/integrations/kafka/consumer.cpp @@ -249,10 +249,18 @@ void Consumer::Check(std::optional timeout, std::opti utils::OnScopeExit restore_is_running([this] { is_running_.store(false); }); if (last_assignment_.empty()) { - if (const auto err = consumer_->assignment(last_assignment_); err != RdKafka::ERR_NO_ERROR) { - spdlog::warn("Saving the commited offset of consumer {} failed: {}", info_.consumer_name, RdKafka::err2str(err)); + auto throw_consumer_check_failed = [this](const auto err) { throw ConsumerCheckFailedException(info_.consumer_name, fmt::format("Couldn't save commited offsets: '{}'", RdKafka::err2str(err))); + }; + if (const auto err = consumer_->assignment(last_assignment_); err != RdKafka::ERR_NO_ERROR) { + spdlog::warn("Saving the assignment of consumer {} failed: {}", info_.consumer_name, RdKafka::err2str(err)); + throw_consumer_check_failed(err); + } + if (const auto err = consumer_->position(last_assignment_); err != RdKafka::ERR_NO_ERROR) { + spdlog::warn("Saving the position offset assignment of consumer {} failed: {}", info_.consumer_name, + RdKafka::err2str(err)); + throw_consumer_check_failed(err); } } else { if (const auto err = consumer_->assign(last_assignment_); err != RdKafka::ERR_NO_ERROR) {