Skip to content

Commit

Permalink
GH-8901: Add MockIntContext.substituteTriggerFor
Browse files Browse the repository at this point in the history
Fixes: #8901

Provide a convenient API `MockIntegrationContext.substituteTriggerFor(String pollingAdapterId, Trigger trigger)`
to mitigate the time span which might be provided in the original configuration

* Add `MockIntegrationContext.substituteTriggerFor(String pollingAdapterId, Trigger trigger)`
* Refactor `MockIntegrationContext` for Java 17 code style
* Change `MockIntegrationContext.beans` to `MultiValueMap` since we may replace several properties of
the same endpoint
* Modify `MockMessageSourceTests.testMockMessageSource()` to use new `substituteTriggerFor()`, too
* Add Javadoc for the `OnlyOnceTrigger`
* Document the new feature and mention it with correlation to `OnlyOnceTrigger`
  • Loading branch information
artembilan committed Feb 15, 2024
1 parent c35982e commit 1d73ab5
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 50 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,12 @@
import org.springframework.scheduling.TriggerContext;

/**
* The {@link Trigger} implementation which returns the current time
* to schedule the task immediately and does that only once.
* <p>
* The {@link #reset()} method can be called to reuse this trigger from scratch.
* <p>
* The {@link #await()} method can be used for synchronization barriers.
*
* @author Gunnar Hillert
* @author Gary Russell
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2023 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,13 +19,9 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

import org.springframework.beans.BeansException;
import org.springframework.beans.DirectFieldAccessor;
Expand All @@ -39,6 +35,7 @@
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.endpoint.AbstractPollingEndpoint;
import org.springframework.integration.endpoint.IntegrationConsumer;
import org.springframework.integration.endpoint.ReactiveStreamsConsumer;
import org.springframework.integration.endpoint.SourcePollingChannelAdapter;
Expand All @@ -47,7 +44,10 @@
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.ReactiveMessageHandler;
import org.springframework.scheduling.Trigger;
import org.springframework.util.Assert;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.ObjectUtils;

/**
Expand Down Expand Up @@ -75,7 +75,7 @@ public class MockIntegrationContext implements BeanPostProcessor, SmartInitializ
*/
public static final String MOCK_INTEGRATION_CONTEXT_BEAN_NAME = "mockIntegrationContext";

private final Map<String, Object> beans = new HashMap<>();
private final MultiValueMap<String, Object> beans = new LinkedMultiValueMap<>();

private final List<AbstractEndpoint> autoStartupCandidates = new ArrayList<>();

