Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Problems with running integration tests en masse #3674

Discussion options

I have a few integration tests written that are testing 3 different flows, some with tests for subflows. They all work fine when run in isolation within my IDE. When they're run en masse I get errors in the tests that appear to be because the mocks aren't replacing the actual implementation:

  • I put in a channel to proxy nullChannel() to verify headers added during the subflow and I get an exception on the console about NullChannel not having message handling methods

  • I injected a mock to prevent data from leaving the flow but on the console I see an exception from where the call is actually supposed to go during runtime indicating the method wasn't trapped by the mock and used the actual implementation

The tests are all annotated similarly to the following

@SpringBootTest
@DirtiesContext
@SpringIntegrationTest
@ActiveProfiles(profiles = "integration", resolver = ITActiveProfileResolver.class)

What could I be doing improperly to where my mocks aren't replaced properly when doing the following?
mockIntegrationContext.substituteMessageHandlerFor(PARSED_DATA_STREAM_ENDPOINT, mockParsedOutbound);

You must be logged in to vote

Sorry, my mistake, it automatically starts the outbound endpoint. You're correct.

If you look at my test from this link #3674 (reply in thread)
you'll see an assertion that the endpoint is stopped on the first line of code in the method. The test doesn't fail on that line. You'll also see an assertion that the endpoint has started after I've manually started it. It doesn't fail on that assertion.

It fails on this line verify(messageHandler, timeout(5000)).handleMessage(any()); The console output during the test shows it removing the actual handler and adding the mock handler but the mock handler doesn't get the call, it goes to the actual handler.

If I remove the call to start the endpoin…

Replies: 2 comments · 40 replies

Comment options

Sorry, not enough info.
Any chances to have some project from you to let us to play and reproduce?

Or at least show your flows and what you do in the tests.

You must be logged in to vote
13 replies
@artembilan
Comment options

Amqp.outboundAdapter(amqpTemplate).routingKey(PROCESS.toQueue()), e -> e.id(PROCESS_OUTBOUND_ENDPOINT)

and then:

@SpringIntegrationTest(noAutoStartup = PreprocessFlowConfiguration.PROCESS_OUTBOUND_ENDPOINT)

That's not correct.
The Amqp.outboundAdapter is an outbound endpoint.
This one is passive, it does something only when the message is sent to its input channel.
What I'm talking about is this one:

Amqp.inboundAdapter(this.rabbitConnectionFactory, PRE_PROCESS.toQueue())
                        .id("preProcessFlowInput")

This is an active endpoint and it consumes RabbitMQ when it is started. And it is started when application context is ready.
And that's when your flow is starting doing something.
So, what I mean is this:

@SpringIntegrationTest(noAutoStartup = "preProcessFlowInput")
...
@Autowired
    @Qualifier("preProcessFlowInput")
    AbstractEndpoint endpoint;

and you start exactly this one when you done with mocking.

@stormsensorbrian
Comment options

That still fails with the same type of output in the logs.

The mock is supposed to be injected here

.handle(Amqp.outboundAdapter(amqpTemplate).routingKey(PROCESS.toQueue()), e -> e.id(PROCESS_OUTBOUND_ENDPOINT)).get();

Am I going about testing this improperly? In my mind I should be able to insert a message on the actual rabbit queue to start the flow, it would go through the normal process defined in the flow and would be published to my mock channel instead of the rabbit outbound queue. Is that not what I should be doing?

@artembilan
Comment options

Something indeed is off in your case.

So, here is a simple Spring Boot application to consume from AMQP and send to AMQP:

@SpringBootApplication
public class SpringIntegrationTestingApplication {

	public static void main(String[] args) {
		SpringApplication.run(SpringIntegrationTestingApplication.class, args);
	}

	@Bean
	Queue testQueue() {
		return new Queue("testQueue");
	}

	@Bean
	public IntegrationFlow preProcessFlow(ConnectionFactory connectionFactory, AmqpTemplate amqpTemplate) {
		return IntegrationFlows
				.from(Amqp.inboundAdapter(connectionFactory, testQueue()).id("preProcessFlowInput"))
				.handle(Amqp.outboundAdapter(amqpTemplate).routingKey("otherQueue"), e -> e.id("outboundEndpoint"))
				.get();
	}

}

