Commit d4661932 authored by liaozan's avatar liaozan 🏀

Introduce MessageProducer in kafka-starter

parent 4904fa60
...@@ -3,13 +3,15 @@ package com.schbrain.framework.autoconfigure.kafka; ...@@ -3,13 +3,15 @@ package com.schbrain.framework.autoconfigure.kafka;
import com.schbrain.framework.autoconfigure.kafka.properties.KafkaProperties; import com.schbrain.framework.autoconfigure.kafka.properties.KafkaProperties;
import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Import;
/** /**
* @author liaozan * @author liaozan
* @since 2023-04-29 * @since 2023-04-29
*/ */
@AutoConfiguration @AutoConfiguration
@Import(MessageProducer.class)
@EnableConfigurationProperties(KafkaProperties.class) @EnableConfigurationProperties(KafkaProperties.class)
public class KafkaAutoConfiguration { public class KafkaAutoConfiguration {
} }
\ No newline at end of file
package com.schbrain.framework.autoconfigure.kafka;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFutureCallback;
/**
* @author liaozan
* @since 2023/7/17
*/
@Slf4j
public class MessageProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
/**
* producer 异步方式发送数据
*
* @param topic topic名称
* @param message producer发送的数据
* @param description 消息描述
*/
public void sendMessageAsync(String topic, String message, String description) {
sendMessageAsync(topic, message, new ListenableFutureCallback<>() {
@Override
public void onSuccess(SendResult<String, String> sendResult) {
log.debug("{} 消息发送成功, message: {}", description, message);
}
@Override
public void onFailure(Throwable exception) {
log.error("{} 消息发送失败, {}", description, exception.getMessage(), exception);
}
});
}
/**
* producer 异步方式发送数据
*
* @param topic topic名称
* @param message producer发送的数据
*/
public void sendMessageAsync(String topic, String message, ListenableFutureCallback<SendResult<String, String>> callback) {
try {
kafkaTemplate.send(topic, message).addCallback(callback);
} catch (Exception exception) {
log.error("消息发送失败, {}", exception.getMessage(), exception);
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment