diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..92322c4e0fd1244aaff671fa70310b69bd5b738d --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.idea/ +target/ diff --git a/README.md b/README.md index 18ede829a074ccb1a433d7dbca61a8e06074e49f..cbaaa8e19b305d36230f7def5dbe15d324c47ec7 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,3 @@ -# schbrain-canal +# canal-client-springboot-starter +canal客户端工具类 \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..963e03dcb61c53b983049277e62c76637dc126a8 --- /dev/null +++ b/pom.xml @@ -0,0 +1,25 @@ + + + 4.0.0 + + + com.schbrain.framework + schbrain-parent + 1.0.0-SNAPSHOT + + pom + + com.schbrain.framework + schbrain-canal + 1.0.0-SNAPSHOT + schbrain-canal + + + schbrain-canal-client + schbrain-canal-web + + + + \ No newline at end of file diff --git a/schbrain-canal-client/pom.xml b/schbrain-canal-client/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..81c3b5153d1914c112763bc44d8a983d01892f98 --- /dev/null +++ b/schbrain-canal-client/pom.xml @@ -0,0 +1,50 @@ + + + 4.0.0 + + + schbrain-canal + com.schbrain.framework + 1.0.0-SNAPSHOT + + + schbrain-canal-client + + schbrain-canal-client + + + + + + + com.alibaba.otter + canal.client + 1.1.5 + + + com.alibaba.otter + canal.protocol + 1.1.5 + + + com.schbrain.framework + schbrain-spring-support + + + org.springframework.boot + spring-boot-starter + + + org.springframework.boot + spring-boot-configuration-processor + true + + + org.springframework.boot + spring-boot-starter-test + test + + + + diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/CanalEventListener.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/CanalEventListener.java new file mode 100644 index 0000000000000000000000000000000000000000..959123f0030ffbcce6263ae7165ae6588f2f69c4 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/CanalEventListener.java @@ -0,0 +1,18 @@ +package com.schbrain.canal.client.annotation; + +import org.springframework.core.annotation.AliasFor; +import org.springframework.stereotype.Component; + +import java.lang.annotation.*; + + +@Target({ElementType.TYPE}) +@Retention(RetentionPolicy.RUNTIME) +@Documented +@Component +public @interface CanalEventListener { + + @AliasFor(annotation = Component.class) + String value() default ""; + +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/DeleteListenPoint.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/DeleteListenPoint.java new file mode 100644 index 0000000000000000000000000000000000000000..4e367d51233572e742fafed445ad9a75f897f2e4 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/DeleteListenPoint.java @@ -0,0 +1,45 @@ +package com.schbrain.canal.client.annotation; + +import com.alibaba.otter.canal.protocol.CanalEntry; +import org.springframework.core.annotation.AliasFor; + +import java.lang.annotation.*; + +/** + * ListenPoint for delete + * + * @author chen.qian + * @date 2018/3/19 + */ + +@Target({ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@Documented +@ListenPoint(eventType = CanalEntry.EventType.DELETE) +public @interface DeleteListenPoint { + + /** + * canal destination + * default for all + * @return canal destination + */ + @AliasFor(annotation = ListenPoint.class) + String destination() default ""; + + /** + * database schema which you are concentrate on + * default for all + * @return canal destination + */ + @AliasFor(annotation = ListenPoint.class) + String[] schema() default {}; + + /** + * tables which you are concentrate on + * default for all + * @return canal destination + */ + @AliasFor(annotation = ListenPoint.class) + String[] table() default {}; + +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/EnableCanalClient.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/EnableCanalClient.java new file mode 100644 index 0000000000000000000000000000000000000000..b972af6ecce8869b45925da29946bb125e8dc506 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/EnableCanalClient.java @@ -0,0 +1,16 @@ +package com.schbrain.canal.client.annotation; + +import com.schbrain.canal.client.conf.CanalClientConfiguration; +import com.schbrain.canal.client.conf.SchbrainCanalConfig; +import org.springframework.context.annotation.Import; + +import java.lang.annotation.*; + +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@Documented +@Inherited +@Import({SchbrainCanalConfig.class, CanalClientConfiguration.class}) +public @interface EnableCanalClient { + +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/InsertListenPoint.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/InsertListenPoint.java new file mode 100644 index 0000000000000000000000000000000000000000..6f454f22292e1a827b45b95db8f56fbc7498f175 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/InsertListenPoint.java @@ -0,0 +1,45 @@ +package com.schbrain.canal.client.annotation; + +import com.alibaba.otter.canal.protocol.CanalEntry; +import org.springframework.core.annotation.AliasFor; + +import java.lang.annotation.*; + +/** + * ListenPoint for insert + * + * @author chen.qian + * @date 2018/3/19 + */ + +@Target({ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@Documented +@ListenPoint(eventType = CanalEntry.EventType.INSERT) +public @interface InsertListenPoint { + + /** + * canal destination + * default for all + * @return canal destination + */ + @AliasFor(annotation = ListenPoint.class) + String destination() default ""; + + /** + * database schema which you are concentrate on + * default for all + * @return canal destination + */ + @AliasFor(annotation = ListenPoint.class) + String[] schema() default {}; + + /** + * tables which you are concentrate on + * default for all + * @return canal destination + */ + @AliasFor(annotation = ListenPoint.class) + String[] table() default {}; + +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/ListenPoint.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/ListenPoint.java new file mode 100644 index 0000000000000000000000000000000000000000..1cfb7f5ed02e2442185aa5b15b6443eaccd6a4ce --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/ListenPoint.java @@ -0,0 +1,48 @@ +package com.schbrain.canal.client.annotation; + +import com.alibaba.otter.canal.protocol.CanalEntry; + +import java.lang.annotation.*; + +/** + * used to indicate that method(or methods) is(are) the candidate of the + * canal event distributor + * + * @author chen.qian + * @date 2018/3/19 + */ + +@Target({ElementType.METHOD, ElementType.TYPE}) +@Retention(RetentionPolicy.RUNTIME) +@Documented +public @interface ListenPoint { + + /** + * canal destination + * default for all + * @return canal destination + */ + String destination() default ""; + + /** + * database schema which you are concentrate on + * default for all + * @return canal destination + */ + String[] schema() default {}; + + /** + * tables which you are concentrate on + * default for all + * @return canal destination + */ + String[] table() default {}; + + /** + * canal event type + * default for all + * @return canal event type + */ + CanalEntry.EventType[] eventType() default {}; + +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/TableFilter.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/TableFilter.java new file mode 100644 index 0000000000000000000000000000000000000000..7bebdc454cb6c383d74c030160e1cb9a2aeea3c2 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/TableFilter.java @@ -0,0 +1,28 @@ +package com.schbrain.canal.client.annotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Table + * @author zhuyf + * @date 2022/6/16 + */ +@Target(ElementType.TYPE) +@Retention(value = RetentionPolicy.RUNTIME) +public @interface TableFilter { + /** + * 表名称 + * @return + */ + String table(); + + /** + * 数据库名称 + * @return + */ + String schame(); + +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/UpdateListenPoint.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/UpdateListenPoint.java new file mode 100644 index 0000000000000000000000000000000000000000..f779c3fa97958dab56f0e9b06c39dfdee9e44cd3 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/UpdateListenPoint.java @@ -0,0 +1,45 @@ +package com.schbrain.canal.client.annotation; + +import com.alibaba.otter.canal.protocol.CanalEntry; +import org.springframework.core.annotation.AliasFor; + +import java.lang.annotation.*; + +/** + * ListenPoint for update + * + * @author chen.qian + * @date 2018/3/19 + */ + +@Target({ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@Documented +@ListenPoint(eventType = CanalEntry.EventType.UPDATE) +public @interface UpdateListenPoint { + + /** + * canal destination + * default for all + * @return canal destination + */ + @AliasFor(annotation = ListenPoint.class) + String destination() default ""; + + /** + * database schema which you are concentrate on + * default for all + * @return canal destination + */ + @AliasFor(annotation = ListenPoint.class) + String[] schema() default {}; + + /** + * tables which you are concentrate on + * default for all + * @return canal destination + */ + @AliasFor(annotation = ListenPoint.class) + String[] table() default {}; + +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/conf/CanalClientConfig.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/conf/CanalClientConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..46e73c1aeceabe98011c34dc3b449c80bc98cb8b --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/conf/CanalClientConfig.java @@ -0,0 +1,30 @@ +package com.schbrain.canal.client.conf; + +import lombok.Data; + +/** + * CANAL Config + * + * @author zhuyf + * @date 2022/6/16 + */ +@Data +public class CanalClientConfig { + + private String addresses; + + private String subscribe; + + private String zkHosts; + + private String username = ""; + + private String password = ""; + + private int retryCount; + + private long acquireInterval = 1000; + + private int batchSize = 1000; +} + diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/conf/CanalClientConfiguration.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/conf/CanalClientConfiguration.java new file mode 100644 index 0000000000000000000000000000000000000000..1747973c28a28e20e538367344a9e3af8c052dbd --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/conf/CanalClientConfiguration.java @@ -0,0 +1,40 @@ +package com.schbrain.canal.client.conf; + +import com.schbrain.canal.client.core.CanalClient; +import com.schbrain.canal.client.core.SimpleCanalClient; +import com.schbrain.canal.client.transfer.MessageTransponders; +import com.schbrain.canal.client.transfer.TransponderFactory; +import com.schbrain.canal.client.utils.BeanUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.core.Ordered; +import org.springframework.core.annotation.Order; + +/** + * 启动配置类 + * + * @author zhuyf + * @date 2022/6/16 + */ +@Slf4j +public class CanalClientConfiguration { + + @Autowired + private SchbrainCanalConfig schbrainCanalConfig; + + @Bean + @Order(Ordered.HIGHEST_PRECEDENCE) + public BeanUtil beanUtil() { + return new BeanUtil(); + } + + @Bean + private CanalClient canalClient() { + log.info("starting canal client...."); + TransponderFactory factory = MessageTransponders.defaultMessageTransponder(); + CanalClient client = new SimpleCanalClient(schbrainCanalConfig,factory); + client.start(); + return client; + } +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/conf/SchbrainCanalConfig.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/conf/SchbrainCanalConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..b6e5ca26e0b2c76f25e4cfd2b4e4cefe682adeb4 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/conf/SchbrainCanalConfig.java @@ -0,0 +1,37 @@ +package com.schbrain.canal.client.conf; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * Canal配置 + * @author zhuyf + * @date 2022/6/16 + */ +@Component +@ConfigurationProperties(prefix = "canal.client") +public class SchbrainCanalConfig { + + /** + * instance config + */ + private Map instances = new LinkedHashMap<>(); + + /** + * 获取实例列表 + * @return + */ + public Map getInstances() { + return instances; + } + + /** + * 设置实例列表 + * @param instances + */ + public void setInstances(Map instances) { + this.instances = instances; + } +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/AbstractCanalClient.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/AbstractCanalClient.java new file mode 100644 index 0000000000000000000000000000000000000000..8786b1ab9ac5b122e60115f75ef5ea89a839f23f --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/AbstractCanalClient.java @@ -0,0 +1,95 @@ +package com.schbrain.canal.client.core; + +import com.alibaba.otter.canal.client.CanalConnector; +import com.schbrain.canal.client.conf.CanalClientConfig; +import com.schbrain.canal.client.conf.SchbrainCanalConfig; +import com.schbrain.canal.client.exception.CanalClientException; +import com.schbrain.canal.client.transfer.TransponderFactory; +import org.apache.commons.lang3.StringUtils; +import java.util.Map; +import java.util.Objects; + +public abstract class AbstractCanalClient implements CanalClient { + /** + * running flag + */ + private volatile boolean running; + + /** + * customer config + */ + private SchbrainCanalConfig canalConfig; + + /** + * TransponderFactory + */ + protected final TransponderFactory factory; + + + AbstractCanalClient(SchbrainCanalConfig canalConfig,TransponderFactory factory) { + Objects.requireNonNull(canalConfig, "canalConfig can not be null!"); + Objects.requireNonNull(canalConfig, "transponderFactory can not be null!"); + this.canalConfig = canalConfig; + this.factory = factory; + } + + @Override + public void start() { + Map instanceMap = getConfig(); + for (Map.Entry instanceEntry : instanceMap.entrySet()) { + CanalConnector connector = processInstanceEntry(instanceEntry); + process(connector, instanceEntry); + } + + } + + /** + * To initialize the canal connector + * @param connector CanalConnector + * @param config config + */ + protected abstract void process(CanalConnector connector,Map.Entry config); + + /** + * 连接 + * @param instanceEntry + * @return + */ + private CanalConnector processInstanceEntry(Map.Entry instanceEntry) { + CanalClientConfig instance = instanceEntry.getValue(); + String destination = instanceEntry.getKey(); + CanalConnector connector = ConnectionFactory.create(instance,destination); + connector.connect(); + if (!StringUtils.isEmpty(instance.getSubscribe())) { + connector.subscribe(instance.getSubscribe()); + } else { + connector.subscribe(); + } + connector.rollback(); + return connector; + } + + protected Map getConfig() { + SchbrainCanalConfig config = canalConfig; + Map instanceMap; + if (config != null && (instanceMap = config.getInstances()) != null && !instanceMap.isEmpty()) { + return config.getInstances(); + } else { + throw new CanalClientException("can not get the configuration of canal client!"); + } + } + + @Override + public void stop() { + setRunning(false); + } + + @Override + public boolean isRunning() { + return running; + } + + private void setRunning(boolean running) { + this.running = running; + } +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/CanalClient.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/CanalClient.java new file mode 100644 index 0000000000000000000000000000000000000000..da6dc9bf7a3b1185ec02ca02c6eb7b67b327a367 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/CanalClient.java @@ -0,0 +1,24 @@ +package com.schbrain.canal.client.core; + +/** + * CanalClient + * @author zhuyf + * @date 2022/6/16 + */ +public interface CanalClient { + /** + * start + */ + void start(); + + /** + * stop + */ + void stop(); + + /** + * is running + * @return yes or no + */ + boolean isRunning(); +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/ConnectionFactory.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/ConnectionFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..be0d7ab337b55918a3f5db56393af43ebc108c17 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/ConnectionFactory.java @@ -0,0 +1,58 @@ +package com.schbrain.canal.client.core; + +import com.alibaba.otter.canal.client.CanalConnector; +import com.alibaba.otter.canal.client.CanalConnectors; +import com.alibaba.otter.canal.client.impl.ClusterCanalConnector; +import com.alibaba.otter.canal.client.impl.ClusterNodeAccessStrategy; +import com.alibaba.otter.canal.common.zookeeper.ZkClientx; +import com.schbrain.canal.client.conf.CanalClientConfig; +import com.schbrain.canal.client.exception.CanalClientException; +import org.apache.commons.lang.StringUtils; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; + + +public class ConnectionFactory { + + /** + * 创建连接 + * @param config + * @param destination + * @return + */ + public static CanalConnector create(CanalClientConfig config, String destination){ + if(StringUtils.isNotBlank(config.getAddresses())){ + List inetSocketAddressList=new ArrayList<>(); + String[] hosts=config.getAddresses().split(","); + for (String hostInfo: hosts) { + String[] hostAndPort= hostInfo.split(":"); + inetSocketAddressList.add(new InetSocketAddress(hostAndPort[0], Integer.valueOf(hostAndPort[1]))); + } + return CanalConnectors.newClusterConnector(inetSocketAddressList,destination, config.getUsername(),config.getPassword()); + }else if(StringUtils.isNotBlank(config.getZkHosts())){ + return newClusterConnector(config.getZkHosts(),destination, config.getUsername(), config.getPassword()); + }else{ + new CanalClientException("zkHosts and addresses cannot all empty"); + return null; + } + } + + /** + * 创建带cluster模式的客户端链接,自动完成failover切换,服务器列表自动扫描 + * + * @param zkServers + * @param destination + * @param username + * @param password + * @return + */ + public static CanalConnector newClusterConnector(String zkServers, String destination, String username, String password) { + ClusterCanalConnector canalConnector = new ClusterCanalConnector(username, password, destination, new ClusterNodeAccessStrategy(destination, ZkClientx.getZkClient(zkServers))); + canalConnector.setSoTimeout(60 * 1000); + canalConnector.setIdleTimeout(60 * 60 * 1000); + return canalConnector; + } + +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/ListenerPoint.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/ListenerPoint.java new file mode 100644 index 0000000000000000000000000000000000000000..4f32e83f17afbf070a51358d6c9276319bac2850 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/ListenerPoint.java @@ -0,0 +1,30 @@ +package com.schbrain.canal.client.core; + +import com.schbrain.canal.client.annotation.ListenPoint; + +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; + +/** + * @author zhuyf + * @date 2022/6/16 + */ +public class ListenerPoint { + + private Object target; + private Map invokeMap = new HashMap<>(); + + ListenerPoint(Object target, Method method, ListenPoint anno) { + this.target = target; + this.invokeMap.put(method, anno); + } + + public Object getTarget() { + return target; + } + + public Map getInvokeMap() { + return invokeMap; + } +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/SimpleCanalClient.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/SimpleCanalClient.java new file mode 100644 index 0000000000000000000000000000000000000000..9c0885919eee5b8112680ad21a69b77393a6fa73 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/SimpleCanalClient.java @@ -0,0 +1,85 @@ +package com.schbrain.canal.client.core; + +import com.alibaba.otter.canal.client.CanalConnector; +import com.schbrain.canal.client.annotation.CanalEventListener; +import com.schbrain.canal.client.annotation.ListenPoint; +import com.schbrain.canal.client.annotation.TableFilter; +import com.schbrain.canal.client.conf.CanalClientConfig; +import com.schbrain.canal.client.conf.SchbrainCanalConfig; +import com.schbrain.canal.client.event.CanalEvent; +import com.schbrain.canal.client.transfer.TransponderFactory; +import com.schbrain.canal.client.utils.BeanUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.core.annotation.AnnotationUtils; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class SimpleCanalClient extends AbstractCanalClient { + /** + * executor + */ + private ThreadPoolExecutor executor; + + private final List listeners = new ArrayList<>(); + + private final List annoListeners = new ArrayList<>(); + + public SimpleCanalClient(SchbrainCanalConfig canalConfig, TransponderFactory factory) { + super(canalConfig,factory); + executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), Executors.defaultThreadFactory()); + initListeners(); + } + + private void initListeners() { + log.info("{}: initializing the listeners....", Thread.currentThread().getName()); + List list = BeanUtil.getBeansOfType(CanalEvent.class); + if(list!=null && list.size() > 0){ + for (CanalEvent canalEvent : list) { + TableFilter table = canalEvent.getClass().getAnnotation(TableFilter.class); + if(table!=null){ + System.out.println(table.schame()); + System.out.println(table.table()); + } + } + listeners.addAll(list); + } + Map listenerMap = BeanUtil.getBeansWithAnnotation(CanalEventListener.class); + if (listenerMap != null) { + for (Object target : listenerMap.values()) { + Method[] methods = target.getClass().getDeclaredMethods(); + if (methods != null && methods.length > 0) { + for (Method method : methods) { + ListenPoint l = AnnotationUtils.findAnnotation(method, ListenPoint.class); + if (l != null) { + annoListeners.add(new ListenerPoint(target, method, l)); + } + } + } + } + } + log.info("{}: initializing the listeners end.", Thread.currentThread().getName()); + if (log.isWarnEnabled() && listeners.isEmpty() && annoListeners.isEmpty()) { + log.warn("{}: No listener found in context! ", Thread.currentThread().getName()); + } + } + + @Override + protected void process(CanalConnector connector, Map.Entry config) { + executor.submit(factory.newTransponder(connector, config, listeners, annoListeners)); + //factory.newTransponder(connector, config, listeners, annoListeners).run(); + } + + @Override + public void stop() { + super.stop(); + executor.shutdown(); + } +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/event/CanalEvent.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/event/CanalEvent.java new file mode 100644 index 0000000000000000000000000000000000000000..bd54f63cfff7a1069f629f85450ce634eb2fd933 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/event/CanalEvent.java @@ -0,0 +1,12 @@ +package com.schbrain.canal.client.event; + +import com.alibaba.otter.canal.protocol.CanalEntry; + +public interface CanalEvent { + /** + * on event + * @param eventType + * @param rowData + */ + void onEvent(CanalEntry.Header header,CanalEntry.EventType eventType, CanalEntry.RowData rowData); +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/event/DefCanalEvent.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/event/DefCanalEvent.java new file mode 100644 index 0000000000000000000000000000000000000000..b0298bd5fdf5438d908229c13f18a5919bc45912 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/event/DefCanalEvent.java @@ -0,0 +1,41 @@ +package com.schbrain.canal.client.event; + +import com.alibaba.otter.canal.protocol.CanalEntry; + +public interface DefCanalEvent extends CanalEvent{ + + @Override + default void onEvent(CanalEntry.Header header,CanalEntry.EventType eventType, CanalEntry.RowData rowData){ + switch (eventType) { + case INSERT: + onInsert(header,rowData); + break; + case UPDATE: + onUpdate(header,rowData); + break; + case DELETE: + onDelete(header,rowData); + break; + default: + break; + } + } + + /** + * onInsert + * @param rowData + */ + void onInsert(CanalEntry.Header header,CanalEntry.RowData rowData); + + /** + * onUpdate + * @param rowData + */ + void onUpdate(CanalEntry.Header header,CanalEntry.RowData rowData); + + /** + * onDelete + * @param rowData + */ + void onDelete(CanalEntry.Header header,CanalEntry.RowData rowData); +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/exception/CanalClientException.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/exception/CanalClientException.java new file mode 100644 index 0000000000000000000000000000000000000000..fe91563f1d8aea2f8589b191ce4b327331a1e132 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/exception/CanalClientException.java @@ -0,0 +1,27 @@ +package com.schbrain.canal.client.exception; + +/** + * @author zhuyf + * @date 2022/6/16 + */ +public class CanalClientException extends RuntimeException{ + + public CanalClientException() { + } + + public CanalClientException(String message) { + super(message); + } + + public CanalClientException(String message, Throwable cause) { + super(message, cause); + } + + public CanalClientException(Throwable cause) { + super(cause); + } + + public CanalClientException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/transfer/AbstractBasicMessageTransponder.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/transfer/AbstractBasicMessageTransponder.java new file mode 100644 index 0000000000000000000000000000000000000000..a97d75a55950123317c831ae92e1f4c9f35f83e8 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/transfer/AbstractBasicMessageTransponder.java @@ -0,0 +1,143 @@ +package com.schbrain.canal.client.transfer; + +import com.alibaba.otter.canal.client.CanalConnector; +import com.alibaba.otter.canal.protocol.CanalEntry; +import com.alibaba.otter.canal.protocol.Message; +import com.schbrain.canal.client.annotation.ListenPoint; +import com.schbrain.canal.client.conf.CanalClientConfig; +import com.schbrain.canal.client.core.ListenerPoint; +import com.schbrain.canal.client.event.CanalEvent; +import com.schbrain.canal.client.exception.CanalClientException; +import lombok.extern.slf4j.Slf4j; + +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; + +/** + * @author zhuyf + * @date 2022/6/16 + */ +@Slf4j +public abstract class AbstractBasicMessageTransponder extends AbstractMessageTransponder{ + + public AbstractBasicMessageTransponder(CanalConnector connector, Map.Entry config, List listeners, List annoListeners) { + super(connector, config, listeners, annoListeners); + } + + @Override + protected void distributeEvent(Message message) { + List entries = message.getEntries(); + for (CanalEntry.Entry entry : entries) { + //忽略不处理的 + List ignoreEntryTypes = getIgnoreEntryTypes(); + if (ignoreEntryTypes != null && ignoreEntryTypes.stream().anyMatch(t -> entry.getEntryType() == t)) { + continue; + } + CanalEntry.RowChange rowChange; + try { + rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); + } catch (Exception e) { + throw new CanalClientException("ERROR ## parser of event has an error , data:" + entry.toString(),e); + } + //ignore the ddl operation + if (rowChange.hasIsDdl() && rowChange.getIsDdl()) { + processDdl(rowChange); + continue; + } + CanalEntry.Header header = entry.getHeader(); + for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { + //distribute to listener interfaces + distributeByImpl(header,rowChange.getEventType(), rowData); + //distribute to annotation listener interfaces + distributeByAnnotation(destination, + entry.getHeader().getSchemaName(), + entry.getHeader().getTableName(), + rowChange.getEventType(), + rowData); + } + } + } + + /** + * distribute to listener interfaces + * + * @param eventType eventType + * @param rowData rowData + */ + protected void distributeByImpl(CanalEntry.Header header,CanalEntry.EventType eventType, CanalEntry.RowData rowData) { + if (listeners != null) { + for (CanalEvent listener : listeners) { + listener.onEvent(header,eventType, rowData); + } + } + } + + /** + * distribute to annotation listener interfaces + * + * @param destination destination + * @param schemaName schema + * @param tableName table name + * @param eventType event type + * @param rowData row data + */ + protected void distributeByAnnotation(String destination, + String schemaName, + String tableName, + CanalEntry.EventType eventType, + CanalEntry.RowData rowData) { + //invoke the listeners + annoListeners.forEach(point -> point + .getInvokeMap() + .entrySet() + .stream() + .filter(getAnnotationFilter(destination, schemaName, tableName, eventType)) + .forEach(entry -> { + Method method = entry.getKey(); + method.setAccessible(true); + try { + Object[] args = getInvokeArgs(method, eventType, rowData); + method.invoke(point.getTarget(), args); + } catch (Exception e) { + log.error("{}: Error occurred when invoke the listener's interface! class:{}, method:{}", + Thread.currentThread().getName(), + point.getTarget().getClass().getName(), method.getName()); + } + })); + } + + /** + * get the filters predicate + * @param destination destination + * @param schemaName schema + * @param tableName table name + * @param eventType event type + * @return predicate + */ + protected abstract Predicate> getAnnotationFilter(String destination, + String schemaName, + String tableName, + CanalEntry.EventType eventType); + + /** + * get the args + * @param method method + * @param eventType event type + * @param rowData row data + * @return args which will be used by invoking the annotation methods + */ + protected abstract Object[] getInvokeArgs(Method method, CanalEntry.EventType eventType,CanalEntry.RowData rowData); + + /** + * ddl事件 + * @param rowChange + */ + protected void processDdl(CanalEntry.RowChange rowChange) {} + + protected List getIgnoreEntryTypes() { + return Collections.emptyList(); + } +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/transfer/AbstractMessageTransponder.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/transfer/AbstractMessageTransponder.java new file mode 100644 index 0000000000000000000000000000000000000000..2a6397c5097c805dc14ea19dfadb76e91527b682 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/transfer/AbstractMessageTransponder.java @@ -0,0 +1,134 @@ +package com.schbrain.canal.client.transfer; + +import com.alibaba.otter.canal.client.CanalConnector; +import com.alibaba.otter.canal.protocol.Message; +import com.alibaba.otter.canal.protocol.exception.CanalClientException; +import com.schbrain.canal.client.conf.CanalClientConfig; +import com.schbrain.canal.client.core.ListenerPoint; +import com.schbrain.canal.client.event.CanalEvent; +import lombok.extern.slf4j.Slf4j; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * @author zhuyf + * @date 2022/6/16 + */ +@Slf4j +public abstract class AbstractMessageTransponder implements MessageTransponder { + + /** + * canal connector + */ + private final CanalConnector connector; + + /** + * custom config + */ + protected final CanalClientConfig config; + + /** + * destination of canal server + */ + protected final String destination; + + /** + * listeners which are used by implementing the Interface + */ + protected final List listeners = new ArrayList<>(); + + /** + * listeners which are used by annotation + */ + protected final List annoListeners = new ArrayList<>(); + + /** + * running flag + */ + private volatile boolean running = true; + + public AbstractMessageTransponder(CanalConnector connector, + Map.Entry config, + List listeners, + List annoListeners) { + Objects.requireNonNull(connector, "connector can not be null!"); + Objects.requireNonNull(config, "config can not be null!"); + this.connector = connector; + this.destination = config.getKey(); + this.config = config.getValue(); + if (listeners != null) { + this.listeners.addAll(listeners); + } + if (annoListeners != null){ + this.annoListeners.addAll(annoListeners); + } + } + + @Override + public void run() { + int errorCount = config.getRetryCount(); + final long interval = config.getAcquireInterval(); + final String threadName = Thread.currentThread().getName(); + while (running && !Thread.currentThread().isInterrupted()) { + try { + Message message = connector.getWithoutAck(config.getBatchSize()); + long batchId = message.getId(); + int size = message.getEntries().size(); + if (log.isDebugEnabled()) { + log.debug("{}: Get message from canal server >>>>> size:{}", threadName, size); + } + //empty message + if (batchId == -1 || size == 0) { + if (log.isDebugEnabled()) { + log.debug("{}: Empty message... sleep for {} millis", threadName, interval); + } + Thread.sleep(interval); + } else { + distributeEvent(message); + } + connector.ack(batchId); + if (log.isDebugEnabled()) { + log.debug("{}: Ack message. batchId:{}", threadName, batchId); + } + } catch (CanalClientException e) { + errorCount--; + log.error(threadName + ": Error occurred!! ", e); + try { + Thread.sleep(interval); + } catch (InterruptedException e1) { + errorCount = 0; + } + } catch (InterruptedException e) { + errorCount = 0; + connector.rollback(); + } finally { + if (errorCount <= 0) { + stop(); + log.info("{}: Stopping the client.. ", Thread.currentThread().getName()); + } + } + } + stop(); + log.info("{}: client stopped. ", Thread.currentThread().getName()); + } + + /** + * to distribute the message to special event and let the event listeners to handle it + * + * @param message canal message + */ + protected abstract void distributeEvent(Message message); + + /** + * stop running + */ + void stop() { + running = false; + } + +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/transfer/DefaultMessageTransponder.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/transfer/DefaultMessageTransponder.java new file mode 100644 index 0000000000000000000000000000000000000000..aa0d9ff6ca9757dfd8bde1df8be16cebaaffc43b --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/transfer/DefaultMessageTransponder.java @@ -0,0 +1,58 @@ +package com.schbrain.canal.client.transfer; + +import com.alibaba.otter.canal.client.CanalConnector; +import com.alibaba.otter.canal.protocol.CanalEntry; +import com.schbrain.canal.client.annotation.ListenPoint; +import com.schbrain.canal.client.conf.CanalClientConfig; +import com.schbrain.canal.client.core.ListenerPoint; +import com.schbrain.canal.client.event.CanalEvent; +import org.springframework.util.StringUtils; + +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; + +/** + * @author zhuyf + * @date 2022/6/16 + */ +public class DefaultMessageTransponder extends AbstractBasicMessageTransponder{ + + public DefaultMessageTransponder(CanalConnector connector, + Map.Entry config, + List listeners, + List annoListeners) { + super(connector, config, listeners, annoListeners); + } + + @Override + protected Predicate> getAnnotationFilter(String destination, String schemaName, String tableName, CanalEntry.EventType eventType) { + Predicate> df = e -> StringUtils.isEmpty(e.getValue().destination()) + || e.getValue().destination().equals(destination); + Predicate> sf = e -> e.getValue().schema().length == 0 + || Arrays.stream(e.getValue().schema()).anyMatch(s -> s.equals(schemaName)); + Predicate> tf = e -> e.getValue().table().length == 0 + || Arrays.stream(e.getValue().table()).anyMatch(t -> t.equals(tableName)); + Predicate> ef = e -> e.getValue().eventType().length == 0 + || Arrays.stream(e.getValue().eventType()).anyMatch(ev -> ev == eventType); + return df.and(sf).and(tf).and(ef); + } + + @Override + protected Object[] getInvokeArgs(Method method, CanalEntry.EventType eventType, CanalEntry.RowData rowData) { + return Arrays.stream(method.getParameterTypes()) + .map(p -> p == CanalEntry.EventType.class + ? eventType + : p == CanalEntry.RowData.class + ? rowData : null) + .toArray(); + } + + + @Override + protected List getIgnoreEntryTypes() { + return Arrays.asList(CanalEntry.EntryType.TRANSACTIONBEGIN, CanalEntry.EntryType.TRANSACTIONEND, CanalEntry.EntryType.HEARTBEAT); + } +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/transfer/DefaultTransponderFactory.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/transfer/DefaultTransponderFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..6166bcc3db986f552afd1a568c83295ebb3908e1 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/transfer/DefaultTransponderFactory.java @@ -0,0 +1,21 @@ +package com.schbrain.canal.client.transfer; + +import com.alibaba.otter.canal.client.CanalConnector; +import com.schbrain.canal.client.conf.CanalClientConfig; +import com.schbrain.canal.client.core.ListenerPoint; +import com.schbrain.canal.client.event.CanalEvent; + +import java.util.List; +import java.util.Map; + +/** + * @author zhuyf + * @date 2022/6/16 + */ +public class DefaultTransponderFactory implements TransponderFactory{ + + @Override + public MessageTransponder newTransponder(CanalConnector connector, Map.Entry config, List listeners, List annoListeners) { + return new DefaultMessageTransponder(connector, config, listeners, annoListeners); + } +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/transfer/MessageTransponder.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/transfer/MessageTransponder.java new file mode 100644 index 0000000000000000000000000000000000000000..2acaca2ea95f1907ff5ca02c336bff0ded421f44 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/transfer/MessageTransponder.java @@ -0,0 +1,8 @@ +package com.schbrain.canal.client.transfer; + +/** + * @author zhuyf + * @date 2022/6/16 + */ +public interface MessageTransponder extends Runnable{ +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/transfer/MessageTransponders.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/transfer/MessageTransponders.java new file mode 100644 index 0000000000000000000000000000000000000000..fa2cb3f4aeb89de8333a98a9cc84e21039b25d3c --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/transfer/MessageTransponders.java @@ -0,0 +1,12 @@ +package com.schbrain.canal.client.transfer; + +/** + * @author zhuyf + * @date 2022/6/16 + */ +public class MessageTransponders { + + public static TransponderFactory defaultMessageTransponder() { + return new DefaultTransponderFactory(); + } +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/transfer/TransponderFactory.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/transfer/TransponderFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..5669a5bb7f7aed71a7ea95470cb84d945308fe7b --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/transfer/TransponderFactory.java @@ -0,0 +1,25 @@ +package com.schbrain.canal.client.transfer; + +import com.alibaba.otter.canal.client.CanalConnector; +import com.schbrain.canal.client.conf.CanalClientConfig; +import com.schbrain.canal.client.core.ListenerPoint; +import com.schbrain.canal.client.event.CanalEvent; + +import java.util.List; +import java.util.Map; + +/** + * @author zhuyf + * @date 2022/6/16 + */ +public interface TransponderFactory { + + /** + * @param connector connector + * @param config config + * @param listeners listeners + * @param annoListeners annoListeners + * @return MessageTransponder + */ + MessageTransponder newTransponder(CanalConnector connector, Map.Entry config, List listeners, List annoListeners); +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/utils/BeanUtil.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/utils/BeanUtil.java new file mode 100644 index 0000000000000000000000000000000000000000..016cd10f2058d9730efb3fa8cebf60a810edd67c --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/utils/BeanUtil.java @@ -0,0 +1,73 @@ +package com.schbrain.canal.client.utils; + +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.stereotype.Component; + +import java.lang.annotation.Annotation; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * @author zhuyf + * @date 2022/6/16 + */ +@Component +public class BeanUtil implements ApplicationContextAware { + + private static ApplicationContext applicationContext; + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + BeanUtil.applicationContext = applicationContext; + } + + /** + * get bean + * @param clazz + * @param + * @return + */ + public static T getBean(Class clazz) { + T obj; + try { + obj = applicationContext.getBean(clazz); + } catch (Exception e) { + obj = null; + } + return obj; + } + + /** + * get bean list + * @param clazz + * @param + * @return + */ + public static List getBeansOfType(Class clazz) { + Map map; + try { + map = applicationContext.getBeansOfType(clazz); + } catch (Exception e) { + map = null; + } + return map == null ? null : new ArrayList<>(map.values()); + } + + /** + * get with annotation + * @param anno + * @return + */ + public static Map getBeansWithAnnotation(Class anno) { + Map map; + try { + map = applicationContext.getBeansWithAnnotation(anno); + } catch (Exception e) { + map = null; + } + return map; + } +} diff --git a/schbrain-canal-client/src/main/resources/META-INF/spring-configuration-metadata.json b/schbrain-canal-client/src/main/resources/META-INF/spring-configuration-metadata.json new file mode 100644 index 0000000000000000000000000000000000000000..6fdb085239fd0e5c39f46e8cbe653a820146eadd --- /dev/null +++ b/schbrain-canal-client/src/main/resources/META-INF/spring-configuration-metadata.json @@ -0,0 +1,18 @@ +{ + "groups": [ + { + "name": "canal.client", + "type": "com.schbrain.canal.client.conf.SchbrainCanalConfig", + "sourceType": "com.schbrain.canal.client.conf.SchbrainCanalConfig" + } + ], + "properties": [ + { + "name": "canal.client.instances", + "type": "java.util.Map", + "description": "instance config", + "sourceType": "com.schbrain.canal.client.conf.SchbrainCanalConfig" + } + ], + "hints": [] +} \ No newline at end of file diff --git a/schbrain-canal-web/pom.xml b/schbrain-canal-web/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..5cb19193a47e88791ba2f3fddf55eadd0e186f71 --- /dev/null +++ b/schbrain-canal-web/pom.xml @@ -0,0 +1,58 @@ + + + + + schbrain-canal + com.schbrain.framework + 1.0.0-SNAPSHOT + + 4.0.0 + + schbrain-canal-web + + + true + + + + + + com.schbrain.framework + schbrain-canal-client + 1.0.0-SNAPSHOT + + + org.springframework.boot + spring-boot-starter-logging + + + org.springframework.boot + spring-boot-starter-actuator + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-test + test + + + org.projectlombok + lombok + + + + + ${artifactId}-${version} + + + org.springframework.boot + spring-boot-maven-plugin + + + + + diff --git a/schbrain-canal-web/src/main/java/com/schbrain/web/ApiSchoolController.java b/schbrain-canal-web/src/main/java/com/schbrain/web/ApiSchoolController.java new file mode 100644 index 0000000000000000000000000000000000000000..df5876733a86f70d6f0f8ee72e50ce4056284a0e --- /dev/null +++ b/schbrain-canal-web/src/main/java/com/schbrain/web/ApiSchoolController.java @@ -0,0 +1,15 @@ +package com.schbrain.web; + +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + + +@RestController +public class ApiSchoolController { + + @RequestMapping("/health") + public String health(){ + return "OK"; + } + +} diff --git a/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent.java b/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent.java new file mode 100644 index 0000000000000000000000000000000000000000..6125046cc62ef6cabe764a2637daf2ea0ced7bfb --- /dev/null +++ b/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent.java @@ -0,0 +1,44 @@ +package com.schbrain.web; + +import com.alibaba.otter.canal.protocol.CanalEntry; +import com.schbrain.canal.client.annotation.TableFilter; +import com.schbrain.canal.client.event.CanalEvent; +import org.springframework.stereotype.Service; + +import java.util.List; + +/** + * @author zhuyf + * @date 2022/6/16 + */ +@Service("myCanalEvent") +@TableFilter(table = "uc_user_info",schame = "kp_user") //暂时未实现 +public class MyCanalEvent implements CanalEvent { + + @Override + public void onEvent(CanalEntry.Header header,CanalEntry.EventType eventType, CanalEntry.RowData rowData) { + System.out.println("======"+header.getSchemaName()+":"+header.getTableName()+":"+eventType.name()); + //如果是删除语句 + if (eventType == CanalEntry.EventType.DELETE) { + printColumn(rowData.getBeforeColumnsList()); + //如果是新增语句 + } else if (eventType == CanalEntry.EventType.INSERT) { + printColumn(rowData.getAfterColumnsList()); + //如果是更新的语句 + } else { + //变更前的数据 + System.out.println("------->; before"); + printColumn(rowData.getBeforeColumnsList()); + //变更后的数据 + System.out.println("------->; after"); + printColumn(rowData.getAfterColumnsList()); + } + + } + + private static void printColumn(List columns) { + for (CanalEntry.Column column : columns) { + System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); + } + } +} diff --git a/schbrain-canal-web/src/main/java/com/schbrain/web/SchbrainMainApplication.java b/schbrain-canal-web/src/main/java/com/schbrain/web/SchbrainMainApplication.java new file mode 100644 index 0000000000000000000000000000000000000000..1718dce1c460b0ae8f006525916b1806e107d36a --- /dev/null +++ b/schbrain-canal-web/src/main/java/com/schbrain/web/SchbrainMainApplication.java @@ -0,0 +1,18 @@ +package com.schbrain.web; + +import com.schbrain.canal.client.annotation.EnableCanalClient; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * Hello world! + */ +@SpringBootApplication +@EnableCanalClient +public class SchbrainMainApplication +{ + public static void main( String[] args ) + { + SpringApplication.run(SchbrainMainApplication.class, args); + } +} diff --git a/schbrain-canal-web/src/main/resources/application.properties b/schbrain-canal-web/src/main/resources/application.properties new file mode 100644 index 0000000000000000000000000000000000000000..5f1d2f695f98979a1ba0af6893cd9ddbf785c84a --- /dev/null +++ b/schbrain-canal-web/src/main/resources/application.properties @@ -0,0 +1,17 @@ +#canal.client.instances.example.host=192.168.0.59 +canal.client.instances.kp_user.addresses=192.168.36.66:11111 +canal.client.instances.kp_user.username= +canal.client.instances.kp_user.password= +canal.client.instances.kp_user.retryCount=10 +#canal.client.instances.kp_user.zkHosts=192.168.22.22:2181,192.168.22.26:2181,192.168.22.21:2181 + +canal.client.instances.screenschbrain.addresses=192.168.36.66:11111 +canal.client.instances.screenschbrain.username= +canal.client.instances.screenschbrain.password= +canal.client.instances.screenschbrain.retryCount=10 +#canal.client.instances.screenschbrain.subscribe=.*\\\\..* +#canal.client.instances.screenschbrain.zkHosts=192.168.22.22:2181,192.168.22.26:2181,192.168.22.21:2181 + + + + diff --git a/schbrain-canal-web/src/main/resources/logback.xml b/schbrain-canal-web/src/main/resources/logback.xml new file mode 100644 index 0000000000000000000000000000000000000000..9aa44cf2cba36d61de074f7e7137a52e4b44834a --- /dev/null +++ b/schbrain-canal-web/src/main/resources/logback.xml @@ -0,0 +1,46 @@ + + + + ${LOG_PATH}/kp-user-hub/kp-user-hub.log + + ${LOG_PATH}/kp-user-hub/kp-user-hub.%d{yyyy-MM-dd}.%i.log + + 100MB + + 30 + + + + [%d{yyyy-MM-dd HH:mm:ss} %-5level %t] %logger{50} - %msg%n + + + + + + + [%d{yyyy-MM-dd HH:mm:ss} %-5level %t] %logger{50} - %msg%n + + utf-8 + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file