diff --git a/starters/kafka-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/kafka/CustomKafkaListenerEndpointRegistry.java b/starters/kafka-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/kafka/CustomKafkaListenerEndpointRegistry.java index 239e4b69c3555ae66ee291255608238c5cdf1c22..5f3d56d72f0578d94aee7a4ea9c13e4f6fcd194d 100644 --- a/starters/kafka-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/kafka/CustomKafkaListenerEndpointRegistry.java +++ b/starters/kafka-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/kafka/CustomKafkaListenerEndpointRegistry.java @@ -14,19 +14,20 @@ public class CustomKafkaListenerEndpointRegistry extends KafkaListenerEndpointRe private static final String CONSUMER_ENABLED_KEY = "spring.kafka.consumer.enabled"; - private final KafkaProperties kafkaProperties; + private final boolean shouldRegisterConsumer; public CustomKafkaListenerEndpointRegistry(KafkaProperties kafkaProperties) { - this.kafkaProperties = kafkaProperties; + this.shouldRegisterConsumer = EnvUtils.runningOnCloudPlatform() || kafkaProperties.getConsumer().isEnabled(); + if (!shouldRegisterConsumer) { + log.warn("Not running on CloudPlatform or {} is set to false, will not listen to messages from brokers", CONSUMER_ENABLED_KEY); + log.warn("If you want force to register with Kafka Brokers, set {} = true", CONSUMER_ENABLED_KEY); + } } @Override public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory factory, boolean startImmediately) { - if (EnvUtils.runningOnCloudPlatform() || kafkaProperties.getConsumer().isEnabled()) { + if (shouldRegisterConsumer) { super.registerListenerContainer(endpoint, factory, startImmediately); - } else { - log.warn("Not running on CloudPlatform or {} is set to false, will not listen to messages from brokers", CONSUMER_ENABLED_KEY); - log.warn("If you want force to register with Kafka Brokers, set {} = true", CONSUMER_ENABLED_KEY); } } diff --git a/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/helper/ConvertUtils.java b/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/helper/ConvertUtils.java index 1a3db669238e4fb0f770439c3f935a57670bb141..01bbb65c5ce7ddd94086f70317f362389ec13162 100644 --- a/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/helper/ConvertUtils.java +++ b/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/helper/ConvertUtils.java @@ -18,12 +18,20 @@ public class ConvertUtils { private static final ObjectMapper DESERIALIZER = JacksonUtils.getObjectMapper().copy().setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE); - public static Target convertTo(ConsumerRecord record, Class sourceType, Class targetType) { + public static Target convertTo(ConsumerRecord record, Class targetType) { CanalChangedEvent event = JacksonUtils.getObjectFromJson(record.value(), CanalChangedEvent.class); if (event == null) { throw new BaseException("CanalChangedEvent is null"); } - Source source = DESERIALIZER.convertValue(event.getAfter(), sourceType); + return DESERIALIZER.convertValue(event.getAfter(), targetType); + } + + public static List convertToList(ConsumerRecords records, Class targetType) { + return StreamUtils.toList(records, record -> convertTo(record, targetType)); + } + + public static Target convertTo(ConsumerRecord record, Class sourceType, Class targetType) { + Source source = convertTo(record, sourceType); return BeanCopyUtils.copy(source, targetType); } diff --git a/support/schbrain-spring-support/src/main/java/com/schbrain/framework/support/spring/env/DefaultPropertiesEnvironmentPostProcessor.java b/support/schbrain-spring-support/src/main/java/com/schbrain/framework/support/spring/env/DefaultPropertiesEnvironmentPostProcessor.java index 7b607285e9618bc38601806a059081069fa9b6e0..a82436b09c25084ee7b984572219c3fd750c50a9 100644 --- a/support/schbrain-spring-support/src/main/java/com/schbrain/framework/support/spring/env/DefaultPropertiesEnvironmentPostProcessor.java +++ b/support/schbrain-spring-support/src/main/java/com/schbrain/framework/support/spring/env/DefaultPropertiesEnvironmentPostProcessor.java @@ -61,6 +61,8 @@ public class DefaultPropertiesEnvironmentPostProcessor extends LoggerAwareEnviro defaultProperties.put("spring.mvc.format.date-time", DatePattern.NORM_DATETIME_PATTERN); defaultProperties.put("spring.jackson.date-format", DatePattern.NORM_DATETIME_PATTERN); defaultProperties.put("spring.jackson.time-zone", TimeZone.getDefault().getID()); + // kafka + defaultProperties.put("spring.kafka.consumer.group-id", ApplicationName.get(environment)); // others defaultProperties.put("spring.mandatory-file-encoding", StandardCharsets.UTF_8.name()); defaultProperties.put("spring.web.resources.add-mappings", false);