Skip to content

Commit c066338

Browse files
authored
GH-10090: Add channel adapters for AMQP 1.0 (#10445)
* GH-10090: Add AmqpClientMessageHandler for AMQP 1.0 Related to: #10090 Spring AMQP now provides the `org.springframework.amqp:spring-rabbitmq-client` library to communicate to RabbitMQ with AMQP 1.0 protocol. * Replace `spring-rabbit` with the `spring-rabbitmq-client` since the latter brings the AMQP 0.9 protocol library as transitive dependency * Introduce an `AmqpClientMessageHandler` based on the `AsyncAmqpTemplate` where its implementation should be for AMQP 1.0, e.g. `org.springframework.amqp.rabbitmq.client.RabbitAmqpTemplate` * This outbound channel adapter can behave as a gateway when `setRequiresReply(true)` * Cover basic `AmqpClientMessageHandler` with integration tests * Document this new feature * Add `AmqpClientMessageProducer` implementation to consume message from RabbitMQ AMQP 1.0 * Fix typos and language in docs and Javadocs * Add tests and docs about new `AmqpClientMessageProducer` * Add `whats-new.adoc` bullet for a new `spring-integration-amqp` feature * Fix `AmqpClientMessageProducer` for throwing a `ListenerExecutionFailedException` when exception is bubbled from the downstream flow * Add test for error handling * Add test for batch manual ack * Remove redundant (and dangerous) `@ComponentScan` from the `ManualAckTests`: it does not do anything for the test suite, but is able to see all the `@Configuration` classes in other tests of this package. That may lead to unexpected behavior and failures
1 parent 4e0667c commit c066338

File tree

10 files changed

+1197
-3
lines changed

10 files changed

+1197
-3
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -436,7 +436,7 @@ project('spring-integration-test-support') {
436436
project('spring-integration-amqp') {
437437
description = 'Spring Integration AMQP Support'
438438
dependencies {
439-
api 'org.springframework.amqp:spring-rabbit'
439+
api 'org.springframework.amqp:spring-rabbitmq-client'
440440
optionalApi 'org.springframework.amqp:spring-rabbit-stream'
441441

442442
testImplementation 'org.springframework.amqp:spring-rabbit-junit'
Lines changed: 286 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,286 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.amqp.inbound;
18+
19+
import java.time.Duration;
20+
import java.util.ArrayList;
21+
import java.util.Arrays;
22+
import java.util.Collection;
23+
import java.util.List;
24+
import java.util.Map;
25+
26+
import com.rabbitmq.client.amqp.Consumer;
27+
import com.rabbitmq.client.amqp.Resource;
28+
import org.aopalliance.aop.Advice;
29+
import org.jspecify.annotations.Nullable;
30+
31+
import org.springframework.amqp.core.AmqpAcknowledgment;
32+
import org.springframework.amqp.core.MessagePostProcessor;
33+
import org.springframework.amqp.core.MessageProperties;
34+
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
35+
import org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory;
36+
import org.springframework.amqp.rabbitmq.client.RabbitAmqpUtils;
37+
import org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpListenerContainer;
38+
import org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpMessageListener;
39+
import org.springframework.amqp.support.converter.MessageConverter;
40+
import org.springframework.amqp.support.converter.SimpleMessageConverter;
41+
import org.springframework.amqp.support.postprocessor.MessagePostProcessorUtils;
42+
import org.springframework.integration.IntegrationMessageHeaderAccessor;
43+
import org.springframework.integration.StaticMessageHeaderAccessor;
44+
import org.springframework.integration.acks.AcknowledgmentCallback;
45+
import org.springframework.integration.acks.SimpleAcknowledgment;
46+
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
47+
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
48+
import org.springframework.integration.core.Pausable;
49+
import org.springframework.integration.endpoint.MessageProducerSupport;
50+
import org.springframework.integration.support.MutableMessageBuilder;
51+
import org.springframework.messaging.Message;
52+
import org.springframework.scheduling.TaskScheduler;
53+
54+
/**
55+
* A {@link MessageProducerSupport} implementation for AMQP 1.0 client.
56+
* <p>
57+
* Based on the {@link RabbitAmqpListenerContainer} and requires an {@link AmqpConnectionFactory}.
58+
*
59+
* @author Artem Bilan
60+
*
61+
* @since 7.0
62+
*
63+
* @see RabbitAmqpListenerContainer
64+
* @see org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpMessageListenerAdapter
65+
*/
66+
public class AmqpClientMessageProducer extends MessageProducerSupport implements Pausable {
67+
68+
private final RabbitAmqpListenerContainer listenerContainer;
69+
70+
private @Nullable MessageConverter messageConverter = new SimpleMessageConverter();
71+
72+
private AmqpHeaderMapper headerMapper = DefaultAmqpHeaderMapper.inboundMapper();
73+
74+
private @Nullable Collection<MessagePostProcessor> afterReceivePostProcessors;
75+
76+
private volatile boolean paused;
77+
78+
public AmqpClientMessageProducer(AmqpConnectionFactory connectionFactory, String... queueNames) {
79+
this.listenerContainer = new RabbitAmqpListenerContainer(connectionFactory);
80+
this.listenerContainer.setQueueNames(queueNames);
81+
}
82+
83+
public void setInitialCredits(int initialCredits) {
84+
this.listenerContainer.setInitialCredits(initialCredits);
85+
}
86+
87+
public void setPriority(int priority) {
88+
this.listenerContainer.setPriority(priority);
89+
}
90+
91+
public void setStateListeners(Resource.StateListener... stateListeners) {
92+
this.listenerContainer.setStateListeners(stateListeners);
93+
}
94+
95+
public void setAfterReceivePostProcessors(MessagePostProcessor... afterReceivePostProcessors) {
96+
this.afterReceivePostProcessors = MessagePostProcessorUtils.sort(Arrays.asList(afterReceivePostProcessors));
97+
}
98+
99+
public void setBatchSize(int batchSize) {
100+
this.listenerContainer.setBatchSize(batchSize);
101+
}
102+
103+
public void setBatchReceiveTimeout(long batchReceiveTimeout) {
104+
this.listenerContainer.setBatchReceiveTimeout(batchReceiveTimeout);
105+
}
106+
107+
@Override
108+
public void setTaskScheduler(TaskScheduler taskScheduler) {
109+
this.listenerContainer.setTaskScheduler(taskScheduler);
110+
}
111+
112+
public void setAdviceChain(Advice... advices) {
113+
this.listenerContainer.setAdviceChain(advices);
114+
}
115+
116+
public void setAutoSettle(boolean autoSettle) {
117+
this.listenerContainer.setAutoSettle(autoSettle);
118+
}
119+
120+
public void setDefaultRequeue(boolean defaultRequeue) {
121+
this.listenerContainer.setDefaultRequeue(defaultRequeue);
122+
}
123+
124+
public void setGracefulShutdownPeriod(Duration gracefulShutdownPeriod) {
125+
this.listenerContainer.setGracefulShutdownPeriod(gracefulShutdownPeriod);
126+
}
127+
128+
public void setConsumersPerQueue(int consumersPerQueue) {
129+
this.listenerContainer.setConsumersPerQueue(consumersPerQueue);
130+
}
131+
132+
/**
133+
* Set a {@link MessageConverter} to replace the default {@link SimpleMessageConverter}.
134+
* If set to null, an AMQP message is sent as is into a {@link Message} payload.
135+
* @param messageConverter the {@link MessageConverter} to use or null.
136+
*/
137+
public void setMessageConverter(@Nullable MessageConverter messageConverter) {
138+
this.messageConverter = messageConverter;
139+
}
140+
141+
public void setHeaderMapper(AmqpHeaderMapper headerMapper) {
142+
this.headerMapper = headerMapper;
143+
}
144+
145+
@Override
146+
public String getComponentType() {
147+
return "amqp:inbound-channel-adapter";
148+
}
149+
150+
@Override
151+
protected void onInit() {
152+
super.onInit();
153+
this.listenerContainer.setBeanName(getComponentName() + ".listenerContainer");
154+
this.listenerContainer.setupMessageListener(new IntegrationRabbitAmqpMessageListener());
155+
this.listenerContainer.afterPropertiesSet();
156+
}
157+
158+
@Override
159+
protected void doStart() {
160+
super.doStart();
161+
this.listenerContainer.start();
162+
}
163+
164+
@Override
165+
protected void doStop() {
166+
super.doStop();
167+
this.listenerContainer.stop();
168+
}
169+
170+
@Override
171+
public void destroy() {
172+
super.destroy();
173+
this.listenerContainer.destroy();
174+
}
175+
176+
@Override
177+
public void pause() {
178+
this.listenerContainer.pause();
179+
this.paused = true;
180+
}
181+
182+
@Override
183+
public void resume() {
184+
this.listenerContainer.resume();
185+
this.paused = false;
186+
}
187+
188+
@Override
189+
public boolean isPaused() {
190+
return this.paused;
191+
}
192+
193+
private final class IntegrationRabbitAmqpMessageListener implements RabbitAmqpMessageListener {
194+
195+
@Override
196+
public void onAmqpMessage(com.rabbitmq.client.amqp.Message amqpMessage, Consumer.@Nullable Context context) {
197+
org.springframework.amqp.core.Message message = RabbitAmqpUtils.fromAmqpMessage(amqpMessage, context);
198+
Message<?> messageToSend = toSpringMessage(message);
199+
200+
try {
201+
sendMessage(messageToSend);
202+
}
203+
catch (Exception ex) {
204+
throw new ListenerExecutionFailedException(getComponentName() + ".onAmqpMessage() failed", ex, message);
205+
}
206+
}
207+
208+
@Override
209+
public void onMessageBatch(List<org.springframework.amqp.core.Message> messages) {
210+
SimpleAcknowledgment acknowledgmentCallback = null;
211+
List<Message<?>> springMessages = new ArrayList<>(messages.size());
212+
for (org.springframework.amqp.core.Message message : messages) {
213+
Message<?> springMessage = toSpringMessage(message);
214+
if (acknowledgmentCallback == null) {
215+
acknowledgmentCallback = StaticMessageHeaderAccessor.getAcknowledgment(springMessage);
216+
}
217+
springMessages.add(springMessage);
218+
}
219+
220+
Message<List<Message<?>>> messageToSend =
221+
MutableMessageBuilder.withPayload(springMessages)
222+
.setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, acknowledgmentCallback)
223+
.build();
224+
225+
try {
226+
sendMessage(messageToSend);
227+
}
228+
catch (Exception ex) {
229+
throw new ListenerExecutionFailedException(getComponentName() + ".onMessageBatch() failed", ex,
230+
messages.toArray(org.springframework.amqp.core.Message[]::new));
231+
}
232+
}
233+
234+
private Message<?> toSpringMessage(org.springframework.amqp.core.Message message) {
235+
if (AmqpClientMessageProducer.this.afterReceivePostProcessors != null) {
236+
for (MessagePostProcessor processor : AmqpClientMessageProducer.this.afterReceivePostProcessors) {
237+
message = processor.postProcessMessage(message);
238+
}
239+
}
240+
MessageProperties messageProperties = message.getMessageProperties();
241+
AmqpAcknowledgment amqpAcknowledgment = messageProperties.getAmqpAcknowledgment();
242+
AmqpAcknowledgmentCallback acknowledgmentCallback = null;
243+
if (amqpAcknowledgment != null) {
244+
acknowledgmentCallback = new AmqpAcknowledgmentCallback(amqpAcknowledgment);
245+
}
246+
247+
Object payload = message;
248+
Map<String, @Nullable Object> headers = null;
249+
if (AmqpClientMessageProducer.this.messageConverter != null) {
250+
payload = AmqpClientMessageProducer.this.messageConverter.fromMessage(message);
251+
headers = AmqpClientMessageProducer.this.headerMapper.toHeadersFromRequest(messageProperties);
252+
}
253+
254+
return getMessageBuilderFactory()
255+
.withPayload(payload)
256+
.copyHeaders(headers)
257+
.setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, acknowledgmentCallback)
258+
.build();
259+
}
260+
261+
@Override
262+
public void onMessage(org.springframework.amqp.core.Message message) {
263+
throw new UnsupportedOperationException("The 'RabbitAmqpMessageListener' does not implement 'onMessage()'");
264+
}
265+
266+
}
267+
268+
/**
269+
* The {@link AcknowledgmentCallback} adapter for an {@link AmqpAcknowledgment}.
270+
* @param delegate the {@link AmqpAcknowledgment} to delegate to.
271+
*/
272+
private record AmqpAcknowledgmentCallback(AmqpAcknowledgment delegate) implements AcknowledgmentCallback {
273+
274+
@Override
275+
public void acknowledge(Status status) {
276+
this.delegate.acknowledge(AmqpAcknowledgment.Status.valueOf(status.name()));
277+
}
278+
279+
@Override
280+
public boolean isAutoAck() {
281+
return false;
282+
}
283+
284+
}
285+
286+
}

0 commit comments

Comments
 (0)