Commit f234110f authored by liaozan's avatar liaozan 🏀

Polish kafka support

parent 794edf56
...@@ -17,7 +17,9 @@ import java.util.List; ...@@ -17,7 +17,9 @@ import java.util.List;
/** /**
* 注意!!!此类是基于 cglib 实现的 * 注意!!!此类是基于 cglib 实现的
* <p>
* 默认的 cglib 只支持同名,同类型的属性转换,本类对此场景进行了增强,具体逻辑见 {@link DefaultConverter} * 默认的 cglib 只支持同名,同类型的属性转换,本类对此场景进行了增强,具体逻辑见 {@link DefaultConverter}
* <p>
* 另外最重要的, cglib 是浅拷贝,意味着如果是引用类型,修改源对象会导致目标对象的值也被修改,使用时请注意!!! * 另外最重要的, cglib 是浅拷贝,意味着如果是引用类型,修改源对象会导致目标对象的值也被修改,使用时请注意!!!
* *
* @author liaozan * @author liaozan
......
...@@ -6,7 +6,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean ...@@ -6,7 +6,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.KafkaListenerConfigUtils; import org.springframework.kafka.config.KafkaListenerConfigUtils;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.ProducerListener;
/** /**
* @author liaozan * @author liaozan
...@@ -22,9 +22,9 @@ public class KafkaAutoConfiguration { ...@@ -22,9 +22,9 @@ public class KafkaAutoConfiguration {
} }
@Bean @Bean
@ConditionalOnMissingBean @ConditionalOnMissingBean(ProducerListener.class)
public KafkaMessageProducer defaultKafkaMessageProducer(KafkaTemplate<String, String> kafkaTemplate) { public KafkaLoggingProducerListener kafkaLoggingProducerListener() {
return new KafkaMessageProducer(kafkaTemplate); return new KafkaLoggingProducerListener();
} }
} }
package com.schbrain.framework.autoconfigure.kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.support.LoggingProducerListener;
/**
* @author liaozan
* @since 2023/7/26
*/
@Slf4j
public class KafkaLoggingProducerListener extends LoggingProducerListener<Object, Object> {
public KafkaLoggingProducerListener() {
this.setIncludeContents(false);
}
@Override
public void onSuccess(ProducerRecord<Object, Object> record, RecordMetadata metadata) {
log.debug("[{}] 消息发送成功, content: {}", record.topic(), record.value());
}
@Override
public void onError(ProducerRecord<Object, Object> record, RecordMetadata metadata, Exception exception) {
log.warn("[{}] 消息发送失败, content: {}", record.topic(), record.value());
super.onError(record, metadata, exception);
}
}
package com.schbrain.framework.autoconfigure.kafka;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFutureCallback;
/**
* @author liaozan
* @since 2023/7/17
*/
@Slf4j
public class KafkaMessageProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaMessageProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
/**
* producer 异步方式发送数据
*
* @param topic topic名称
* @param message producer发送的数据
* @param description 消息描述
*/
public void sendMessageAsync(String topic, String message, String description) {
sendMessageAsync(topic, message, new ListenableFutureCallback<>() {
@Override
public void onSuccess(SendResult<String, String> sendResult) {
log.debug("{} 消息发送成功, message: {}", description, message);
}
@Override
public void onFailure(Throwable exception) {
log.error("{} 消息发送失败, {}", description, exception.getMessage(), exception);
}
});
}
/**
* producer 异步方式发送数据
*
* @param topic topic名称
* @param message producer发送的数据
*/
public void sendMessageAsync(String topic, String message, ListenableFutureCallback<SendResult<String, String>> callback) {
try {
kafkaTemplate.send(topic, message).addCallback(callback);
} catch (Exception exception) {
log.error("消息发送失败, {}", exception.getMessage(), exception);
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment