diff --git a/starters/starrocks-spring-boot-starter/pom.xml b/starters/starrocks-spring-boot-starter/pom.xml
index b1550dadbc52dbaefeb8ac7417f6b141ec8acced..e2d99104b8e2315c3f752ae28c2fd31a83edd11e 100644
--- a/starters/starrocks-spring-boot-starter/pom.xml
+++ b/starters/starrocks-spring-boot-starter/pom.xml
@@ -18,5 +18,10 @@
com.schbrain.framework
mybatis-spring-boot-starter
+
+ org.springframework.kafka
+ spring-kafka
+ true
+
diff --git a/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/helper/ConvertUtils.java b/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/helper/ConvertUtils.java
new file mode 100644
index 0000000000000000000000000000000000000000..1a3db669238e4fb0f770439c3f935a57670bb141
--- /dev/null
+++ b/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/helper/ConvertUtils.java
@@ -0,0 +1,34 @@
+package com.schbrain.framework.autoconfigure.starrocks.helper;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.PropertyNamingStrategies;
+import com.schbrain.common.entity.CanalChangedEvent;
+import com.schbrain.common.exception.BaseException;
+import com.schbrain.common.util.*;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+
+import java.util.List;
+
+/**
+ * @author liaozan
+ * @since 2023/12/22
+ */
+public class ConvertUtils {
+
+ private static final ObjectMapper DESERIALIZER = JacksonUtils.getObjectMapper().copy().setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);
+
+ public static Target convertTo(ConsumerRecord record, Class sourceType, Class targetType) {
+ CanalChangedEvent event = JacksonUtils.getObjectFromJson(record.value(), CanalChangedEvent.class);
+ if (event == null) {
+ throw new BaseException("CanalChangedEvent is null");
+ }
+ Source source = DESERIALIZER.convertValue(event.getAfter(), sourceType);
+ return BeanCopyUtils.copy(source, targetType);
+ }
+
+ public static List convertToList(ConsumerRecords records, Class sourceType, Class targetType) {
+ return StreamUtils.toList(records, record -> convertTo(record, sourceType, targetType));
+ }
+
+}