Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Stopping AMQP consumer on error #8672

Answered by garyrussell
gigermocas asked this question in Q&A
Discussion options

Hi,

Is there a way to cleanly stop an AMQP queue consumer on processing error?

The best I've come up with is to close the listener container during the message's processing, from my tests it seems to achieve the required outcome (message remains in queue with no consumers), but as the container's stop() runs on the same thread the channel is being forcibly closed after the shutdown timeout and logs a warning.

I can't find a place where I can hook up a consumer stop/cancel immediately after rejecting the message and before requesting a new one, is there a way to do this?

You must be logged in to vote

You can call the other stop variant and pass in a runnable, which will be called when the stop is complete (the container does the wait on another thread) - it currently works only for the SMLC; I am working on a fix for another problem that will enable it for the DMLC too; should be in a PR today.

Replies: 3 comments · 9 replies

Comment options

We even have that recommendation in the docs: https://docs.spring.io/spring-amqp/reference/html/#event-consumption

If you wish to use the idle event to stop the lister container, you should not call container.stop() on the thread that calls the listener. Doing so always causes delays and unnecessary log messages. Instead, you should hand off the event to a different thread that can then stop the container.

You must be logged in to vote
5 replies
@gigermocas
Comment options

I understand that, but if stop() is being called on a different thread nothing prevents the listener container from trying to process further messages until stop() is actually being executed, I don't have that guarantee right?

@garyrussell
Comment options

You can call the other stop variant and pass in a runnable, which will be called when the stop is complete (the container does the wait on another thread) - it currently works only for the SMLC; I am working on a fix for another problem that will enable it for the DMLC too; should be in a PR today.

Answer selected by gigermocas
@garyrussell
Comment options

You can also wait (in the listener) until container.isActive() goes false after firing off the stop on another thread. It goes false before we start stopping the consumers.

@gigermocas
Comment options

Thanks Garry, that works. I am using the DMLC here.

@garyrussell
Comment options

The fix spring-projects/spring-amqp#2483 will be in Spring AMQP 3.0.6 and 2.4.14; 5.5.x pulls in 2.3.x by default, which is no longer supported. See https://docs.spring.io/spring-integration/docs/5.5.19-SNAPSHOT/reference/html/amqp.html#amqp-inbound-channel-adapter

When using exclusive or single-active consumers in the listener container, it is recommended that you set the container property forceStop to true. This will prevent a race condition where, after stopping the container, another consumer could start consuming messages before this instance has fully stopped. For this option to be available, the Spring AMQP dependency has to be explicitly upgraded to the latest version of the 2.4.x generation (this is automatic when using Spring Boot 2.7.14 or later).

Comment options

I'm already on 2.4.x, so I will pick that up (I'm assuming 2.9.10 is a typo).

Indeed, I am using single action consumers on this particular case (with a prefetch of 1) and will also be enabling that.

You must be logged in to vote
3 replies
@garyrussell
Comment options

Yes, sorry, 2.4.14; should be available Monday.

@garyrussell
Comment options

You can use 2.4.14-SNAPSHOT to test.

@gigermocas
Comment options

I think I'm getting the desired behavior using the SMLC, but not so with the DMLC. Using this simple application:

@SpringBootApplication
@EnableIntegration
public class StopListenerApplication {

	public static void main(String[] args) {
		SpringApplication.run(StopListenerApplication.class, args);
	}

	@Bean
	IntegrationFlow consumeFlow(ConnectionFactory connectionFactory) {
//		var factory = new SimpleRabbitListenerContainerFactory();
		var factory = new DirectRabbitListenerContainerFactory();
		factory.setConnectionFactory(connectionFactory);

		var listener = factory.createListenerContainer();
		listener.setQueueNames("test_queue");
		listener.setPrefetchCount(1);
		listener.setForceStop(true);

		return IntegrationFlows.from(Amqp.inboundAdapter(listener)
				.retryTemplate(RetryTemplate.builder()
						.withListener(new RetryListenerSupport() {

							@Override
							public <T, E extends Throwable> void close(RetryContext context,
									RetryCallback<T, E> callback, Throwable throwable) {
								if (throwable != null) {
									listener.stop(new Runnable() {

										@Override
										public void run() {
											System.out.println("Listener sucessfully stopped");
										}
									});
								}
							}

						})
						.maxAttempts(1)
						.build()))
				.log()
				.handle(p -> {
					throw new RuntimeException("oops!");
				})
				.get();
	}

}

