Commit 8068126f authored by liaozan's avatar liaozan 🏀

Support disable kafka message listen when application running in local

parent e8474581
......@@ -4,7 +4,7 @@ import com.ctrip.framework.foundation.Foundation;
import com.schbrain.common.util.ApplicationName;
import com.schbrain.common.util.EnvUtils;
import com.schbrain.framework.support.spring.LoggerAwareEnvironmentPostProcessor;
import com.schbrain.framework.support.spring.defaults.DefaultPropertiesEnvironmentPostProcessor;
import com.schbrain.framework.support.spring.env.DefaultPropertiesEnvironmentPostProcessor;
import org.apache.commons.lang3.StringUtils;
import org.springframework.boot.ConfigurableBootstrapContext;
import org.springframework.boot.SpringApplication;
......
package com.schbrain.framework.autoconfigure.dubbo.env;
import cn.hutool.core.text.StrFormatter;
import com.google.common.collect.Maps;
import com.schbrain.common.util.EnvUtils;
import com.schbrain.framework.support.spring.LoggerAwareEnvironmentPostProcessor;
import org.springframework.boot.ConfigurableBootstrapContext;
import org.springframework.boot.DefaultPropertiesPropertySource;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.logging.DeferredLogFactory;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.util.ClassUtils;
import java.util.Map;
/**
* @author liaozan
* @since 2023/7/18
*/
public class DubboEnvironmentPostProcessor extends LoggerAwareEnvironmentPostProcessor {
private static final String DUBBO_REGISTER_KEY = "dubbo.registry.register";
public DubboEnvironmentPostProcessor(DeferredLogFactory logFactory, ConfigurableBootstrapContext bootstrapContext) {
super(logFactory, bootstrapContext);
}
@Override
public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {
Map<String, Object> defaultProperties = Maps.newHashMapWithExpectedSize(1);
configureDubboRegistrationIfPresent(environment, defaultProperties);
DefaultPropertiesPropertySource.addOrMerge(defaultProperties, environment.getPropertySources());
}
private void configureDubboRegistrationIfPresent(ConfigurableEnvironment environment, Map<String, Object> defaultProperties) {
if (!dubboInClassPath()) {
return;
}
if (EnvUtils.runningOnCloudPlatform(environment)) {
return;
}
if (!environment.containsProperty(DUBBO_REGISTER_KEY)) {
log.warn(StrFormatter.format("Not running on CloudPlatform, {} is set to false by default", DUBBO_REGISTER_KEY));
log.warn(StrFormatter.format("If you want force to register with Dubbo Registry, set {} = true", DUBBO_REGISTER_KEY));
defaultProperties.put(DUBBO_REGISTER_KEY, false);
}
}
private boolean dubboInClassPath() {
return ClassUtils.isPresent("org.apache.dubbo.config.bootstrap.DubboBootstrap", getClass().getClassLoader());
}
}
com.schbrain.common.util.support.ConfigurableProperties=\
com.schbrain.framework.autoconfigure.dubbo.properties.DubboProperties
com.schbrain.framework.autoconfigure.dubbo.properties.DubboProperties
org.springframework.context.ApplicationListener=\
com.schbrain.framework.autoconfigure.dubbo.listener.DubboConfigLoadedEventListener
org.springframework.boot.env.EnvironmentPostProcessor=\
com.schbrain.framework.autoconfigure.dubbo.env.DubboEnvironmentPostProcessor
package com.schbrain.framework.autoconfigure.kafka;
import com.schbrain.common.util.EnvUtils;
import com.schbrain.framework.autoconfigure.kafka.properties.KafkaProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpoint;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
/**
* @author liaozan
* @since 2023/7/18
*/
@Slf4j
public class CustomKafkaListenerEndpointRegistry extends KafkaListenerEndpointRegistry {
private static final String CONSUMER_ENABLED_KEY = "spring.kafka.consumer.enabled";
private final KafkaProperties kafkaProperties;
public CustomKafkaListenerEndpointRegistry(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}
@Override
public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory, boolean startImmediately) {
if (EnvUtils.runningOnCloudPlatform() || kafkaProperties.getConsumer().isEnabled()) {
super.registerListenerContainer(endpoint, factory, startImmediately);
} else {
log.warn("Not running on CloudPlatform or {} is set to false, will not listen to messages from brokers", CONSUMER_ENABLED_KEY);
log.warn("If you want force to register with Kafka Brokers, set {} = true", CONSUMER_ENABLED_KEY);
}
}
}
......@@ -2,16 +2,28 @@ package com.schbrain.framework.autoconfigure.kafka;
import com.schbrain.framework.autoconfigure.kafka.properties.KafkaProperties;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.KafkaListenerConfigUtils;
/**
* @author liaozan
* @since 2023-04-29
*/
@AutoConfiguration
@Import(MessageProducer.class)
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaAutoConfiguration {
@Bean(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
public CustomKafkaListenerEndpointRegistry customKafkaListenerEndpointRegistry(KafkaProperties kafkaProperties) {
return new CustomKafkaListenerEndpointRegistry(kafkaProperties);
}
@Bean
@ConditionalOnMissingBean
public MessageProducer defaultMessageProducer() {
return new MessageProducer();
}
}
package com.schbrain.framework.autoconfigure.kafka.properties;
import com.schbrain.common.util.support.ConfigurableProperties;
import com.schbrain.framework.autoconfigure.kafka.CustomKafkaListenerEndpointRegistry;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.NestedConfigurationProperty;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpoint;
/**
* @author liaozan
......@@ -12,9 +16,27 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = "spring.kafka")
public class KafkaProperties implements ConfigurableProperties {
/**
* 消费者配置
*/
@NestedConfigurationProperty
private Consumer consumer = new Consumer();
@Override
public String getNamespace() {
return "kafka-common";
}
@Data
public static class Consumer {
/**
* 是否启用消费者,只对在本地运行生效
*
* @see CustomKafkaListenerEndpointRegistry#registerListenerContainer(KafkaListenerEndpoint, KafkaListenerContainerFactory, boolean)
*/
private boolean enabled = false;
}
}
package com.schbrain.framework.support.spring.defaults;
package com.schbrain.framework.support.spring.env;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.text.StrFormatter;
......@@ -17,7 +17,6 @@ import org.springframework.boot.logging.DeferredLogFactory;
import org.springframework.boot.web.server.Shutdown;
import org.springframework.core.Ordered;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.util.ClassUtils;
import org.springframework.util.unit.DataSize;
import java.nio.charset.StandardCharsets;
......@@ -38,8 +37,6 @@ public class DefaultPropertiesEnvironmentPostProcessor extends LoggerAwareEnviro
private static final String SPRING_PROFILE_ACTIVE = "spring.profiles.active";
private static final String DUBBO_REGISTER_KEY = "dubbo.registry.register";
public DefaultPropertiesEnvironmentPostProcessor(DeferredLogFactory logFactory, ConfigurableBootstrapContext bootstrapContext) {
super(logFactory, bootstrapContext);
}
......@@ -72,8 +69,6 @@ public class DefaultPropertiesEnvironmentPostProcessor extends LoggerAwareEnviro
defaultProperties.put("spring.main.allow-circular-references", true);
defaultProperties.put("spring.main.banner-mode", Banner.Mode.OFF);
defaultProperties.put("server.shutdown", Shutdown.GRACEFUL);
// dubbo
configureDubboRegistrationIfPresent(environment, defaultProperties);
// active profile
configureActiveProfileIfPresent(environment, defaultProperties);
environment.setDefaultProfiles(EnvUtils.DEVELOPMENT);
......@@ -93,22 +88,4 @@ public class DefaultPropertiesEnvironmentPostProcessor extends LoggerAwareEnviro
}
}
private void configureDubboRegistrationIfPresent(ConfigurableEnvironment environment, Map<String, Object> defaultProperties) {
if (!dubboInClassPath()) {
return;
}
if (EnvUtils.runningOnCloudPlatform(environment)) {
return;
}
if (!environment.containsProperty(DUBBO_REGISTER_KEY)) {
log.warn(StrFormatter.format("Not running on CloudPlatform, {} is set to false by default", DUBBO_REGISTER_KEY));
log.warn(StrFormatter.format("If you want force to register with Dubbo Registry, set {} = true", DUBBO_REGISTER_KEY));
defaultProperties.put(DUBBO_REGISTER_KEY, false);
}
}
private boolean dubboInClassPath() {
return ClassUtils.isPresent("org.apache.dubbo.config.bootstrap.DubboBootstrap", getClass().getClassLoader());
}
}
org.springframework.boot.env.EnvironmentPostProcessor=com.schbrain.framework.support.spring.defaults.DefaultPropertiesEnvironmentPostProcessor
org.springframework.boot.env.EnvironmentPostProcessor=com.schbrain.framework.support.spring.env.DefaultPropertiesEnvironmentPostProcessor
org.springframework.context.ApplicationContextInitializer=com.schbrain.framework.support.spring.startup.BufferingApplicationStartupApplicationContextInitializer
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment