diff --git a/pom.xml b/pom.xml index 5c39704c33e61a14d6511df0ac7fae5999f8f8aa..aaa2e70c5750a0132d906ba6cda57871c5ddd5a3 100644 --- a/pom.xml +++ b/pom.xml @@ -17,7 +17,7 @@ schbrain-canal - 1.0.0-RELEASE + 1.1.0-SNAPSHOT diff --git a/schbrain-canal-client/pom.xml b/schbrain-canal-client/pom.xml index 965c387ab9367b676c5c70a4111b53222b85ff0f..64075e4d2b1f3e384d88e05127702fc63f0096cf 100644 --- a/schbrain-canal-client/pom.xml +++ b/schbrain-canal-client/pom.xml @@ -23,6 +23,11 @@ canal.protocol 1.1.5 + + joda-time + joda-time + 2.9.4 + com.schbrain.framework schbrain-spring-support diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/CanalEventListener.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/CanalEventListener.java index 959123f0030ffbcce6263ae7165ae6588f2f69c4..61b69247875d053d83e5a383bbd8925f4c8b7ae7 100644 --- a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/CanalEventListener.java +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/CanalEventListener.java @@ -12,6 +12,10 @@ import java.lang.annotation.*; @Component public @interface CanalEventListener { + /** + * bean name + * @return + */ @AliasFor(annotation = Component.class) String value() default ""; diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/DeleteListenPoint.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/DeleteListenPoint.java index 4e367d51233572e742fafed445ad9a75f897f2e4..96e9afe5eacfe1c65abdeecf53e4745e1cdea7f6 100644 --- a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/DeleteListenPoint.java +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/DeleteListenPoint.java @@ -7,11 +7,9 @@ import java.lang.annotation.*; /** * ListenPoint for delete - * - * @author chen.qian - * @date 2018/3/19 + * @author zhuyf + * @date 2022/06/19 */ - @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/InsertListenPoint.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/InsertListenPoint.java index 6f8f6b04a6c10938e1424e5b096ebb9385055a07..4e9d284ca873c1635f74e7cb16689a0176ce98a6 100644 --- a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/InsertListenPoint.java +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/InsertListenPoint.java @@ -6,12 +6,10 @@ import org.springframework.core.annotation.AliasFor; import java.lang.annotation.*; /** - * ListenPoint for insert - * - * @author chen.qian - * @date 2018/3/19 + * ListenPoint for inster + * @author zhuyf + * @date 2022/06/19 */ - @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/ListenPoint.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/ListenPoint.java index 1cfb7f5ed02e2442185aa5b15b6443eaccd6a4ce..ca45ed72f1e79feb354dd8d8dad15ca3fccfd69f 100644 --- a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/ListenPoint.java +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/ListenPoint.java @@ -6,12 +6,9 @@ import java.lang.annotation.*; /** * used to indicate that method(or methods) is(are) the candidate of the - * canal event distributor - * - * @author chen.qian - * @date 2018/3/19 + * @author zhuyf + * @date 2022/06/19 */ - @Target({ElementType.METHOD, ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/TableFilter.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/TableFilter.java index 82409a1626486adaa1081fd6c301a637c3062bf4..e3d2f4daa1b613524a427d5b966a1be25e317eab 100644 --- a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/TableFilter.java +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/TableFilter.java @@ -6,7 +6,7 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** - * Table + * table filter * @author zhuyf * @date 2022/6/16 */ diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/UpdateListenPoint.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/UpdateListenPoint.java index f779c3fa97958dab56f0e9b06c39dfdee9e44cd3..4ad56b4290586be147a0db3dac571f5c8e200ec6 100644 --- a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/UpdateListenPoint.java +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/UpdateListenPoint.java @@ -7,11 +7,9 @@ import java.lang.annotation.*; /** * ListenPoint for update - * - * @author chen.qian - * @date 2018/3/19 + * @author zhuyf + * @date 2022/06/19 */ - @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/conf/MethodArgumentConfig.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/conf/MethodArgumentConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..1d5faad50922967d28e18c19809b527e254660d5 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/conf/MethodArgumentConfig.java @@ -0,0 +1,12 @@ +package com.schbrain.canal.client.conf; + +import com.schbrain.canal.client.core.MethodArgumentResolver; + +/** + * @author zhuyf + * @date 2022/6/22 + */ +public class MethodArgumentConfig { + + public static MethodArgumentResolver LISTENERMETHODARGUMENTRESOLVER = new MethodArgumentResolver(); +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/EditMetaInfo.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/EditMetaInfo.java new file mode 100644 index 0000000000000000000000000000000000000000..f80c910ebe41fe3b70f03ae97dc667f55ae25c0d --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/EditMetaInfo.java @@ -0,0 +1,20 @@ +package com.schbrain.canal.client.core; + +import lombok.Data; + +/** + * 用于封装修改数据的 + * @author zhuyf + * @date 2022/6/22 + */ +@Data +public class EditMetaInfo { + /** + * 修改后 + */ + private Object after; + /** + * 修改前 + */ + private Object before; +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/HandlerConf.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/HandlerConf.java index ad01bdd943369133202be01c7fb66554438b4c2b..4ea6f1a72a723399cfb35fbff250c2a177bace24 100644 --- a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/HandlerConf.java +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/HandlerConf.java @@ -1,6 +1,7 @@ package com.schbrain.canal.client.core; import com.schbrain.canal.client.event.CanalEvent; +import com.schbrain.canal.client.event.ResolverCanalEvent; import lombok.Data; import lombok.Getter; import lombok.experimental.Accessors; @@ -29,9 +30,12 @@ public class HandlerConf { */ private final List annoListeners; - public HandlerConf(List listeners, Map> tableCanalEventMap, List annoListeners) { + private final Map>> resolverCanalEvents; + + public HandlerConf(List listeners, Map> tableCanalEventMap, List annoListeners,Map>> resolverCanalEvents) { this.listeners = listeners; this.tableCanalEventMap = tableCanalEventMap; this.annoListeners = annoListeners; + this.resolverCanalEvents = resolverCanalEvents; } } diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/MethodArgumentResolver.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/MethodArgumentResolver.java new file mode 100644 index 0000000000000000000000000000000000000000..7f8174a831633b1b34d62a8884a64fdb2d180b0c --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/MethodArgumentResolver.java @@ -0,0 +1,145 @@ +package com.schbrain.canal.client.core; + +import com.schbrain.canal.client.event.ResolverCanalEvent; +import com.schbrain.canal.client.exception.CanalClientException; +import com.schbrain.canal.client.exception.ReflectionException; +import com.schbrain.canal.client.reflector.DefaultReflectorFactory; +import com.schbrain.canal.client.reflector.Reflector; +import com.schbrain.canal.client.reflector.ReflectorFactory; +import com.schbrain.canal.client.reflector.DefaultObjectFactory; +import com.schbrain.canal.client.transfer.ObjectFactory; +import com.schbrain.canal.client.type.TypeHandler; +import com.schbrain.canal.client.type.TypeHandlerRegister; +import com.schbrain.canal.client.utils.Dml; +import com.schbrain.canal.client.utils.MapUnderscoreToCamelCase; +import lombok.extern.slf4j.Slf4j; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.text.ParseException; +import java.util.*; + +/** + * @author zhuyf + * @date 2022/6/21 + */ +@Slf4j +public class MethodArgumentResolver { + + public static Map classNameMappingClass = new HashMap<>(); + + private ObjectFactory objectFactory = new DefaultObjectFactory(); + + private ReflectorFactory reflectorFactory = new DefaultReflectorFactory(); + + /** + * 获取参数类 + * @param event + * @return + */ + public Class getArgumentClass(ResolverCanalEvent event) { + if (!classNameMappingClass.containsKey(event)) { + //接口泛型缓存 + boolean cache = interfaceGeneric(event); + if(!cache){ + //继承类泛型缓存 + cache = extendGeneric(event); + } + if(!cache){ + CanalClientException e = new CanalClientException(event.getClass() + "generic get exception"); + log.error("generic get exception",e); + throw e; + } + } + return classNameMappingClass.get(event); + } + + + /** + * 接口泛型缓存 + * @param event + * @return + */ + private boolean interfaceGeneric(ResolverCanalEvent event){ + Type[] types = event.getClass().getGenericInterfaces(); + if(types == null || types.length == 0){ + return false; + } + for (Type type : types) { + if(!(type instanceof ParameterizedType)){ + continue; + } + ParameterizedType parameterized = (ParameterizedType)type; + Type rawType = parameterized.getRawType(); + if(rawType.equals(ResolverCanalEvent.class)){ + Class clazz = (Class)parameterized.getActualTypeArguments()[0]; + classNameMappingClass.put(event, clazz); + return true; + } + } + return false; + } + + private boolean extendGeneric(ResolverCanalEvent event){ + Class c = event.getClass(); + while(true){ + Type type = c.getGenericSuperclass(); + if(type instanceof ParameterizedType){ + ParameterizedType parameterizedType = (ParameterizedType)type; + Type[] types = parameterizedType.getActualTypeArguments(); + if(types!= null && types.length>0){ + classNameMappingClass.put(event, (Class) types[0]); + return true; + } + } + c = c.getSuperclass(); + if(type.equals(Object.class)){ + break; + } + } + return false; + } + + public List resolver(ResolverCanalEvent event, Dml dml) throws InvocationTargetException, IllegalAccessException, ParseException,ReflectionException { + EditMetaInfo metaInfo = new EditMetaInfo(); + Class c = getArgumentClass(event); + List editMetaInfos=new ArrayList<>(); + List datas = dml.getData(); + for (Dml.Row row : datas) { + Object after = columnsConvertObject(c,row.getData()); + metaInfo.setAfter(after); + Object before = columnsConvertObject(c,row.getOld()); + metaInfo.setBefore(before); + editMetaInfos.add(metaInfo); + } + return editMetaInfos; + } + + + public Object columnsConvertObject(Class c,Map columns) throws ReflectionException, InvocationTargetException, IllegalAccessException, ParseException { + if(columns==null||columns.isEmpty()){ + return null; + } + Reflector classesReflector= reflectorFactory.findForClass(c); + Object o = objectFactory.create(c); + for (String columnName: columns.keySet()) { + String filedName= MapUnderscoreToCamelCase.convertByCache(columnName); + Object value=columns.get(columnName); + if(value==null){ + continue; + } + if (!classesReflector.hasGetter(filedName)) { + continue; + } + Type setterType= classesReflector.getSetterType(filedName); + TypeHandler typeHandler= TypeHandlerRegister.getTypeHandler(setterType); + if(typeHandler==null){ + log.error("未适配到typeHandle{},name:{},value:{},",setterType,filedName,value); + } + classesReflector.getSetInvoker(filedName).invoke(o,new Object[]{typeHandler.convert(value)}); + } + return o; + } + +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/SimpleCanalClient.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/SimpleCanalClient.java index 44419802fcf908a9e7239118f2641d83b111b794..a56abcb4da490696406f631e71b7d293478aa949 100644 --- a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/SimpleCanalClient.java +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/SimpleCanalClient.java @@ -5,17 +5,16 @@ import com.schbrain.canal.client.annotation.CanalEventListener; import com.schbrain.canal.client.annotation.ListenPoint; import com.schbrain.canal.client.annotation.TableFilter; import com.schbrain.canal.client.conf.CanalClientConfig; +import com.schbrain.canal.client.conf.MethodArgumentConfig; import com.schbrain.canal.client.conf.SchbrainCanalConfig; import com.schbrain.canal.client.event.CanalEvent; +import com.schbrain.canal.client.event.ResolverCanalEvent; import com.schbrain.canal.client.transfer.TransponderFactory; import com.schbrain.canal.client.utils.BeanUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang.StringUtils; import org.springframework.core.annotation.AnnotatedElementUtils; -import org.springframework.core.annotation.AnnotationUtils; - -import java.lang.annotation.Annotation; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; @@ -36,6 +35,9 @@ public class SimpleCanalClient extends AbstractCanalClient { * 所有执行器 */ private final List listeners = new ArrayList<>(); + + //解析器事件 + private final Map>> resolverCanalEvents = new HashMap<>(); /** * 表过滤器 */ @@ -53,46 +55,96 @@ public class SimpleCanalClient extends AbstractCanalClient { private void initListeners() { log.info("{}: initializing the listeners....", Thread.currentThread().getName()); + //初始接口监听器 + initEventList(); + //初始注解监听器 + initAnnotionList(); + //初始解析器 + initResolverList(); + log.info("{}: initializing the listeners end.", Thread.currentThread().getName()); + if (log.isWarnEnabled() && listeners.isEmpty() && annoListeners.isEmpty()) { + log.warn("{}: No listener found in context! ", Thread.currentThread().getName()); + } + } + + /** + * 初始事件监听器 + */ + private void initEventList(){ List list = BeanUtil.getBeansOfType(CanalEvent.class); - if(list!=null && list.size() > 0){ - List unFilters = new ArrayList<>(); - for (CanalEvent canalEvent : list) { - TableFilter filter = canalEvent.getClass().getAnnotation(TableFilter.class); - if(filter!=null && StringUtils.isNotBlank(filter.schame()) && StringUtils.isNotBlank(filter.table())){ - String key = filter.schame()+":"+filter.table(); - List filterList = MapUtils.getObject(tableCanalEventMap,key,new ArrayList<>()); - filterList.add(canalEvent); + if(list==null || list.size()<=0){ + return; + } + List unFilters = new ArrayList<>(); + for (CanalEvent canalEvent : list) { + Class[] classes = canalEvent.getClass().getInterfaces(); + for (Class aClass : classes) { + if(ResolverCanalEvent.class.equals(aClass)){ continue; } - unFilters.add(canalEvent); } - if(unFilters!=null && unFilters.size()>0){ - listeners.addAll(unFilters); + TableFilter filter = canalEvent.getClass().getAnnotation(TableFilter.class); + if(filter!=null && StringUtils.isNotBlank(filter.schame()) && StringUtils.isNotBlank(filter.table())){ + String key = filter.schame()+":"+filter.table(); + List filterList = MapUtils.getObject(tableCanalEventMap,key,new ArrayList<>()); + filterList.add(canalEvent); + continue; } + unFilters.add(canalEvent); } + if(unFilters!=null && unFilters.size()>0){ + listeners.addAll(unFilters); + } + } + + /** + * 初始注解监听器 + */ + private void initAnnotionList(){ Map listenerMap = BeanUtil.getBeansWithAnnotation(CanalEventListener.class); - if (listenerMap != null) { - for (Object target : listenerMap.values()) { - Method[] methods = target.getClass().getDeclaredMethods(); - if (methods != null && methods.length > 0) { - for (Method method : methods) { - ListenPoint l = AnnotatedElementUtils.findMergedAnnotation(method,ListenPoint.class); - if (l != null) { - annoListeners.add(new ListenerPoint(target, method, l)); - } + if(listenerMap==null){ + return; + } + for (Object target : listenerMap.values()) { + Method[] methods = target.getClass().getDeclaredMethods(); + if (methods != null && methods.length > 0) { + for (Method method : methods) { + ListenPoint l = AnnotatedElementUtils.findMergedAnnotation(method,ListenPoint.class); + if (l != null) { + annoListeners.add(new ListenerPoint(target, method, l)); } } } } - log.info("{}: initializing the listeners end.", Thread.currentThread().getName()); - if (log.isWarnEnabled() && listeners.isEmpty() && annoListeners.isEmpty()) { - log.warn("{}: No listener found in context! ", Thread.currentThread().getName()); + } + + /** + * 初始解析器事件处理器 + */ + private void initResolverList(){ + List list = BeanUtil.getBeansOfType(ResolverCanalEvent.class); + if(list==null || list.size()<=0){ + return; + } + + MethodArgumentResolver resolver = MethodArgumentConfig.LISTENERMETHODARGUMENTRESOLVER; + + for (ResolverCanalEvent event : list) { + TableFilter filter = event.getClass().getAnnotation(TableFilter.class); + if(filter!=null && StringUtils.isNotBlank(filter.schame()) && StringUtils.isNotBlank(filter.table())){ + String key = filter.schame()+":"+filter.table(); + List> filterList = MapUtils.getObject(resolverCanalEvents,key,new ArrayList<>()); + filterList.add(event); + resolverCanalEvents.put(key,filterList); + resolver.getArgumentClass(event); + continue; + } } } @Override protected void process(CanalConnector connector, Map.Entry config) { - HandlerConf handlerConf = new HandlerConf(listeners,tableCanalEventMap,annoListeners); + HandlerConf handlerConf = new HandlerConf(listeners,tableCanalEventMap,annoListeners,resolverCanalEvents); executor.submit(factory.newTransponder(connector, config,handlerConf)); } diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/event/ResolverCanalEvent.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/event/ResolverCanalEvent.java new file mode 100644 index 0000000000000000000000000000000000000000..a7f11a08721f356fdcdfdc1795aff3692dbdbf9f --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/event/ResolverCanalEvent.java @@ -0,0 +1,62 @@ +package com.schbrain.canal.client.event; + +import com.alibaba.otter.canal.protocol.CanalEntry; +import com.schbrain.canal.client.conf.MethodArgumentConfig; +import com.schbrain.canal.client.core.EditMetaInfo; +import com.schbrain.canal.client.core.MethodArgumentResolver; +import com.schbrain.canal.client.exception.ReflectionException; +import com.schbrain.canal.client.utils.Dml; +import com.schbrain.canal.client.utils.MessageUtil; +import java.lang.reflect.InvocationTargetException; +import java.text.ParseException; +import java.util.List; + +public interface ResolverCanalEvent { + + + default void onEvent(CanalEntry.Entry entry) throws InvocationTargetException, IllegalAccessException, ParseException, ReflectionException { + CanalEntry.Header header = entry.getHeader(); + Dml dml = MessageUtil.parse4Dml(entry); + MethodArgumentResolver resolver = MethodArgumentConfig.LISTENERMETHODARGUMENTRESOLVER; + List metaInfos = resolver.resolver(this,dml); + CanalEntry.EventType eventType = header.getEventType(); + switch (eventType) { + case INSERT: + for (EditMetaInfo metaInfo : metaInfos) { + onInsert(header, (T) metaInfo.getAfter()); + } + break; + case UPDATE: + for (EditMetaInfo metaInfo : metaInfos) { + onUpdate(header,(T) metaInfo.getBefore(),(T) metaInfo.getAfter()); + } + break; + case DELETE: + for (EditMetaInfo metaInfo : metaInfos) { + onDelete(header,(T) metaInfo.getAfter()); + } + break; + default: + break; + } + } + + /** + * onInsert + * @param t + */ + void onInsert(CanalEntry.Header header, T t); + + /** + * onUpdate + * @param before + * @param after + */ + void onUpdate(CanalEntry.Header header, T before,T after); + + /** + * onDelete + * @param t + */ + void onDelete(CanalEntry.Header header, T t); +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/event/SimpleDefCanalEvent.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/event/SimpleDefCanalEvent.java new file mode 100644 index 0000000000000000000000000000000000000000..e5dc4f71700dffb5c6be2511d439bbc90361dbdf --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/event/SimpleDefCanalEvent.java @@ -0,0 +1,18 @@ +package com.schbrain.canal.client.event; + +import com.alibaba.otter.canal.protocol.CanalEntry; + +public class SimpleDefCanalEvent implements DefCanalEvent{ + + @Override + public void onInsert(CanalEntry.Header header, CanalEntry.RowData rowData) { + } + + @Override + public void onUpdate(CanalEntry.Header header, CanalEntry.RowData rowData) { + } + + @Override + public void onDelete(CanalEntry.Header header, CanalEntry.RowData rowData) { + } +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/event/SimpleResolverCanalEvent.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/event/SimpleResolverCanalEvent.java new file mode 100644 index 0000000000000000000000000000000000000000..e561af6fe126daf6335bfaf7f8682dfc41eec49e --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/event/SimpleResolverCanalEvent.java @@ -0,0 +1,23 @@ +package com.schbrain.canal.client.event; + +import com.alibaba.otter.canal.protocol.CanalEntry; + +/** + * @author zhuyf + * @date 2022/6/22 + */ +public class SimpleResolverCanalEvent implements ResolverCanalEvent{ + + @Override + public void onInsert(CanalEntry.Header header, T o) { + } + + @Override + public void onUpdate(CanalEntry.Header header, T before, T after) { + } + + @Override + public void onDelete(CanalEntry.Header header, T o) { + } + +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/exception/PersistenceException.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/exception/PersistenceException.java new file mode 100644 index 0000000000000000000000000000000000000000..5b829f46c3047fd5ce8921bbb27465740ba967b9 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/exception/PersistenceException.java @@ -0,0 +1,20 @@ +package com.schbrain.canal.client.exception; + +public class PersistenceException extends CanalClientException { + private static final long serialVersionUID = -7537395265357977271L; + + public PersistenceException() { + } + + public PersistenceException(String message) { + super(message); + } + + public PersistenceException(String message, Throwable cause) { + super(message, cause); + } + + public PersistenceException(Throwable cause) { + super(cause); + } +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/exception/ReflectionException.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/exception/ReflectionException.java new file mode 100644 index 0000000000000000000000000000000000000000..a826534cc546e5e46a94a36cdc2a28a9aa12ec82 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/exception/ReflectionException.java @@ -0,0 +1,21 @@ +package com.schbrain.canal.client.exception; + + +public class ReflectionException extends Exception { + private static final long serialVersionUID = 7642570221267566591L; + + public ReflectionException() { + } + + public ReflectionException(String message) { + super(message); + } + + public ReflectionException(String message, Throwable cause) { + super(message, cause); + } + + public ReflectionException(Throwable cause) { + super(cause); + } +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/exception/TypeException.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/exception/TypeException.java new file mode 100644 index 0000000000000000000000000000000000000000..15c96d5a150ba628e1420c09eb661685fd8797cd --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/exception/TypeException.java @@ -0,0 +1,20 @@ +package com.schbrain.canal.client.exception; + +public class TypeException extends PersistenceException { + private static final long serialVersionUID = 8614420898975117130L; + + public TypeException() { + } + + public TypeException(String message) { + super(message); + } + + public TypeException(String message, Throwable cause) { + super(message, cause); + } + + public TypeException(Throwable cause) { + super(cause); + } +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/reflector/DefaultObjectFactory.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/reflector/DefaultObjectFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..0e4fb1fb2d549dd7700bdd9f627157d4b765245e --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/reflector/DefaultObjectFactory.java @@ -0,0 +1,131 @@ +package com.schbrain.canal.client.reflector; + +import com.schbrain.canal.client.exception.ReflectionException; +import com.schbrain.canal.client.transfer.ObjectFactory; + +import java.io.Serializable; +import java.lang.reflect.Constructor; +import java.util.*; + +/** + * @author zhuyf + * @date 2022/6/22 + */ +public class DefaultObjectFactory implements ObjectFactory, Serializable { + private static final long serialVersionUID = -8855120656740914948L; + + public DefaultObjectFactory() { + } + + /** + * 创建指定类型的对象 不使用构造函数创建 + * @param type 类型 + * @param + * @return + */ + @Override + public T create(Class type) throws ReflectionException { + return (T) this.create(type, (List)null, (List)null); + } + + /** + * 创建指定类型的对象 + * @param type 类型 + * @param constructorArgTypes 构造函数参数类型列表 + * @param constructorArgs 构造函数参数列表 + * @param + * @return + */ + @Override + public T create(Class type, List> constructorArgTypes, List constructorArgs) throws ReflectionException { + Class classToCreate = this.resolveInterface(type); + return (T) this.instantiateClass(classToCreate, constructorArgTypes, constructorArgs); + } + + @Override + public void setProperties(Properties properties) { + } + + + public T instantiateClass(Class type, List> constructorArgTypes, List constructorArgs) throws ReflectionException { + try { + Constructor constructor; + //判断是否指定了构造函数初始化 + if (constructorArgTypes != null && constructorArgs != null) { + //获得private或public指定参数类型列表的构造函数 注:getConstructor和getDeclaredConstructor的区别是只能获得public + constructor = type.getDeclaredConstructor((Class[])constructorArgTypes.toArray(new Class[constructorArgTypes.size()])); + //如果是私有的 设置可以访问 + if (!constructor.isAccessible()) { + constructor.setAccessible(true); + } + + return (T) constructor.newInstance(constructorArgs.toArray(new Object[constructorArgs.size()])); + } else { + constructor = type.getDeclaredConstructor(); + if (!constructor.isAccessible()) { + constructor.setAccessible(true); + } + + return (T) constructor.newInstance(); + } + } catch (Exception var9) { + StringBuilder argTypes = new StringBuilder(); + if (constructorArgTypes != null && !constructorArgTypes.isEmpty()) { + Iterator i$ = constructorArgTypes.iterator(); + + while(i$.hasNext()) { + Class argType = (Class)i$.next(); + argTypes.append(argType.getSimpleName()); + argTypes.append(","); + } + + argTypes.deleteCharAt(argTypes.length() - 1); + } + + StringBuilder argValues = new StringBuilder(); + if (constructorArgs != null && !constructorArgs.isEmpty()) { + Iterator i$ = constructorArgs.iterator(); + + while(i$.hasNext()) { + Object argValue = i$.next(); + argValues.append(String.valueOf(argValue)); + argValues.append(","); + } + + argValues.deleteCharAt(argValues.length() - 1); + } + throw new ReflectionException("Error instantiating " + type + " with invalid types (" + argTypes + ") or values (" + argValues + "). Cause: " + var9,var9); + } + } + + /** + * 如果是定义结合类型 类型改为实现类 + * @param type + * @return + */ + protected Class resolveInterface(Class type) { + Class classToCreate; + if (type != List.class && type != Collection.class && type != Iterable.class) { + if (type == Map.class) { + classToCreate = HashMap.class; + } else if (type == SortedSet.class) { + classToCreate = TreeSet.class; + } else if (type == Set.class) { + classToCreate = HashSet.class; + } else { + classToCreate = type; + } + } else { + classToCreate = ArrayList.class; + } + + return classToCreate; + } + + @Override + public boolean isCollection(Class type) { + return Collection.class.isAssignableFrom(type); + } + +} + diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/reflector/DefaultReflectorFactory.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/reflector/DefaultReflectorFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..08710058fb6ac1a6341e58ae6b77938380458758 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/reflector/DefaultReflectorFactory.java @@ -0,0 +1,48 @@ +package com.schbrain.canal.client.reflector; + + +import com.schbrain.canal.client.exception.ReflectionException; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class DefaultReflectorFactory implements ReflectorFactory { + private boolean classCacheEnabled = true; + + /** + * 将反射的元数据信息封装保存到Reflector,优化性能 + */ + private final ConcurrentMap, Reflector> reflectorMap = new ConcurrentHashMap(); + + public DefaultReflectorFactory() { + } + + @Override + public boolean isClassCacheEnabled() { + return this.classCacheEnabled; + } + + @Override + public void setClassCacheEnabled(boolean classCacheEnabled) { + this.classCacheEnabled = classCacheEnabled; + } + + /** + * 获得指定类型反射元数据信息 + * @param type + * @return + */ + @Override + public Reflector findForClass(Class type) throws ReflectionException { + if (this.classCacheEnabled) { + Reflector cached = (Reflector)this.reflectorMap.get(type); + if (cached == null) { + cached = new Reflector(type); + this.reflectorMap.put(type, cached); + } + return cached; + } else { + return new Reflector(type); + } + } +} \ No newline at end of file diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/reflector/GetFieldInvoker.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/reflector/GetFieldInvoker.java new file mode 100644 index 0000000000000000000000000000000000000000..f81fb9ab772d8046d4b7bc4d77ff76c4c484200b --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/reflector/GetFieldInvoker.java @@ -0,0 +1,23 @@ +package com.schbrain.canal.client.reflector; + + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; + +public class GetFieldInvoker implements Invoker { + private Field field; + + public GetFieldInvoker(Field field) { + this.field = field; + } + + @Override + public Object invoke(Object target, Object[] args) throws IllegalAccessException, InvocationTargetException { + return this.field.get(target); + } + + @Override + public Class getType() { + return this.field.getType(); + } +} \ No newline at end of file diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/reflector/Invoker.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/reflector/Invoker.java new file mode 100644 index 0000000000000000000000000000000000000000..0d279c19c290e69f5aa1d2796164d451016fe3e0 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/reflector/Invoker.java @@ -0,0 +1,22 @@ +package com.schbrain.canal.client.reflector; + +import java.lang.reflect.InvocationTargetException; + + +public interface Invoker { + /** + * 给指定对象的当前属性设置值 + * @param var1 + * @param var2 + * @return + * @throws IllegalAccessException + * @throws InvocationTargetException + */ + Object invoke(Object var1, Object[] var2) throws IllegalAccessException, InvocationTargetException; + + /** + * 类型 + * @return + */ + Class getType(); +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/reflector/MethodInvoker.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/reflector/MethodInvoker.java new file mode 100644 index 0000000000000000000000000000000000000000..6106f963bd170f5c17c9b80591f05f4d0f4eb562 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/reflector/MethodInvoker.java @@ -0,0 +1,33 @@ +package com.schbrain.canal.client.reflector; + + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +/** + * 方法元数据封装 以及提供调用的方法 + */ +public class MethodInvoker implements Invoker { + private Class type; + private Method method; + + public MethodInvoker(Method method) { + this.method = method; + if (method.getParameterTypes().length == 1) { + this.type = method.getParameterTypes()[0]; + } else { + this.type = method.getReturnType(); + } + + } + + @Override + public Object invoke(Object target, Object[] args) throws IllegalAccessException, InvocationTargetException { + return this.method.invoke(target, args); + } + + @Override + public Class getType() { + return this.type; + } +} \ No newline at end of file diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/reflector/PropertyNamer.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/reflector/PropertyNamer.java new file mode 100644 index 0000000000000000000000000000000000000000..8b5bc036f55bfde326a4f70526a1ec2b1ed7e9b6 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/reflector/PropertyNamer.java @@ -0,0 +1,40 @@ +package com.schbrain.canal.client.reflector; + + +import com.schbrain.canal.client.exception.ReflectionException; + +import java.util.Locale; + +public final class PropertyNamer { + private PropertyNamer() { + } + + public static String methodToProperty(String name) throws ReflectionException { + if (name.startsWith("is")) { + name = name.substring(2); + } else { + if (!name.startsWith("get") && !name.startsWith("set")) { + throw new ReflectionException("Error parsing property name '" + name + "'. Didn't start with 'is', 'get' or 'set'."); + } + name = name.substring(3); + } + + if (name.length() == 1 || name.length() > 1 && !Character.isUpperCase(name.charAt(1))) { + name = name.substring(0, 1).toLowerCase(Locale.ENGLISH) + name.substring(1); + } + + return name; + } + + public static boolean isProperty(String name) { + return isGetter(name) || isSetter(name); + } + + public static boolean isGetter(String name) { + return name.startsWith("get") && name.length() > 3 || name.startsWith("is") && name.length() > 2; + } + + public static boolean isSetter(String name) { + return name.startsWith("set") && name.length() > 3; + } +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/reflector/Reflector.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/reflector/Reflector.java new file mode 100644 index 0000000000000000000000000000000000000000..7a8eb62624a5edebc5f3ac3d9ee11d79d0e81ffd --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/reflector/Reflector.java @@ -0,0 +1,466 @@ +package com.schbrain.canal.client.reflector; + +import com.schbrain.canal.client.exception.ReflectionException; + +import java.lang.reflect.*; +import java.util.*; + +public class Reflector { + private static final String[] EMPTY_STRING_ARRAY = new String[0]; + private Class type; + private String[] readablePropertyNames; + private String[] writeablePropertyNames; + private Map setMethods; + private Map getMethods; + private Map> setTypes; + private Map> getTypes; + private Constructor defaultConstructor; + private Map caseInsensitivePropertyMap; + + /** + * 初始化并将对应的元数据信息封装起来 + * @param clazz + */ + public Reflector(Class clazz) throws ReflectionException { + this.readablePropertyNames = EMPTY_STRING_ARRAY; + this.writeablePropertyNames = EMPTY_STRING_ARRAY; + //初始化几个map + this.setMethods = new HashMap(); + this.getMethods = new HashMap(); + this.setTypes = new HashMap(); + this.getTypes = new HashMap(); + this.caseInsensitivePropertyMap = new HashMap(); + this.type = clazz; + //反射查找默认构造函数到defaultConstructor + this.addDefaultConstructor(clazz); + //反射获得所有的get方法元数据保存到以Invoker保存getMethods + this.addGetMethods(clazz); + //反射获得所有的set方法元数据以invokersetMethods + this.addSetMethods(clazz); + //反射获得所有的Fields元数据 + this.addFields(clazz); + this.readablePropertyNames = (String[])this.getMethods.keySet().toArray(new String[this.getMethods.keySet().size()]); + this.writeablePropertyNames = (String[])this.setMethods.keySet().toArray(new String[this.setMethods.keySet().size()]); + String[] arr$ = this.readablePropertyNames; + int len$ = arr$.length; + + int i$; + String propName; + for(i$ = 0; i$ < len$; ++i$) { + propName = arr$[i$]; + this.caseInsensitivePropertyMap.put(propName.toUpperCase(Locale.ENGLISH), propName); + } + + arr$ = this.writeablePropertyNames; + len$ = arr$.length; + + for(i$ = 0; i$ < len$; ++i$) { + propName = arr$[i$]; + this.caseInsensitivePropertyMap.put(propName.toUpperCase(Locale.ENGLISH), propName); + } + + } + + private void addDefaultConstructor(Class clazz) { + Constructor[] consts = clazz.getDeclaredConstructors(); + Constructor[] arr$ = consts; + int len$ = consts.length; + + for(int i$ = 0; i$ < len$; ++i$) { + Constructor constructor = arr$[i$]; + if (constructor.getParameterTypes().length == 0) { + if (canAccessPrivateMethods()) { + try { + constructor.setAccessible(true); + } catch (Exception var8) { + ; + } + } + + if (constructor.isAccessible()) { + this.defaultConstructor = constructor; + } + } + } + + } + + private void addGetMethods(Class cls) throws ReflectionException { + Map> conflictingGetters = new HashMap(); + Method[] methods = this.getClassMethods(cls); + Method[] arr$ = methods; + int len$ = methods.length; + + for(int i$ = 0; i$ < len$; ++i$) { + Method method = arr$[i$]; + String name = method.getName(); + if (name.startsWith("get") && name.length() > 3) { + if (method.getParameterTypes().length == 0) { + name = PropertyNamer.methodToProperty(name); + this.addMethodConflict(conflictingGetters, name, method); + } + } else if (name.startsWith("is") && name.length() > 2 && method.getParameterTypes().length == 0) { + name = PropertyNamer.methodToProperty(name); + this.addMethodConflict(conflictingGetters, name, method); + } + } + + this.resolveGetterConflicts(conflictingGetters); + } + + private void resolveGetterConflicts(Map> conflictingGetters) throws ReflectionException { + Iterator i$ = conflictingGetters.keySet().iterator(); + + while(true) { + while(i$.hasNext()) { + String propName = (String)i$.next(); + List getters = (List)conflictingGetters.get(propName); + Iterator iterator = getters.iterator(); + Method firstMethod = (Method)iterator.next(); + if (getters.size() == 1) { + this.addGetMethod(propName, firstMethod); + } else { + Method getter = firstMethod; + Class getterType = firstMethod.getReturnType(); + + while(iterator.hasNext()) { + Method method = (Method)iterator.next(); + Class methodType = method.getReturnType(); + if (methodType.equals(getterType)) { + throw new ReflectionException("Illegal overloaded getter method with ambiguous type for property " + propName + " in class " + firstMethod.getDeclaringClass() + ". This breaks the JavaBeans " + "specification and can cause unpredicatble results."); + } + + if (!methodType.isAssignableFrom(getterType)) { + if (!getterType.isAssignableFrom(methodType)) { + throw new ReflectionException("Illegal overloaded getter method with ambiguous type for property " + propName + " in class " + firstMethod.getDeclaringClass() + ". This breaks the JavaBeans " + "specification and can cause unpredicatble results."); + } + + getter = method; + getterType = methodType; + } + } + + this.addGetMethod(propName, getter); + } + } + + return; + } + } + + private void addGetMethod(String name, Method method) { + if (this.isValidPropertyName(name)) { + this.getMethods.put(name, new MethodInvoker(method)); + Type returnType = TypeParameterResolver.resolveReturnType(method, this.type); + this.getTypes.put(name, this.typeToClass(returnType)); + } + + } + + private void addSetMethods(Class cls) throws ReflectionException { + Map> conflictingSetters = new HashMap(); + Method[] methods = this.getClassMethods(cls); + Method[] arr$ = methods; + int len$ = methods.length; + + for(int i$ = 0; i$ < len$; ++i$) { + Method method = arr$[i$]; + String name = method.getName(); + if (name.startsWith("set") && name.length() > 3 && method.getParameterTypes().length == 1) { + name = PropertyNamer.methodToProperty(name); + this.addMethodConflict(conflictingSetters, name, method); + } + } + + this.resolveSetterConflicts(conflictingSetters); + } + + private void addMethodConflict(Map> conflictingMethods, String name, Method method) { + List list = (List)conflictingMethods.get(name); + if (list == null) { + list = new ArrayList(); + conflictingMethods.put(name, list); + } + + ((List)list).add(method); + } + + private void resolveSetterConflicts(Map> conflictingSetters) throws ReflectionException { + Iterator i$ = conflictingSetters.keySet().iterator(); + + while(true) { + while(i$.hasNext()) { + String propName = (String)i$.next(); + List setters = (List)conflictingSetters.get(propName); + Method firstMethod = (Method)setters.get(0); + if (setters.size() == 1) { + this.addSetMethod(propName, firstMethod); + } else { + Class expectedType = (Class)this.getTypes.get(propName); + if (expectedType == null) { + throw new ReflectionException("Illegal overloaded setter method with ambiguous type for property " + propName + " in class " + firstMethod.getDeclaringClass() + ". This breaks the JavaBeans " + "specification and can cause unpredicatble results."); + } + + Iterator methods = setters.iterator(); + Method setter = null; + + while(methods.hasNext()) { + Method method = (Method)methods.next(); + if (method.getParameterTypes().length == 1 && expectedType.equals(method.getParameterTypes()[0])) { + setter = method; + break; + } + } + + if (setter == null) { + throw new ReflectionException("Illegal overloaded setter method with ambiguous type for property " + propName + " in class " + firstMethod.getDeclaringClass() + ". This breaks the JavaBeans " + "specification and can cause unpredicatble results."); + } + + this.addSetMethod(propName, setter); + } + } + + return; + } + } + + private void addSetMethod(String name, Method method) { + if (this.isValidPropertyName(name)) { + this.setMethods.put(name, new MethodInvoker(method)); + Type[] paramTypes = TypeParameterResolver.resolveParamTypes(method, this.type); + this.setTypes.put(name, this.typeToClass(paramTypes[0])); + } + + } + + private Class typeToClass(Type src) { + Class result = null; + if (src instanceof Class) { + result = (Class)src; + } else if (src instanceof ParameterizedType) { + result = (Class)((ParameterizedType)src).getRawType(); + } else if (src instanceof GenericArrayType) { + Type componentType = ((GenericArrayType)src).getGenericComponentType(); + if (componentType instanceof Class) { + result = Array.newInstance((Class)componentType, 0).getClass(); + } else { + Class componentClass = this.typeToClass(componentType); + result = Array.newInstance(componentClass, 0).getClass(); + } + } + + if (result == null) { + result = Object.class; + } + + return result; + } + + private void addFields(Class clazz) { + Field[] fields = clazz.getDeclaredFields(); + Field[] arr$ = fields; + int len$ = fields.length; + + for(int i$ = 0; i$ < len$; ++i$) { + Field field = arr$[i$]; + if (canAccessPrivateMethods()) { + try { + field.setAccessible(true); + } catch (Exception var8) { + ; + } + } + + if (field.isAccessible()) { + if (!this.setMethods.containsKey(field.getName())) { + int modifiers = field.getModifiers(); + if (!Modifier.isFinal(modifiers) || !Modifier.isStatic(modifiers)) { + this.addSetField(field); + } + } + + if (!this.getMethods.containsKey(field.getName())) { + this.addGetField(field); + } + } + } + + if (clazz.getSuperclass() != null) { + this.addFields(clazz.getSuperclass()); + } + + } + + private void addSetField(Field field) { + if (this.isValidPropertyName(field.getName())) { + this.setMethods.put(field.getName(), new SetFieldInvoker(field)); + Type fieldType = TypeParameterResolver.resolveFieldType(field, this.type); + this.setTypes.put(field.getName(), this.typeToClass(fieldType)); + } + + } + + private void addGetField(Field field) { + if (this.isValidPropertyName(field.getName())) { + this.getMethods.put(field.getName(), new GetFieldInvoker(field)); + Type fieldType = TypeParameterResolver.resolveFieldType(field, this.type); + this.getTypes.put(field.getName(), this.typeToClass(fieldType)); + } + + } + + private boolean isValidPropertyName(String name) { + return !name.startsWith("$") && !"serialVersionUID".equals(name) && !"class".equals(name); + } + + private Method[] getClassMethods(Class cls) { + Map uniqueMethods = new HashMap(); + + for(Class currentClass = cls; currentClass != null; currentClass = currentClass.getSuperclass()) { + this.addUniqueMethods(uniqueMethods, currentClass.getDeclaredMethods()); + Class[] interfaces = currentClass.getInterfaces(); + Class[] arr$ = interfaces; + int len$ = interfaces.length; + + for(int i$ = 0; i$ < len$; ++i$) { + Class anInterface = arr$[i$]; + this.addUniqueMethods(uniqueMethods, anInterface.getMethods()); + } + } + + Collection methods = uniqueMethods.values(); + return (Method[])methods.toArray(new Method[methods.size()]); + } + + private void addUniqueMethods(Map uniqueMethods, Method[] methods) { + Method[] arr$ = methods; + int len$ = methods.length; + + for(int i$ = 0; i$ < len$; ++i$) { + Method currentMethod = arr$[i$]; + if (!currentMethod.isBridge()) { + String signature = this.getSignature(currentMethod); + if (!uniqueMethods.containsKey(signature)) { + if (canAccessPrivateMethods()) { + try { + currentMethod.setAccessible(true); + } catch (Exception var9) { + ; + } + } + + uniqueMethods.put(signature, currentMethod); + } + } + } + + } + + private String getSignature(Method method) { + StringBuilder sb = new StringBuilder(); + Class returnType = method.getReturnType(); + if (returnType != null) { + sb.append(returnType.getName()).append('#'); + } + + sb.append(method.getName()); + Class[] parameters = method.getParameterTypes(); + + for(int i = 0; i < parameters.length; ++i) { + if (i == 0) { + sb.append(':'); + } else { + sb.append(','); + } + + sb.append(parameters[i].getName()); + } + + return sb.toString(); + } + + private static boolean canAccessPrivateMethods() { + try { + SecurityManager securityManager = System.getSecurityManager(); + if (null != securityManager) { + securityManager.checkPermission(new ReflectPermission("suppressAccessChecks")); + } + + return true; + } catch (SecurityException var1) { + return false; + } + } + + public Class getType() { + return this.type; + } + + public Constructor getDefaultConstructor() throws ReflectionException { + if (this.defaultConstructor != null) { + return this.defaultConstructor; + } else { + throw new ReflectionException("There is no default constructor for " + this.type); + } + } + + public boolean hasDefaultConstructor() { + return this.defaultConstructor != null; + } + + public Invoker getSetInvoker(String propertyName) throws ReflectionException { + Invoker method = (Invoker)this.setMethods.get(propertyName); + if (method == null) { + throw new ReflectionException("There is no setter for property named '" + propertyName + "' in '" + this.type + "'"); + } else { + return method; + } + } + + public Invoker getGetInvoker(String propertyName) throws ReflectionException { + Invoker method = (Invoker)this.getMethods.get(propertyName); + if (method == null) { + throw new ReflectionException("There is no getter for property named '" + propertyName + "' in '" + this.type + "'"); + } else { + return method; + } + } + + public Class getSetterType(String propertyName) throws ReflectionException { + Class clazz = (Class)this.setTypes.get(propertyName); + if (clazz == null) { + throw new ReflectionException("There is no setter for property named '" + propertyName + "' in '" + this.type + "'"); + } else { + return clazz; + } + } + + public Class getGetterType(String propertyName) throws ReflectionException { + Class clazz = (Class)this.getTypes.get(propertyName); + if (clazz == null) { + throw new ReflectionException("There is no getter for property named '" + propertyName + "' in '" + this.type + "'"); + } else { + return clazz; + } + } + + public String[] getGetablePropertyNames() { + return this.readablePropertyNames; + } + + public String[] getSetablePropertyNames() { + return this.writeablePropertyNames; + } + + public boolean hasSetter(String propertyName) { + return this.setMethods.keySet().contains(propertyName); + } + + public boolean hasGetter(String propertyName) { + return this.getMethods.keySet().contains(propertyName); + } + + public String findPropertyName(String name) { + return (String)this.caseInsensitivePropertyMap.get(name.toUpperCase(Locale.ENGLISH)); + } +} \ No newline at end of file diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/reflector/ReflectorFactory.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/reflector/ReflectorFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..20bec2d22122e5ef3f1a254e7f08e28562960671 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/reflector/ReflectorFactory.java @@ -0,0 +1,14 @@ +package com.schbrain.canal.client.reflector; + + +import com.schbrain.canal.client.exception.ReflectionException; + + +public interface ReflectorFactory { + + boolean isClassCacheEnabled(); + + void setClassCacheEnabled(boolean var1); + + Reflector findForClass(Class var1) throws ReflectionException; +} \ No newline at end of file diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/reflector/SetFieldInvoker.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/reflector/SetFieldInvoker.java new file mode 100644 index 0000000000000000000000000000000000000000..a213dadedd72352b43564b0be2dac7e8ef63dbb1 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/reflector/SetFieldInvoker.java @@ -0,0 +1,40 @@ +package com.schbrain.canal.client.reflector; + + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; + +/** + * 封装filed元数据信息 + */ +public class SetFieldInvoker implements Invoker { + + private Field field; + + public SetFieldInvoker(Field field) { + this.field = field; + } + + /** + * 给指定对象的当前属性设置值 + * @param target + * @param args + * @return + * @throws IllegalAccessException + * @throws InvocationTargetException + */ + @Override + public Object invoke(Object target, Object[] args) throws IllegalAccessException, InvocationTargetException { + this.field.set(target, args[0]); + return null; + } + + /** + * 类型 + * @return + */ + @Override + public Class getType() { + return this.field.getType(); + } +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/reflector/TypeParameterResolver.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/reflector/TypeParameterResolver.java new file mode 100644 index 0000000000000000000000000000000000000000..ff25886d011cd44533f970930b700f37f143540e --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/reflector/TypeParameterResolver.java @@ -0,0 +1,256 @@ +package com.schbrain.canal.client.reflector; + + +import java.lang.reflect.*; +import java.util.Arrays; + +public class TypeParameterResolver { + public static Type resolveFieldType(Field field, Type srcType) { + Type fieldType = field.getGenericType(); + Class declaringClass = field.getDeclaringClass(); + return resolveType(fieldType, srcType, declaringClass); + } + + public static Type resolveReturnType(Method method, Type srcType) { + Type returnType = method.getGenericReturnType(); + Class declaringClass = method.getDeclaringClass(); + return resolveType(returnType, srcType, declaringClass); + } + + public static Type[] resolveParamTypes(Method method, Type srcType) { + Type[] paramTypes = method.getGenericParameterTypes(); + Class declaringClass = method.getDeclaringClass(); + Type[] result = new Type[paramTypes.length]; + + for(int i = 0; i < paramTypes.length; ++i) { + result[i] = resolveType(paramTypes[i], srcType, declaringClass); + } + + return result; + } + + private static Type resolveType(Type type, Type srcType, Class declaringClass) { + if (type instanceof TypeVariable) { + return resolveTypeVar((TypeVariable)type, srcType, declaringClass); + } else if (type instanceof ParameterizedType) { + return resolveParameterizedType((ParameterizedType)type, srcType, declaringClass); + } else { + return type instanceof GenericArrayType ? resolveGenericArrayType((GenericArrayType)type, srcType, declaringClass) : type; + } + } + + private static Type resolveGenericArrayType(GenericArrayType genericArrayType, Type srcType, Class declaringClass) { + Type componentType = genericArrayType.getGenericComponentType(); + Type resolvedComponentType = null; + if (componentType instanceof TypeVariable) { + resolvedComponentType = resolveTypeVar((TypeVariable)componentType, srcType, declaringClass); + } else if (componentType instanceof GenericArrayType) { + resolvedComponentType = resolveGenericArrayType((GenericArrayType)componentType, srcType, declaringClass); + } else if (componentType instanceof ParameterizedType) { + resolvedComponentType = resolveParameterizedType((ParameterizedType)componentType, srcType, declaringClass); + } + + return (Type)(resolvedComponentType instanceof Class ? Array.newInstance((Class)resolvedComponentType, 0).getClass() : new GenericArrayTypeImpl((Type)resolvedComponentType)); + } + + private static ParameterizedType resolveParameterizedType(ParameterizedType parameterizedType, Type srcType, Class declaringClass) { + Class rawType = (Class)parameterizedType.getRawType(); + Type[] typeArgs = parameterizedType.getActualTypeArguments(); + Type[] args = new Type[typeArgs.length]; + + for(int i = 0; i < typeArgs.length; ++i) { + if (typeArgs[i] instanceof TypeVariable) { + args[i] = resolveTypeVar((TypeVariable)typeArgs[i], srcType, declaringClass); + } else if (typeArgs[i] instanceof ParameterizedType) { + args[i] = resolveParameterizedType((ParameterizedType)typeArgs[i], srcType, declaringClass); + } else if (typeArgs[i] instanceof WildcardType) { + args[i] = resolveWildcardType((WildcardType)typeArgs[i], srcType, declaringClass); + } else { + args[i] = typeArgs[i]; + } + } + + return new ParameterizedTypeImpl(rawType, (Type)null, args); + } + + private static Type resolveWildcardType(WildcardType wildcardType, Type srcType, Class declaringClass) { + Type[] lowerBounds = resolveWildcardTypeBounds(wildcardType.getLowerBounds(), srcType, declaringClass); + Type[] upperBounds = resolveWildcardTypeBounds(wildcardType.getUpperBounds(), srcType, declaringClass); + return new WildcardTypeImpl(lowerBounds, upperBounds); + } + + private static Type[] resolveWildcardTypeBounds(Type[] bounds, Type srcType, Class declaringClass) { + Type[] result = new Type[bounds.length]; + + for(int i = 0; i < bounds.length; ++i) { + if (bounds[i] instanceof TypeVariable) { + result[i] = resolveTypeVar((TypeVariable)bounds[i], srcType, declaringClass); + } else if (bounds[i] instanceof ParameterizedType) { + result[i] = resolveParameterizedType((ParameterizedType)bounds[i], srcType, declaringClass); + } else if (bounds[i] instanceof WildcardType) { + result[i] = resolveWildcardType((WildcardType)bounds[i], srcType, declaringClass); + } else { + result[i] = bounds[i]; + } + } + + return result; + } + + private static Type resolveTypeVar(TypeVariable typeVar, Type srcType, Class declaringClass) { + Class clazz; + if (srcType instanceof Class) { + clazz = (Class)srcType; + } else { + if (!(srcType instanceof ParameterizedType)) { + throw new IllegalArgumentException("The 2nd arg must be Class or ParameterizedType, but was: " + srcType.getClass()); + } + + ParameterizedType parameterizedType = (ParameterizedType)srcType; + clazz = (Class)parameterizedType.getRawType(); + } + + if (clazz == declaringClass) { + Type[] bounds = typeVar.getBounds(); + return (Type)(bounds.length > 0 ? bounds[0] : Object.class); + } else { + Type superclass = clazz.getGenericSuperclass(); + Type result = scanSuperTypes(typeVar, srcType, declaringClass, clazz, superclass); + if (result != null) { + return result; + } else { + Type[] superInterfaces = clazz.getGenericInterfaces(); + Type[] var7 = superInterfaces; + int var8 = superInterfaces.length; + + for(int var9 = 0; var9 < var8; ++var9) { + Type superInterface = var7[var9]; + result = scanSuperTypes(typeVar, srcType, declaringClass, clazz, superInterface); + if (result != null) { + return result; + } + } + + return Object.class; + } + } + } + + private static Type scanSuperTypes(TypeVariable typeVar, Type srcType, Class declaringClass, Class clazz, Type superclass) { + if (superclass instanceof ParameterizedType) { + ParameterizedType parentAsType = (ParameterizedType)superclass; + Class parentAsClass = (Class)parentAsType.getRawType(); + TypeVariable[] parentTypeVars = parentAsClass.getTypeParameters(); + if (srcType instanceof ParameterizedType) { + parentAsType = translateParentTypeVars((ParameterizedType)srcType, clazz, parentAsType); + } + + if (declaringClass == parentAsClass) { + for(int i = 0; i < parentTypeVars.length; ++i) { + if (typeVar == parentTypeVars[i]) { + return parentAsType.getActualTypeArguments()[i]; + } + } + } + + if (declaringClass.isAssignableFrom(parentAsClass)) { + return resolveTypeVar(typeVar, parentAsType, declaringClass); + } + } else if (superclass instanceof Class && declaringClass.isAssignableFrom((Class)superclass)) { + return resolveTypeVar(typeVar, superclass, declaringClass); + } + + return null; + } + + private static ParameterizedType translateParentTypeVars(ParameterizedType srcType, Class srcClass, ParameterizedType parentType) { + Type[] parentTypeArgs = parentType.getActualTypeArguments(); + Type[] srcTypeArgs = srcType.getActualTypeArguments(); + TypeVariable[] srcTypeVars = srcClass.getTypeParameters(); + Type[] newParentArgs = new Type[parentTypeArgs.length]; + boolean noChange = true; + + for(int i = 0; i < parentTypeArgs.length; ++i) { + if (parentTypeArgs[i] instanceof TypeVariable) { + for(int j = 0; j < srcTypeVars.length; ++j) { + if (srcTypeVars[j] == parentTypeArgs[i]) { + noChange = false; + newParentArgs[i] = srcTypeArgs[j]; + } + } + } else { + newParentArgs[i] = parentTypeArgs[i]; + } + } + + return (ParameterizedType)(noChange ? parentType : new ParameterizedTypeImpl((Class)parentType.getRawType(), (Type)null, newParentArgs)); + } + + private TypeParameterResolver() { + } + + static class GenericArrayTypeImpl implements GenericArrayType { + private Type genericComponentType; + + GenericArrayTypeImpl(Type genericComponentType) { + this.genericComponentType = genericComponentType; + } + + @Override + public Type getGenericComponentType() { + return this.genericComponentType; + } + } + + static class WildcardTypeImpl implements WildcardType { + private Type[] lowerBounds; + private Type[] upperBounds; + + WildcardTypeImpl(Type[] lowerBounds, Type[] upperBounds) { + this.lowerBounds = lowerBounds; + this.upperBounds = upperBounds; + } + + @Override + public Type[] getLowerBounds() { + return this.lowerBounds; + } + + @Override + public Type[] getUpperBounds() { + return this.upperBounds; + } + } + + static class ParameterizedTypeImpl implements ParameterizedType { + private Class rawType; + private Type ownerType; + private Type[] actualTypeArguments; + + public ParameterizedTypeImpl(Class rawType, Type ownerType, Type[] actualTypeArguments) { + this.rawType = rawType; + this.ownerType = ownerType; + this.actualTypeArguments = actualTypeArguments; + } + + @Override + public Type[] getActualTypeArguments() { + return this.actualTypeArguments; + } + + @Override + public Type getOwnerType() { + return this.ownerType; + } + + @Override + public Type getRawType() { + return this.rawType; + } + + @Override + public String toString() { + return "ParameterizedTypeImpl [rawType=" + this.rawType + ", ownerType=" + this.ownerType + ", actualTypeArguments=" + Arrays.toString(this.actualTypeArguments) + "]"; + } + } +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/transfer/AbstractBasicMessageTransponder.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/transfer/AbstractBasicMessageTransponder.java index efbc2faf207c577bba6d95c310d3b9978857bdaf..2d7c73a5771f41bf11f934f129a7e87aa4421ed0 100644 --- a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/transfer/AbstractBasicMessageTransponder.java +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/transfer/AbstractBasicMessageTransponder.java @@ -8,6 +8,7 @@ import com.schbrain.canal.client.conf.CanalClientConfig; import com.schbrain.canal.client.core.HandlerConf; import com.schbrain.canal.client.core.ListenerPoint; import com.schbrain.canal.client.event.CanalEvent; +import com.schbrain.canal.client.event.ResolverCanalEvent; import com.schbrain.canal.client.exception.CanalClientException; import lombok.extern.slf4j.Slf4j; import java.lang.reflect.Method; @@ -51,12 +52,11 @@ public abstract class AbstractBasicMessageTransponder extends AbstractMessageTra for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { //distribute to listener interfaces distributeByImpl(header,rowChange.getEventType(), rowData); + //带解析器的执行器 //distribute to annotation listener interfaces - distributeByAnnotation(destination, - header, - rowChange.getEventType(), - rowData); + distributeByAnnotation(destination, header, rowChange.getEventType(), rowData); } + resolverByImpl(entry); } } @@ -90,6 +90,33 @@ public abstract class AbstractBasicMessageTransponder extends AbstractMessageTra } } + /** + * 解析器执行器 + * @param entry + */ + protected void resolverByImpl(CanalEntry.Entry entry) { + if(handlerConf == null){ + return; + } + //带过滤器的执行 + Map>> filterEvent = handlerConf.resolverCanalEvents(); + if(filterEvent == null){ + return; + } + CanalEntry.Header header = entry.getHeader(); + String key = String.format("%s:%s", header.getSchemaName(), header.getTableName()); + List> events = filterEvent.get(key); + if(events!=null && events.size()>0){ + for (ResolverCanalEvent event : events) { + try { + event.onEvent(entry); + }catch (Exception e){ + log.warn("resolver event handel error",e); + } + } + } + } + private void doEvent(CanalEvent event,CanalEntry.Header header,CanalEntry.EventType eventType,CanalEntry.RowData rowData){ try { event.onEvent(header,eventType, rowData); diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/transfer/ObjectFactory.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/transfer/ObjectFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..58b9cb6f7e520a1910390140b735fd874e19dfa7 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/transfer/ObjectFactory.java @@ -0,0 +1,21 @@ +package com.schbrain.canal.client.transfer; + +import com.schbrain.canal.client.exception.ReflectionException; + +import java.util.List; +import java.util.Properties; + +/** + * @author zhuyf + * @date 2022/6/22 + */ +public interface ObjectFactory { + + void setProperties(Properties var1); + + T create(Class var1) throws ReflectionException; + + T create(Class var1, List> var2, List var3) throws ReflectionException; + + boolean isCollection(Class var1); +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/type/BaseTypeHandler.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/type/BaseTypeHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..625034a1fa9b4d6c01edb01e799dbc7aa3f0f8e0 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/type/BaseTypeHandler.java @@ -0,0 +1,16 @@ +package com.schbrain.canal.client.type; + +import java.text.ParseException; + +public abstract class BaseTypeHandler implements TypeHandler { + + public T convertNullableResult(Object value) throws ParseException { + if(value==null){ + return null; + } + return convert(value); + } + + public abstract T convert(Object value) throws ParseException; + +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/type/BigDecimalTypeHandler.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/type/BigDecimalTypeHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..40efa3f5084c6521491a23b2ea53ee96d7dce8d4 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/type/BigDecimalTypeHandler.java @@ -0,0 +1,10 @@ +package com.schbrain.canal.client.type; + +import java.math.BigDecimal; + +public class BigDecimalTypeHandler extends BaseTypeHandler { + @Override + public BigDecimal convert(Object value) { + return new BigDecimal(value.toString()); + } +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/type/BigIntegerTypeHandler.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/type/BigIntegerTypeHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..29216f4627fc72c152c65cfe51b1e95b2964a97b --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/type/BigIntegerTypeHandler.java @@ -0,0 +1,10 @@ +package com.schbrain.canal.client.type; + +import java.math.BigInteger; + +public class BigIntegerTypeHandler extends BaseTypeHandler { + @Override + public BigInteger convert(Object value) { + return new BigInteger(value.toString()); + } +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/type/BooleanTypeHandler.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/type/BooleanTypeHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..3a6ab601c7387f0705a4c41a88f7395417214393 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/type/BooleanTypeHandler.java @@ -0,0 +1,8 @@ +package com.schbrain.canal.client.type; + +public class BooleanTypeHandler extends BaseTypeHandler{ + @Override + public Boolean convert(Object value) { + return new Boolean(value.toString()); + } +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/type/DateTypeHandler.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/type/DateTypeHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..308272e5e05876d6cb50de2630d7a12d34cebba6 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/type/DateTypeHandler.java @@ -0,0 +1,20 @@ +package com.schbrain.canal.client.type; + +import java.sql.Timestamp; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; + + +public class DateTypeHandler extends BaseTypeHandler { + + @Override + public Date convert(Object value) throws ParseException { + String valueStr=value.toString(); + if(valueStr.indexOf("-")>0){ + SimpleDateFormat sdf = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" ); + return sdf.parse(valueStr); + } + return new Timestamp(Long.valueOf(value.toString())); + } +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/type/EnumTypeHandler.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/type/EnumTypeHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..906067773f65bbdfb87fb7f09815fe45de9628d0 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/type/EnumTypeHandler.java @@ -0,0 +1,18 @@ +package com.schbrain.canal.client.type; + +import java.text.ParseException; + +public class EnumTypeHandler> extends BaseTypeHandler{ + private final Class type; + public EnumTypeHandler(Class type) { + if (type == null) { + throw new IllegalArgumentException("Type argument cannot be null"); + } else { + this.type = type; + } + } + @Override + public E convert(Object value) throws ParseException { + return Enum.valueOf(type,value.toString()); + } +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/type/IntegerTypeHandler.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/type/IntegerTypeHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..808120766683d761c89c0ca53b4b0c3aebc2699c --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/type/IntegerTypeHandler.java @@ -0,0 +1,8 @@ +package com.schbrain.canal.client.type; + +public class IntegerTypeHandler extends BaseTypeHandler { + @Override + public Integer convert(Object value) { + return new Integer(value.toString()); + } +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/type/LongTypeHandler.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/type/LongTypeHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..35e4a60d56d096fd0597a258f429c1de0438e31a --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/type/LongTypeHandler.java @@ -0,0 +1,8 @@ +package com.schbrain.canal.client.type; + +public class LongTypeHandler extends BaseTypeHandler { + @Override + public Long convert(Object value) { + return new Long(value.toString()); + } +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/type/StringTypeHandler.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/type/StringTypeHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..6ae17f58279498760d13adc9149276733c5f7f4d --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/type/StringTypeHandler.java @@ -0,0 +1,11 @@ +package com.schbrain.canal.client.type; + +import java.text.ParseException; + + +public class StringTypeHandler extends BaseTypeHandler { + @Override + public String convert(Object value) throws ParseException { + return value.toString(); + } +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/type/TimestampTypeHandler.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/type/TimestampTypeHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..75bf3007de860bdf4ae424625fbfe56afb60e82b --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/type/TimestampTypeHandler.java @@ -0,0 +1,17 @@ +package com.schbrain.canal.client.type; + +import java.sql.Timestamp; +import java.text.ParseException; +import java.text.SimpleDateFormat; + +public class TimestampTypeHandler extends BaseTypeHandler { + @Override + public Timestamp convert(Object value) throws ParseException { + String valueStr=value.toString(); + if(valueStr.indexOf("-")>0){ + SimpleDateFormat sdf = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" ); + return new Timestamp(sdf.parse(valueStr).getTime()); + } + return new Timestamp(Long.valueOf(value.toString())); + } +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/type/TypeHandler.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/type/TypeHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..0a9254ff9d7f7405d75c863c1ae28ec7a8d8a9c0 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/type/TypeHandler.java @@ -0,0 +1,7 @@ +package com.schbrain.canal.client.type; + +import java.text.ParseException; + +public interface TypeHandler { + public T convert(Object value) throws ParseException; +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/type/TypeHandlerRegister.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/type/TypeHandlerRegister.java new file mode 100644 index 0000000000000000000000000000000000000000..7a96ee0bf23535d3d8ebf9c8938bee5dbf3f39ce --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/type/TypeHandlerRegister.java @@ -0,0 +1,67 @@ +package com.schbrain.canal.client.type; + + + +import com.schbrain.canal.client.exception.TypeException; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Type; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Timestamp; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +public class TypeHandlerRegister { + private static Map typeHandlerMap=new HashMap<>(); + private static Class< ? extends TypeHandler> defaultEnumTypeHandler=EnumTypeHandler.class; + static { + register((Type)Integer.class,new IntegerTypeHandler()); + register((Class)Integer.TYPE, new IntegerTypeHandler()); + register((Type)BigDecimal.class,new BigDecimalTypeHandler()); + register((Type)BigInteger.class,new BigIntegerTypeHandler()); + register((Type)Boolean.class,new BooleanTypeHandler()); + register((Type)Boolean.TYPE,new BigDecimalTypeHandler()); + register((Type)Date.class,new DateTypeHandler()); + register((Type)Long.class,new LongTypeHandler()); + register((Type)Long.TYPE,new LongTypeHandler()); + register((Type)Timestamp.class,new TimestampTypeHandler()); + register((Type)String.class,new StringTypeHandler()); + } + public static TypeHandler getTypeHandler(Type c){ + if(!typeHandlerMap.containsKey(c)){ + if (Enum.class.isAssignableFrom((Class)c)){ + Class cs=(Class)c; + Class enumClass = cs.isAnonymousClass() ? cs.getSuperclass() : cs; + register(c,(TypeHandler)getInstance(enumClass,defaultEnumTypeHandler)); + } + } + TypeHandler typeHandler= typeHandlerMap.get(c); + return typeHandler; + + } + public static void register(Type c,TypeHandler t){ + typeHandlerMap.put(c,t); + } + + public static TypeHandler getInstance(Class javaTypeClass, Class typeHandlerClass) { + Constructor c; + if (javaTypeClass != null) { + try { + c = typeHandlerClass.getConstructor(Class.class); + return (TypeHandler)c.newInstance(javaTypeClass); + } catch (NoSuchMethodException var5) { + } catch (Exception var6) { + throw new TypeException("Failed invoking constructor for handler " + typeHandlerClass, var6); + } + } + + try { + c = typeHandlerClass.getConstructor(); + return (TypeHandler)c.newInstance(); + } catch (Exception var4) { + throw new TypeException("Unable to find a usable constructor for " + typeHandlerClass, var4); + } + } +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/utils/Dml.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/utils/Dml.java new file mode 100644 index 0000000000000000000000000000000000000000..7c03c2f92adbafee6dd10bc0bca47815c67cd1c3 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/utils/Dml.java @@ -0,0 +1,66 @@ +package com.schbrain.canal.client.utils; + +import lombok.Data; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * @author zhuyf + * @date 2022/6/22 + */ +@Data +public class Dml { + /** + * 对应canal的实例或者MQ的topic + */ + private String destination; + /** + * 数据库或schema + */ + private String database; + /** + * 表名 + */ + private String table; + + private List pkNames; + + private Boolean isDdl; + /** + * 类型: INSERT UPDATE DELETE + */ + private String type; + /** + * 执行耗时 + */ + private Long es; + // dml build timeStamp + /** + * 同步时间 + */ + private Long ts; + /** + * 执行的sql, dml sql为空 + */ + private String sql; + /** + * 数据列表 + */ + private List data; + /** + * 数据列表 + */ + private Set updatedNames; + /** + * 数据列 + */ + @Data + public static class Row{ + + Map data; + + Map old; + } +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/utils/JdbcTypeUtil.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/utils/JdbcTypeUtil.java new file mode 100644 index 0000000000000000000000000000000000000000..7c1d50769a4ed28f0310fa2d8fc4be45305d407c --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/utils/JdbcTypeUtil.java @@ -0,0 +1,115 @@ +package com.schbrain.canal.client.utils; + +import lombok.extern.slf4j.Slf4j; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.sql.Types; + +/** + * @author zhuyf + * @date 2022/6/22 + */ +@Slf4j +public class JdbcTypeUtil { + + private static boolean isText(String columnType) { + return "LONGTEXT".equalsIgnoreCase(columnType) || "MEDIUMTEXT".equalsIgnoreCase(columnType) + || "TEXT".equalsIgnoreCase(columnType) || "TINYTEXT".equalsIgnoreCase(columnType); + } + + public static Object typeConvert(String tableName ,String columnName, String value, int sqlType, String mysqlType) { + if (value == null + || (value.equals("") && !(isText(mysqlType) || sqlType == Types.CHAR || sqlType == Types.VARCHAR || sqlType == Types.LONGVARCHAR))) { + return null; + } + + try { + Object res; + switch (sqlType) { + case Types.INTEGER: + res = Integer.parseInt(value); + break; + case Types.SMALLINT: + res = Short.parseShort(value); + break; + case Types.BIT: + case Types.TINYINT: + res = Byte.parseByte(value); + break; + case Types.BIGINT: + if (mysqlType.startsWith("bigint") && mysqlType.endsWith("unsigned")) { + res = new BigInteger(value); + } else { + res = Long.parseLong(value); + } + break; + // case Types.BIT: + case Types.BOOLEAN: + res = !"0".equals(value); + break; + case Types.DOUBLE: + case Types.FLOAT: + res = Double.parseDouble(value); + break; + case Types.REAL: + res = Float.parseFloat(value); + break; + case Types.DECIMAL: + case Types.NUMERIC: + res = new BigDecimal(value); + break; + case Types.BINARY: + case Types.VARBINARY: + case Types.LONGVARBINARY: + case Types.BLOB: + res = value.getBytes("ISO-8859-1"); + break; + case Types.DATE: + if (!value.startsWith("0000-00-00")) { + java.util.Date date = Util.parseDate(value); + if (date != null) { + res = new Date(date.getTime()); + } else { + res = null; + } + } else { + res = null; + } + break; + case Types.TIME: { + java.util.Date date = Util.parseDate(value); + if (date != null) { + res = new Time(date.getTime()); + } else { + res = null; + } + break; + } + case Types.TIMESTAMP: + if (!value.startsWith("0000-00-00")) { + java.util.Date date = Util.parseDate(value); + if (date != null) { + res = new Timestamp(date.getTime()); + } else { + res = null; + } + } else { + res = null; + } + break; + case Types.CLOB: + default: + res = value; + break; + } + return res; + } catch (Exception e) { + log.error("table: {} column: {}, failed convert type {} to {}", tableName, columnName, value, sqlType); + return value; + } + } +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/utils/MapUnderscoreToCamelCase.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/utils/MapUnderscoreToCamelCase.java new file mode 100644 index 0000000000000000000000000000000000000000..da3780ed397ba187d6bee4ff90b407209c31961b --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/utils/MapUnderscoreToCamelCase.java @@ -0,0 +1,19 @@ +package com.schbrain.canal.client.utils; + +import com.google.common.base.CaseFormat; + +import java.util.concurrent.ConcurrentHashMap; + +public class MapUnderscoreToCamelCase { + + private static ConcurrentHashMap names=new ConcurrentHashMap<>(); + + public static String convertByCache(String name){ + if(names.containsKey(name)) { + return names.get(name); + } + String convertName= CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, name); + names.put(name,convertName); + return convertName; + } +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/utils/MessageUtil.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/utils/MessageUtil.java new file mode 100644 index 0000000000000000000000000000000000000000..48da719ef78f6785da247998ac40908f92b929b5 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/utils/MessageUtil.java @@ -0,0 +1,145 @@ +package com.schbrain.canal.client.utils; + +import com.alibaba.otter.canal.protocol.CanalEntry; +import com.alibaba.otter.canal.protocol.Message; +import lombok.extern.slf4j.Slf4j; +import org.springframework.util.CollectionUtils; + +import java.util.*; + +/** + * @author zhuyf + * @date 2022/6/22 + */ +@Slf4j +public class MessageUtil { + + public static Dml parse4Dml(CanalEntry.Entry entry) { + if (entry == null) { + return null; + } + if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN + || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { + return null; + } + CanalEntry.RowChange rowChange; + try { + rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); + } catch (Exception e) { + throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), + e); + } + CanalEntry.EventType eventType = rowChange.getEventType(); + final Dml dml = new Dml(); + dml.setIsDdl(rowChange.getIsDdl()); + dml.setDatabase(entry.getHeader().getSchemaName()); + dml.setTable(entry.getHeader().getTableName()); + dml.setType(eventType.toString()); + dml.setEs(entry.getHeader().getExecuteTime()); + dml.setTs(System.currentTimeMillis()); + dml.setSql(rowChange.getSql()); + if (!rowChange.getIsDdl()) { + List rows=new ArrayList<>(); + Set updateSet = new HashSet<>(); + dml.setPkNames(new ArrayList<>()); + int i = 0; + for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { + if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE + && eventType != CanalEntry.EventType.DELETE) { + continue; + } + Dml.Row dmlRow=new Dml.Row(); + Map row = new LinkedHashMap<>(); + List columns; + + if (eventType == CanalEntry.EventType.DELETE) { + columns = rowData.getBeforeColumnsList(); + } else { + columns = rowData.getAfterColumnsList(); + } + + for (CanalEntry.Column column : columns) { + if (i == 0) { + if (column.getIsKey()) { + dml.getPkNames().add(column.getName()); + } + } + if (column.getIsNull()) { + row.put(column.getName(), null); + } else { +// row.put(column.getName(), +// JdbcTypeUtil.typeConvert(dml.getTable(), +// column.getName(), +// column.getValue(), +// column.getSqlType(), +// column.getMysqlType())); + row.put(column.getName(),column.getValue()); + } + // 获取update为true的字段 + if (column.getUpdated()) { + updateSet.add(column.getName()); + } + } + if (!row.isEmpty()) { + dmlRow.setData(row); + } + if (eventType == CanalEntry.EventType.UPDATE) { + Map rowOld = new LinkedHashMap<>(); + for (CanalEntry.Column column : rowData.getBeforeColumnsList()) { + if (column.getIsNull()) { + rowOld.put(column.getName(), null); + } else { +// rowOld.put(column.getName(), +// JdbcTypeUtil.typeConvert(dml.getTable(), +// column.getName(), +// column.getValue(), +// column.getSqlType(), +// column.getMysqlType())); + rowOld.put(column.getName(), + column.getValue()); + } + } + // update操作将记录修改前的值 + if (!rowOld.isEmpty()) { + dmlRow.setOld(rowOld); + } + } + if(!CollectionUtils.isEmpty(dmlRow.getData())||!CollectionUtils.isEmpty(dmlRow.getOld())){ + rows.add(dmlRow); + } + i++; + } + dml.setData(rows); + dml.setUpdatedNames(updateSet); + } + return dml; + } + + + + private static List> changeRows(String table, List> rows, Map sqlTypes, Map mysqlTypes) { + List> result = new ArrayList<>(); + for (Map row : rows) { + Map resultRow = new LinkedHashMap<>(); + for (Map.Entry entry : row.entrySet()) { + String columnName = entry.getKey(); + String columnValue = entry.getValue(); + + Integer sqlType = sqlTypes.get(columnName); + if (sqlType == null) { + continue; + } + + String mysqlType = mysqlTypes.get(columnName); + if (mysqlType == null) { + continue; + } + + Object finalValue = JdbcTypeUtil.typeConvert(table, columnName, columnValue, sqlType, mysqlType); + resultRow.put(columnName, finalValue); + } + result.add(resultRow); + } + return result; + } +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/utils/Util.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/utils/Util.java new file mode 100644 index 0000000000000000000000000000000000000000..19eaa2e53298c4ad7a9feb0b657e52a7c5fc1d12 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/utils/Util.java @@ -0,0 +1,261 @@ +package com.schbrain.canal.client.utils; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import org.apache.commons.lang.StringUtils; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.sql.DataSource; +import java.sql.*; +import java.time.*; +import java.time.format.DateTimeFormatter; +import java.util.Date; +import java.util.List; +import java.util.TimeZone; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; + +public class Util { + + private static final Logger logger = LoggerFactory.getLogger(Util.class); + + /** + * 通过DS执行sql + */ + public static Object sqlRS(DataSource ds, String sql, Function fun) { + try (Connection conn = ds.getConnection(); + Statement stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) { + stmt.setFetchSize(Integer.MIN_VALUE); + try (ResultSet rs = stmt.executeQuery(sql)) { + return fun.apply(rs); + } + } catch (Exception e) { + logger.error("sqlRs has error, sql: {} ", sql); + throw new RuntimeException(e); + } + } + + public static Object sqlRS(DataSource ds, String sql, List values, Function fun) { + try (Connection conn = ds.getConnection()) { + try (PreparedStatement pstmt = conn + .prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) { + pstmt.setFetchSize(Integer.MIN_VALUE); + if (values != null) { + for (int i = 0; i < values.size(); i++) { + pstmt.setObject(i + 1, values.get(i)); + } + } + try (ResultSet rs = pstmt.executeQuery()) { + return fun.apply(rs); + } + } + } catch (Exception e) { + logger.error("sqlRs has error, sql: {} ", sql); + throw new RuntimeException(e); + } + } + + /** + * sql执行获取resultSet + * + * @param conn sql connection + * @param sql sql + * @param consumer 回调方法 + */ + public static void sqlRS(Connection conn, String sql, Consumer consumer) { + try (Statement stmt = conn.createStatement(); ResultSet rs = stmt.executeQuery(sql)) { + consumer.accept(rs); + } catch (SQLException e) { + logger.error(e.getMessage(), e); + } + } + + + public static String cleanColumn(String column) { + if (column == null) { + return null; + } + if (column.contains("`")) { + column = column.replaceAll("`", ""); + } + + if (column.contains("'")) { + column = column.replaceAll("'", ""); + } + + if (column.contains("\"")) { + column = column.replaceAll("\"", ""); + } + + return column; + } + + public static ThreadPoolExecutor newFixedThreadPool(int nThreads, long keepAliveTime) { + return new ThreadPoolExecutor(nThreads, + nThreads, + keepAliveTime, + TimeUnit.MILLISECONDS, + new SynchronousQueue<>(), + (r, exe) -> { + if (!exe.isShutdown()) { + try { + exe.getQueue().put(r); + } catch (InterruptedException e) { + // ignore + } + } + }); + } + + public static ThreadPoolExecutor newSingleThreadExecutor(long keepAliveTime) { + return new ThreadPoolExecutor(1, + 1, + keepAliveTime, + TimeUnit.MILLISECONDS, + new SynchronousQueue<>(), + (r, exe) -> { + if (!exe.isShutdown()) { + try { + exe.getQueue().put(r); + } catch (InterruptedException e) { + // ignore + } + } + }); + } + + public final static String timeZone; // 当前时区 + private static DateTimeZone dateTimeZone; + + static { + TimeZone localTimeZone = TimeZone.getDefault(); + int rawOffset = localTimeZone.getRawOffset(); + String symbol = "+"; + if (rawOffset < 0) { + symbol = "-"; + } + rawOffset = Math.abs(rawOffset); + int offsetHour = rawOffset / 3600000; + int offsetMinute = rawOffset % 3600000 / 60000; + String hour = String.format("%1$02d", offsetHour); + String minute = String.format("%1$02d", offsetMinute); + timeZone = symbol + hour + ":" + minute; + dateTimeZone = DateTimeZone.forID(timeZone); + TimeZone.setDefault(TimeZone.getTimeZone("GMT" + timeZone)); + } + + /** + * 通用日期时间字符解析 + * + * @param datetimeStr 日期时间字符串 + * @return Date + */ + public static Date parseDate(String datetimeStr) { + if (StringUtils.isEmpty(datetimeStr)) { + return null; + } + datetimeStr = datetimeStr.trim(); + if (datetimeStr.contains("-")) { + if (datetimeStr.contains(":")) { + datetimeStr = datetimeStr.replace(" ", "T"); + } + } else if (datetimeStr.contains(":")) { + datetimeStr = "T" + datetimeStr; + } + + DateTime dateTime = new DateTime(datetimeStr, dateTimeZone); + + return dateTime.toDate(); + } + + private static LoadingCache dateFormatterCache = CacheBuilder.newBuilder() + .build(new CacheLoader() { + + @Override + public DateTimeFormatter load(String key) { + return DateTimeFormatter.ofPattern(key); + } + }); + + public static Date parseDate2(String datetimeStr) { + if (StringUtils.isEmpty(datetimeStr)) { + return null; + } + try { + datetimeStr = datetimeStr.trim(); + int len = datetimeStr.length(); + if (datetimeStr.contains("-") && datetimeStr.contains(":") && datetimeStr.contains(".")) { + // 包含日期+时间+毫秒 + // 取毫秒位数 + int msLen = len - datetimeStr.indexOf(".") - 1; + StringBuilder ms = new StringBuilder(); + for (int i = 0; i < msLen; i++) { + ms.append("S"); + } + String formatter = "yyyy-MM-dd HH:mm:ss." + ms; + + DateTimeFormatter dateTimeFormatter = dateFormatterCache.get(formatter); + LocalDateTime dateTime = LocalDateTime.parse(datetimeStr, dateTimeFormatter); + return Date.from(dateTime.atZone(ZoneId.systemDefault()).toInstant()); + } else if (datetimeStr.contains("-") && datetimeStr.contains(":")) { + // 包含日期+时间 + // 判断包含时间位数 + int i = datetimeStr.indexOf(":"); + i = datetimeStr.indexOf(":", i + 1); + String formatter; + if (i > -1) { + formatter = "yyyy-MM-dd HH:mm:ss"; + } else { + formatter = "yyyy-MM-dd HH:mm"; + } + + DateTimeFormatter dateTimeFormatter = dateFormatterCache.get(formatter); + LocalDateTime dateTime = LocalDateTime.parse(datetimeStr, dateTimeFormatter); + return Date.from(dateTime.atZone(ZoneId.systemDefault()).toInstant()); + } else if (datetimeStr.contains("-")) { + // 只包含日期 + String formatter = "yyyy-MM-dd"; + DateTimeFormatter dateTimeFormatter = dateFormatterCache.get(formatter); + LocalDate localDate = LocalDate.parse(datetimeStr, dateTimeFormatter); + return Date.from(localDate.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant()); + } else if (datetimeStr.contains(":")) { + // 只包含时间 + String formatter; + if (datetimeStr.contains(".")) { + // 包含毫秒 + int msLen = len - datetimeStr.indexOf(".") - 1; + StringBuilder ms = new StringBuilder(); + for (int i = 0; i < msLen; i++) { + ms.append("S"); + } + formatter = "HH:mm:ss." + ms; + } else { + // 判断包含时间位数 + int i = datetimeStr.indexOf(":"); + i = datetimeStr.indexOf(":", i + 1); + if (i > -1) { + formatter = "HH:mm:ss"; + } else { + formatter = "HH:mm"; + } + } + DateTimeFormatter dateTimeFormatter = dateFormatterCache.get(formatter); + LocalTime localTime = LocalTime.parse(datetimeStr, dateTimeFormatter); + LocalDate localDate = LocalDate.of(1970, Month.JANUARY, 1); + LocalDateTime localDateTime = LocalDateTime.of(localDate, localTime); + return Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant()); + } + } catch (Throwable e) { + logger.error(e.getMessage(), e); + } + + return null; + } +} diff --git a/schbrain-canal-web/src/main/java/com/schbrain/bean/User.java b/schbrain-canal-web/src/main/java/com/schbrain/bean/User.java new file mode 100644 index 0000000000000000000000000000000000000000..98f517024c8576ed3c0a92663cfeac1473c6f91c --- /dev/null +++ b/schbrain-canal-web/src/main/java/com/schbrain/bean/User.java @@ -0,0 +1,128 @@ +package com.schbrain.bean; + +import lombok.Data; + +import java.io.Serializable; +import java.util.Date; + +/** + * @author zhuyf + * @date 2022/6/21 + */ +@Data +public class User implements Serializable { + + /** + * ID + */ + private Long id; + + /** + * PKID + */ + private String pkId; + + /** + * 接入方ID + */ + private Long appInfoId; + + /** + * 用户ID + */ + private String userid; + + /** + * 姓名 + */ + private String name; + + /** + * 别名 + */ + private String alias; + + /** + * 性别 + */ + private String gender; + + /** + * 头像 + */ + private String avatar; + + /** + * 激活状态1=已激活,2=已禁用,4=未激活,5=退出企业。 + */ + private Integer status; + + /** + * 手机号码 + */ + private String mobile; + + /** + * 职务信息 + */ + private String position; + + /** + * 邮箱 + */ + private String email; + + /** + * 头像缩略图 + */ + private String thumbAvatar; + + /** + * 座机 + */ + private String telephone; + + /** + * 扩展属性 + */ + private String extattr; + + /** + * 员工个人二维码 + */ + private String qrCode; + + /** + * 成员对外属性 + */ + private String externalProfile; + + /** + * 对外职务 + */ + private String externalPosition; + + /** + * 地址 + */ + private String address; + + /** + * 主部门 + */ + private Long mainDepartment; + /** + * 主部门 + */ + private String openUserid; + + /** + * 创建时间 + */ + private Date createTime; + + /** + * 更新时间 + */ + private Date modifyTime; +} diff --git a/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent4.java b/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent4.java new file mode 100644 index 0000000000000000000000000000000000000000..0391cfc84a6a9fee2580fae31374004b4ccbbe9c --- /dev/null +++ b/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent4.java @@ -0,0 +1,38 @@ +package com.schbrain.web; + +import com.alibaba.fastjson.JSONObject; +import com.alibaba.otter.canal.protocol.CanalEntry; +import com.schbrain.bean.User; +import com.schbrain.canal.client.annotation.TableFilter; +import com.schbrain.canal.client.event.SimpleResolverCanalEvent; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +/** + * @author zhuyf + * @date 2022/6/16 + */ +@Service("myCanalEvent4") +@TableFilter(table = "wechat_user",schame = "kp_user") +@Slf4j +public class MyCanalEvent4 extends SimpleResolverCanalEvent { + + @Override + public void onInsert(CanalEntry.Header header, User user) { + String s = JSONObject.toJSONString(user); + log.info("onInsert:{}",s); + } + + @Override + public void onUpdate(CanalEntry.Header header, User before, User after) { + String s = JSONObject.toJSONString(before); + String b = JSONObject.toJSONString(after); + log.info("onUpdate,before:{},after:{}",s,b); + } + + @Override + public void onDelete(CanalEntry.Header header, User user) { + String s = JSONObject.toJSONString(user); + log.info("onDelete:{}",s); + } +} diff --git a/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent5.java b/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent5.java new file mode 100644 index 0000000000000000000000000000000000000000..a425d99fde4a8787d24a061de56d0f0ce3537c73 --- /dev/null +++ b/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent5.java @@ -0,0 +1,39 @@ +package com.schbrain.web; + +import com.alibaba.fastjson.JSONObject; +import com.alibaba.otter.canal.protocol.CanalEntry; +import com.schbrain.bean.User; +import com.schbrain.canal.client.annotation.TableFilter; +import com.schbrain.canal.client.event.ResolverCanalEvent; +import com.schbrain.canal.client.event.SimpleResolverCanalEvent; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +/** + * @author zhuyf + * @date 2022/6/16 + */ +@Service("myCanalEvent5") +@TableFilter(table = "wechat_user",schame = "kp_user") +@Slf4j +public class MyCanalEvent5 implements ResolverCanalEvent { + + @Override + public void onInsert(CanalEntry.Header header, User user) { + String s = JSONObject.toJSONString(user); + log.info("onInsert:{}",s); + } + + @Override + public void onUpdate(CanalEntry.Header header, User before, User after) { + String s = JSONObject.toJSONString(before); + String b = JSONObject.toJSONString(after); + log.info("onUpdate,before:{},after:{}",s,b); + } + + @Override + public void onDelete(CanalEntry.Header header, User user) { + String s = JSONObject.toJSONString(user); + log.info("onDelete:{}",s); + } +} diff --git a/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent6.java b/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent6.java new file mode 100644 index 0000000000000000000000000000000000000000..3c0796142e315fb7cb6a5c821876278dac9a3b77 --- /dev/null +++ b/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent6.java @@ -0,0 +1,38 @@ +package com.schbrain.web; + +import com.alibaba.fastjson.JSONObject; +import com.alibaba.otter.canal.protocol.CanalEntry; +import com.schbrain.bean.User; +import com.schbrain.canal.client.annotation.TableFilter; +import com.schbrain.canal.client.event.ResolverCanalEvent; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +/** + * @author zhuyf + * @date 2022/6/16 + */ +@Slf4j +@Service("myCanalEvent6") +@TableFilter(table = "wechat_user",schame = "kp_user") +public class MyCanalEvent6 extends MyCanalEvent4 { + + @Override + public void onInsert(CanalEntry.Header header, User user) { + String s = JSONObject.toJSONString(user); + log.info("onInsert:{}",s); + } + + @Override + public void onUpdate(CanalEntry.Header header, User before, User after) { + String s = JSONObject.toJSONString(before); + String b = JSONObject.toJSONString(after); + log.info("onUpdate,before:{},after:{}",s,b); + } + + @Override + public void onDelete(CanalEntry.Header header, User user) { + String s = JSONObject.toJSONString(user); + log.info("onDelete:{}",s); + } +} diff --git a/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent7.java b/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent7.java new file mode 100644 index 0000000000000000000000000000000000000000..cc6fdd7fd46d2f463745df54ab67a33f9b221cfe --- /dev/null +++ b/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent7.java @@ -0,0 +1,37 @@ +package com.schbrain.web; + +import com.alibaba.fastjson.JSONObject; +import com.alibaba.otter.canal.protocol.CanalEntry; +import com.schbrain.bean.User; +import com.schbrain.canal.client.annotation.TableFilter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +/** + * @author zhuyf + * @date 2022/6/16 + */ +@Slf4j +@Service("myCanalEvent7") +@TableFilter(table = "wechat_user",schame = "kp_user") +public class MyCanalEvent7 extends MyCanalEvent6 { + + @Override + public void onInsert(CanalEntry.Header header, User user) { + String s = JSONObject.toJSONString(user); + log.info("onInsert:{}",s); + } + + @Override + public void onUpdate(CanalEntry.Header header, User before, User after) { + String s = JSONObject.toJSONString(before); + String b = JSONObject.toJSONString(after); + log.info("onUpdate,before:{},after:{}",s,b); + } + + @Override + public void onDelete(CanalEntry.Header header, User user) { + String s = JSONObject.toJSONString(user); + log.info("onDelete:{}",s); + } +} diff --git a/schbrain-canal-web/src/main/resources/application.properties b/schbrain-canal-web/src/main/resources/application.properties index cecd338178c3d6ab2f1416ab809d9feee0f05f4a..967425acce9fe890bd7057b3874bcf2e0217fc2a 100644 --- a/schbrain-canal-web/src/main/resources/application.properties +++ b/schbrain-canal-web/src/main/resources/application.properties @@ -5,19 +5,19 @@ canal.client.instances.kp_user.retryCount=10 canal.client.instances.kp_user.zkHosts=192.168.22.22:2181,192.168.22.26:2181,192.168.22.21:2181 #canal.client.instances.screenschbrain.addresses=192.168.36.66:11111 -canal.client.instances.screenschbrain.username= -canal.client.instances.screenschbrain.password= -canal.client.instances.screenschbrain.retryCount=10 -canal.client.instances.screenschbrain.subscribe=kp_weekly.comment_student -canal.client.instances.screenschbrain.zkHosts=192.168.2.48:2181,192.168.2.47:2181,192.168.2.43:2181 - - -canal.client.instances.qicheng_czzs.addresses=192.168.36.66:11111 -canal.client.instances.qicheng_czzs.username= -canal.client.instances.qicheng_czzs.password= -canal.client.instances.qicheng_czzs.retryCount=10 -canal.client.instances.qicheng_czzs.zkHosts=192.168.22.22:2181,192.168.22.26:2181,192.168.22.21:2181 - +#canal.client.instances.screenschbrain.username= +#canal.client.instances.screenschbrain.password= +#canal.client.instances.screenschbrain.retryCount=10 +#canal.client.instances.screenschbrain.subscribe=kp_weekly.comment_student +#canal.client.instances.screenschbrain.zkHosts=192.168.2.48:2181,192.168.2.47:2181,192.168.2.43:2181 +#canal.client.instances.qicheng_czzs.addresses=192.168.36.66:11111 +#canal.client.instances.qicheng_czzs.username= +#canal.client.instances.qicheng_czzs.password= +#canal.client.instances.qicheng_czzs.retryCount=10 +#canal.client.instances.qicheng_czzs.zkHosts=192.168.22.22:2181,192.168.22.26:2181,192.168.22.21:2181 +# +# +# diff --git a/schbrain-canal-web/src/test/java/com/schbrain/web/MyCanalEvent6Test.java b/schbrain-canal-web/src/test/java/com/schbrain/web/MyCanalEvent6Test.java new file mode 100644 index 0000000000000000000000000000000000000000..cb752542bd27f6c9bbb465baf205bb6886db77f1 --- /dev/null +++ b/schbrain-canal-web/src/test/java/com/schbrain/web/MyCanalEvent6Test.java @@ -0,0 +1,62 @@ +package com.schbrain.web; + +import com.schbrain.canal.client.event.ResolverCanalEvent; +import com.schbrain.canal.client.event.SimpleResolverCanalEvent; + +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; + +/** + * @author zhuyf + * @date 2022/6/22 + */ +public class MyCanalEvent6Test { + + public static void main(String[] args) { + Class g = null; + MyCanalEvent7 event5 = new MyCanalEvent7(); + Class c = event5.getClass(); + while(true){ + Class aClass = interfaceGeneric(event5); + if(aClass!=null){ + g = aClass; + break; + } + Type type = c.getGenericSuperclass(); + if(type instanceof ParameterizedType){ + ParameterizedType parameterizedType = (ParameterizedType)type; + Type[] types = parameterizedType.getActualTypeArguments(); + if(types!= null && types.length>0){ + g = (Class) types[0]; + break; + } + } + c = c.getSuperclass(); + } + System.out.println(g); + } + + /** + * 接口泛型缓存 + * @param event + * @return + */ + private static Class interfaceGeneric(ResolverCanalEvent event){ + Type[] types = event.getClass().getGenericInterfaces(); + if(types == null || types.length == 0){ + return null; + } + for (Type type : types) { + if(!(type instanceof ParameterizedType)){ + continue; + } + ParameterizedType parameterized = (ParameterizedType)type; + Type rawType = parameterized.getRawType(); + if(rawType.equals(ResolverCanalEvent.class)){ + Class clazz = (Class)parameterized.getActualTypeArguments()[0]; + return clazz; + } + } + return null; + } +}