(I'm using a RetryTemplate here because in the actual application I'll be retrying a couple of times)

With the SMLC I get:

17:05:39.733 [Thread-0] DEBUG org.springframework.boot.devtools.restart.classloader.RestartClassLoader - Created RestartClassLoader org.springframework.boot.devtools.restart.classloader.RestartClassLoader@9002d5f

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::      (v2.7.14-SNAPSHOT)

2023-07-13 17:05:40.094  INFO 42048 --- [  restartedMain] com.example.StopListenerApplication      : Starting StopListenerApplication using Java 17.0.6 on pws55 with PID 42048 (C:\Users\mmd\Work\NOS\workspace\stop-listener\target\classes started by mmd in C:\Users\mmd\Work\NOS\workspace\stop-listener)
2023-07-13 17:05:40.096  INFO 42048 --- [  restartedMain] com.example.StopListenerApplication      : No active profile set, falling back to 1 default profile: "default"
2023-07-13 17:05:40.148  INFO 42048 --- [  restartedMain] .e.DevToolsPropertyDefaultsPostProcessor : Devtools property defaults active! Set 'spring.devtools.add-properties' to 'false' to disable
2023-07-13 17:05:40.613  INFO 42048 --- [  restartedMain] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2023-07-13 17:05:40.635  INFO 42048 --- [  restartedMain] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2023-07-13 17:05:41.459  INFO 42048 --- [  restartedMain] o.s.b.d.a.OptionalLiveReloadServer       : LiveReload server is running on port 35729
2023-07-13 17:05:41.527  INFO 42048 --- [  restartedMain] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-07-13 17:05:41.527  INFO 42048 --- [  restartedMain] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 1 subscriber(s).
2023-07-13 17:05:41.527  INFO 42048 --- [  restartedMain] o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
2023-07-13 17:05:41.527  INFO 42048 --- [  restartedMain] o.s.integration.channel.DirectChannel    : Channel 'application.consumeFlow.channel#0' has 1 subscriber(s).
2023-07-13 17:05:41.527  INFO 42048 --- [  restartedMain] o.s.i.endpoint.EventDrivenConsumer       : started bean 'consumeFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'com.example.StopListenerApplication'; from source: 'bean method consumeFlow'
2023-07-13 17:05:41.528  INFO 42048 --- [  restartedMain] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2023-07-13 17:05:41.583  INFO 42048 --- [  restartedMain] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#5e64159a:0/SimpleConnection@19b985d8 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 59322]
2023-07-13 17:05:41.626  INFO 42048 --- [  restartedMain] o.s.i.a.i.AmqpInboundChannelAdapter      : started bean 'consumeFlow.amqp:inbound-channel-adapter#0'; defined in: 'com.example.StopListenerApplication'; from source: 'bean method consumeFlow'
2023-07-13 17:05:41.640  INFO 42048 --- [  restartedMain] com.example.StopListenerApplication      : Started StopListenerApplication in 1.894 seconds (JVM running for 2.267)
2023-07-13 17:05:41.646  INFO 42048 --- [erContainer#0-1] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=byte[3], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=test_queue, amqp_receivedExchange=, amqp_deliveryTag=1, deliveryAttempt=1, amqp_consumerQueue=test_queue, amqp_redelivered=true, id=31a4d34b-c01d-a3e6-e2df-a4462f6e8824, amqp_consumerTag=amq.ctag-gNp95f4d2OUY_kkrUbTSvw, timestamp=1689264341645}]
2023-07-13 17:05:41.650  INFO 42048 --- [erContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2023-07-13 17:05:41.656  WARN 42048 --- [erContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1826) ~[spring-rabbit-2.4.14-SNAPSHOT.jar:2.4.14-SNAPSHOT]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1716) ~[spring-rabbit-2.4.14-SNAPSHOT.jar:2.4.14-SNAPSHOT]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1631) ~[spring-rabbit-2.4.14-SNAPSHOT.jar:2.4.14-SNAPSHOT]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1619) ~[spring-rabbit-2.4.14-SNAPSHOT.jar:2.4.14-SNAPSHOT]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1610) ~[spring-rabbit-2.4.14-SNAPSHOT.jar:2.4.14-SNAPSHOT]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1554) ~[spring-rabbit-2.4.14-SNAPSHOT.jar:2.4.14-SNAPSHOT]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:994) ~[spring-rabbit-2.4.14-SNAPSHOT.jar:2.4.14-SNAPSHOT]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:941) ~[spring-rabbit-2.4.14-SNAPSHOT.jar:2.4.14-SNAPSHOT]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:86) ~[spring-rabbit-2.4.14-SNAPSHOT.jar:2.4.14-SNAPSHOT]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1323) ~[spring-rabbit-2.4.14-SNAPSHOT.jar:2.4.14-SNAPSHOT]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1225) ~[spring-rabbit-2.4.14-SNAPSHOT.jar:2.4.14-SNAPSHOT]
	at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher failed to deliver Message; nested exception is java.lang.RuntimeException: oops!
	at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:166) ~[spring-integration-core-5.5.18.jar:5.5.18]
	at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:119) ~[spring-integration-core-5.5.18.jar:5.5.18]
	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.5.18.jar:5.5.18]
	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.5.18.jar:5.5.18]
	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-5.5.18.jar:5.5.18]
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317) ~[spring-integration-core-5.5.18.jar:5.5.18]
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272) ~[spring-integration-core-5.5.18.jar:5.5.18]
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.3.29.jar:5.3.29]
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.3.29.jar:5.3.29]
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.3.29.jar:5.3.29]
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.3.29.jar:5.3.29]
	at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:215) ~[spring-integration-core-5.5.18.jar:5.5.18]
	at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1300(AmqpInboundChannelAdapter.java:69) ~[spring-integration-amqp-5.5.18.jar:5.5.18]
	at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:374) ~[spring-integration-amqp-5.5.18.jar:5.5.18]
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) ~[spring-retry-1.3.4.jar:na]
	at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225) ~[spring-retry-1.3.4.jar:na]
	at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:370) ~[spring-integration-amqp-5.5.18.jar:5.5.18]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1712) ~[spring-rabbit-2.4.14-SNAPSHOT.jar:2.4.14-SNAPSHOT]
	... 10 common frames omitted
