From 16b8c7b27c4f32b58602417f5f41dc8bd9d866ae Mon Sep 17 00:00:00 2001
From: Aidar Samerkhanov <aidar.samerkhanov@memgraph.io>
Date: Sun, 5 Nov 2023 22:51:56 +0300
Subject: [PATCH] Fix Kafka flaky unit test (#1409)

---
 src/integrations/kafka/consumer.cpp        | 2 +-
 tests/unit/integrations_kafka_consumer.cpp | 8 +++++++-
 2 files changed, 8 insertions(+), 2 deletions(-)

diff --git a/src/integrations/kafka/consumer.cpp b/src/integrations/kafka/consumer.cpp
index 8674c8442..9889fe46b 100644
--- a/src/integrations/kafka/consumer.cpp
+++ b/src/integrations/kafka/consumer.cpp
@@ -1,4 +1,4 @@
-// Copyright 2022 Memgraph Ltd.
+// Copyright 2023 Memgraph Ltd.
 //
 // Use of this software is governed by the Business Source License
 // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
diff --git a/tests/unit/integrations_kafka_consumer.cpp b/tests/unit/integrations_kafka_consumer.cpp
index 426c836c3..63a419552 100644
--- a/tests/unit/integrations_kafka_consumer.cpp
+++ b/tests/unit/integrations_kafka_consumer.cpp
@@ -1,4 +1,4 @@
-// Copyright 2022 Memgraph Ltd.
+// Copyright 2023 Memgraph Ltd.
 //
 // Use of this software is governed by the Business Source License
 // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@@ -147,6 +147,8 @@ TEST_F(ConsumerTest, BatchInterval) {
     cluster.SeedTopic(kTopicName, kMessage);
     std::this_thread::sleep_for(kBatchInterval * 0.5);
   }
+  // Wait for all messages to be delivered
+  std::this_thread::sleep_for(kBatchInterval);
 
   consumer->Stop();
   EXPECT_TRUE(expected_messages_received) << "Some unexpected message has been received";
@@ -170,9 +172,13 @@ TEST_F(ConsumerTest, BatchInterval) {
   EXPECT_TRUE(1 <= received_timestamps[0].first && received_timestamps[0].first <= 2);
 
   EXPECT_LE(3, received_timestamps.size());
+
+  int msgsCnt = received_timestamps[0].first;
   for (auto i = 1; i < received_timestamps.size(); ++i) {
+    msgsCnt += received_timestamps[i].first;
     check_received_timestamp(i);
   }
+  EXPECT_EQ(kMessageCount, msgsCnt);
 }
 
 TEST_F(ConsumerTest, StartStop) {