Skip to content

Listener receives multiple messages when using @RetryableTopic with topicPartitions to reset offset #2220

@ashishqcs

Description

@ashishqcs

In what version(s) of Spring for Apache Kafka are you seeing this issue?

Between 2.7.0 and 2.8.2

Description

I am trying to use @RetryableTopic for unblocking retries and topicPartitions in order to read messages from beginning.

Below are my main and DLT listeners:

@RetryableTopic(
            attempts = "4",
            backoff = @Backoff(delay = 1000),
            fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
    @KafkaListener(topicPartitions = {@TopicPartition(topic = "products",
            partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))})
    public void listen(ConsumerRecord<String, String> message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

        log.info("message consumed - \nkey: {} , \nvalue: {}, \ntopic: {}, \nat: {}",
                message.key(),
                message.value(),
                message.topic(),
                LocalDateTime.now());
    }

    @DltHandler
    public void dltListener(ConsumerRecord<String, String> message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        log.info("message consumed at DLT - \nkey: {} , \nvalue: {}, \ntopic: {}, \nat: {}",
                message.key(),
                message.value(),
                message.topic(),
                LocalDateTime.now());
    }

With this main topic listener as well as DLT topic listener receive same event (since there is one retry topic - main listener receives same event 2 times and 1 by dlt listener as well - total 3 copies of same event).

logs:

15:10:50.950 [org.springframework.kafka.KafkaListenerEndpointContainer#8-0-C-1] INFO  c.m.s.c.n.SingleTopicRetryConsumer 
- message consumed - 
key: product1 , 
value: This is Product1, 
topic: products, 
at: 2022-04-07T15:10:50.950810          

15:10:50.950 [org.springframework.kafka.KafkaListenerEndpointContainer#9-retry-0-C-1] INFO  c.m.s.c.n.SingleTopicRetryConsumer - 
message consumed - 
key: product1 , 
value: This is Product1, 
topic: products, 
at: 2022-04-07T15:10:50.950810      

15:10:50.950 [org.springframework.kafka.KafkaListenerEndpointContainer#10-dlt-0-C-1] INFO  c.m.s.c.n.SingleTopicRetryConsumer - 
message consumed at DLT - 
key: product1 , 
value: This is Product1, 
topic: products, 
at: 2022-04-07T15:10:50.950810  

If I use above code without topicPartitions by removing below line, listener works as expected.
@TopicPartition(topic = "products", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))

Metadata

Metadata

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions