diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/conf/CanalClientConfig.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/conf/CanalClientConfig.java index 4e1ff872ac275f91acd97a0f8b9e1f4d0b7e3234..b0bda58c0af76d70a2bac027f9012d8d91b24127 100644 --- a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/conf/CanalClientConfig.java +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/conf/CanalClientConfig.java @@ -1,6 +1,9 @@ package com.schbrain.canal.client.conf; import lombok.Data; +import lombok.ToString; + +import java.io.Serializable; /** * CANAL Config @@ -9,7 +12,11 @@ import lombok.Data; * @date 2022/6/16 */ @Data -public class CanalClientConfig { +@ToString +public class CanalClientConfig implements Serializable { + + private static final long serialVersionUID = -2850256804111728271L; + /** * 连接地址 */ @@ -43,5 +50,7 @@ public class CanalClientConfig { * 批量获取数据条数 */ private int batchSize = 1000; + + } diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/ConnectionFactory.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/ConnectionFactory.java index be0d7ab337b55918a3f5db56393af43ebc108c17..0d8695db0ee0a6737d21fe3849929f3709b9cbaf 100644 --- a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/ConnectionFactory.java +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/ConnectionFactory.java @@ -7,6 +7,7 @@ import com.alibaba.otter.canal.client.impl.ClusterNodeAccessStrategy; import com.alibaba.otter.canal.common.zookeeper.ZkClientx; import com.schbrain.canal.client.conf.CanalClientConfig; import com.schbrain.canal.client.exception.CanalClientException; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import java.net.InetSocketAddress; @@ -14,6 +15,7 @@ import java.util.ArrayList; import java.util.List; +@Slf4j public class ConnectionFactory { /** @@ -24,6 +26,7 @@ public class ConnectionFactory { */ public static CanalConnector create(CanalClientConfig config, String destination){ if(StringUtils.isNotBlank(config.getAddresses())){ + log.info("canal create address destination {} connector by config : {}",destination,config); List inetSocketAddressList=new ArrayList<>(); String[] hosts=config.getAddresses().split(","); for (String hostInfo: hosts) { @@ -32,6 +35,7 @@ public class ConnectionFactory { } return CanalConnectors.newClusterConnector(inetSocketAddressList,destination, config.getUsername(),config.getPassword()); }else if(StringUtils.isNotBlank(config.getZkHosts())){ + log.info("canal create zookeeper destination {} connector by config : {}",destination,config); return newClusterConnector(config.getZkHosts(),destination, config.getUsername(), config.getPassword()); }else{ new CanalClientException("zkHosts and addresses cannot all empty"); diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/SimpleCanalClient.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/SimpleCanalClient.java index 81d5f5e06060cd686a9c6aaa9d9e18dcb6aa653d..6bcc68352c0056ba41c8cc843362fef620dc1070 100644 --- a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/SimpleCanalClient.java +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/SimpleCanalClient.java @@ -98,9 +98,7 @@ public class SimpleCanalClient extends AbstractCanalClient { TableFilter filter = canalEvent.getClass().getAnnotation(TableFilter.class); if(filter!=null && StringUtils.isNotBlank(filter.schame()) && StringUtils.isNotBlank(filter.table())){ //拼接key - String schame = parse(filter.schame()); - String table = parse(filter.table()); - String key = schame+":"+table; + String key = tableFilterKey(filter); List filterList = MapUtils.getObject(tableCanalEventMap,key,new ArrayList<>()); filterList.add(canalEvent); tableCanalEventMap.put(key,filterList); @@ -147,9 +145,7 @@ public class SimpleCanalClient extends AbstractCanalClient { for (ResolverCanalEvent event : list) { TableFilter filter = event.getClass().getAnnotation(TableFilter.class); if(filter!=null && StringUtils.isNotBlank(filter.schame()) && StringUtils.isNotBlank(filter.table())){ - String schame = parse(filter.schame()); - String table = parse(filter.table()); - String key = schame+":"+table; + String key = tableFilterKey(filter); List> filterList = MapUtils.getObject(resolverCanalEvents,key,new ArrayList<>()); filterList.add(event); resolverCanalEvents.put(key,filterList); @@ -159,6 +155,13 @@ public class SimpleCanalClient extends AbstractCanalClient { } } + + private String tableFilterKey(TableFilter filter){ + String schame = parse(filter.schame()); + String table = parse(filter.table()); + return String.format("%s:%s",schame,table); + } + /** * 解析参数值 * @param val diff --git a/schbrain-canal-web/pom.xml b/schbrain-canal-web/pom.xml index 6203df0a01f31b529fef36bf4164a1fdaeb95f32..31860520a25483c23ddd41d8387d594ee5205c49 100644 --- a/schbrain-canal-web/pom.xml +++ b/schbrain-canal-web/pom.xml @@ -46,7 +46,6 @@ - ${artifactId}-${version} org.springframework.boot