Commit 9f76e74e authored by zhuyunfeng's avatar zhuyunfeng

代码初始提交

parent 823d2af0
Pipeline #1006 failed with stages
in 0 seconds
.idea/
target/
# schbrain-canal # canal-client-springboot-starter
canal客户端工具类
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.schbrain.framework</groupId>
<artifactId>schbrain-parent</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<packaging>pom</packaging>
<groupId>com.schbrain.framework</groupId>
<artifactId>schbrain-canal</artifactId>
<version>1.0.0-SNAPSHOT</version>
<name>schbrain-canal</name>
<modules>
<module>schbrain-canal-client</module>
<module>schbrain-canal-web</module>
</modules>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>schbrain-canal</artifactId>
<groupId>com.schbrain.framework</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>schbrain-canal-client</artifactId>
<name>schbrain-canal-client</name>
<properties>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.5</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.5</version>
</dependency>
<dependency>
<groupId>com.schbrain.framework</groupId>
<artifactId>schbrain-spring-support</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
package com.schbrain.canal.client.annotation;
import org.springframework.core.annotation.AliasFor;
import org.springframework.stereotype.Component;
import java.lang.annotation.*;
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface CanalEventListener {
@AliasFor(annotation = Component.class)
String value() default "";
}
package com.schbrain.canal.client.annotation;
import com.alibaba.otter.canal.protocol.CanalEntry;
import org.springframework.core.annotation.AliasFor;
import java.lang.annotation.*;
/**
* ListenPoint for delete
*
* @author chen.qian
* @date 2018/3/19
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@ListenPoint(eventType = CanalEntry.EventType.DELETE)
public @interface DeleteListenPoint {
/**
* canal destination
* default for all
* @return canal destination
*/
@AliasFor(annotation = ListenPoint.class)
String destination() default "";
/**
* database schema which you are concentrate on
* default for all
* @return canal destination
*/
@AliasFor(annotation = ListenPoint.class)
String[] schema() default {};
/**
* tables which you are concentrate on
* default for all
* @return canal destination
*/
@AliasFor(annotation = ListenPoint.class)
String[] table() default {};
}
package com.schbrain.canal.client.annotation;
import com.schbrain.canal.client.conf.CanalClientConfiguration;
import com.schbrain.canal.client.conf.SchbrainCanalConfig;
import org.springframework.context.annotation.Import;
import java.lang.annotation.*;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import({SchbrainCanalConfig.class, CanalClientConfiguration.class})
public @interface EnableCanalClient {
}
package com.schbrain.canal.client.annotation;
import com.alibaba.otter.canal.protocol.CanalEntry;
import org.springframework.core.annotation.AliasFor;
import java.lang.annotation.*;
/**
* ListenPoint for insert
*
* @author chen.qian
* @date 2018/3/19
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@ListenPoint(eventType = CanalEntry.EventType.INSERT)
public @interface InsertListenPoint {
/**
* canal destination
* default for all
* @return canal destination
*/
@AliasFor(annotation = ListenPoint.class)
String destination() default "";
/**
* database schema which you are concentrate on
* default for all
* @return canal destination
*/
@AliasFor(annotation = ListenPoint.class)
String[] schema() default {};
/**
* tables which you are concentrate on
* default for all
* @return canal destination
*/
@AliasFor(annotation = ListenPoint.class)
String[] table() default {};
}
package com.schbrain.canal.client.annotation;
import com.alibaba.otter.canal.protocol.CanalEntry;
import java.lang.annotation.*;
/**
* used to indicate that method(or methods) is(are) the candidate of the
* canal event distributor
*
* @author chen.qian
* @date 2018/3/19
*/
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ListenPoint {
/**
* canal destination
* default for all
* @return canal destination
*/
String destination() default "";
/**
* database schema which you are concentrate on
* default for all
* @return canal destination
*/
String[] schema() default {};
/**
* tables which you are concentrate on
* default for all
* @return canal destination
*/
String[] table() default {};
/**
* canal event type
* default for all
* @return canal event type
*/
CanalEntry.EventType[] eventType() default {};
}
package com.schbrain.canal.client.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Table
* @author zhuyf
* @date 2022/6/16
*/
@Target(ElementType.TYPE)
@Retention(value = RetentionPolicy.RUNTIME)
public @interface TableFilter {
/**
* 表名称
* @return
*/
String table();
/**
* 数据库名称
* @return
*/
String schame();
}
package com.schbrain.canal.client.annotation;
import com.alibaba.otter.canal.protocol.CanalEntry;
import org.springframework.core.annotation.AliasFor;
import java.lang.annotation.*;
/**
* ListenPoint for update
*
* @author chen.qian
* @date 2018/3/19
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@ListenPoint(eventType = CanalEntry.EventType.UPDATE)
public @interface UpdateListenPoint {
/**
* canal destination
* default for all
* @return canal destination
*/
@AliasFor(annotation = ListenPoint.class)
String destination() default "";
/**
* database schema which you are concentrate on
* default for all
* @return canal destination
*/
@AliasFor(annotation = ListenPoint.class)
String[] schema() default {};
/**
* tables which you are concentrate on
* default for all
* @return canal destination
*/
@AliasFor(annotation = ListenPoint.class)
String[] table() default {};
}
package com.schbrain.canal.client.conf;
import lombok.Data;
/**
* CANAL Config
*
* @author zhuyf
* @date 2022/6/16
*/
@Data
public class CanalClientConfig {
private String addresses;
private String subscribe;
private String zkHosts;
private String username = "";
private String password = "";
private int retryCount;
private long acquireInterval = 1000;
private int batchSize = 1000;
}
package com.schbrain.canal.client.conf;
import com.schbrain.canal.client.core.CanalClient;
import com.schbrain.canal.client.core.SimpleCanalClient;
import com.schbrain.canal.client.transfer.MessageTransponders;
import com.schbrain.canal.client.transfer.TransponderFactory;
import com.schbrain.canal.client.utils.BeanUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
/**
* 启动配置类
*
* @author zhuyf
* @date 2022/6/16
*/
@Slf4j
public class CanalClientConfiguration {
@Autowired
private SchbrainCanalConfig schbrainCanalConfig;
@Bean
@Order(Ordered.HIGHEST_PRECEDENCE)
public BeanUtil beanUtil() {
return new BeanUtil();
}
@Bean
private CanalClient canalClient() {
log.info("starting canal client....");
TransponderFactory factory = MessageTransponders.defaultMessageTransponder();
CanalClient client = new SimpleCanalClient(schbrainCanalConfig,factory);
client.start();
return client;
}
}
package com.schbrain.canal.client.conf;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* Canal配置
* @author zhuyf
* @date 2022/6/16
*/
@Component
@ConfigurationProperties(prefix = "canal.client")
public class SchbrainCanalConfig {
/**
* instance config
*/
private Map<String, CanalClientConfig> instances = new LinkedHashMap<>();
/**
* 获取实例列表
* @return
*/
public Map<String, CanalClientConfig> getInstances() {
return instances;
}
/**
* 设置实例列表
* @param instances
*/
public void setInstances(Map<String, CanalClientConfig> instances) {
this.instances = instances;
}
}
package com.schbrain.canal.client.core;
import com.alibaba.otter.canal.client.CanalConnector;
import com.schbrain.canal.client.conf.CanalClientConfig;
import com.schbrain.canal.client.conf.SchbrainCanalConfig;
import com.schbrain.canal.client.exception.CanalClientException;
import com.schbrain.canal.client.transfer.TransponderFactory;
import org.apache.commons.lang3.StringUtils;
import java.util.Map;
import java.util.Objects;
public abstract class AbstractCanalClient implements CanalClient {
/**
* running flag
*/
private volatile boolean running;
/**
* customer config
*/
private SchbrainCanalConfig canalConfig;
/**
* TransponderFactory
*/
protected final TransponderFactory factory;
AbstractCanalClient(SchbrainCanalConfig canalConfig,TransponderFactory factory) {
Objects.requireNonNull(canalConfig, "canalConfig can not be null!");
Objects.requireNonNull(canalConfig, "transponderFactory can not be null!");
this.canalConfig = canalConfig;
this.factory = factory;
}
@Override
public void start() {
Map<String, CanalClientConfig> instanceMap = getConfig();
for (Map.Entry<String,CanalClientConfig> instanceEntry : instanceMap.entrySet()) {
CanalConnector connector = processInstanceEntry(instanceEntry);
process(connector, instanceEntry);
}
}
/**
* To initialize the canal connector
* @param connector CanalConnector
* @param config config
*/
protected abstract void process(CanalConnector connector,Map.Entry<String,CanalClientConfig> config);
/**
* 连接
* @param instanceEntry
* @return
*/
private CanalConnector processInstanceEntry(Map.Entry<String,CanalClientConfig> instanceEntry) {
CanalClientConfig instance = instanceEntry.getValue();
String destination = instanceEntry.getKey();
CanalConnector connector = ConnectionFactory.create(instance,destination);
connector.connect();
if (!StringUtils.isEmpty(instance.getSubscribe())) {
connector.subscribe(instance.getSubscribe());
} else {
connector.subscribe();
}
connector.rollback();
return connector;
}
protected Map<String,CanalClientConfig> getConfig() {
SchbrainCanalConfig config = canalConfig;
Map<String,CanalClientConfig> instanceMap;
if (config != null && (instanceMap = config.getInstances()) != null && !instanceMap.isEmpty()) {
return config.getInstances();
} else {
throw new CanalClientException("can not get the configuration of canal client!");
}
}
@Override
public void stop() {
setRunning(false);
}
@Override
public boolean isRunning() {
return running;
}
private void setRunning(boolean running) {
this.running = running;
}
}
package com.schbrain.canal.client.core;
/**
* CanalClient
* @author zhuyf
* @date 2022/6/16
*/
public interface CanalClient {
/**
* start
*/
void start();
/**
* stop
*/
void stop();
/**
* is running
* @return yes or no
*/
boolean isRunning();
}
package com.schbrain.canal.client.core;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.impl.ClusterCanalConnector;
import com.alibaba.otter.canal.client.impl.ClusterNodeAccessStrategy;
import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
import com.schbrain.canal.client.conf.CanalClientConfig;
import com.schbrain.canal.client.exception.CanalClientException;
import org.apache.commons.lang.StringUtils;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
public class ConnectionFactory {
/**
* 创建连接
* @param config
* @param destination
* @return
*/
public static CanalConnector create(CanalClientConfig config, String destination){
if(StringUtils.isNotBlank(config.getAddresses())){
List<InetSocketAddress> inetSocketAddressList=new ArrayList<>();
String[] hosts=config.getAddresses().split(",");
for (String hostInfo: hosts) {
String[] hostAndPort= hostInfo.split(":");
inetSocketAddressList.add(new InetSocketAddress(hostAndPort[0], Integer.valueOf(hostAndPort[1])));
}
return CanalConnectors.newClusterConnector(inetSocketAddressList,destination, config.getUsername(),config.getPassword());
}else if(StringUtils.isNotBlank(config.getZkHosts())){
return newClusterConnector(config.getZkHosts(),destination, config.getUsername(), config.getPassword());
}else{
new CanalClientException("zkHosts and addresses cannot all empty");
return null;
}
}
/**
* 创建带cluster模式的客户端链接,自动完成failover切换,服务器列表自动扫描
*
* @param zkServers
* @param destination
* @param username
* @param password
* @return
*/
public static CanalConnector newClusterConnector(String zkServers, String destination, String username, String password) {
ClusterCanalConnector canalConnector = new ClusterCanalConnector(username, password, destination, new ClusterNodeAccessStrategy(destination, ZkClientx.getZkClient(zkServers)));
canalConnector.setSoTimeout(60 * 1000);
canalConnector.setIdleTimeout(60 * 60 * 1000);
return canalConnector;
}
}
package com.schbrain.canal.client.core;
import com.schbrain.canal.client.annotation.ListenPoint;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
/**
* @author zhuyf
* @date 2022/6/16
*/
public class ListenerPoint {
private Object target;
private Map<Method, ListenPoint> invokeMap = new HashMap<>();
ListenerPoint(Object target, Method method, ListenPoint anno) {
this.target = target;
this.invokeMap.put(method, anno);
}
public Object getTarget() {
return target;
}
public Map<Method, ListenPoint> getInvokeMap() {
return invokeMap;
}
}
package com.schbrain.canal.client.core;
import com.alibaba.otter.canal.client.CanalConnector;
import com.schbrain.canal.client.annotation.CanalEventListener;
import com.schbrain.canal.client.annotation.ListenPoint;
import com.schbrain.canal.client.annotation.TableFilter;
import com.schbrain.canal.client.conf.CanalClientConfig;
import com.schbrain.canal.client.conf.SchbrainCanalConfig;
import com.schbrain.canal.client.event.CanalEvent;
import com.schbrain.canal.client.transfer.TransponderFactory;
import com.schbrain.canal.client.utils.BeanUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.annotation.AnnotationUtils;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Slf4j
public class SimpleCanalClient extends AbstractCanalClient {
/**
* executor
*/
private ThreadPoolExecutor executor;
private final List<CanalEvent> listeners = new ArrayList<>();
private final List<ListenerPoint> annoListeners = new ArrayList<>();
public SimpleCanalClient(SchbrainCanalConfig canalConfig, TransponderFactory factory) {
super(canalConfig,factory);
executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), Executors.defaultThreadFactory());
initListeners();
}
private void initListeners() {
log.info("{}: initializing the listeners....", Thread.currentThread().getName());
List<CanalEvent> list = BeanUtil.getBeansOfType(CanalEvent.class);
if(list!=null && list.size() > 0){
for (CanalEvent canalEvent : list) {
TableFilter table = canalEvent.getClass().getAnnotation(TableFilter.class);
if(table!=null){
System.out.println(table.schame());
System.out.println(table.table());
}
}
listeners.addAll(list);
}
Map<String, Object> listenerMap = BeanUtil.getBeansWithAnnotation(CanalEventListener.class);
if (listenerMap != null) {
for (Object target : listenerMap.values()) {
Method[] methods = target.getClass().getDeclaredMethods();
if (methods != null && methods.length > 0) {
for (Method method : methods) {
ListenPoint l = AnnotationUtils.findAnnotation(method, ListenPoint.class);
if (l != null) {
annoListeners.add(new ListenerPoint(target, method, l));
}
}
}
}
}
log.info("{}: initializing the listeners end.", Thread.currentThread().getName());
if (log.isWarnEnabled() && listeners.isEmpty() && annoListeners.isEmpty()) {
log.warn("{}: No listener found in context! ", Thread.currentThread().getName());
}
}
@Override
protected void process(CanalConnector connector, Map.Entry<String, CanalClientConfig> config) {
executor.submit(factory.newTransponder(connector, config, listeners, annoListeners));
//factory.newTransponder(connector, config, listeners, annoListeners).run();
}
@Override
public void stop() {
super.stop();
executor.shutdown();
}
}
package com.schbrain.canal.client.event;
import com.alibaba.otter.canal.protocol.CanalEntry;
public interface CanalEvent {
/**
* on event
* @param eventType
* @param rowData
*/
void onEvent(CanalEntry.Header header,CanalEntry.EventType eventType, CanalEntry.RowData rowData);
}
package com.schbrain.canal.client.event;
import com.alibaba.otter.canal.protocol.CanalEntry;
public interface DefCanalEvent extends CanalEvent{
@Override
default void onEvent(CanalEntry.Header header,CanalEntry.EventType eventType, CanalEntry.RowData rowData){
switch (eventType) {
case INSERT:
onInsert(header,rowData);
break;
case UPDATE:
onUpdate(header,rowData);
break;
case DELETE:
onDelete(header,rowData);
break;
default:
break;
}
}
/**
* onInsert
* @param rowData
*/
void onInsert(CanalEntry.Header header,CanalEntry.RowData rowData);
/**
* onUpdate
* @param rowData
*/
void onUpdate(CanalEntry.Header header,CanalEntry.RowData rowData);
/**
* onDelete
* @param rowData
*/
void onDelete(CanalEntry.Header header,CanalEntry.RowData rowData);
}
package com.schbrain.canal.client.exception;
/**
* @author zhuyf
* @date 2022/6/16
*/
public class CanalClientException extends RuntimeException{
public CanalClientException() {
}
public CanalClientException(String message) {
super(message);
}
public CanalClientException(String message, Throwable cause) {
super(message, cause);
}
public CanalClientException(Throwable cause) {
super(cause);
}
public CanalClientException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
package com.schbrain.canal.client.transfer;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.schbrain.canal.client.annotation.ListenPoint;
import com.schbrain.canal.client.conf.CanalClientConfig;
import com.schbrain.canal.client.core.ListenerPoint;
import com.schbrain.canal.client.event.CanalEvent;
import com.schbrain.canal.client.exception.CanalClientException;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
/**
* @author zhuyf
* @date 2022/6/16
*/
@Slf4j
public abstract class AbstractBasicMessageTransponder extends AbstractMessageTransponder{
public AbstractBasicMessageTransponder(CanalConnector connector, Map.Entry<String, CanalClientConfig> config, List<CanalEvent> listeners, List<ListenerPoint> annoListeners) {
super(connector, config, listeners, annoListeners);
}
@Override
protected void distributeEvent(Message message) {
List<CanalEntry.Entry> entries = message.getEntries();
for (CanalEntry.Entry entry : entries) {
//忽略不处理的
List<CanalEntry.EntryType> ignoreEntryTypes = getIgnoreEntryTypes();
if (ignoreEntryTypes != null && ignoreEntryTypes.stream().anyMatch(t -> entry.getEntryType() == t)) {
continue;
}
CanalEntry.RowChange rowChange;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new CanalClientException("ERROR ## parser of event has an error , data:" + entry.toString(),e);
}
//ignore the ddl operation
if (rowChange.hasIsDdl() && rowChange.getIsDdl()) {
processDdl(rowChange);
continue;
}
CanalEntry.Header header = entry.getHeader();
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
//distribute to listener interfaces
distributeByImpl(header,rowChange.getEventType(), rowData);
//distribute to annotation listener interfaces
distributeByAnnotation(destination,
entry.getHeader().getSchemaName(),
entry.getHeader().getTableName(),
rowChange.getEventType(),
rowData);
}
}
}
/**
* distribute to listener interfaces
*
* @param eventType eventType
* @param rowData rowData
*/
protected void distributeByImpl(CanalEntry.Header header,CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
if (listeners != null) {
for (CanalEvent listener : listeners) {
listener.onEvent(header,eventType, rowData);
}
}
}
/**
* distribute to annotation listener interfaces
*
* @param destination destination
* @param schemaName schema
* @param tableName table name
* @param eventType event type
* @param rowData row data
*/
protected void distributeByAnnotation(String destination,
String schemaName,
String tableName,
CanalEntry.EventType eventType,
CanalEntry.RowData rowData) {
//invoke the listeners
annoListeners.forEach(point -> point
.getInvokeMap()
.entrySet()
.stream()
.filter(getAnnotationFilter(destination, schemaName, tableName, eventType))
.forEach(entry -> {
Method method = entry.getKey();
method.setAccessible(true);
try {
Object[] args = getInvokeArgs(method, eventType, rowData);
method.invoke(point.getTarget(), args);
} catch (Exception e) {
log.error("{}: Error occurred when invoke the listener's interface! class:{}, method:{}",
Thread.currentThread().getName(),
point.getTarget().getClass().getName(), method.getName());
}
}));
}
/**
* get the filters predicate
* @param destination destination
* @param schemaName schema
* @param tableName table name
* @param eventType event type
* @return predicate
*/
protected abstract Predicate<Map.Entry<Method, ListenPoint>> getAnnotationFilter(String destination,
String schemaName,
String tableName,
CanalEntry.EventType eventType);
/**
* get the args
* @param method method
* @param eventType event type
* @param rowData row data
* @return args which will be used by invoking the annotation methods
*/
protected abstract Object[] getInvokeArgs(Method method, CanalEntry.EventType eventType,CanalEntry.RowData rowData);
/**
* ddl事件
* @param rowChange
*/
protected void processDdl(CanalEntry.RowChange rowChange) {}
protected List<CanalEntry.EntryType> getIgnoreEntryTypes() {
return Collections.emptyList();
}
}
package com.schbrain.canal.client.transfer;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import com.schbrain.canal.client.conf.CanalClientConfig;
import com.schbrain.canal.client.core.ListenerPoint;
import com.schbrain.canal.client.event.CanalEvent;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
* @author zhuyf
* @date 2022/6/16
*/
@Slf4j
public abstract class AbstractMessageTransponder implements MessageTransponder {
/**
* canal connector
*/
private final CanalConnector connector;
/**
* custom config
*/
protected final CanalClientConfig config;
/**
* destination of canal server
*/
protected final String destination;
/**
* listeners which are used by implementing the Interface
*/
protected final List<CanalEvent> listeners = new ArrayList<>();
/**
* listeners which are used by annotation
*/
protected final List<ListenerPoint> annoListeners = new ArrayList<>();
/**
* running flag
*/
private volatile boolean running = true;
public AbstractMessageTransponder(CanalConnector connector,
Map.Entry<String,CanalClientConfig> config,
List<CanalEvent> listeners,
List<ListenerPoint> annoListeners) {
Objects.requireNonNull(connector, "connector can not be null!");
Objects.requireNonNull(config, "config can not be null!");
this.connector = connector;
this.destination = config.getKey();
this.config = config.getValue();
if (listeners != null) {
this.listeners.addAll(listeners);
}
if (annoListeners != null){
this.annoListeners.addAll(annoListeners);
}
}
@Override
public void run() {
int errorCount = config.getRetryCount();
final long interval = config.getAcquireInterval();
final String threadName = Thread.currentThread().getName();
while (running && !Thread.currentThread().isInterrupted()) {
try {
Message message = connector.getWithoutAck(config.getBatchSize());
long batchId = message.getId();
int size = message.getEntries().size();
if (log.isDebugEnabled()) {
log.debug("{}: Get message from canal server >>>>> size:{}", threadName, size);
}
//empty message
if (batchId == -1 || size == 0) {
if (log.isDebugEnabled()) {
log.debug("{}: Empty message... sleep for {} millis", threadName, interval);
}
Thread.sleep(interval);
} else {
distributeEvent(message);
}
connector.ack(batchId);
if (log.isDebugEnabled()) {
log.debug("{}: Ack message. batchId:{}", threadName, batchId);
}
} catch (CanalClientException e) {
errorCount--;
log.error(threadName + ": Error occurred!! ", e);
try {
Thread.sleep(interval);
} catch (InterruptedException e1) {
errorCount = 0;
}
} catch (InterruptedException e) {
errorCount = 0;
connector.rollback();
} finally {
if (errorCount <= 0) {
stop();
log.info("{}: Stopping the client.. ", Thread.currentThread().getName());
}
}
}
stop();
log.info("{}: client stopped. ", Thread.currentThread().getName());
}
/**
* to distribute the message to special event and let the event listeners to handle it
*
* @param message canal message
*/
protected abstract void distributeEvent(Message message);
/**
* stop running
*/
void stop() {
running = false;
}
}
package com.schbrain.canal.client.transfer;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.schbrain.canal.client.annotation.ListenPoint;
import com.schbrain.canal.client.conf.CanalClientConfig;
import com.schbrain.canal.client.core.ListenerPoint;
import com.schbrain.canal.client.event.CanalEvent;
import org.springframework.util.StringUtils;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
/**
* @author zhuyf
* @date 2022/6/16
*/
public class DefaultMessageTransponder extends AbstractBasicMessageTransponder{
public DefaultMessageTransponder(CanalConnector connector,
Map.Entry<String, CanalClientConfig> config,
List<CanalEvent> listeners,
List<ListenerPoint> annoListeners) {
super(connector, config, listeners, annoListeners);
}
@Override
protected Predicate<Map.Entry<Method, ListenPoint>> getAnnotationFilter(String destination, String schemaName, String tableName, CanalEntry.EventType eventType) {
Predicate<Map.Entry<Method, ListenPoint>> df = e -> StringUtils.isEmpty(e.getValue().destination())
|| e.getValue().destination().equals(destination);
Predicate<Map.Entry<Method, ListenPoint>> sf = e -> e.getValue().schema().length == 0
|| Arrays.stream(e.getValue().schema()).anyMatch(s -> s.equals(schemaName));
Predicate<Map.Entry<Method, ListenPoint>> tf = e -> e.getValue().table().length == 0
|| Arrays.stream(e.getValue().table()).anyMatch(t -> t.equals(tableName));
Predicate<Map.Entry<Method, ListenPoint>> ef = e -> e.getValue().eventType().length == 0
|| Arrays.stream(e.getValue().eventType()).anyMatch(ev -> ev == eventType);
return df.and(sf).and(tf).and(ef);
}
@Override
protected Object[] getInvokeArgs(Method method, CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
return Arrays.stream(method.getParameterTypes())
.map(p -> p == CanalEntry.EventType.class
? eventType
: p == CanalEntry.RowData.class
? rowData : null)
.toArray();
}
@Override
protected List<CanalEntry.EntryType> getIgnoreEntryTypes() {
return Arrays.asList(CanalEntry.EntryType.TRANSACTIONBEGIN, CanalEntry.EntryType.TRANSACTIONEND, CanalEntry.EntryType.HEARTBEAT);
}
}
package com.schbrain.canal.client.transfer;
import com.alibaba.otter.canal.client.CanalConnector;
import com.schbrain.canal.client.conf.CanalClientConfig;
import com.schbrain.canal.client.core.ListenerPoint;
import com.schbrain.canal.client.event.CanalEvent;
import java.util.List;
import java.util.Map;
/**
* @author zhuyf
* @date 2022/6/16
*/
public class DefaultTransponderFactory implements TransponderFactory{
@Override
public MessageTransponder newTransponder(CanalConnector connector, Map.Entry<String, CanalClientConfig> config, List<CanalEvent> listeners, List<ListenerPoint> annoListeners) {
return new DefaultMessageTransponder(connector, config, listeners, annoListeners);
}
}
package com.schbrain.canal.client.transfer;
/**
* @author zhuyf
* @date 2022/6/16
*/
public interface MessageTransponder extends Runnable{
}
package com.schbrain.canal.client.transfer;
/**
* @author zhuyf
* @date 2022/6/16
*/
public class MessageTransponders {
public static TransponderFactory defaultMessageTransponder() {
return new DefaultTransponderFactory();
}
}
package com.schbrain.canal.client.transfer;
import com.alibaba.otter.canal.client.CanalConnector;
import com.schbrain.canal.client.conf.CanalClientConfig;
import com.schbrain.canal.client.core.ListenerPoint;
import com.schbrain.canal.client.event.CanalEvent;
import java.util.List;
import java.util.Map;
/**
* @author zhuyf
* @date 2022/6/16
*/
public interface TransponderFactory {
/**
* @param connector connector
* @param config config
* @param listeners listeners
* @param annoListeners annoListeners
* @return MessageTransponder
*/
MessageTransponder newTransponder(CanalConnector connector, Map.Entry<String, CanalClientConfig> config, List<CanalEvent> listeners, List<ListenerPoint> annoListeners);
}
package com.schbrain.canal.client.utils;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import java.lang.annotation.Annotation;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* @author zhuyf
* @date 2022/6/16
*/
@Component
public class BeanUtil implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
BeanUtil.applicationContext = applicationContext;
}
/**
* get bean
* @param clazz
* @param <T>
* @return
*/
public static <T> T getBean(Class<T> clazz) {
T obj;
try {
obj = applicationContext.getBean(clazz);
} catch (Exception e) {
obj = null;
}
return obj;
}
/**
* get bean list
* @param clazz
* @param <T>
* @return
*/
public static <T> List<T> getBeansOfType(Class<T> clazz) {
Map<String, T> map;
try {
map = applicationContext.getBeansOfType(clazz);
} catch (Exception e) {
map = null;
}
return map == null ? null : new ArrayList<>(map.values());
}
/**
* get with annotation
* @param anno
* @return
*/
public static Map<String, Object> getBeansWithAnnotation(Class<? extends Annotation> anno) {
Map<String, Object> map;
try {
map = applicationContext.getBeansWithAnnotation(anno);
} catch (Exception e) {
map = null;
}
return map;
}
}
{
"groups": [
{
"name": "canal.client",
"type": "com.schbrain.canal.client.conf.SchbrainCanalConfig",
"sourceType": "com.schbrain.canal.client.conf.SchbrainCanalConfig"
}
],
"properties": [
{
"name": "canal.client.instances",
"type": "java.util.Map<java.lang.String,com.schbrain.canal.client.conf.CanalClientConfig>",
"description": "instance config",
"sourceType": "com.schbrain.canal.client.conf.SchbrainCanalConfig"
}
],
"hints": []
}
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>schbrain-canal</artifactId>
<groupId>com.schbrain.framework</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>schbrain-canal-web</artifactId>
<properties>
<maven.deploy.skip>true</maven.deploy.skip>
</properties>
<dependencies>
<dependency>
<groupId>com.schbrain.framework</groupId>
<artifactId>schbrain-canal-client</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
<build>
<finalName>${artifactId}-${version}</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
package com.schbrain.web;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ApiSchoolController {
@RequestMapping("/health")
public String health(){
return "OK";
}
}
package com.schbrain.web;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.schbrain.canal.client.annotation.TableFilter;
import com.schbrain.canal.client.event.CanalEvent;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* @author zhuyf
* @date 2022/6/16
*/
@Service("myCanalEvent")
@TableFilter(table = "uc_user_info",schame = "kp_user") //暂时未实现
public class MyCanalEvent implements CanalEvent {
@Override
public void onEvent(CanalEntry.Header header,CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
System.out.println("======"+header.getSchemaName()+":"+header.getTableName()+":"+eventType.name());
//如果是删除语句
if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
//如果是新增语句
} else if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
//如果是更新的语句
} else {
//变更前的数据
System.out.println("------->; before");
printColumn(rowData.getBeforeColumnsList());
//变更后的数据
System.out.println("------->; after");
printColumn(rowData.getAfterColumnsList());
}
}
private static void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
package com.schbrain.web;
import com.schbrain.canal.client.annotation.EnableCanalClient;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Hello world!
*/
@SpringBootApplication
@EnableCanalClient
public class SchbrainMainApplication
{
public static void main( String[] args )
{
SpringApplication.run(SchbrainMainApplication.class, args);
}
}
#canal.client.instances.example.host=192.168.0.59
canal.client.instances.kp_user.addresses=192.168.36.66:11111
canal.client.instances.kp_user.username=
canal.client.instances.kp_user.password=
canal.client.instances.kp_user.retryCount=10
#canal.client.instances.kp_user.zkHosts=192.168.22.22:2181,192.168.22.26:2181,192.168.22.21:2181
canal.client.instances.screenschbrain.addresses=192.168.36.66:11111
canal.client.instances.screenschbrain.username=
canal.client.instances.screenschbrain.password=
canal.client.instances.screenschbrain.retryCount=10
#canal.client.instances.screenschbrain.subscribe=.*\\\\..*
#canal.client.instances.screenschbrain.zkHosts=192.168.22.22:2181,192.168.22.26:2181,192.168.22.21:2181
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="FILE"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_PATH}/kp-user-hub/kp-user-hub.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_PATH}/kp-user-hub/kp-user-hub.%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy
class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<maxHistory>30</maxHistory>
</rollingPolicy>
<encoder>
<pattern>[%d{yyyy-MM-dd HH:mm:ss} %-5level %t] %logger{50} - %msg%n
</pattern>
</encoder>
</appender>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>[%d{yyyy-MM-dd HH:mm:ss} %-5level %t] %logger{50} - %msg%n
</pattern>
<charset>utf-8</charset>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT"/>
<appender-ref ref="FILE"/>
</root>
<logger name="com.schbrain" level="INFO" additivity="false">
<appender-ref ref="STDOUT" />
<appender-ref ref="FILE" />
</logger>
<logger name="org.springframework.web.servlet.PageNotFound" level="off" additivity="false">
<appender-ref ref="FILE" />
</logger>
<logger name="org.springframework.web.HttpRequestMethodNotSupportedException" level="off" additivity="false">
<appender-ref ref="FILE" />
</logger>
</configuration>
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment