Add logging before Kafka consume
This commit is contained in:
parent
fbf52a4ce2
commit
52eba3194e
@ -30,6 +30,31 @@
|
|||||||
namespace integrations::kafka {
|
namespace integrations::kafka {
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
// TODO: Remove after debugging
|
||||||
|
std::string time_in_HH_MM_SS_MMM() {
|
||||||
|
using namespace std::chrono;
|
||||||
|
|
||||||
|
// get current time
|
||||||
|
auto now = system_clock::now();
|
||||||
|
|
||||||
|
// get number of milliseconds for the current second
|
||||||
|
// (remainder after division into seconds)
|
||||||
|
auto ms = duration_cast<milliseconds>(now.time_since_epoch()) % 1000;
|
||||||
|
|
||||||
|
// convert to std::time_t in order to convert to std::tm (broken time)
|
||||||
|
auto timer = system_clock::to_time_t(now);
|
||||||
|
|
||||||
|
// convert to broken time
|
||||||
|
std::tm bt = *std::localtime(&timer);
|
||||||
|
|
||||||
|
std::ostringstream oss;
|
||||||
|
|
||||||
|
oss << std::put_time(&bt, "%H:%M:%S"); // HH:MM:SS
|
||||||
|
oss << '.' << std::setfill('0') << std::setw(3) << ms.count();
|
||||||
|
|
||||||
|
return oss.str();
|
||||||
|
}
|
||||||
|
|
||||||
utils::BasicResult<std::string, std::vector<Message>> GetBatch(RdKafka::KafkaConsumer &consumer,
|
utils::BasicResult<std::string, std::vector<Message>> GetBatch(RdKafka::KafkaConsumer &consumer,
|
||||||
const ConsumerInfo &info,
|
const ConsumerInfo &info,
|
||||||
std::atomic<bool> &is_running) {
|
std::atomic<bool> &is_running) {
|
||||||
@ -42,7 +67,9 @@ utils::BasicResult<std::string, std::vector<Message>> GetBatch(RdKafka::KafkaCon
|
|||||||
|
|
||||||
bool run_batch = true;
|
bool run_batch = true;
|
||||||
for (int64_t i = 0; remaining_timeout_in_ms > 0 && i < info.batch_size && is_running.load(); ++i) {
|
for (int64_t i = 0; remaining_timeout_in_ms > 0 && i < info.batch_size && is_running.load(); ++i) {
|
||||||
|
spdlog::info("Consuming stream {} ...", time_in_HH_MM_SS_MMM());
|
||||||
std::unique_ptr<RdKafka::Message> msg(consumer.consume(remaining_timeout_in_ms));
|
std::unique_ptr<RdKafka::Message> msg(consumer.consume(remaining_timeout_in_ms));
|
||||||
|
spdlog::info("Stream consumed {}", time_in_HH_MM_SS_MMM());
|
||||||
switch (msg->err()) {
|
switch (msg->err()) {
|
||||||
case RdKafka::ERR__TIMED_OUT:
|
case RdKafka::ERR__TIMED_OUT:
|
||||||
run_batch = false;
|
run_batch = false;
|
||||||
|
Loading…
Reference in New Issue
Block a user