Commit ac5fdf02 authored by zhuyunfeng's avatar zhuyunfeng

注解测试

parent 9f76e74e
This diff is collapsed.
...@@ -23,7 +23,7 @@ public @interface InsertListenPoint { ...@@ -23,7 +23,7 @@ public @interface InsertListenPoint {
* default for all * default for all
* @return canal destination * @return canal destination
*/ */
@AliasFor(annotation = ListenPoint.class) @AliasFor(attribute = "destination",annotation = ListenPoint.class)
String destination() default ""; String destination() default "";
/** /**
...@@ -31,7 +31,7 @@ public @interface InsertListenPoint { ...@@ -31,7 +31,7 @@ public @interface InsertListenPoint {
* default for all * default for all
* @return canal destination * @return canal destination
*/ */
@AliasFor(annotation = ListenPoint.class) @AliasFor(attribute = "schema",annotation = ListenPoint.class)
String[] schema() default {}; String[] schema() default {};
/** /**
...@@ -39,7 +39,7 @@ public @interface InsertListenPoint { ...@@ -39,7 +39,7 @@ public @interface InsertListenPoint {
* default for all * default for all
* @return canal destination * @return canal destination
*/ */
@AliasFor(annotation = ListenPoint.class) @AliasFor(attribute = "table",annotation = ListenPoint.class)
String[] table() default {}; String[] table() default {};
} }
...@@ -17,12 +17,12 @@ public @interface TableFilter { ...@@ -17,12 +17,12 @@ public @interface TableFilter {
* 表名称 * 表名称
* @return * @return
*/ */
String table(); String table() default "";
/** /**
* 数据库名称 * 数据库名称
* @return * @return
*/ */
String schame(); String schame() default "";
} }
package com.schbrain.canal.client.core;
import com.schbrain.canal.client.event.CanalEvent;
import lombok.Data;
import lombok.Getter;
import lombok.experimental.Accessors;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author zhuyf
* @date 2022/6/20
*/
@Getter
@Accessors(fluent = true)
public class HandlerConf {
/**
* 所有执行器
*/
private final List<CanalEvent> listeners;
/**
* 表过滤器
*/
private final Map<String,List<CanalEvent>> tableCanalEventMap;
/**
* 切面执行器
*/
private final List<ListenerPoint> annoListeners;
public HandlerConf(List<CanalEvent> listeners, Map<String, List<CanalEvent>> tableCanalEventMap, List<ListenerPoint> annoListeners) {
this.listeners = listeners;
this.tableCanalEventMap = tableCanalEventMap;
this.annoListeners = annoListeners;
}
}
...@@ -10,7 +10,12 @@ import com.schbrain.canal.client.event.CanalEvent; ...@@ -10,7 +10,12 @@ import com.schbrain.canal.client.event.CanalEvent;
import com.schbrain.canal.client.transfer.TransponderFactory; import com.schbrain.canal.client.transfer.TransponderFactory;
import com.schbrain.canal.client.utils.BeanUtil; import com.schbrain.canal.client.utils.BeanUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.annotation.AnnotationUtils; import org.springframework.core.annotation.AnnotationUtils;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
...@@ -27,9 +32,17 @@ public class SimpleCanalClient extends AbstractCanalClient { ...@@ -27,9 +32,17 @@ public class SimpleCanalClient extends AbstractCanalClient {
* executor * executor
*/ */
private ThreadPoolExecutor executor; private ThreadPoolExecutor executor;
/**
* 所有执行器
*/
private final List<CanalEvent> listeners = new ArrayList<>(); private final List<CanalEvent> listeners = new ArrayList<>();
/**
* 表过滤器
*/
private Map<String,List<CanalEvent>> tableCanalEventMap = new HashMap<>();
/**
* 切面执行器
*/
private final List<ListenerPoint> annoListeners = new ArrayList<>(); private final List<ListenerPoint> annoListeners = new ArrayList<>();
public SimpleCanalClient(SchbrainCanalConfig canalConfig, TransponderFactory factory) { public SimpleCanalClient(SchbrainCanalConfig canalConfig, TransponderFactory factory) {
...@@ -42,14 +55,20 @@ public class SimpleCanalClient extends AbstractCanalClient { ...@@ -42,14 +55,20 @@ public class SimpleCanalClient extends AbstractCanalClient {
log.info("{}: initializing the listeners....", Thread.currentThread().getName()); log.info("{}: initializing the listeners....", Thread.currentThread().getName());
List<CanalEvent> list = BeanUtil.getBeansOfType(CanalEvent.class); List<CanalEvent> list = BeanUtil.getBeansOfType(CanalEvent.class);
if(list!=null && list.size() > 0){ if(list!=null && list.size() > 0){
List<CanalEvent> unFilters = new ArrayList<>();
for (CanalEvent canalEvent : list) { for (CanalEvent canalEvent : list) {
TableFilter table = canalEvent.getClass().getAnnotation(TableFilter.class); TableFilter filter = canalEvent.getClass().getAnnotation(TableFilter.class);
if(table!=null){ if(filter!=null && StringUtils.isNotBlank(filter.schame()) && StringUtils.isNotBlank(filter.table())){
System.out.println(table.schame()); String key = filter.schame()+":"+filter.table();
System.out.println(table.table()); List<CanalEvent> filterList = MapUtils.getObject(tableCanalEventMap,key,new ArrayList<>());
filterList.add(canalEvent);
continue;
} }
unFilters.add(canalEvent);
}
if(unFilters!=null && unFilters.size()>0){
listeners.addAll(unFilters);
} }
listeners.addAll(list);
} }
Map<String, Object> listenerMap = BeanUtil.getBeansWithAnnotation(CanalEventListener.class); Map<String, Object> listenerMap = BeanUtil.getBeansWithAnnotation(CanalEventListener.class);
if (listenerMap != null) { if (listenerMap != null) {
...@@ -57,7 +76,7 @@ public class SimpleCanalClient extends AbstractCanalClient { ...@@ -57,7 +76,7 @@ public class SimpleCanalClient extends AbstractCanalClient {
Method[] methods = target.getClass().getDeclaredMethods(); Method[] methods = target.getClass().getDeclaredMethods();
if (methods != null && methods.length > 0) { if (methods != null && methods.length > 0) {
for (Method method : methods) { for (Method method : methods) {
ListenPoint l = AnnotationUtils.findAnnotation(method, ListenPoint.class); ListenPoint l = AnnotatedElementUtils.findMergedAnnotation(method,ListenPoint.class);
if (l != null) { if (l != null) {
annoListeners.add(new ListenerPoint(target, method, l)); annoListeners.add(new ListenerPoint(target, method, l));
} }
...@@ -73,8 +92,8 @@ public class SimpleCanalClient extends AbstractCanalClient { ...@@ -73,8 +92,8 @@ public class SimpleCanalClient extends AbstractCanalClient {
@Override @Override
protected void process(CanalConnector connector, Map.Entry<String, CanalClientConfig> config) { protected void process(CanalConnector connector, Map.Entry<String, CanalClientConfig> config) {
executor.submit(factory.newTransponder(connector, config, listeners, annoListeners)); HandlerConf handlerConf = new HandlerConf(listeners,tableCanalEventMap,annoListeners);
//factory.newTransponder(connector, config, listeners, annoListeners).run(); executor.submit(factory.newTransponder(connector, config,handlerConf));
} }
@Override @Override
......
...@@ -5,11 +5,11 @@ import com.alibaba.otter.canal.protocol.CanalEntry; ...@@ -5,11 +5,11 @@ import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message; import com.alibaba.otter.canal.protocol.Message;
import com.schbrain.canal.client.annotation.ListenPoint; import com.schbrain.canal.client.annotation.ListenPoint;
import com.schbrain.canal.client.conf.CanalClientConfig; import com.schbrain.canal.client.conf.CanalClientConfig;
import com.schbrain.canal.client.core.HandlerConf;
import com.schbrain.canal.client.core.ListenerPoint; import com.schbrain.canal.client.core.ListenerPoint;
import com.schbrain.canal.client.event.CanalEvent; import com.schbrain.canal.client.event.CanalEvent;
import com.schbrain.canal.client.exception.CanalClientException; import com.schbrain.canal.client.exception.CanalClientException;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
...@@ -23,8 +23,8 @@ import java.util.function.Predicate; ...@@ -23,8 +23,8 @@ import java.util.function.Predicate;
@Slf4j @Slf4j
public abstract class AbstractBasicMessageTransponder extends AbstractMessageTransponder{ public abstract class AbstractBasicMessageTransponder extends AbstractMessageTransponder{
public AbstractBasicMessageTransponder(CanalConnector connector, Map.Entry<String, CanalClientConfig> config, List<CanalEvent> listeners, List<ListenerPoint> annoListeners) { public AbstractBasicMessageTransponder(CanalConnector connector, Map.Entry<String, CanalClientConfig> config, HandlerConf handlerConf) {
super(connector, config, listeners, annoListeners); super(connector, config,handlerConf);
} }
@Override @Override
...@@ -53,8 +53,7 @@ public abstract class AbstractBasicMessageTransponder extends AbstractMessageTra ...@@ -53,8 +53,7 @@ public abstract class AbstractBasicMessageTransponder extends AbstractMessageTra
distributeByImpl(header,rowChange.getEventType(), rowData); distributeByImpl(header,rowChange.getEventType(), rowData);
//distribute to annotation listener interfaces //distribute to annotation listener interfaces
distributeByAnnotation(destination, distributeByAnnotation(destination,
entry.getHeader().getSchemaName(), header,
entry.getHeader().getTableName(),
rowChange.getEventType(), rowChange.getEventType(),
rowData); rowData);
} }
...@@ -68,38 +67,64 @@ public abstract class AbstractBasicMessageTransponder extends AbstractMessageTra ...@@ -68,38 +67,64 @@ public abstract class AbstractBasicMessageTransponder extends AbstractMessageTra
* @param rowData rowData * @param rowData rowData
*/ */
protected void distributeByImpl(CanalEntry.Header header,CanalEntry.EventType eventType, CanalEntry.RowData rowData) { protected void distributeByImpl(CanalEntry.Header header,CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
if (listeners != null) { if(handlerConf == null){
for (CanalEvent listener : listeners) { return;
listener.onEvent(header,eventType, rowData); }
//带过滤器的执行
Map<String,List<CanalEvent>> filterEvent = handlerConf.tableCanalEventMap();
if(filterEvent!=null){
String key = header.getSchemaName()+":"+header.getTableName();
List<CanalEvent> events = filterEvent.get(key);
if(events!=null && events.size()>0){
for (CanalEvent event : events) {
doEvent(event,header,eventType, rowData);
}
}
}
//全部执行
List<CanalEvent> allEvents = handlerConf.listeners();
if(allEvents!=null && allEvents.size()>0){
for (CanalEvent event : allEvents) {
doEvent(event,header,eventType, rowData);
} }
} }
} }
private void doEvent(CanalEvent event,CanalEntry.Header header,CanalEntry.EventType eventType,CanalEntry.RowData rowData){
try {
event.onEvent(header,eventType, rowData);
}catch (Exception e){
log.warn("event handel error",e);
}
}
/** /**
* distribute to annotation listener interfaces * distribute to annotation listener interfaces
* *
* @param destination destination * @param destination destination
* @param schemaName schema * @param header header
* @param tableName table name
* @param eventType event type * @param eventType event type
* @param rowData row data * @param rowData row data
*/ */
protected void distributeByAnnotation(String destination, protected void distributeByAnnotation(String destination,
String schemaName, CanalEntry.Header header,
String tableName,
CanalEntry.EventType eventType, CanalEntry.EventType eventType,
CanalEntry.RowData rowData) { CanalEntry.RowData rowData) {
List<ListenerPoint> points = handlerConf.annoListeners();
if(points==null){
return;
}
//invoke the listeners //invoke the listeners
annoListeners.forEach(point -> point points.forEach(point -> point
.getInvokeMap() .getInvokeMap()
.entrySet() .entrySet()
.stream() .stream()
.filter(getAnnotationFilter(destination, schemaName, tableName, eventType)) .filter(getAnnotationFilter(destination, header, eventType))
.forEach(entry -> { .forEach(entry -> {
Method method = entry.getKey(); Method method = entry.getKey();
method.setAccessible(true); method.setAccessible(true);
try { try {
Object[] args = getInvokeArgs(method, eventType, rowData); Object[] args = getInvokeArgs(method,header,eventType, rowData);
method.invoke(point.getTarget(), args); method.invoke(point.getTarget(), args);
} catch (Exception e) { } catch (Exception e) {
log.error("{}: Error occurred when invoke the listener's interface! class:{}, method:{}", log.error("{}: Error occurred when invoke the listener's interface! class:{}, method:{}",
...@@ -112,14 +137,12 @@ public abstract class AbstractBasicMessageTransponder extends AbstractMessageTra ...@@ -112,14 +137,12 @@ public abstract class AbstractBasicMessageTransponder extends AbstractMessageTra
/** /**
* get the filters predicate * get the filters predicate
* @param destination destination * @param destination destination
* @param schemaName schema * @param header
* @param tableName table name
* @param eventType event type * @param eventType event type
* @return predicate * @return predicate
*/ */
protected abstract Predicate<Map.Entry<Method, ListenPoint>> getAnnotationFilter(String destination, protected abstract Predicate<Map.Entry<Method, ListenPoint>> getAnnotationFilter(String destination,
String schemaName, CanalEntry.Header header,
String tableName,
CanalEntry.EventType eventType); CanalEntry.EventType eventType);
/** /**
...@@ -129,7 +152,7 @@ public abstract class AbstractBasicMessageTransponder extends AbstractMessageTra ...@@ -129,7 +152,7 @@ public abstract class AbstractBasicMessageTransponder extends AbstractMessageTra
* @param rowData row data * @param rowData row data
* @return args which will be used by invoking the annotation methods * @return args which will be used by invoking the annotation methods
*/ */
protected abstract Object[] getInvokeArgs(Method method, CanalEntry.EventType eventType,CanalEntry.RowData rowData); protected abstract Object[] getInvokeArgs(Method method,CanalEntry.Header header,CanalEntry.EventType eventType,CanalEntry.RowData rowData);
/** /**
* ddl事件 * ddl事件
......
...@@ -4,6 +4,7 @@ import com.alibaba.otter.canal.client.CanalConnector; ...@@ -4,6 +4,7 @@ import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.Message; import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.exception.CanalClientException; import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import com.schbrain.canal.client.conf.CanalClientConfig; import com.schbrain.canal.client.conf.CanalClientConfig;
import com.schbrain.canal.client.core.HandlerConf;
import com.schbrain.canal.client.core.ListenerPoint; import com.schbrain.canal.client.core.ListenerPoint;
import com.schbrain.canal.client.event.CanalEvent; import com.schbrain.canal.client.event.CanalEvent;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
...@@ -37,16 +38,7 @@ public abstract class AbstractMessageTransponder implements MessageTransponder { ...@@ -37,16 +38,7 @@ public abstract class AbstractMessageTransponder implements MessageTransponder {
*/ */
protected final String destination; protected final String destination;
/** protected final HandlerConf handlerConf;
* listeners which are used by implementing the Interface
*/
protected final List<CanalEvent> listeners = new ArrayList<>();
/**
* listeners which are used by annotation
*/
protected final List<ListenerPoint> annoListeners = new ArrayList<>();
/** /**
* running flag * running flag
*/ */
...@@ -54,19 +46,13 @@ public abstract class AbstractMessageTransponder implements MessageTransponder { ...@@ -54,19 +46,13 @@ public abstract class AbstractMessageTransponder implements MessageTransponder {
public AbstractMessageTransponder(CanalConnector connector, public AbstractMessageTransponder(CanalConnector connector,
Map.Entry<String,CanalClientConfig> config, Map.Entry<String,CanalClientConfig> config,
List<CanalEvent> listeners, HandlerConf handlerConf) {
List<ListenerPoint> annoListeners) {
Objects.requireNonNull(connector, "connector can not be null!"); Objects.requireNonNull(connector, "connector can not be null!");
Objects.requireNonNull(config, "config can not be null!"); Objects.requireNonNull(config, "config can not be null!");
this.connector = connector; this.connector = connector;
this.destination = config.getKey(); this.destination = config.getKey();
this.config = config.getValue(); this.config = config.getValue();
if (listeners != null) { this.handlerConf = handlerConf;
this.listeners.addAll(listeners);
}
if (annoListeners != null){
this.annoListeners.addAll(annoListeners);
}
} }
@Override @Override
...@@ -80,12 +66,12 @@ public abstract class AbstractMessageTransponder implements MessageTransponder { ...@@ -80,12 +66,12 @@ public abstract class AbstractMessageTransponder implements MessageTransponder {
long batchId = message.getId(); long batchId = message.getId();
int size = message.getEntries().size(); int size = message.getEntries().size();
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("{}: Get message from canal server >>>>> size:{}", threadName, size); //log.debug("{}: Get message from canal server >>>>> size:{}", threadName, size);
} }
//empty message //empty message
if (batchId == -1 || size == 0) { if (batchId == -1 || size == 0) {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("{}: Empty message... sleep for {} millis", threadName, interval); //log.debug("{}: Empty message... sleep for {} millis", threadName, interval);
} }
Thread.sleep(interval); Thread.sleep(interval);
} else { } else {
...@@ -93,7 +79,7 @@ public abstract class AbstractMessageTransponder implements MessageTransponder { ...@@ -93,7 +79,7 @@ public abstract class AbstractMessageTransponder implements MessageTransponder {
} }
connector.ack(batchId); connector.ack(batchId);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("{}: Ack message. batchId:{}", threadName, batchId); //log.debug("{}: Ack message. batchId:{}", threadName, batchId);
} }
} catch (CanalClientException e) { } catch (CanalClientException e) {
errorCount--; errorCount--;
......
...@@ -4,9 +4,8 @@ import com.alibaba.otter.canal.client.CanalConnector; ...@@ -4,9 +4,8 @@ import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.CanalEntry;
import com.schbrain.canal.client.annotation.ListenPoint; import com.schbrain.canal.client.annotation.ListenPoint;
import com.schbrain.canal.client.conf.CanalClientConfig; import com.schbrain.canal.client.conf.CanalClientConfig;
import com.schbrain.canal.client.core.ListenerPoint; import com.schbrain.canal.client.core.HandlerConf;
import com.schbrain.canal.client.event.CanalEvent; import org.apache.commons.lang3.StringUtils;
import org.springframework.util.StringUtils;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.Arrays; import java.util.Arrays;
...@@ -21,33 +20,73 @@ import java.util.function.Predicate; ...@@ -21,33 +20,73 @@ import java.util.function.Predicate;
public class DefaultMessageTransponder extends AbstractBasicMessageTransponder{ public class DefaultMessageTransponder extends AbstractBasicMessageTransponder{
public DefaultMessageTransponder(CanalConnector connector, public DefaultMessageTransponder(CanalConnector connector,
Map.Entry<String, CanalClientConfig> config, Map.Entry<String, CanalClientConfig> config, HandlerConf handlerConf) {
List<CanalEvent> listeners, super(connector, config,handlerConf);
List<ListenerPoint> annoListeners) {
super(connector, config, listeners, annoListeners);
} }
@Override @Override
protected Predicate<Map.Entry<Method, ListenPoint>> getAnnotationFilter(String destination, String schemaName, String tableName, CanalEntry.EventType eventType) { protected Predicate<Map.Entry<Method, ListenPoint>> getAnnotationFilter(String destination,CanalEntry.Header header, CanalEntry.EventType eventType) {
Predicate<Map.Entry<Method, ListenPoint>> df = e -> StringUtils.isEmpty(e.getValue().destination()) Predicate<Map.Entry<Method, ListenPoint>> df = (e->{
|| e.getValue().destination().equals(destination); if(StringUtils.isBlank(e.getValue().destination())){
Predicate<Map.Entry<Method, ListenPoint>> sf = e -> e.getValue().schema().length == 0 return true;
|| Arrays.stream(e.getValue().schema()).anyMatch(s -> s.equals(schemaName)); }
Predicate<Map.Entry<Method, ListenPoint>> tf = e -> e.getValue().table().length == 0 return e.getValue().destination().equals(destination);
|| Arrays.stream(e.getValue().table()).anyMatch(t -> t.equals(tableName)); });
Predicate<Map.Entry<Method, ListenPoint>> ef = e -> e.getValue().eventType().length == 0 Predicate<Map.Entry<Method, ListenPoint>> sf = (e->{
|| Arrays.stream(e.getValue().eventType()).anyMatch(ev -> ev == eventType); if(e.getValue().schema() == null || e.getValue().schema().length ==0){
return true;
}
String schemaName = header.getSchemaName();
for (String s : e.getValue().schema()) {
if(schemaName.equals(s)){
return true;
}
}
return false;
});
Predicate<Map.Entry<Method, ListenPoint>> tf = (e->{
if(e.getValue().table() == null || e.getValue().table().length ==0){
return true;
}
String tableName = header.getTableName();
for (String s : e.getValue().table()) {
if(tableName.equals(s)){
return true;
}
}
return false;
});
Predicate<Map.Entry<Method, ListenPoint>> ef = (e->{
if(e.getValue().eventType() == null || e.getValue().eventType().length ==0){
return true;
}
for (CanalEntry.EventType eventType1 : e.getValue().eventType()) {
if(eventType1 == eventType){
return true;
}
}
return false;
});
return df.and(sf).and(tf).and(ef); return df.and(sf).and(tf).and(ef);
} }
@Override @Override
protected Object[] getInvokeArgs(Method method, CanalEntry.EventType eventType, CanalEntry.RowData rowData) { protected Object[] getInvokeArgs(Method method,CanalEntry.Header header,CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
return Arrays.stream(method.getParameterTypes()) Class<?>[] classes = method.getParameterTypes();
.map(p -> p == CanalEntry.EventType.class Object[] args = new Object[classes.length];
? eventType for (int i = 0; i < classes.length; i++) {
: p == CanalEntry.RowData.class Class<?> p = classes[i];
? rowData : null) Object arg = null;
.toArray(); if(p == CanalEntry.EventType.class){
arg = eventType;
}else if(p == CanalEntry.Header.class){
arg = header;
}else if(p == CanalEntry.RowData.class){
arg = rowData;
}
args[i] = arg;
}
return args;
} }
......
...@@ -2,6 +2,7 @@ package com.schbrain.canal.client.transfer; ...@@ -2,6 +2,7 @@ package com.schbrain.canal.client.transfer;
import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnector;
import com.schbrain.canal.client.conf.CanalClientConfig; import com.schbrain.canal.client.conf.CanalClientConfig;
import com.schbrain.canal.client.core.HandlerConf;
import com.schbrain.canal.client.core.ListenerPoint; import com.schbrain.canal.client.core.ListenerPoint;
import com.schbrain.canal.client.event.CanalEvent; import com.schbrain.canal.client.event.CanalEvent;
...@@ -15,7 +16,7 @@ import java.util.Map; ...@@ -15,7 +16,7 @@ import java.util.Map;
public class DefaultTransponderFactory implements TransponderFactory{ public class DefaultTransponderFactory implements TransponderFactory{
@Override @Override
public MessageTransponder newTransponder(CanalConnector connector, Map.Entry<String, CanalClientConfig> config, List<CanalEvent> listeners, List<ListenerPoint> annoListeners) { public MessageTransponder newTransponder(CanalConnector connector, Map.Entry<String, CanalClientConfig> config, HandlerConf handlerConf) {
return new DefaultMessageTransponder(connector, config, listeners, annoListeners); return new DefaultMessageTransponder(connector, config,handlerConf);
} }
} }
...@@ -2,6 +2,7 @@ package com.schbrain.canal.client.transfer; ...@@ -2,6 +2,7 @@ package com.schbrain.canal.client.transfer;
import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnector;
import com.schbrain.canal.client.conf.CanalClientConfig; import com.schbrain.canal.client.conf.CanalClientConfig;
import com.schbrain.canal.client.core.HandlerConf;
import com.schbrain.canal.client.core.ListenerPoint; import com.schbrain.canal.client.core.ListenerPoint;
import com.schbrain.canal.client.event.CanalEvent; import com.schbrain.canal.client.event.CanalEvent;
...@@ -17,9 +18,7 @@ public interface TransponderFactory { ...@@ -17,9 +18,7 @@ public interface TransponderFactory {
/** /**
* @param connector connector * @param connector connector
* @param config config * @param config config
* @param listeners listeners
* @param annoListeners annoListeners
* @return MessageTransponder * @return MessageTransponder
*/ */
MessageTransponder newTransponder(CanalConnector connector, Map.Entry<String, CanalClientConfig> config, List<CanalEvent> listeners, List<ListenerPoint> annoListeners); MessageTransponder newTransponder(CanalConnector connector, Map.Entry<String, CanalClientConfig> config,HandlerConf handlerConf);
} }
...@@ -2,43 +2,29 @@ package com.schbrain.web; ...@@ -2,43 +2,29 @@ package com.schbrain.web;
import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.CanalEntry;
import com.schbrain.canal.client.annotation.TableFilter; import com.schbrain.canal.client.annotation.TableFilter;
import com.schbrain.canal.client.event.CanalEvent; import com.schbrain.canal.client.event.DefCanalEvent;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.List;
/** /**
* @author zhuyf * @author zhuyf
* @date 2022/6/16 * @date 2022/6/16
*/ */
@Service("myCanalEvent") @Service("myCanalEvent")
@TableFilter(table = "uc_user_info",schame = "kp_user") //暂时未实现 @TableFilter(table = "wechat_user",schame = "kp_user")
public class MyCanalEvent implements CanalEvent { public class MyCanalEvent implements DefCanalEvent {
@Override @Override
public void onEvent(CanalEntry.Header header,CanalEntry.EventType eventType, CanalEntry.RowData rowData) { public void onInsert(CanalEntry.Header header, CanalEntry.RowData rowData) {
System.out.println("======"+header.getSchemaName()+":"+header.getTableName()+":"+eventType.name()); System.out.println("======"+header.getSchemaName()+":"+header.getTableName()+":"+"insert");
//如果是删除语句 }
if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
//如果是新增语句
} else if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
//如果是更新的语句
} else {
//变更前的数据
System.out.println("------->; before");
printColumn(rowData.getBeforeColumnsList());
//变更后的数据
System.out.println("------->; after");
printColumn(rowData.getAfterColumnsList());
}
@Override
public void onUpdate(CanalEntry.Header header, CanalEntry.RowData rowData) {
System.out.println("======"+header.getSchemaName()+":"+header.getTableName()+":"+"update");
} }
private static void printColumn(List<CanalEntry.Column> columns) { @Override
for (CanalEntry.Column column : columns) { public void onDelete(CanalEntry.Header header, CanalEntry.RowData rowData) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); System.out.println("======"+header.getSchemaName()+":"+header.getTableName()+":"+"delete");
}
} }
} }
package com.schbrain.web;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.schbrain.canal.client.annotation.TableFilter;
import com.schbrain.canal.client.event.DefCanalEvent;
import org.springframework.stereotype.Service;
/**
* @author zhuyf
* @date 2022/6/16
*/
@Service("myCanalEvent2")
@TableFilter(table = "ding_talk_user",schame = "kp_user")
public class MyCanalEvent2 implements DefCanalEvent {
@Override
public void onInsert(CanalEntry.Header header, CanalEntry.RowData rowData) {
System.out.println("======"+header.getSchemaName()+":"+header.getTableName()+":"+"insert");
}
@Override
public void onUpdate(CanalEntry.Header header, CanalEntry.RowData rowData) {
System.out.println("======"+header.getSchemaName()+":"+header.getTableName()+":"+"update");
}
@Override
public void onDelete(CanalEntry.Header header, CanalEntry.RowData rowData) {
System.out.println("======"+header.getSchemaName()+":"+header.getTableName()+":"+"delete");
}
}
package com.schbrain.web;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.schbrain.canal.client.annotation.CanalEventListener;
import com.schbrain.canal.client.annotation.InsertListenPoint;
import com.schbrain.canal.client.annotation.UpdateListenPoint;
import org.springframework.beans.factory.annotation.Autowired;
/**
* @author zhuyf
* @date 2022/6/16
*/
@CanalEventListener
public class MyCanalEvent3 {
@Autowired
private UserService userService;
@InsertListenPoint(destination = "kp_user",schema = {"kp_user"},table={"wechat_user"})
public void onInsert(CanalEntry.Header header, CanalEntry.RowData rowData) {
String user = userService.getUser();
System.out.println("MyCanalEvent3======"+header.getSchemaName()+":"+header.getTableName()+":"+"onInsert,user:+"+user);
}
@UpdateListenPoint(destination = "kp_user",schema = {"kp_user"},table={"wechat_user"})
public void onUpdate(CanalEntry.Header header, CanalEntry.RowData rowData) {
System.out.println("MyCanalEvent3======"+header.getSchemaName()+":"+header.getTableName()+":"+"onUpdate");
}
}
package com.schbrain.web;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.schbrain.canal.client.annotation.CanalEventListener;
import com.schbrain.canal.client.annotation.InsertListenPoint;
import com.schbrain.canal.client.annotation.UpdateListenPoint;
import org.springframework.stereotype.Service;
/**
* @author zhuyf
* @date 2022/6/16
*/
@Service
public class UserService {
String getUser(){
return "1";
}
}
#canal.client.instances.example.host=192.168.0.59
canal.client.instances.kp_user.addresses=192.168.36.66:11111 canal.client.instances.kp_user.addresses=192.168.36.66:11111
canal.client.instances.kp_user.username= canal.client.instances.kp_user.username=
canal.client.instances.kp_user.password= canal.client.instances.kp_user.password=
canal.client.instances.kp_user.retryCount=10 canal.client.instances.kp_user.retryCount=10
#canal.client.instances.kp_user.zkHosts=192.168.22.22:2181,192.168.22.26:2181,192.168.22.21:2181 canal.client.instances.kp_user.zkHosts=192.168.22.22:2181,192.168.22.26:2181,192.168.22.21:2181
canal.client.instances.screenschbrain.addresses=192.168.36.66:11111 #canal.client.instances.screenschbrain.addresses=192.168.36.66:11111
canal.client.instances.screenschbrain.username= #canal.client.instances.screenschbrain.username=
canal.client.instances.screenschbrain.password= #canal.client.instances.screenschbrain.password=
canal.client.instances.screenschbrain.retryCount=10 #canal.client.instances.screenschbrain.retryCount=10
#canal.client.instances.screenschbrain.subscribe=.*\\\\..* #canal.client.instances.screenschbrain.subscribe=kp_weekly.comment_student
#canal.client.instances.screenschbrain.zkHosts=192.168.22.22:2181,192.168.22.26:2181,192.168.22.21:2181 #canal.client.instances.screenschbrain.zkHosts=192.168.22.22:2181,192.168.22.26:2181,192.168.22.21:2181
canal.client.instances.qicheng_czzs.addresses=192.168.36.66:11111
canal.client.instances.qicheng_czzs.username=
canal.client.instances.qicheng_czzs.password=
canal.client.instances.qicheng_czzs.retryCount=10
canal.client.instances.qicheng_czzs.zkHosts=192.168.22.22:2181,192.168.22.26:2181,192.168.22.21:2181
...@@ -30,7 +30,7 @@ ...@@ -30,7 +30,7 @@
<appender-ref ref="STDOUT"/> <appender-ref ref="STDOUT"/>
<appender-ref ref="FILE"/> <appender-ref ref="FILE"/>
</root> </root>
<logger name="com.schbrain" level="INFO" additivity="false"> <logger name="com.schbrain" level="debug" additivity="false">
<appender-ref ref="STDOUT" /> <appender-ref ref="STDOUT" />
<appender-ref ref="FILE" /> <appender-ref ref="FILE" />
</logger> </logger>
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment