Fix check stream not reseting back to the correct offset (#295)

This commit is contained in:
Kostas Kyrimis 2021-11-15 15:44:30 +02:00 committed by GitHub
parent 16709dff6c
commit 6cb293688d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -249,10 +249,18 @@ void Consumer::Check(std::optional<std::chrono::milliseconds> timeout, std::opti
utils::OnScopeExit restore_is_running([this] { is_running_.store(false); });
if (last_assignment_.empty()) {
if (const auto err = consumer_->assignment(last_assignment_); err != RdKafka::ERR_NO_ERROR) {
spdlog::warn("Saving the commited offset of consumer {} failed: {}", info_.consumer_name, RdKafka::err2str(err));
auto throw_consumer_check_failed = [this](const auto err) {
throw ConsumerCheckFailedException(info_.consumer_name,
fmt::format("Couldn't save commited offsets: '{}'", RdKafka::err2str(err)));
};
if (const auto err = consumer_->assignment(last_assignment_); err != RdKafka::ERR_NO_ERROR) {
spdlog::warn("Saving the assignment of consumer {} failed: {}", info_.consumer_name, RdKafka::err2str(err));
throw_consumer_check_failed(err);
}
if (const auto err = consumer_->position(last_assignment_); err != RdKafka::ERR_NO_ERROR) {
spdlog::warn("Saving the position offset assignment of consumer {} failed: {}", info_.consumer_name,
RdKafka::err2str(err));
throw_consumer_check_failed(err);
}
} else {
if (const auto err = consumer_->assign(last_assignment_); err != RdKafka::ERR_NO_ERROR) {