diff --git a/src/integrations/kafka/consumer.cpp b/src/integrations/kafka/consumer.cpp index 8674c8442..9889fe46b 100644 --- a/src/integrations/kafka/consumer.cpp +++ b/src/integrations/kafka/consumer.cpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source diff --git a/tests/unit/integrations_kafka_consumer.cpp b/tests/unit/integrations_kafka_consumer.cpp index 426c836c3..63a419552 100644 --- a/tests/unit/integrations_kafka_consumer.cpp +++ b/tests/unit/integrations_kafka_consumer.cpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -147,6 +147,8 @@ TEST_F(ConsumerTest, BatchInterval) { cluster.SeedTopic(kTopicName, kMessage); std::this_thread::sleep_for(kBatchInterval * 0.5); } + // Wait for all messages to be delivered + std::this_thread::sleep_for(kBatchInterval); consumer->Stop(); EXPECT_TRUE(expected_messages_received) << "Some unexpected message has been received"; @@ -170,9 +172,13 @@ TEST_F(ConsumerTest, BatchInterval) { EXPECT_TRUE(1 <= received_timestamps[0].first && received_timestamps[0].first <= 2); EXPECT_LE(3, received_timestamps.size()); + + int msgsCnt = received_timestamps[0].first; for (auto i = 1; i < received_timestamps.size(); ++i) { + msgsCnt += received_timestamps[i].first; check_received_timestamp(i); } + EXPECT_EQ(kMessageCount, msgsCnt); } TEST_F(ConsumerTest, StartStop) {