diff --git a/starters/kafka-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/kafka/KafkaAutoConfiguration.java b/starters/kafka-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/kafka/KafkaAutoConfiguration.java index 7e8e10e5c26ea467f84b2e06d2789df5e7a4de82..7b9dfe1e59023f3500ae05496af794e4644187d4 100644 --- a/starters/kafka-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/kafka/KafkaAutoConfiguration.java +++ b/starters/kafka-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/kafka/KafkaAutoConfiguration.java @@ -3,13 +3,15 @@ package com.schbrain.framework.autoconfigure.kafka; import com.schbrain.framework.autoconfigure.kafka.properties.KafkaProperties; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Import; /** * @author liaozan * @since 2023-04-29 */ @AutoConfiguration +@Import(MessageProducer.class) @EnableConfigurationProperties(KafkaProperties.class) public class KafkaAutoConfiguration { -} \ No newline at end of file +} diff --git a/starters/kafka-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/kafka/MessageProducer.java b/starters/kafka-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/kafka/MessageProducer.java new file mode 100644 index 0000000000000000000000000000000000000000..b9b9fdb657251e6fb7d8303ca6c939993cf03411 --- /dev/null +++ b/starters/kafka-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/kafka/MessageProducer.java @@ -0,0 +1,54 @@ +package com.schbrain.framework.autoconfigure.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.util.concurrent.ListenableFutureCallback; + +/** + * @author liaozan + * @since 2023/7/17 + */ +@Slf4j +public class MessageProducer { + + @Autowired + private KafkaTemplate kafkaTemplate; + + /** + * producer 异步方式发送数据 + * + * @param topic topic名称 + * @param message producer发送的数据 + * @param description 消息描述 + */ + public void sendMessageAsync(String topic, String message, String description) { + sendMessageAsync(topic, message, new ListenableFutureCallback<>() { + @Override + public void onSuccess(SendResult sendResult) { + log.debug("{} 消息发送成功, message: {}", description, message); + } + + @Override + public void onFailure(Throwable exception) { + log.error("{} 消息发送失败, {}", description, exception.getMessage(), exception); + } + }); + } + + /** + * producer 异步方式发送数据 + * + * @param topic topic名称 + * @param message producer发送的数据 + */ + public void sendMessageAsync(String topic, String message, ListenableFutureCallback> callback) { + try { + kafkaTemplate.send(topic, message).addCallback(callback); + } catch (Exception exception) { + log.error("消息发送失败, {}", exception.getMessage(), exception); + } + } + +}