From f234110f3e8208c8ee27099536c8eb25d0b5934c Mon Sep 17 00:00:00 2001 From: liaozan <378024053@qq.com> Date: Thu, 27 Jul 2023 00:06:03 +0800 Subject: [PATCH] Polish kafka support --- .../schbrain/common/util/BeanCopyUtils.java | 2 + .../kafka/KafkaAutoConfiguration.java | 8 +-- .../kafka/KafkaLoggingProducerListener.java | 30 ++++++++++ .../kafka/KafkaMessageProducer.java | 56 ------------------- 4 files changed, 36 insertions(+), 60 deletions(-) create mode 100644 starters/kafka-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/kafka/KafkaLoggingProducerListener.java delete mode 100644 starters/kafka-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/kafka/KafkaMessageProducer.java diff --git a/commons/common-util/src/main/java/com/schbrain/common/util/BeanCopyUtils.java b/commons/common-util/src/main/java/com/schbrain/common/util/BeanCopyUtils.java index 404edb6..5d291af 100644 --- a/commons/common-util/src/main/java/com/schbrain/common/util/BeanCopyUtils.java +++ b/commons/common-util/src/main/java/com/schbrain/common/util/BeanCopyUtils.java @@ -17,7 +17,9 @@ import java.util.List; /** * 注意!!!此类是基于 cglib 实现的 + *

* 默认的 cglib 只支持同名,同类型的属性转换,本类对此场景进行了增强,具体逻辑见 {@link DefaultConverter} + *

* 另外最重要的, cglib 是浅拷贝,意味着如果是引用类型,修改源对象会导致目标对象的值也被修改,使用时请注意!!! * * @author liaozan diff --git a/starters/kafka-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/kafka/KafkaAutoConfiguration.java b/starters/kafka-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/kafka/KafkaAutoConfiguration.java index 7fe4e82..f6a2a1d 100644 --- a/starters/kafka-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/kafka/KafkaAutoConfiguration.java +++ b/starters/kafka-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/kafka/KafkaAutoConfiguration.java @@ -6,7 +6,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.kafka.config.KafkaListenerConfigUtils; -import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.ProducerListener; /** * @author liaozan @@ -22,9 +22,9 @@ public class KafkaAutoConfiguration { } @Bean - @ConditionalOnMissingBean - public KafkaMessageProducer defaultKafkaMessageProducer(KafkaTemplate kafkaTemplate) { - return new KafkaMessageProducer(kafkaTemplate); + @ConditionalOnMissingBean(ProducerListener.class) + public KafkaLoggingProducerListener kafkaLoggingProducerListener() { + return new KafkaLoggingProducerListener(); } } diff --git a/starters/kafka-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/kafka/KafkaLoggingProducerListener.java b/starters/kafka-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/kafka/KafkaLoggingProducerListener.java new file mode 100644 index 0000000..37b0e51 --- /dev/null +++ b/starters/kafka-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/kafka/KafkaLoggingProducerListener.java @@ -0,0 +1,30 @@ +package com.schbrain.framework.autoconfigure.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.kafka.support.LoggingProducerListener; + +/** + * @author liaozan + * @since 2023/7/26 + */ +@Slf4j +public class KafkaLoggingProducerListener extends LoggingProducerListener { + + public KafkaLoggingProducerListener() { + this.setIncludeContents(false); + } + + @Override + public void onSuccess(ProducerRecord record, RecordMetadata metadata) { + log.debug("[{}] 消息发送成功, content: {}", record.topic(), record.value()); + } + + @Override + public void onError(ProducerRecord record, RecordMetadata metadata, Exception exception) { + log.warn("[{}] 消息发送失败, content: {}", record.topic(), record.value()); + super.onError(record, metadata, exception); + } + +} diff --git a/starters/kafka-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/kafka/KafkaMessageProducer.java b/starters/kafka-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/kafka/KafkaMessageProducer.java deleted file mode 100644 index 536669b..0000000 --- a/starters/kafka-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/kafka/KafkaMessageProducer.java +++ /dev/null @@ -1,56 +0,0 @@ -package com.schbrain.framework.autoconfigure.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.support.SendResult; -import org.springframework.util.concurrent.ListenableFutureCallback; - -/** - * @author liaozan - * @since 2023/7/17 - */ -@Slf4j -public class KafkaMessageProducer { - - private final KafkaTemplate kafkaTemplate; - - public KafkaMessageProducer(KafkaTemplate kafkaTemplate) { - this.kafkaTemplate = kafkaTemplate; - } - - /** - * producer 异步方式发送数据 - * - * @param topic topic名称 - * @param message producer发送的数据 - * @param description 消息描述 - */ - public void sendMessageAsync(String topic, String message, String description) { - sendMessageAsync(topic, message, new ListenableFutureCallback<>() { - @Override - public void onSuccess(SendResult sendResult) { - log.debug("{} 消息发送成功, message: {}", description, message); - } - - @Override - public void onFailure(Throwable exception) { - log.error("{} 消息发送失败, {}", description, exception.getMessage(), exception); - } - }); - } - - /** - * producer 异步方式发送数据 - * - * @param topic topic名称 - * @param message producer发送的数据 - */ - public void sendMessageAsync(String topic, String message, ListenableFutureCallback> callback) { - try { - kafkaTemplate.send(topic, message).addCallback(callback); - } catch (Exception exception) { - log.error("消息发送失败, {}", exception.getMessage(), exception); - } - } - -} -- GitLab