From dd744c909b06bfa2418042a0a0ba453031584546 Mon Sep 17 00:00:00 2001 From: zhuyf Date: Fri, 21 Apr 2023 14:11:37 +0800 Subject: [PATCH] no message --- .../canal/client/conf/CanalClientConfig.java | 11 ++++++++++- .../canal/client/core/ConnectionFactory.java | 4 ++++ .../canal/client/core/SimpleCanalClient.java | 15 +++++++++------ schbrain-canal-web/pom.xml | 1 - 4 files changed, 23 insertions(+), 8 deletions(-) diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/conf/CanalClientConfig.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/conf/CanalClientConfig.java index 4e1ff87..b0bda58 100644 --- a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/conf/CanalClientConfig.java +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/conf/CanalClientConfig.java @@ -1,6 +1,9 @@ package com.schbrain.canal.client.conf; import lombok.Data; +import lombok.ToString; + +import java.io.Serializable; /** * CANAL Config @@ -9,7 +12,11 @@ import lombok.Data; * @date 2022/6/16 */ @Data -public class CanalClientConfig { +@ToString +public class CanalClientConfig implements Serializable { + + private static final long serialVersionUID = -2850256804111728271L; + /** * 连接地址 */ @@ -43,5 +50,7 @@ public class CanalClientConfig { * 批量获取数据条数 */ private int batchSize = 1000; + + } diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/ConnectionFactory.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/ConnectionFactory.java index be0d7ab..0d8695d 100644 --- a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/ConnectionFactory.java +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/ConnectionFactory.java @@ -7,6 +7,7 @@ import com.alibaba.otter.canal.client.impl.ClusterNodeAccessStrategy; import com.alibaba.otter.canal.common.zookeeper.ZkClientx; import com.schbrain.canal.client.conf.CanalClientConfig; import com.schbrain.canal.client.exception.CanalClientException; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import java.net.InetSocketAddress; @@ -14,6 +15,7 @@ import java.util.ArrayList; import java.util.List; +@Slf4j public class ConnectionFactory { /** @@ -24,6 +26,7 @@ public class ConnectionFactory { */ public static CanalConnector create(CanalClientConfig config, String destination){ if(StringUtils.isNotBlank(config.getAddresses())){ + log.info("canal create address destination {} connector by config : {}",destination,config); List inetSocketAddressList=new ArrayList<>(); String[] hosts=config.getAddresses().split(","); for (String hostInfo: hosts) { @@ -32,6 +35,7 @@ public class ConnectionFactory { } return CanalConnectors.newClusterConnector(inetSocketAddressList,destination, config.getUsername(),config.getPassword()); }else if(StringUtils.isNotBlank(config.getZkHosts())){ + log.info("canal create zookeeper destination {} connector by config : {}",destination,config); return newClusterConnector(config.getZkHosts(),destination, config.getUsername(), config.getPassword()); }else{ new CanalClientException("zkHosts and addresses cannot all empty"); diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/SimpleCanalClient.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/SimpleCanalClient.java index 81d5f5e..6bcc683 100644 --- a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/SimpleCanalClient.java +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/SimpleCanalClient.java @@ -98,9 +98,7 @@ public class SimpleCanalClient extends AbstractCanalClient { TableFilter filter = canalEvent.getClass().getAnnotation(TableFilter.class); if(filter!=null && StringUtils.isNotBlank(filter.schame()) && StringUtils.isNotBlank(filter.table())){ //拼接key - String schame = parse(filter.schame()); - String table = parse(filter.table()); - String key = schame+":"+table; + String key = tableFilterKey(filter); List filterList = MapUtils.getObject(tableCanalEventMap,key,new ArrayList<>()); filterList.add(canalEvent); tableCanalEventMap.put(key,filterList); @@ -147,9 +145,7 @@ public class SimpleCanalClient extends AbstractCanalClient { for (ResolverCanalEvent event : list) { TableFilter filter = event.getClass().getAnnotation(TableFilter.class); if(filter!=null && StringUtils.isNotBlank(filter.schame()) && StringUtils.isNotBlank(filter.table())){ - String schame = parse(filter.schame()); - String table = parse(filter.table()); - String key = schame+":"+table; + String key = tableFilterKey(filter); List> filterList = MapUtils.getObject(resolverCanalEvents,key,new ArrayList<>()); filterList.add(event); resolverCanalEvents.put(key,filterList); @@ -159,6 +155,13 @@ public class SimpleCanalClient extends AbstractCanalClient { } } + + private String tableFilterKey(TableFilter filter){ + String schame = parse(filter.schame()); + String table = parse(filter.table()); + return String.format("%s:%s",schame,table); + } + /** * 解析参数值 * @param val diff --git a/schbrain-canal-web/pom.xml b/schbrain-canal-web/pom.xml index 6203df0..3186052 100644 --- a/schbrain-canal-web/pom.xml +++ b/schbrain-canal-web/pom.xml @@ -46,7 +46,6 @@ - ${artifactId}-${version} org.springframework.boot -- GitLab