And here is an Integration Mock test:

@SpringBootTest
@SpringIntegrationTest(noAutoStartup = "preProcessFlowInput")
class SpringIntegrationTestingApplicationTests {

	@Autowired
	AmqpTemplate amqpTemplate;

	@Autowired
	@Qualifier("preProcessFlowInput")
	AbstractEndpoint inboundEndpoint;

	@Autowired
	MockIntegrationContext mockIntegrationContext;

	@Test
	void testIntegrationMocks() {
		assertThat(this.inboundEndpoint.isRunning()).isFalse();

		ArgumentCaptor<Message<?>> captor = MockIntegration.messageArgumentCaptor();
		MessageHandler messageHandler = MockIntegration.mockMessageHandler(captor).handleNext(m ->{});
		this.mockIntegrationContext.substituteMessageHandlerFor("outboundEndpoint", messageHandler);

		this.amqpTemplate.convertAndSend("testQueue", "test");
		this.inboundEndpoint.start();

		verify(messageHandler, timeout(5000)).handleMessage(captor.capture());

		Message<?> m = captor.getValue();
		assertThat(m.getPayload()).isEqualTo("test");
	}

}

Please, investigate this one and see what is a difference for you.

@stormsensorbrian
Comment options

The only differences between your example and mine is yours starts the endpoint after you've published a message to the amqp queue and the way the ArgumentCaptor is constructed, MockIntegration.messageArgumentCaptor vs ArgumentCaptor.forClass(). I changed those and the test still fails at the same point. Also, this.mockIntegrationContext.substituteMessageHandlerFor("outboundEndpoint", messageHandler); this line seems to actually start the endpoint once it attaches the handler making this.inboundEndpoint.start(); superfluous.

Another difference is that your flow doesn't do anything in the middle, it just takes the input and sends it to the output. I modified my flow under test to remove the intermediate steps and just route from one queue to the next and it still fails in the same manner. It's behaving as if the mock that's meant to replace the actual outbound message handler isn't actually getting replaced regardless of what the console output says is happening.

To reiterate; my tests all have always worked when run in isolation and only fail when run with other tests. I don't believe the way my project is set up would be all that uncommon

@artembilan
Comment options

the handler making this.inboundEndpoint.start(); superfluous.

That's not correct.
Even if substituteMessageHandlerFor() starts the endpoint (service activator, an outbound one), the consuming part still need to be started. That one which consumes from RabbitMQ - the beginning of your flow.
All what you say is only feels like you don't have an inbound endpoint stopped before mocking.

Comment options

Sorry, my mistake, it automatically starts the outbound endpoint. You're correct.

If you look at my test from this link #3674 (reply in thread)
you'll see an assertion that the endpoint is stopped on the first line of code in the method. The test doesn't fail on that line. You'll also see an assertion that the endpoint has started after I've manually started it. It doesn't fail on that assertion.

It fails on this line verify(messageHandler, timeout(5000)).handleMessage(any()); The console output during the test shows it removing the actual handler and adding the mock handler but the mock handler doesn't get the call, it goes to the actual handler.

If I remove the call to start the endpoint, the test fails due to the assertion that the endpoint is running regardless of how the test is executed; individually or in the integration test suite.

@Autowired
    @Qualifier("preProcessFlowInput")
    AbstractEndpoint endpoint;