Caused by: java.lang.RuntimeException: oops!
	at com.example.StopListenerApplication.lambda$0(StopListenerApplication.java:59) ~[classes/:na]
	at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.5.18.jar:5.5.18]
	... 26 common frames omitted

2023-07-13 17:05:41.669  INFO 42048 --- [erContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.
Listener sucessfully stopped

Looks good...

vs

17:06:36.911 [Thread-0] DEBUG org.springframework.boot.devtools.restart.classloader.RestartClassLoader - Created RestartClassLoader org.springframework.boot.devtools.restart.classloader.RestartClassLoader@9002d5f

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::      (v2.7.14-SNAPSHOT)

2023-07-13 17:06:37.318  INFO 20044 --- [  restartedMain] com.example.StopListenerApplication      : Starting StopListenerApplication using Java 17.0.6 on pws55 with PID 20044 (C:\Users\mmd\Work\NOS\workspace\stop-listener\target\classes started by mmd in C:\Users\mmd\Work\NOS\workspace\stop-listener)
2023-07-13 17:06:37.320  INFO 20044 --- [  restartedMain] com.example.StopListenerApplication      : No active profile set, falling back to 1 default profile: "default"
2023-07-13 17:06:37.371  INFO 20044 --- [  restartedMain] .e.DevToolsPropertyDefaultsPostProcessor : Devtools property defaults active! Set 'spring.devtools.add-properties' to 'false' to disable
2023-07-13 17:06:37.830  INFO 20044 --- [  restartedMain] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2023-07-13 17:06:37.851  INFO 20044 --- [  restartedMain] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2023-07-13 17:06:38.689  INFO 20044 --- [  restartedMain] o.s.b.d.a.OptionalLiveReloadServer       : LiveReload server is running on port 35729
2023-07-13 17:06:38.754  INFO 20044 --- [  restartedMain] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-07-13 17:06:38.754  INFO 20044 --- [  restartedMain] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 1 subscriber(s).
2023-07-13 17:06:38.754  INFO 20044 --- [  restartedMain] o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
2023-07-13 17:06:38.754  INFO 20044 --- [  restartedMain] o.s.integration.channel.DirectChannel    : Channel 'application.consumeFlow.channel#0' has 1 subscriber(s).
2023-07-13 17:06:38.755  INFO 20044 --- [  restartedMain] o.s.i.endpoint.EventDrivenConsumer       : started bean 'consumeFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'com.example.StopListenerApplication'; from source: 'bean method consumeFlow'
2023-07-13 17:06:38.755  INFO 20044 --- [  restartedMain] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2023-07-13 17:06:38.803  INFO 20044 --- [  restartedMain] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#22f746eb:0/SimpleConnection@503bcd94 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 50361]
2023-07-13 17:06:38.817  INFO 20044 --- [  restartedMain] o.s.a.r.l.DirectMessageListenerContainer : Container initialized for queues: [test_queue]
2023-07-13 17:06:38.817  INFO 20044 --- [  restartedMain] o.s.i.a.i.AmqpInboundChannelAdapter      : started bean 'consumeFlow.amqp:inbound-channel-adapter#0'; defined in: 'com.example.StopListenerApplication'; from source: 'bean method consumeFlow'
2023-07-13 17:06:38.829  INFO 20044 --- [  restartedMain] com.example.StopListenerApplication      : Started StopListenerApplication in 1.904 seconds (JVM running for 2.277)
2023-07-13 17:06:38.842  INFO 20044 --- [erContainer#0-1] o.s.a.r.l.DirectMessageListenerContainer : SimpleConsumer [queue=test_queue, index=0, consumerTag=amq.ctag-0Zmd0L8EaSESSFozxzUZFA identity=3aa18427] started
2023-07-13 17:06:38.854  INFO 20044 --- [pool-1-thread-4] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=byte[3], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=test_queue, amqp_receivedExchange=, amqp_deliveryTag=1, deliveryAttempt=1, amqp_consumerQueue=test_queue, amqp_redelivered=true, id=e468ca27-96c6-62ab-0000-80175a0c74ab, amqp_consumerTag=amq.ctag-0Zmd0L8EaSESSFozxzUZFA, timestamp=1689264398853}]
2023-07-13 17:06:38.863  WARN 20044 --- [pool-1-thread-4] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1826) ~[spring-rabbit-2.4.14-SNAPSHOT.jar:2.4.14-SNAPSHOT]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1716) ~[spring-rabbit-2.4.14-SNAPSHOT.jar:2.4.14-SNAPSHOT]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1631) ~[spring-rabbit-2.4.14-SNAPSHOT.jar:2.4.14-SNAPSHOT]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1619) ~[spring-rabbit-2.4.14-SNAPSHOT.jar:2.4.14-SNAPSHOT]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1610) ~[spring-rabbit-2.4.14-SNAPSHOT.jar:2.4.14-SNAPSHOT]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1554) ~[spring-rabbit-2.4.14-SNAPSHOT.jar:2.4.14-SNAPSHOT]
	at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:1137) ~[spring-rabbit-2.4.14-SNAPSHOT.jar:2.4.14-SNAPSHOT]
	at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:1094) ~[spring-rabbit-2.4.14-SNAPSHOT.jar:2.4.14-SNAPSHOT]
	at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) ~[amqp-client-5.14.3.jar:5.14.3]
	at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104) ~[amqp-client-5.14.3.jar:5.14.3]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher failed to deliver Message; nested exception is java.lang.RuntimeException: oops!
	at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:166) ~[spring-integration-core-5.5.18.jar:5.5.18]
	at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:119) ~[spring-integration-core-5.5.18.jar:5.5.18]
	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.5.18.jar:5.5.18]
	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.5.18.jar:5.5.18]
	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-5.5.18.jar:5.5.18]
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317) ~[spring-integration-core-5.5.18.jar:5.5.18]
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272) ~[spring-integration-core-5.5.18.jar:5.5.18]
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.3.29.jar:5.3.29]
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.3.29.jar:5.3.29]
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.3.29.jar:5.3.29]
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.3.29.jar:5.3.29]
	at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:215) ~[spring-integration-core-5.5.18.jar:5.5.18]
	at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1300(AmqpInboundChannelAdapter.java:69) ~[spring-integration-amqp-5.5.18.jar:5.5.18]
	at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:374) ~[spring-integration-amqp-5.5.18.jar:5.5.18]
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) ~[spring-retry-1.3.4.jar:na]
	at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225) ~[spring-retry-1.3.4.jar:na]
	at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:370) ~[spring-integration-amqp-5.5.18.jar:5.5.18]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1712) ~[spring-rabbit-2.4.14-SNAPSHOT.jar:2.4.14-SNAPSHOT]
	... 11 common frames omitted
