diff --git a/src/integrations/kafka/consumer.cpp b/src/integrations/kafka/consumer.cpp index 2f795942b..be77882ac 100644 --- a/src/integrations/kafka/consumer.cpp +++ b/src/integrations/kafka/consumer.cpp @@ -249,10 +249,18 @@ void Consumer::Check(std::optional timeout, std::opti utils::OnScopeExit restore_is_running([this] { is_running_.store(false); }); if (last_assignment_.empty()) { - if (const auto err = consumer_->assignment(last_assignment_); err != RdKafka::ERR_NO_ERROR) { - spdlog::warn("Saving the commited offset of consumer {} failed: {}", info_.consumer_name, RdKafka::err2str(err)); + auto throw_consumer_check_failed = [this](const auto err) { throw ConsumerCheckFailedException(info_.consumer_name, fmt::format("Couldn't save commited offsets: '{}'", RdKafka::err2str(err))); + }; + if (const auto err = consumer_->assignment(last_assignment_); err != RdKafka::ERR_NO_ERROR) { + spdlog::warn("Saving the assignment of consumer {} failed: {}", info_.consumer_name, RdKafka::err2str(err)); + throw_consumer_check_failed(err); + } + if (const auto err = consumer_->position(last_assignment_); err != RdKafka::ERR_NO_ERROR) { + spdlog::warn("Saving the position offset assignment of consumer {} failed: {}", info_.consumer_name, + RdKafka::err2str(err)); + throw_consumer_check_failed(err); } } else { if (const auto err = consumer_->assign(last_assignment_); err != RdKafka::ERR_NO_ERROR) {