Expand Down Expand Up @@ -131,8 +131,12 @@ public void resetBeans(String... beanNames) {

this.beans.entrySet()
.stream()
.filter(e -> names == null || names.contains(e.getKey()))
.forEach(e -> resetBean(this.beanFactory.getBean(e.getKey()), e.getValue()));
.filter((bean) -> names == null || names.contains(bean.getKey()))
.forEach((bean) -> {
Object endpoint = this.beanFactory.getBean(bean.getKey());
bean.getValue()
.forEach((value) -> resetBean(endpoint, value));
});

if (!ObjectUtils.isEmpty(beanNames)) {
for (String name : beanNames) {
Expand All @@ -144,28 +148,24 @@ public void resetBeans(String... beanNames) {
}
}

private void resetBean(Object endpoint, Object handler) {
private void resetBean(Object endpoint, Object component) {
DirectFieldAccessor directFieldAccessor = new DirectFieldAccessor(endpoint);
SmartLifecycle lifecycle = null;
if (endpoint instanceof SmartLifecycle && ((SmartLifecycle) endpoint).isRunning()) {
lifecycle = (SmartLifecycle) endpoint;
if (endpoint instanceof SmartLifecycle lifecycleEndpoint && lifecycleEndpoint.isRunning()) {
lifecycle = lifecycleEndpoint;
lifecycle.stop();
}
if (endpoint instanceof SourcePollingChannelAdapter) {
directFieldAccessor.setPropertyValue("source", handler);
if (endpoint instanceof SourcePollingChannelAdapter && component instanceof MessageSource<?>) {
directFieldAccessor.setPropertyValue("source", component);
}
else if (endpoint instanceof ReactiveStreamsConsumer) {
if (handler instanceof Tuple2<?, ?> value) {
directFieldAccessor.setPropertyValue(HANDLER, value.getT1());
directFieldAccessor.setPropertyValue(REACTIVE_MESSAGE_HANDLER, value.getT2());
}
else {
directFieldAccessor.setPropertyValue(HANDLER, handler);
directFieldAccessor.setPropertyValue(REACTIVE_MESSAGE_HANDLER, null);
}
else if (endpoint instanceof IntegrationConsumer && component instanceof MessageHandler) {
directFieldAccessor.setPropertyValue(HANDLER, component);
}
else if (endpoint instanceof IntegrationConsumer) {
directFieldAccessor.setPropertyValue(HANDLER, handler);
else if (endpoint instanceof ReactiveStreamsConsumer && component instanceof ReactiveMessageHandler) {
directFieldAccessor.setPropertyValue(REACTIVE_MESSAGE_HANDLER, component);
}
else if (component instanceof Trigger) {
directFieldAccessor.setPropertyValue("trigger", component);
}
if (lifecycle != null && lifecycle.isAutoStartup()) {
lifecycle.start();
Expand Down Expand Up @@ -196,7 +196,8 @@ public void substituteMessageSourceFor(String pollingAdapterId, MessageSource<?>
*/
public void substituteMessageSourceFor(String pollingAdapterId, MessageSource<?> mockMessageSource,
boolean autoStartup) {
substituteMessageSourceFor(pollingAdapterId, mockMessageSource, SourcePollingChannelAdapter.class, "source",

substituteComponentFor(pollingAdapterId, mockMessageSource, SourcePollingChannelAdapter.class, "source",
autoStartup);
}

Expand All @@ -208,29 +209,24 @@ public void substituteMessageHandlerFor(String consumerEndpointId, // NOSONAR -
MessageHandler mockMessageHandler, boolean autoStartup) {

Object endpoint = this.beanFactory.getBean(consumerEndpointId, IntegrationConsumer.class);
if (autoStartup && endpoint instanceof Lifecycle) {
((Lifecycle) endpoint).stop();
if (autoStartup && endpoint instanceof Lifecycle lifecycle) {
lifecycle.stop();
}
DirectFieldAccessor directFieldAccessor = new DirectFieldAccessor(endpoint);
Object targetMessageHandler = directFieldAccessor.getPropertyValue(HANDLER);
Assert.notNull(targetMessageHandler, () -> "'handler' must not be null in the: " + endpoint);
this.beans.add(consumerEndpointId, targetMessageHandler);
if (endpoint instanceof ReactiveStreamsConsumer) {
Object targetReactiveMessageHandler = directFieldAccessor.getPropertyValue(REACTIVE_MESSAGE_HANDLER);
if (targetReactiveMessageHandler != null) {
this.beans.put(consumerEndpointId, Tuples.of(targetMessageHandler, targetReactiveMessageHandler));
this.beans.add(consumerEndpointId, targetReactiveMessageHandler);
}
else {
this.beans.put(consumerEndpointId, targetMessageHandler);
}
}
else {
this.beans.put(consumerEndpointId, targetMessageHandler);
}

if (mockMessageHandler instanceof MessageProducer) {
if (targetMessageHandler instanceof MessageProducer) {
MessageChannel outputChannel = ((MessageProducer) targetMessageHandler).getOutputChannel();
((MessageProducer) mockMessageHandler).setOutputChannel(outputChannel);
if (mockMessageHandler instanceof MessageProducer mockMessageProducer) {
if (targetMessageHandler instanceof MessageProducer messageProducer) {
MessageChannel outputChannel = messageProducer.getOutputChannel();
mockMessageProducer.setOutputChannel(outputChannel);
}
else {
if (mockMessageHandler instanceof MockMessageHandler) {
Expand All @@ -254,23 +250,45 @@ public void substituteMessageHandlerFor(String consumerEndpointId, // NOSONAR -
directFieldAccessor.setPropertyValue(REACTIVE_MESSAGE_HANDLER, reactiveMessageHandler);
}

if (autoStartup && endpoint instanceof Lifecycle) {
((Lifecycle) endpoint).start();
if (autoStartup && endpoint instanceof Lifecycle lifecycle) {
lifecycle.start();
}
}

private void substituteMessageSourceFor(String endpointId, Object messagingComponent, Class<?> endpointClass,
/**
* Replace the real {@link Trigger} in the {@link AbstractPollingEndpoint} bean with provided instance.
* @param pollingEndpointId the {@link AbstractPollingEndpoint} bean id to replace
* @param trigger the {@link Trigger} to set into {@link AbstractPollingEndpoint}
* @since 6.3
*/
public void substituteTriggerFor(String pollingEndpointId, Trigger trigger) {
substituteTriggerFor(pollingEndpointId, trigger, true);
}

/**
* Replace the real {@link Trigger} in the {@link AbstractPollingEndpoint} bean with provided instance.
* The endpoint is not started when {@code autoStartup == false}.
* @param pollingEndpointId the {@link AbstractPollingEndpoint} bean id to replace
* @param trigger the {@link Trigger} to set into {@link AbstractPollingEndpoint}
* @param autoStartup start or not the endpoint after replacing its {@link MessageSource}
* @since 6.3
*/
public void substituteTriggerFor(String pollingEndpointId, Trigger trigger, boolean autoStartup) {
substituteComponentFor(pollingEndpointId, trigger, AbstractPollingEndpoint.class, "trigger", autoStartup);
}

private void substituteComponentFor(String endpointId, Object messagingComponent, Class<?> endpointClass,
String property, boolean autoStartup) {

Object endpoint = this.beanFactory.getBean(endpointId, endpointClass);
if (autoStartup && endpoint instanceof Lifecycle) {
((Lifecycle) endpoint).stop();
if (autoStartup && endpoint instanceof Lifecycle lifecycle) {
lifecycle.stop();
}
DirectFieldAccessor directFieldAccessor = new DirectFieldAccessor(endpoint);
this.beans.put(endpointId, directFieldAccessor.getPropertyValue(property));
this.beans.add(endpointId, directFieldAccessor.getPropertyValue(property));
directFieldAccessor.setPropertyValue(property, messagingComponent);
if (autoStartup && endpoint instanceof Lifecycle) {
((Lifecycle) endpoint).start();
if (autoStartup && endpoint instanceof Lifecycle lifecycle) {
lifecycle.start();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2022 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -39,6 +39,7 @@
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.integration.test.context.MockIntegrationContext;
import org.springframework.integration.test.context.SpringIntegrationTest;
import org.springframework.integration.test.util.OnlyOnceTrigger;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.test.annotation.DirtiesContext;
Expand Down Expand Up @@ -94,7 +95,16 @@ public void testMockMessageSource() {
assertThat(receive.getPayload()).isEqualTo("BAZ");
}

this.applicationContext.getBean("mySourceEndpoint", Lifecycle.class).stop();
this.mockIntegrationContext.resetBeans("mySourceEndpoint");

this.mockIntegrationContext.substituteTriggerFor("mySourceEndpoint", new OnlyOnceTrigger());

receive = this.results.receive(10_000);
assertThat(receive).isNotNull()
.extracting(Message::getPayload)
.isEqualTo("MYDATA");

assertThat(this.results.receive(10)).isNull();
}

@Test
Expand Down
6 changes: 6 additions & 0 deletions src/reference/antora/modules/ROOT/pages/testing.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@ public void tearDown() {
}
----

Starting with version 6.3, the `MockIntegrationContext.substituteTriggerFor()` API has been introduced.
This can be used to replace the real `Trigger` in the `AbstractPollingEndpoint`.
For example the production configuration may rely on daily (or even weekly) cron schedule.
Any custom `Trigger` can be injected into the target endpoint to mitigate the time span.
For example, the mentioned above <<using-onlyoncetrigger, `OnlyOnceTrigger`>> suggests a behavior to schedule polling task immediately and do that only once.

See the https://docs.spring.io/spring-integration/api/org/springframework/integration/test/context/MockIntegrationContext.html[Javadoc] for more information.

[[testing-mocks]]
Expand Down
8 changes: 7 additions & 1 deletion src/reference/antora/modules/ROOT/pages/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,10 @@ See xref:security.adoc[Security in Spring Integration] for more information.
=== MQTT Support Changes

The fine-grained configuration based on `MqttSubscription` API is exposed on the `Mqttv5PahoMessageDrivenChannelAdapter`.
See xref:mqtt.adoc[MQTT Support] for more information.
See xref:mqtt.adoc[MQTT Support] for more information.

[[x6.3-testing]]
=== Testing Support Changes

The `MockIntegrationContext.substituteTriggerFor()` API has been introduced.
See xref:testing.adoc[Testing Support] for more information.

0 comments on commit 1d73ab5

Please sign in to comment.