From b3ee0c0eda826f6e059d744212dd16b6a7cf05da Mon Sep 17 00:00:00 2001 From: wadhwaroh-lang Date: Sat, 9 May 2026 17:15:36 -0400 Subject: [PATCH] GH-3164 Support multiple StreamsBuilderFactoryBeanConfigurers Signed-off-by: wadhwaroh-lang --- .../AbstractKafkaStreamsBinderProcessor.java | 7 +- ...StreamsBinderSupportAutoConfiguration.java | 2 +- .../KafkaStreamsFunctionProcessor.java | 9 +-- ...msBinderSupportAutoConfigurationTests.java | 68 +++++++++++++++++++ .../streamsbuilderfactorybean-customizer.adoc | 5 +- 5 files changed, 80 insertions(+), 11 deletions(-) create mode 100644 binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfigurationTests.java diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/AbstractKafkaStreamsBinderProcessor.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/AbstractKafkaStreamsBinderProcessor.java index c97c7504c2..b2cd545814 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/AbstractKafkaStreamsBinderProcessor.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/AbstractKafkaStreamsBinderProcessor.java @@ -18,6 +18,7 @@ import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Objects; @@ -190,7 +191,7 @@ else if (parameterType.isAssignableFrom(GlobalKTable.class)) { protected StreamsBuilderFactoryBean buildStreamsBuilderAndRetrieveConfig(String beanNamePostPrefix, ApplicationContext applicationContext, String inboundName, KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties, - StreamsBuilderFactoryBeanConfigurer customizer, + List customizers, ConfigurableEnvironment environment, BindingProperties bindingProperties) { ConfigurableListableBeanFactory beanFactory = this.applicationContext .getBeanFactory(); @@ -347,8 +348,8 @@ else if (deserializationExceptionHandler == DeserializationExceptionHandler.skip extendedConsumerProperties.setApplicationId((String) streamConfiguration.get(StreamsConfig.APPLICATION_ID_CONFIG)); - if (customizer != null) { - customizer.configure(streamsBuilderFactoryBean); + if (!CollectionUtils.isEmpty(customizers)) { + customizers.forEach(customizer -> customizer.configure(streamsBuilderFactoryBean)); } return applicationContext.getBean( "&stream-builder-" + beanNamePostPrefix, StreamsBuilderFactoryBean.class); diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration.java index 796b919b43..591a1aa1bf 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration.java @@ -387,7 +387,7 @@ public KafkaStreamsFunctionProcessor kafkaStreamsFunctionProcessor(BindingServic return new KafkaStreamsFunctionProcessor(bindingServiceProperties, kafkaStreamsExtendedBindingProperties, keyValueSerdeResolver, kafkaStreamsBindingInformationCatalogue, kafkaStreamsMessageConversionDelegate, cleanupConfig.getIfUnique(), streamFunctionProperties, kafkaStreamsBinderConfigurationProperties, - customizerProvider.getIfUnique(), environment); + customizerProvider.orderedStream().toList(), environment); } @Bean diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsFunctionProcessor.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsFunctionProcessor.java index 37c57f8ef5..e9a8792c03 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsFunctionProcessor.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsFunctionProcessor.java @@ -87,7 +87,7 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro private BeanFactory beanFactory; private final StreamFunctionProperties streamFunctionProperties; private final KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties; - StreamsBuilderFactoryBeanConfigurer customizer; + final List customizers; ConfigurableEnvironment environment; public KafkaStreamsFunctionProcessor(BindingServiceProperties bindingServiceProperties, @@ -98,7 +98,8 @@ public KafkaStreamsFunctionProcessor(BindingServiceProperties bindingServiceProp CleanupConfig cleanupConfig, StreamFunctionProperties streamFunctionProperties, KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties, - StreamsBuilderFactoryBeanConfigurer customizer, ConfigurableEnvironment environment) { + List customizers, + ConfigurableEnvironment environment) { super(bindingServiceProperties, kafkaStreamsBindingInformationCatalogue, kafkaStreamsExtendedBindingProperties, keyValueSerdeResolver, cleanupConfig); this.bindingServiceProperties = bindingServiceProperties; @@ -108,7 +109,7 @@ public KafkaStreamsFunctionProcessor(BindingServiceProperties bindingServiceProp this.kafkaStreamsMessageConversionDelegate = kafkaStreamsMessageConversionDelegate; this.streamFunctionProperties = streamFunctionProperties; this.kafkaStreamsBinderConfigurationProperties = kafkaStreamsBinderConfigurationProperties; - this.customizer = customizer; + this.customizers = customizers != null ? customizers : List.of(); this.environment = environment; } @@ -523,7 +524,7 @@ private Object[] adaptAndRetrieveInboundArguments(Map st //Otherwise, create the StreamsBuilderFactory and get the underlying config. if (!this.methodStreamsBuilderFactoryBeanMap.containsKey(functionName)) { StreamsBuilderFactoryBean streamsBuilderFactoryBean = buildStreamsBuilderAndRetrieveConfig(functionName, applicationContext, - input, kafkaStreamsBinderConfigurationProperties, customizer, this.environment, bindingProperties); + input, kafkaStreamsBinderConfigurationProperties, customizers, this.environment, bindingProperties); this.methodStreamsBuilderFactoryBeanMap.put(functionName, streamsBuilderFactoryBean); } try { diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfigurationTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfigurationTests.java new file mode 100644 index 0000000000..4cbcaff086 --- /dev/null +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfigurationTests.java @@ -0,0 +1,68 @@ +/* + * Copyright 2026-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.kafka.streams; + +import java.util.stream.Stream; + +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties; +import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties; +import org.springframework.cloud.stream.config.BindingServiceProperties; +import org.springframework.cloud.stream.function.StreamFunctionProperties; +import org.springframework.core.env.ConfigurableEnvironment; +import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer; +import org.springframework.kafka.core.CleanupConfig; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link KafkaStreamsBinderSupportAutoConfiguration}. + * + * @author wadhwaroh-lang + */ +class KafkaStreamsBinderSupportAutoConfigurationTests { + + @Test + @SuppressWarnings("unchecked") + void kafkaStreamsFunctionProcessorUsesAllStreamsBuilderFactoryBeanConfigurers() { + StreamsBuilderFactoryBeanConfigurer first = mock(StreamsBuilderFactoryBeanConfigurer.class); + StreamsBuilderFactoryBeanConfigurer second = mock(StreamsBuilderFactoryBeanConfigurer.class); + ObjectProvider customizerProvider = mock(ObjectProvider.class); + ObjectProvider cleanupConfigProvider = mock(ObjectProvider.class); + when(customizerProvider.orderedStream()).thenReturn(Stream.of(first, second)); + + KafkaStreamsFunctionProcessor processor = new KafkaStreamsBinderSupportAutoConfiguration() + .kafkaStreamsFunctionProcessor( + mock(BindingServiceProperties.class), + mock(KafkaStreamsExtendedBindingProperties.class), + mock(KeyValueSerdeResolver.class), + mock(KafkaStreamsBindingInformationCatalogue.class), + mock(KafkaStreamsMessageConversionDelegate.class), + cleanupConfigProvider, + mock(StreamFunctionProperties.class), + mock(KafkaStreamsBinderConfigurationProperties.class), + customizerProvider, + mock(ConfigurableEnvironment.class) + ); + + assertThat(processor.customizers).containsExactly(first, second); + } +} diff --git a/docs/modules/ROOT/pages/kafka/kafka-streams-binder/streamsbuilderfactorybean-customizer.adoc b/docs/modules/ROOT/pages/kafka/kafka-streams-binder/streamsbuilderfactorybean-customizer.adoc index 89900fb570..41abba4a67 100644 --- a/docs/modules/ROOT/pages/kafka/kafka-streams-binder/streamsbuilderfactorybean-customizer.adoc +++ b/docs/modules/ROOT/pages/kafka/kafka-streams-binder/streamsbuilderfactorybean-customizer.adoc @@ -43,9 +43,8 @@ public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() `KafkaStreamsCustomizer` will be called by the `StreamsBuilderFactoryBeabn` right before the underlying `KafkaStreams` gets started. -There can only be one `StreamsBuilderFactoryBeanConfigurer` in the entire application. -Then how do we account for multiple Kafka Streams processors as each of them are backed up by individual `StreamsBuilderFactoryBean` objects? -In that case, if the customization needs to be different for those processors, then the application needs to apply some filter based on the application ID. +The binder invokes all `StreamsBuilderFactoryBeanConfigurer` beans in order before the factory bean is started. +If the customization needs to be different for multiple Kafka Streams processors, each backed by its own `StreamsBuilderFactoryBean`, then the application needs to apply some filter based on the application ID. For e.g,