Caused by: java.lang.RuntimeException: oops!
	at com.example.StopListenerApplication.lambda$0(StopListenerApplication.java:59) ~[classes/:na]
	at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.5.18.jar:5.5.18]
	... 27 common frames omitted

2023-07-13 17:06:38.865 ERROR 20044 --- [pool-1-thread-4] o.s.a.r.l.DirectMessageListenerContainer : Failed to invoke listener

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1826) ~[spring-rabbit-2.4.14-SNAPSHOT.jar:2.4.14-SNAPSHOT]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1716) ~[spring-rabbit-2.4.14-SNAPSHOT.jar:2.4.14-SNAPSHOT]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1631) ~[spring-rabbit-2.4.14-SNAPSHOT.jar:2.4.14-SNAPSHOT]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1619) ~[spring-rabbit-2.4.14-SNAPSHOT.jar:2.4.14-SNAPSHOT]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1610) ~[spring-rabbit-2.4.14-SNAPSHOT.jar:2.4.14-SNAPSHOT]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1554) ~[spring-rabbit-2.4.14-SNAPSHOT.jar:2.4.14-SNAPSHOT]
	at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:1137) ~[spring-rabbit-2.4.14-SNAPSHOT.jar:2.4.14-SNAPSHOT]
	at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:1094) ~[spring-rabbit-2.4.14-SNAPSHOT.jar:2.4.14-SNAPSHOT]
	at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) ~[amqp-client-5.14.3.jar:5.14.3]
	at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104) ~[amqp-client-5.14.3.jar:5.14.3]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher failed to deliver Message; nested exception is java.lang.RuntimeException: oops!
	at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:166) ~[spring-integration-core-5.5.18.jar:5.5.18]
	at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:119) ~[spring-integration-core-5.5.18.jar:5.5.18]
	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.5.18.jar:5.5.18]
	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.5.18.jar:5.5.18]
	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-5.5.18.jar:5.5.18]
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317) ~[spring-integration-core-5.5.18.jar:5.5.18]
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272) ~[spring-integration-core-5.5.18.jar:5.5.18]
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.3.29.jar:5.3.29]
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.3.29.jar:5.3.29]
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.3.29.jar:5.3.29]
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.3.29.jar:5.3.29]
	at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:215) ~[spring-integration-core-5.5.18.jar:5.5.18]
	at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1300(AmqpInboundChannelAdapter.java:69) ~[spring-integration-amqp-5.5.18.jar:5.5.18]
	at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:374) ~[spring-integration-amqp-5.5.18.jar:5.5.18]
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) ~[spring-retry-1.3.4.jar:na]
	at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225) ~[spring-retry-1.3.4.jar:na]
	at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:370) ~[spring-integration-amqp-5.5.18.jar:5.5.18]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1712) ~[spring-rabbit-2.4.14-SNAPSHOT.jar:2.4.14-SNAPSHOT]
	... 11 common frames omitted
Caused by: java.lang.RuntimeException: oops!
	at com.example.StopListenerApplication.lambda$0(StopListenerApplication.java:59) ~[classes/:na]
	at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.5.18.jar:5.5.18]
	... 27 common frames omitted

2023-07-13 17:06:43.872  INFO 20044 --- [erContainer#0-2] o.s.a.r.l.DirectMessageListenerContainer : Consumers not finished.
2023-07-13 17:06:43.873  WARN 20044 --- [erContainer#0-2] o.s.a.r.l.DirectMessageListenerContainer : Closing channel for unresponsive consumer: SimpleConsumer [queue=test_queue, index=0, consumerTag=amq.ctag-0Zmd0L8EaSESSFozxzUZFA identity=3aa18427]
Listener sucessfully stopped

Other than the doubled exception logging, the unresponsive consumer is triggered specifically by the forceStop

Comment options

Oops, you're right; investigating...

You must be logged in to vote
1 reply
@garyrussell
Comment options

spring-projects/spring-amqp#2487

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Category
🙏
Q&A
Labels
None yet
3 participants
Morty Proxy This is a proxified and sanitized view of the page, visit original site.