@Test
    void testPreprocessFlowConfig() throws JsonProcessingException, InterruptedException {
        // Given
        assertFalse(endpoint.isRunning());
        assertThat(repository.findAll().size()).isEqualTo(0);

        MessageHandler messageHandler = MockIntegration.mockMessageHandler().handleNext(m ->{});
        context.substituteMessageHandlerFor(PROCESS_OUTBOUND_ENDPOINT, messageHandler);
        ArgumentCaptor<Message<?>> captor = MockIntegration.messageArgumentCaptor();

        // When
        UplinkMessageRaw raw =
                UplinkMessageRaw.builder().id(UUID.randomUUID()).payload(payload).source(MessageSource.MACHINE_Q)
                        .build();
        this.amqpTemplate.convertAndSend(MessageSource.PRE_PROCESS.toQueue(), mapper.writeValueAsString(raw));
        
//        endpoint.start();
        Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
            assertTrue(endpoint.isRunning());
        });

        // Then
        verify(messageHandler, timeout(5000)).handleMessage(any());
        verify(messageHandler).handleMessage(captor.capture());
        Message<String> m = (Message<String>) captor.getValue();
        assertThat(m).isNotNull();
        assertThat(m.getHeaders().get(IntegrationConfiguration.STEP_HEADER)).isEqualTo(ProcessingStep.PREPROCESS);
        UplinkMessagePreprocessed uplinkMessagePreprocessed =
                mapper.readValue(m.getPayload(), UplinkMessagePreprocessed.class);
        assertThat(uplinkMessagePreprocessed).isNotNull();
        assertThat(uplinkMessagePreprocessed.getHardwareId()).isEqualTo("0004A30B00EF9055");
        assertThat(repository.findAll().size()).isEqualTo(1);
        var dbRecord = repository.findAll().get(0);
        assertThat(dbRecord.getHardwareId()).isEqualTo("0004A30B00EF9055");
    }

When I remove the guts of the process flow to where it's just taking from one queue to the next, the only assertions that fail when I run the test individually are the ones related to the things that were removed from the flow. The mock handler is still attached and the message is still captured.

You must be logged in to vote
27 replies
@artembilan
Comment options

That's good catch, Gary!

So, currently we do:

public int hashCode() {
	return this.springIntegrationTest.hashCode();
}

But we need to calculate a hash code based on the @SpringIntegrationTest attribute values.
Currently only one noAutoStartup. So, if it is the same for different tests, we would have the same hash code and therefore would hit the cache.
But it is not going to work with use-case discussed here:

@SpringIntegrationTest(noAutoStartup = {"inboundEndpoint"})
class BasicInOutFlowTest {

and:

@SpringIntegrationTest(noAutoStartup = {"secondInboundEndpoint"})
class BasicInOutFlowTest2 {

Or, if you suggest exactly opposite: cache independently of the noAutoStartup value, then we wouldn't get the functionality we need from the MockIntegrationContextCustomizer.
It registers a couple beans one of which is a BeanPostProcessor to mark endpoint beans to not start.
So, if we take ctx from the cache, we won't be able to have BPP initiated since all the beans are already in the ctx and initialized respectively.
In other words for the proper mocking via @SpringIntegrationTest we always need a fresh ctx to initialize and start.

Am I missing anything yet?
Or the idea was to revise such a logic in favor of something runtime-aware?

@garyrussell
Comment options

Yes; it would have to hash the no auto startup values.

This wouldn't help the test project because we'd still get two contexts and the inbound adapter would be live in the one created for the unit tests.

It needs more thought; I didn't think it through all the way but, ideally, yes, we'd like to cache one context for all tests, regardless of the noautostartup settings (and even including the unit tests that don't have @SpringIntegrationTest).

Perhaps always register the customizer (for all tests) and use a JUnit5 condition with a BeforeEachCallback to stop the configured no auto start endpoints. Or vice versa - coerce them all to default stopped and only start the ones not in the list.

Perhaps also an AfterEachCallback to stop all endpoints.

Not a trivial change, hence my 6.0 suggestion, rather than in a patch release.

@artembilan
Comment options

Sure! Feel free to raise a GH issue and we will come back to it when 6.0 time arrives!

@garyrussell
Comment options

#3679

I didn't convert this discussion to an issue due to its length.

@artembilan
Comment options

Right. Plus all of that optimization discussion we would revise within that issue has nothing to do with an original concern of this one.

Answer selected by stormsensorbrian
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Category
🙏
Q&A
Labels
None yet
3 participants
Morty Proxy This is a proxified and sanitized view of the page, visit original site.