Commit 07c629ca authored by liaozan's avatar liaozan 🏀

Improve kafka integration

parent 9b1c8308
...@@ -14,19 +14,20 @@ public class CustomKafkaListenerEndpointRegistry extends KafkaListenerEndpointRe ...@@ -14,19 +14,20 @@ public class CustomKafkaListenerEndpointRegistry extends KafkaListenerEndpointRe
private static final String CONSUMER_ENABLED_KEY = "spring.kafka.consumer.enabled"; private static final String CONSUMER_ENABLED_KEY = "spring.kafka.consumer.enabled";
private final KafkaProperties kafkaProperties; private final boolean shouldRegisterConsumer;
public CustomKafkaListenerEndpointRegistry(KafkaProperties kafkaProperties) { public CustomKafkaListenerEndpointRegistry(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties; this.shouldRegisterConsumer = EnvUtils.runningOnCloudPlatform() || kafkaProperties.getConsumer().isEnabled();
if (!shouldRegisterConsumer) {
log.warn("Not running on CloudPlatform or {} is set to false, will not listen to messages from brokers", CONSUMER_ENABLED_KEY);
log.warn("If you want force to register with Kafka Brokers, set {} = true", CONSUMER_ENABLED_KEY);
}
} }
@Override @Override
public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory, boolean startImmediately) { public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory, boolean startImmediately) {
if (EnvUtils.runningOnCloudPlatform() || kafkaProperties.getConsumer().isEnabled()) { if (shouldRegisterConsumer) {
super.registerListenerContainer(endpoint, factory, startImmediately); super.registerListenerContainer(endpoint, factory, startImmediately);
} else {
log.warn("Not running on CloudPlatform or {} is set to false, will not listen to messages from brokers", CONSUMER_ENABLED_KEY);
log.warn("If you want force to register with Kafka Brokers, set {} = true", CONSUMER_ENABLED_KEY);
} }
} }
......
...@@ -18,12 +18,20 @@ public class ConvertUtils { ...@@ -18,12 +18,20 @@ public class ConvertUtils {
private static final ObjectMapper DESERIALIZER = JacksonUtils.getObjectMapper().copy().setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE); private static final ObjectMapper DESERIALIZER = JacksonUtils.getObjectMapper().copy().setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);
public static <Source, Target> Target convertTo(ConsumerRecord<String, String> record, Class<Source> sourceType, Class<Target> targetType) { public static <Target> Target convertTo(ConsumerRecord<String, String> record, Class<Target> targetType) {
CanalChangedEvent event = JacksonUtils.getObjectFromJson(record.value(), CanalChangedEvent.class); CanalChangedEvent event = JacksonUtils.getObjectFromJson(record.value(), CanalChangedEvent.class);
if (event == null) { if (event == null) {
throw new BaseException("CanalChangedEvent is null"); throw new BaseException("CanalChangedEvent is null");
} }
Source source = DESERIALIZER.convertValue(event.getAfter(), sourceType); return DESERIALIZER.convertValue(event.getAfter(), targetType);
}
public static <Target> List<Target> convertToList(ConsumerRecords<String, String> records, Class<Target> targetType) {
return StreamUtils.toList(records, record -> convertTo(record, targetType));
}
public static <Source, Target> Target convertTo(ConsumerRecord<String, String> record, Class<Source> sourceType, Class<Target> targetType) {
Source source = convertTo(record, sourceType);
return BeanCopyUtils.copy(source, targetType); return BeanCopyUtils.copy(source, targetType);
} }
......
...@@ -61,6 +61,8 @@ public class DefaultPropertiesEnvironmentPostProcessor extends LoggerAwareEnviro ...@@ -61,6 +61,8 @@ public class DefaultPropertiesEnvironmentPostProcessor extends LoggerAwareEnviro
defaultProperties.put("spring.mvc.format.date-time", DatePattern.NORM_DATETIME_PATTERN); defaultProperties.put("spring.mvc.format.date-time", DatePattern.NORM_DATETIME_PATTERN);
defaultProperties.put("spring.jackson.date-format", DatePattern.NORM_DATETIME_PATTERN); defaultProperties.put("spring.jackson.date-format", DatePattern.NORM_DATETIME_PATTERN);
defaultProperties.put("spring.jackson.time-zone", TimeZone.getDefault().getID()); defaultProperties.put("spring.jackson.time-zone", TimeZone.getDefault().getID());
// kafka
defaultProperties.put("spring.kafka.consumer.group-id", ApplicationName.get(environment));
// others // others
defaultProperties.put("spring.mandatory-file-encoding", StandardCharsets.UTF_8.name()); defaultProperties.put("spring.mandatory-file-encoding", StandardCharsets.UTF_8.name());
defaultProperties.put("spring.web.resources.add-mappings", false); defaultProperties.put("spring.web.resources.add-mappings", false);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment