-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Closed
Milestone
Description
In what version(s) of Spring for Apache Kafka are you seeing this issue?
Between 2.7.0 and 2.8.2
Description
I am trying to use @RetryableTopic for unblocking retries and topicPartitions in order to read messages from beginning.
Below are my main and DLT listeners:
@RetryableTopic(
attempts = "4",
backoff = @Backoff(delay = 1000),
fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
@KafkaListener(topicPartitions = {@TopicPartition(topic = "products",
partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))})
public void listen(ConsumerRecord<String, String> message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("message consumed - \nkey: {} , \nvalue: {}, \ntopic: {}, \nat: {}",
message.key(),
message.value(),
message.topic(),
LocalDateTime.now());
}
@DltHandler
public void dltListener(ConsumerRecord<String, String> message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("message consumed at DLT - \nkey: {} , \nvalue: {}, \ntopic: {}, \nat: {}",
message.key(),
message.value(),
message.topic(),
LocalDateTime.now());
}
With this main topic listener as well as DLT topic listener receive same event (since there is one retry topic - main listener receives same event 2 times and 1 by dlt listener as well - total 3 copies of same event).
logs:
15:10:50.950 [org.springframework.kafka.KafkaListenerEndpointContainer#8-0-C-1] INFO c.m.s.c.n.SingleTopicRetryConsumer
- message consumed -
key: product1 ,
value: This is Product1,
topic: products,
at: 2022-04-07T15:10:50.950810
15:10:50.950 [org.springframework.kafka.KafkaListenerEndpointContainer#9-retry-0-C-1] INFO c.m.s.c.n.SingleTopicRetryConsumer -
message consumed -
key: product1 ,
value: This is Product1,
topic: products,
at: 2022-04-07T15:10:50.950810
15:10:50.950 [org.springframework.kafka.KafkaListenerEndpointContainer#10-dlt-0-C-1] INFO c.m.s.c.n.SingleTopicRetryConsumer -
message consumed at DLT -
key: product1 ,
value: This is Product1,
topic: products,
at: 2022-04-07T15:10:50.950810
If I use above code without topicPartitions by removing below line, listener works as expected.
@TopicPartition(topic = "products", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))