Commit 9b1c8308 authored by liaozan's avatar liaozan 🏀

Add utils for canalEvent convert

parent 9018c33d
......@@ -18,5 +18,10 @@
<groupId>com.schbrain.framework</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
</project>
package com.schbrain.framework.autoconfigure.starrocks.helper;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.schbrain.common.entity.CanalChangedEvent;
import com.schbrain.common.exception.BaseException;
import com.schbrain.common.util.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.util.List;
/**
* @author liaozan
* @since 2023/12/22
*/
public class ConvertUtils {
private static final ObjectMapper DESERIALIZER = JacksonUtils.getObjectMapper().copy().setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);
public static <Source, Target> Target convertTo(ConsumerRecord<String, String> record, Class<Source> sourceType, Class<Target> targetType) {
CanalChangedEvent event = JacksonUtils.getObjectFromJson(record.value(), CanalChangedEvent.class);
if (event == null) {
throw new BaseException("CanalChangedEvent is null");
}
Source source = DESERIALIZER.convertValue(event.getAfter(), sourceType);
return BeanCopyUtils.copy(source, targetType);
}
public static <Source, Target> List<Target> convertToList(ConsumerRecords<String, String> records, Class<Source> sourceType, Class<Target> targetType) {
return StreamUtils.toList(records, record -> convertTo(record, sourceType, targetType));
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment