From 07c629cae3b61d6360b7315480bf2ba2a0c9cb53 Mon Sep 17 00:00:00 2001 From: liaozan <378024053@qq.com> Date: Fri, 22 Dec 2023 15:17:58 +0800 Subject: [PATCH] Improve kafka integration --- .../kafka/CustomKafkaListenerEndpointRegistry.java | 13 +++++++------ .../starrocks/helper/ConvertUtils.java | 12 ++++++++++-- .../DefaultPropertiesEnvironmentPostProcessor.java | 2 ++ 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/starters/kafka-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/kafka/CustomKafkaListenerEndpointRegistry.java b/starters/kafka-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/kafka/CustomKafkaListenerEndpointRegistry.java index 239e4b6..5f3d56d 100644 --- a/starters/kafka-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/kafka/CustomKafkaListenerEndpointRegistry.java +++ b/starters/kafka-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/kafka/CustomKafkaListenerEndpointRegistry.java @@ -14,19 +14,20 @@ public class CustomKafkaListenerEndpointRegistry extends KafkaListenerEndpointRe private static final String CONSUMER_ENABLED_KEY = "spring.kafka.consumer.enabled"; - private final KafkaProperties kafkaProperties; + private final boolean shouldRegisterConsumer; public CustomKafkaListenerEndpointRegistry(KafkaProperties kafkaProperties) { - this.kafkaProperties = kafkaProperties; + this.shouldRegisterConsumer = EnvUtils.runningOnCloudPlatform() || kafkaProperties.getConsumer().isEnabled(); + if (!shouldRegisterConsumer) { + log.warn("Not running on CloudPlatform or {} is set to false, will not listen to messages from brokers", CONSUMER_ENABLED_KEY); + log.warn("If you want force to register with Kafka Brokers, set {} = true", CONSUMER_ENABLED_KEY); + } } @Override public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory factory, boolean startImmediately) { - if (EnvUtils.runningOnCloudPlatform() || kafkaProperties.getConsumer().isEnabled()) { + if (shouldRegisterConsumer) { super.registerListenerContainer(endpoint, factory, startImmediately); - } else { - log.warn("Not running on CloudPlatform or {} is set to false, will not listen to messages from brokers", CONSUMER_ENABLED_KEY); - log.warn("If you want force to register with Kafka Brokers, set {} = true", CONSUMER_ENABLED_KEY); } } diff --git a/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/helper/ConvertUtils.java b/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/helper/ConvertUtils.java index 1a3db66..01bbb65 100644 --- a/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/helper/ConvertUtils.java +++ b/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/helper/ConvertUtils.java @@ -18,12 +18,20 @@ public class ConvertUtils { private static final ObjectMapper DESERIALIZER = JacksonUtils.getObjectMapper().copy().setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE); - public static Target convertTo(ConsumerRecord record, Class sourceType, Class targetType) { + public static Target convertTo(ConsumerRecord record, Class targetType) { CanalChangedEvent event = JacksonUtils.getObjectFromJson(record.value(), CanalChangedEvent.class); if (event == null) { throw new BaseException("CanalChangedEvent is null"); } - Source source = DESERIALIZER.convertValue(event.getAfter(), sourceType); + return DESERIALIZER.convertValue(event.getAfter(), targetType); + } + + public static List convertToList(ConsumerRecords records, Class targetType) { + return StreamUtils.toList(records, record -> convertTo(record, targetType)); + } + + public static Target convertTo(ConsumerRecord record, Class sourceType, Class targetType) { + Source source = convertTo(record, sourceType); return BeanCopyUtils.copy(source, targetType); } diff --git a/support/schbrain-spring-support/src/main/java/com/schbrain/framework/support/spring/env/DefaultPropertiesEnvironmentPostProcessor.java b/support/schbrain-spring-support/src/main/java/com/schbrain/framework/support/spring/env/DefaultPropertiesEnvironmentPostProcessor.java index 7b60728..a82436b 100644 --- a/support/schbrain-spring-support/src/main/java/com/schbrain/framework/support/spring/env/DefaultPropertiesEnvironmentPostProcessor.java +++ b/support/schbrain-spring-support/src/main/java/com/schbrain/framework/support/spring/env/DefaultPropertiesEnvironmentPostProcessor.java @@ -61,6 +61,8 @@ public class DefaultPropertiesEnvironmentPostProcessor extends LoggerAwareEnviro defaultProperties.put("spring.mvc.format.date-time", DatePattern.NORM_DATETIME_PATTERN); defaultProperties.put("spring.jackson.date-format", DatePattern.NORM_DATETIME_PATTERN); defaultProperties.put("spring.jackson.time-zone", TimeZone.getDefault().getID()); + // kafka + defaultProperties.put("spring.kafka.consumer.group-id", ApplicationName.get(environment)); // others defaultProperties.put("spring.mandatory-file-encoding", StandardCharsets.UTF_8.name()); defaultProperties.put("spring.web.resources.add-mappings", false); -- GitLab