Commit dd744c90 authored by zhuyunfeng's avatar zhuyunfeng

no message

parent ca014a8c
package com.schbrain.canal.client.conf; package com.schbrain.canal.client.conf;
import lombok.Data; import lombok.Data;
import lombok.ToString;
import java.io.Serializable;
/** /**
* CANAL Config * CANAL Config
...@@ -9,7 +12,11 @@ import lombok.Data; ...@@ -9,7 +12,11 @@ import lombok.Data;
* @date 2022/6/16 * @date 2022/6/16
*/ */
@Data @Data
public class CanalClientConfig { @ToString
public class CanalClientConfig implements Serializable {
private static final long serialVersionUID = -2850256804111728271L;
/** /**
* 连接地址 * 连接地址
*/ */
...@@ -43,5 +50,7 @@ public class CanalClientConfig { ...@@ -43,5 +50,7 @@ public class CanalClientConfig {
* 批量获取数据条数 * 批量获取数据条数
*/ */
private int batchSize = 1000; private int batchSize = 1000;
} }
...@@ -7,6 +7,7 @@ import com.alibaba.otter.canal.client.impl.ClusterNodeAccessStrategy; ...@@ -7,6 +7,7 @@ import com.alibaba.otter.canal.client.impl.ClusterNodeAccessStrategy;
import com.alibaba.otter.canal.common.zookeeper.ZkClientx; import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
import com.schbrain.canal.client.conf.CanalClientConfig; import com.schbrain.canal.client.conf.CanalClientConfig;
import com.schbrain.canal.client.exception.CanalClientException; import com.schbrain.canal.client.exception.CanalClientException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
...@@ -14,6 +15,7 @@ import java.util.ArrayList; ...@@ -14,6 +15,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
@Slf4j
public class ConnectionFactory { public class ConnectionFactory {
/** /**
...@@ -24,6 +26,7 @@ public class ConnectionFactory { ...@@ -24,6 +26,7 @@ public class ConnectionFactory {
*/ */
public static CanalConnector create(CanalClientConfig config, String destination){ public static CanalConnector create(CanalClientConfig config, String destination){
if(StringUtils.isNotBlank(config.getAddresses())){ if(StringUtils.isNotBlank(config.getAddresses())){
log.info("canal create address destination {} connector by config : {}",destination,config);
List<InetSocketAddress> inetSocketAddressList=new ArrayList<>(); List<InetSocketAddress> inetSocketAddressList=new ArrayList<>();
String[] hosts=config.getAddresses().split(","); String[] hosts=config.getAddresses().split(",");
for (String hostInfo: hosts) { for (String hostInfo: hosts) {
...@@ -32,6 +35,7 @@ public class ConnectionFactory { ...@@ -32,6 +35,7 @@ public class ConnectionFactory {
} }
return CanalConnectors.newClusterConnector(inetSocketAddressList,destination, config.getUsername(),config.getPassword()); return CanalConnectors.newClusterConnector(inetSocketAddressList,destination, config.getUsername(),config.getPassword());
}else if(StringUtils.isNotBlank(config.getZkHosts())){ }else if(StringUtils.isNotBlank(config.getZkHosts())){
log.info("canal create zookeeper destination {} connector by config : {}",destination,config);
return newClusterConnector(config.getZkHosts(),destination, config.getUsername(), config.getPassword()); return newClusterConnector(config.getZkHosts(),destination, config.getUsername(), config.getPassword());
}else{ }else{
new CanalClientException("zkHosts and addresses cannot all empty"); new CanalClientException("zkHosts and addresses cannot all empty");
......
...@@ -98,9 +98,7 @@ public class SimpleCanalClient extends AbstractCanalClient { ...@@ -98,9 +98,7 @@ public class SimpleCanalClient extends AbstractCanalClient {
TableFilter filter = canalEvent.getClass().getAnnotation(TableFilter.class); TableFilter filter = canalEvent.getClass().getAnnotation(TableFilter.class);
if(filter!=null && StringUtils.isNotBlank(filter.schame()) && StringUtils.isNotBlank(filter.table())){ if(filter!=null && StringUtils.isNotBlank(filter.schame()) && StringUtils.isNotBlank(filter.table())){
//拼接key //拼接key
String schame = parse(filter.schame()); String key = tableFilterKey(filter);
String table = parse(filter.table());
String key = schame+":"+table;
List<CanalEvent> filterList = MapUtils.getObject(tableCanalEventMap,key,new ArrayList<>()); List<CanalEvent> filterList = MapUtils.getObject(tableCanalEventMap,key,new ArrayList<>());
filterList.add(canalEvent); filterList.add(canalEvent);
tableCanalEventMap.put(key,filterList); tableCanalEventMap.put(key,filterList);
...@@ -147,9 +145,7 @@ public class SimpleCanalClient extends AbstractCanalClient { ...@@ -147,9 +145,7 @@ public class SimpleCanalClient extends AbstractCanalClient {
for (ResolverCanalEvent event : list) { for (ResolverCanalEvent event : list) {
TableFilter filter = event.getClass().getAnnotation(TableFilter.class); TableFilter filter = event.getClass().getAnnotation(TableFilter.class);
if(filter!=null && StringUtils.isNotBlank(filter.schame()) && StringUtils.isNotBlank(filter.table())){ if(filter!=null && StringUtils.isNotBlank(filter.schame()) && StringUtils.isNotBlank(filter.table())){
String schame = parse(filter.schame()); String key = tableFilterKey(filter);
String table = parse(filter.table());
String key = schame+":"+table;
List<ResolverCanalEvent<?>> filterList = MapUtils.getObject(resolverCanalEvents,key,new ArrayList<>()); List<ResolverCanalEvent<?>> filterList = MapUtils.getObject(resolverCanalEvents,key,new ArrayList<>());
filterList.add(event); filterList.add(event);
resolverCanalEvents.put(key,filterList); resolverCanalEvents.put(key,filterList);
...@@ -159,6 +155,13 @@ public class SimpleCanalClient extends AbstractCanalClient { ...@@ -159,6 +155,13 @@ public class SimpleCanalClient extends AbstractCanalClient {
} }
} }
private String tableFilterKey(TableFilter filter){
String schame = parse(filter.schame());
String table = parse(filter.table());
return String.format("%s:%s",schame,table);
}
/** /**
* 解析参数值 * 解析参数值
* @param val * @param val
......
...@@ -46,7 +46,6 @@ ...@@ -46,7 +46,6 @@
</dependencies> </dependencies>
<build> <build>
<finalName>${artifactId}-${version}</finalName>
<plugins> <plugins>
<plugin> <plugin>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment