-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
Affects Version(s): 2.7.1
Hello,
I think I have found a bug with the pause method of Listener Containers.
When I call registry.getListenerContainers().forEach(MessageListenerContainer::pause);
I have something like this in the log :
2021-07-06T18:20:16,650- [DEBUG] [T-1] k.l.KafkaMessageListenerContainer$ListenerConsumer - [{}] - Paused consumption from: [partition-0]
2021-07-06T18:20:16,678- [DEBUG] [T-1] k.l.KafkaMessageListenerContainer$ListenerConsumer - [{}] - Received: 0 records
2021-07-06T18:20:16,678- [DEBUG] [T-1] k.l.KafkaMessageListenerContainer$ListenerConsumer - [{}] - Paused consumption from: [partition-1]
2021-07-06T18:20:16,718- [DEBUG] [T-1] k.l.KafkaMessageListenerContainer$ListenerConsumer - [{}] - Received: 0 records
2021-07-06T18:20:16,718- [DEBUG] [T-1] k.l.KafkaMessageListenerContainer$ListenerConsumer - [{}] - Paused consumption from: [partition-2]
2021-07-06T18:20:16,767- [DEBUG] [T-1] k.l.KafkaMessageListenerContainer$ListenerConsumer - [{}] - Received: 0 records
2021-07-06T18:20:16,767- [DEBUG] [T-1] k.l.KafkaMessageListenerContainer$ListenerConsumer - [{}] - Paused consumption from: [partition-3]
2021-07-06T18:20:21,651- [DEBUG] [T-1] k.l.KafkaMessageListenerContainer$ListenerConsumer - [{}] - Resumed consumption from [partition-0]
2021-07-06T18:20:21,652- [DEBUG] [T-1] k.l.KafkaMessageListenerContainer$ListenerConsumer - [{}] - Received: 0 records
2021-07-06T18:20:21,679- [DEBUG] [T-1] k.l.KafkaMessageListenerContainer$ListenerConsumer - [{}] - Resumed consumption from [partition-1]
2021-07-06T18:20:21,679- [DEBUG] [T-1] k.l.KafkaMessageListenerContainer$ListenerConsumer - [{}] - Received: 0 records
2021-07-06T18:20:21,719- [DEBUG] [T-1] k.l.KafkaMessageListenerContainer$ListenerConsumer - [{}] - Resumed consumption from [partition-2]
2021-07-06T18:20:21,719- [DEBUG] [T-1] k.l.KafkaMessageListenerContainer$ListenerConsumer - [{}] - Received: 0 records
2021-07-06T18:20:21,767- [DEBUG] [T-1] k.l.KafkaMessageListenerContainer$ListenerConsumer - [{}] - Resumed consumption from [partition-3]
We can see that the consumtion has resumed without the call of the resume method.
I think it's because the addition of the possibility to pause partitions in version 2.7.
When the pause of the consumer is requested all the partitions are paused but they are not added to the collection pauseRequestedPartitions
of the AbstractMessageListenerContainer
so they are not returned by isPartitionPauseRequested
and they are restarted by the method KafkaMessageListenerContainer::resumePartitionsIfNecessary
at the end of the poll.