Skip to content

Commit

Permalink
Add support for bean customizers (#669)
Browse files Browse the repository at this point in the history
This commit adds the notion of `BeanCustomizer` and a generic bean post
processor that will apply any unique customizer to its target bean.

Included are concrete customizers for PulsarTemplate and
ConcurrentPulsarListenerContainerFactory in order to make it easier
to configure transactions to these components.
  • Loading branch information
onobc authored May 3, 2024
1 parent 3476dd8 commit 5c7655d
Show file tree
Hide file tree
Showing 7 changed files with 351 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.pulsar.annotation;

/**
* Customize a bean.
*
* @param <B> the bean type to customize
* @author Chris Bono
*/
@FunctionalInterface
public interface BeanCustomizer<B> {

/**
* Customize the bean.
* @param bean the bean to customize
*/
void customize(B bean);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.pulsar.annotation;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.log.LogAccessor;
import org.springframework.util.CollectionUtils;

/**
* A {@link BeanPostProcessor} that applies a customizer to beans of a specified type.
* <p>
* There must be only one customizer in the application context in order for it to be
* applied.
*
* @param <B> the type of bean to customize
* @param <C> the type of customizer
* @author Chris Bono
*/
class BeanCustomizerPostProcessor<B, C extends BeanCustomizer<B>>
implements BeanPostProcessor, ApplicationContextAware {

private final LogAccessor logger = new LogAccessor(getClass());

private final Class<B> beanType;

private final Class<C> customizerType;

private ApplicationContext applicationContext;

BeanCustomizerPostProcessor(Class<B> beanType, Class<C> customizerType) {
this.beanType = beanType;
this.customizerType = customizerType;
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (this.beanType.isInstance(bean)) {
B typedBean = this.beanType.cast(bean);
var customizers = this.applicationContext.getBeansOfType(this.customizerType);
if (CollectionUtils.isEmpty(customizers)) {
return bean;
}
if (customizers.size() > 1) {
this.logger.warn("Found multiple %s beans [%s] - must be only 1 in order to apply"
.formatted(this.customizerType.getSimpleName(), customizers.keySet()));
}
else {
customizers.values().stream().forEach((c) -> c.customize(typedBean));
}
}
return bean;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,19 @@

package org.springframework.pulsar.annotation;

import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.ResolvableType;
import org.springframework.core.type.AnnotationMetadata;
import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory;
import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactoryCustomizer;
import org.springframework.pulsar.config.PulsarAnnotationSupportBeanNames;
import org.springframework.pulsar.config.PulsarListenerEndpointRegistry;
import org.springframework.pulsar.config.PulsarReaderEndpointRegistry;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.core.PulsarTemplateCustomizer;

/**
* An {@link ImportBeanDefinitionRegistrar} class that registers a
Expand All @@ -44,6 +50,30 @@ public class PulsarBootstrapConfiguration implements ImportBeanDefinitionRegistr

@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
if (!registry.containsBeanDefinition("pulsarTemplateCustomizerPostProcessor")) {
var postProcessorType = ResolvableType.forClassWithGenerics(BeanCustomizerPostProcessor.class,
PulsarTemplate.class, PulsarTemplateCustomizer.class);
@SuppressWarnings("unchecked")
var beanDef = BeanDefinitionBuilder
.rootBeanDefinition(postProcessorType,
() -> new BeanCustomizerPostProcessor<>(PulsarTemplate.class, PulsarTemplateCustomizer.class))
.getBeanDefinition();
registry.registerBeanDefinition("pulsarTemplateCustomizerPostProcessor", beanDef);
}

if (!registry.containsBeanDefinition("concurrentContainerFactoryCustomizerPostProcessor")) {
var postProcessorType = ResolvableType.forClassWithGenerics(BeanCustomizerPostProcessor.class,
ConcurrentPulsarListenerContainerFactory.class,
ConcurrentPulsarListenerContainerFactoryCustomizer.class);
@SuppressWarnings("unchecked")
var beanDef = BeanDefinitionBuilder
.rootBeanDefinition(postProcessorType,
() -> new BeanCustomizerPostProcessor<>(ConcurrentPulsarListenerContainerFactory.class,
ConcurrentPulsarListenerContainerFactoryCustomizer.class))
.getBeanDefinition();
registry.registerBeanDefinition("concurrentContainerFactoryCustomizerPostProcessor", beanDef);
}

if (!registry
.containsBeanDefinition(PulsarAnnotationSupportBeanNames.PULSAR_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) {
registry.registerBeanDefinition(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.pulsar.config;

import org.springframework.pulsar.annotation.BeanCustomizer;

/**
* Callback interface that can be implemented to customize a
* {@link ConcurrentPulsarListenerContainerFactory}.
*
* @param <T> The message payload type
* @author Chris Bono
*/
public interface ConcurrentPulsarListenerContainerFactoryCustomizer<T>
extends BeanCustomizer<ConcurrentPulsarListenerContainerFactory<T>> {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.pulsar.core;

import org.springframework.pulsar.annotation.BeanCustomizer;

/**
* Callback interface that can be implemented to customize a {@link PulsarTemplate}.
*
* @param <T> the payload type of the template
* @author Chris Bono
*/
public interface PulsarTemplateCustomizer<T> extends BeanCustomizer<PulsarTemplate<T>> {

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -55,6 +56,9 @@
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Configuration;
import org.springframework.pulsar.annotation.EnablePulsar;
import org.springframework.pulsar.test.support.PulsarTestContainerSupport;
import org.springframework.pulsar.test.support.model.UserRecord;
import org.springframework.util.function.ThrowingConsumer;
Expand Down Expand Up @@ -391,6 +395,59 @@ void withJsonSchema() throws Exception {

}

@Nested
class PulsarTemplateCustomizerTests {

@Test
void whenSingleCustomizerAvailableThenItIsApplied() {
var template = mock(PulsarTemplate.class);
var txnProps = mock(TransactionProperties.class);
when(template.transactions()).thenReturn(txnProps);
PulsarTemplateCustomizer<?> customizer = (t) -> t.transactions().setTimeout(Duration.ofSeconds(45));
try (var appContext = new AnnotationConfigApplicationContext()) {
appContext.registerBean(PulsarTemplate.class, () -> template);
appContext.registerBean(PulsarTemplateCustomizer.class, () -> customizer);
appContext.register(PulsarTemplateCustomizerTestsConfig.class);
appContext.refresh();
verify(txnProps).setTimeout(Duration.ofSeconds(45));
}
}

@Test
void whenMultipleCustomizersAvailableThenNoneAreApplied() {
var template = mock(PulsarTemplate.class);
var txnProps = mock(TransactionProperties.class);
when(template.transactions()).thenReturn(txnProps);
PulsarTemplateCustomizer<?> customizer1 = (t) -> t.transactions().setTimeout(Duration.ofSeconds(30));
PulsarTemplateCustomizer<?> customizer2 = (t) -> t.transactions().setTimeout(Duration.ofSeconds(45));
try (var appContext = new AnnotationConfigApplicationContext()) {
appContext.registerBean(PulsarTemplate.class, () -> template);
appContext.registerBean("customizer1", PulsarTemplateCustomizer.class, () -> customizer1);
appContext.registerBean("customizer2", PulsarTemplateCustomizer.class, () -> customizer2);
appContext.register(PulsarTemplateCustomizerTestsConfig.class);
appContext.refresh();
verify(txnProps, never()).setTimeout(any(Duration.class));
}
}

@Test
void whenNoCustomizersAvaiableThenContextStartsWithoutFailure() {
var template = mock(PulsarTemplate.class);
try (var appContext = new AnnotationConfigApplicationContext()) {
appContext.registerBean(PulsarTemplate.class, () -> template);
appContext.register(PulsarTemplateCustomizerTestsConfig.class);
appContext.refresh();
}
}

@Configuration(proxyBeanMethods = false)
@EnablePulsar
static class PulsarTemplateCustomizerTestsConfig {

}

}

public static class Foo {

private String foo;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.pulsar.listener;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.time.Duration;

import org.junit.jupiter.api.Test;

import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Configuration;
import org.springframework.pulsar.annotation.EnablePulsar;
import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory;
import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactoryCustomizer;

/**
* Tests for applying {@link ConcurrentPulsarListenerContainerFactoryCustomizer} to the
* {@link ConcurrentPulsarListenerContainerFactory}.
*
* @author Chris Bono
*/
class ConcurrentPulsarListenerContainerFactoryCustomizerTests {

@Test
void whenSingleCustomizerAvailableThenItIsApplied() {
var containerFactory = mock(ConcurrentPulsarListenerContainerFactory.class);
var containerProps = new PulsarContainerProperties();
when(containerFactory.getContainerProperties()).thenReturn(containerProps);
ConcurrentPulsarListenerContainerFactoryCustomizer<?> customizer = (
cf) -> cf.getContainerProperties().transactions().setTimeout(Duration.ofSeconds(45));
try (var appContext = new AnnotationConfigApplicationContext()) {
appContext.registerBean(ConcurrentPulsarListenerContainerFactory.class, () -> containerFactory);
appContext.registerBean(ConcurrentPulsarListenerContainerFactoryCustomizer.class, () -> customizer);
appContext.register(ConcurrentPulsarListenerContainerFactoryCustomizerTestsConfig.class);
appContext.refresh();
assertThat(containerProps.transactions().getTimeout()).isEqualTo(Duration.ofSeconds(45));
}
}

@Test
void whenMultipleCustomizersAvailableThenNoneAreApplied() {
var containerFactory = mock(ConcurrentPulsarListenerContainerFactory.class);
var containerProps = new PulsarContainerProperties();
when(containerFactory.getContainerProperties()).thenReturn(containerProps);
ConcurrentPulsarListenerContainerFactoryCustomizer<?> customizer1 = (
cf) -> cf.getContainerProperties().transactions().setTimeout(Duration.ofSeconds(45));
ConcurrentPulsarListenerContainerFactoryCustomizer<?> customizer2 = (
cf) -> cf.getContainerProperties().transactions().setTimeout(Duration.ofSeconds(60));
try (var appContext = new AnnotationConfigApplicationContext()) {
appContext.registerBean(ConcurrentPulsarListenerContainerFactory.class, () -> containerFactory);
appContext.registerBean("customizer1", ConcurrentPulsarListenerContainerFactoryCustomizer.class,
() -> customizer1);
appContext.registerBean("customizer2", ConcurrentPulsarListenerContainerFactoryCustomizer.class,
() -> customizer2);
appContext.register(ConcurrentPulsarListenerContainerFactoryCustomizerTestsConfig.class);
appContext.refresh();
assertThat(containerProps.transactions().getTimeout()).isNull();
}
}

@Test
void whenNoCustomizersAvaiableThenContextStartsWithoutFailure() {
var containerFactory = mock(ConcurrentPulsarListenerContainerFactory.class);
try (var appContext = new AnnotationConfigApplicationContext()) {
appContext.registerBean(ConcurrentPulsarListenerContainerFactory.class, () -> containerFactory);
appContext.register(ConcurrentPulsarListenerContainerFactoryCustomizerTestsConfig.class);
appContext.refresh();
}
}

@Configuration(proxyBeanMethods = false)
@EnablePulsar
static class ConcurrentPulsarListenerContainerFactoryCustomizerTestsConfig {

}

}

0 comments on commit 5c7655d

Please sign in to comment.