Commit c7f52d04 authored by zhuyunfeng's avatar zhuyunfeng

增加泛型注解获取

parent 2dfc2478
......@@ -17,7 +17,7 @@
<name>schbrain-canal</name>
<properties>
<revision>1.0.0-RELEASE</revision>
<revision>1.1.0-SNAPSHOT</revision>
</properties>
<modules>
......
......@@ -23,6 +23,11 @@
<artifactId>canal.protocol</artifactId>
<version>1.1.5</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.4</version>
</dependency>
<dependency>
<groupId>com.schbrain.framework</groupId>
<artifactId>schbrain-spring-support</artifactId>
......
......@@ -12,6 +12,10 @@ import java.lang.annotation.*;
@Component
public @interface CanalEventListener {
/**
* bean name
* @return
*/
@AliasFor(annotation = Component.class)
String value() default "";
......
......@@ -7,11 +7,9 @@ import java.lang.annotation.*;
/**
* ListenPoint for delete
*
* @author chen.qian
* @date 2018/3/19
* @author zhuyf
* @date 2022/06/19
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
......
......@@ -6,12 +6,10 @@ import org.springframework.core.annotation.AliasFor;
import java.lang.annotation.*;
/**
* ListenPoint for insert
*
* @author chen.qian
* @date 2018/3/19
* ListenPoint for inster
* @author zhuyf
* @date 2022/06/19
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
......
......@@ -6,12 +6,9 @@ import java.lang.annotation.*;
/**
* used to indicate that method(or methods) is(are) the candidate of the
* canal event distributor
*
* @author chen.qian
* @date 2018/3/19
* @author zhuyf
* @date 2022/06/19
*/
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
......
......@@ -6,7 +6,7 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Table
* table filter
* @author zhuyf
* @date 2022/6/16
*/
......
......@@ -7,11 +7,9 @@ import java.lang.annotation.*;
/**
* ListenPoint for update
*
* @author chen.qian
* @date 2018/3/19
* @author zhuyf
* @date 2022/06/19
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
......
package com.schbrain.canal.client.conf;
import com.schbrain.canal.client.core.MethodArgumentResolver;
/**
* @author zhuyf
* @date 2022/6/22
*/
public class MethodArgumentConfig {
public static MethodArgumentResolver LISTENERMETHODARGUMENTRESOLVER = new MethodArgumentResolver();
}
package com.schbrain.canal.client.core;
import lombok.Data;
/**
* 用于封装修改数据的
* @author zhuyf
* @date 2022/6/22
*/
@Data
public class EditMetaInfo {
/**
* 修改后
*/
private Object after;
/**
* 修改前
*/
private Object before;
}
package com.schbrain.canal.client.core;
import com.schbrain.canal.client.event.CanalEvent;
import com.schbrain.canal.client.event.ResolverCanalEvent;
import lombok.Data;
import lombok.Getter;
import lombok.experimental.Accessors;
......@@ -29,9 +30,12 @@ public class HandlerConf {
*/
private final List<ListenerPoint> annoListeners;
public HandlerConf(List<CanalEvent> listeners, Map<String, List<CanalEvent>> tableCanalEventMap, List<ListenerPoint> annoListeners) {
private final Map<String,List<ResolverCanalEvent<?>>> resolverCanalEvents;
public HandlerConf(List<CanalEvent> listeners, Map<String, List<CanalEvent>> tableCanalEventMap, List<ListenerPoint> annoListeners,Map<String,List<ResolverCanalEvent<?>>> resolverCanalEvents) {
this.listeners = listeners;
this.tableCanalEventMap = tableCanalEventMap;
this.annoListeners = annoListeners;
this.resolverCanalEvents = resolverCanalEvents;
}
}
package com.schbrain.canal.client.core;
import com.schbrain.canal.client.event.ResolverCanalEvent;
import com.schbrain.canal.client.exception.CanalClientException;
import com.schbrain.canal.client.exception.ReflectionException;
import com.schbrain.canal.client.reflector.DefaultReflectorFactory;
import com.schbrain.canal.client.reflector.Reflector;
import com.schbrain.canal.client.reflector.ReflectorFactory;
import com.schbrain.canal.client.reflector.DefaultObjectFactory;
import com.schbrain.canal.client.transfer.ObjectFactory;
import com.schbrain.canal.client.type.TypeHandler;
import com.schbrain.canal.client.type.TypeHandlerRegister;
import com.schbrain.canal.client.utils.Dml;
import com.schbrain.canal.client.utils.MapUnderscoreToCamelCase;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.text.ParseException;
import java.util.*;
/**
* @author zhuyf
* @date 2022/6/21
*/
@Slf4j
public class MethodArgumentResolver {
public static Map<ResolverCanalEvent, Class> classNameMappingClass = new HashMap<>();
private ObjectFactory objectFactory = new DefaultObjectFactory();
private ReflectorFactory reflectorFactory = new DefaultReflectorFactory();
/**
* 获取参数类
* @param event
* @return
*/
public Class getArgumentClass(ResolverCanalEvent<?> event) {
if (!classNameMappingClass.containsKey(event)) {
//接口泛型缓存
boolean cache = interfaceGeneric(event);
if(!cache){
//继承类泛型缓存
cache = extendGeneric(event);
}
if(!cache){
CanalClientException e = new CanalClientException(event.getClass() + "generic get exception");
log.error("generic get exception",e);
throw e;
}
}
return classNameMappingClass.get(event);
}
/**
* 接口泛型缓存
* @param event
* @return
*/
private boolean interfaceGeneric(ResolverCanalEvent<?> event){
Type[] types = event.getClass().getGenericInterfaces();
if(types == null || types.length == 0){
return false;
}
for (Type type : types) {
if(!(type instanceof ParameterizedType)){
continue;
}
ParameterizedType parameterized = (ParameterizedType)type;
Type rawType = parameterized.getRawType();
if(rawType.equals(ResolverCanalEvent.class)){
Class clazz = (Class)parameterized.getActualTypeArguments()[0];
classNameMappingClass.put(event, clazz);
return true;
}
}
return false;
}
private boolean extendGeneric(ResolverCanalEvent<?> event){
Class c = event.getClass();
while(true){
Type type = c.getGenericSuperclass();
if(type instanceof ParameterizedType){
ParameterizedType parameterizedType = (ParameterizedType)type;
Type[] types = parameterizedType.getActualTypeArguments();
if(types!= null && types.length>0){
classNameMappingClass.put(event, (Class) types[0]);
return true;
}
}
c = c.getSuperclass();
if(type.equals(Object.class)){
break;
}
}
return false;
}
public List<EditMetaInfo> resolver(ResolverCanalEvent event, Dml dml) throws InvocationTargetException, IllegalAccessException, ParseException,ReflectionException {
EditMetaInfo metaInfo = new EditMetaInfo();
Class c = getArgumentClass(event);
List<EditMetaInfo> editMetaInfos=new ArrayList<>();
List<Dml.Row> datas = dml.getData();
for (Dml.Row row : datas) {
Object after = columnsConvertObject(c,row.getData());
metaInfo.setAfter(after);
Object before = columnsConvertObject(c,row.getOld());
metaInfo.setBefore(before);
editMetaInfos.add(metaInfo);
}
return editMetaInfos;
}
public Object columnsConvertObject(Class c,Map<String,Object> columns) throws ReflectionException, InvocationTargetException, IllegalAccessException, ParseException {
if(columns==null||columns.isEmpty()){
return null;
}
Reflector classesReflector= reflectorFactory.findForClass(c);
Object o = objectFactory.create(c);
for (String columnName: columns.keySet()) {
String filedName= MapUnderscoreToCamelCase.convertByCache(columnName);
Object value=columns.get(columnName);
if(value==null){
continue;
}
if (!classesReflector.hasGetter(filedName)) {
continue;
}
Type setterType= classesReflector.getSetterType(filedName);
TypeHandler typeHandler= TypeHandlerRegister.getTypeHandler(setterType);
if(typeHandler==null){
log.error("未适配到typeHandle{},name:{},value:{},",setterType,filedName,value);
}
classesReflector.getSetInvoker(filedName).invoke(o,new Object[]{typeHandler.convert(value)});
}
return o;
}
}
......@@ -5,17 +5,16 @@ import com.schbrain.canal.client.annotation.CanalEventListener;
import com.schbrain.canal.client.annotation.ListenPoint;
import com.schbrain.canal.client.annotation.TableFilter;
import com.schbrain.canal.client.conf.CanalClientConfig;
import com.schbrain.canal.client.conf.MethodArgumentConfig;
import com.schbrain.canal.client.conf.SchbrainCanalConfig;
import com.schbrain.canal.client.event.CanalEvent;
import com.schbrain.canal.client.event.ResolverCanalEvent;
import com.schbrain.canal.client.transfer.TransponderFactory;
import com.schbrain.canal.client.utils.BeanUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.annotation.AnnotationUtils;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
......@@ -36,6 +35,9 @@ public class SimpleCanalClient extends AbstractCanalClient {
* 所有执行器
*/
private final List<CanalEvent> listeners = new ArrayList<>();
//解析器事件
private final Map<String,List<ResolverCanalEvent<?>>> resolverCanalEvents = new HashMap<>();
/**
* 表过滤器
*/
......@@ -53,46 +55,96 @@ public class SimpleCanalClient extends AbstractCanalClient {
private void initListeners() {
log.info("{}: initializing the listeners....", Thread.currentThread().getName());
//初始接口监听器
initEventList();
//初始注解监听器
initAnnotionList();
//初始解析器
initResolverList();
log.info("{}: initializing the listeners end.", Thread.currentThread().getName());
if (log.isWarnEnabled() && listeners.isEmpty() && annoListeners.isEmpty()) {
log.warn("{}: No listener found in context! ", Thread.currentThread().getName());
}
}
/**
* 初始事件监听器
*/
private void initEventList(){
List<CanalEvent> list = BeanUtil.getBeansOfType(CanalEvent.class);
if(list!=null && list.size() > 0){
List<CanalEvent> unFilters = new ArrayList<>();
for (CanalEvent canalEvent : list) {
TableFilter filter = canalEvent.getClass().getAnnotation(TableFilter.class);
if(filter!=null && StringUtils.isNotBlank(filter.schame()) && StringUtils.isNotBlank(filter.table())){
String key = filter.schame()+":"+filter.table();
List<CanalEvent> filterList = MapUtils.getObject(tableCanalEventMap,key,new ArrayList<>());
filterList.add(canalEvent);
if(list==null || list.size()<=0){
return;
}
List<CanalEvent> unFilters = new ArrayList<>();
for (CanalEvent canalEvent : list) {
Class<?>[] classes = canalEvent.getClass().getInterfaces();
for (Class aClass : classes) {
if(ResolverCanalEvent.class.equals(aClass)){
continue;
}
unFilters.add(canalEvent);
}
if(unFilters!=null && unFilters.size()>0){
listeners.addAll(unFilters);
TableFilter filter = canalEvent.getClass().getAnnotation(TableFilter.class);
if(filter!=null && StringUtils.isNotBlank(filter.schame()) && StringUtils.isNotBlank(filter.table())){
String key = filter.schame()+":"+filter.table();
List<CanalEvent> filterList = MapUtils.getObject(tableCanalEventMap,key,new ArrayList<>());
filterList.add(canalEvent);
continue;
}
unFilters.add(canalEvent);
}
if(unFilters!=null && unFilters.size()>0){
listeners.addAll(unFilters);
}
}
/**
* 初始注解监听器
*/
private void initAnnotionList(){
Map<String, Object> listenerMap = BeanUtil.getBeansWithAnnotation(CanalEventListener.class);
if (listenerMap != null) {
for (Object target : listenerMap.values()) {
Method[] methods = target.getClass().getDeclaredMethods();
if (methods != null && methods.length > 0) {
for (Method method : methods) {
ListenPoint l = AnnotatedElementUtils.findMergedAnnotation(method,ListenPoint.class);
if (l != null) {
annoListeners.add(new ListenerPoint(target, method, l));
}
if(listenerMap==null){
return;
}
for (Object target : listenerMap.values()) {
Method[] methods = target.getClass().getDeclaredMethods();
if (methods != null && methods.length > 0) {
for (Method method : methods) {
ListenPoint l = AnnotatedElementUtils.findMergedAnnotation(method,ListenPoint.class);
if (l != null) {
annoListeners.add(new ListenerPoint(target, method, l));
}
}
}
}
log.info("{}: initializing the listeners end.", Thread.currentThread().getName());
if (log.isWarnEnabled() && listeners.isEmpty() && annoListeners.isEmpty()) {
log.warn("{}: No listener found in context! ", Thread.currentThread().getName());
}
/**
* 初始解析器事件处理器
*/
private void initResolverList(){
List<ResolverCanalEvent> list = BeanUtil.getBeansOfType(ResolverCanalEvent.class);
if(list==null || list.size()<=0){
return;
}
MethodArgumentResolver resolver = MethodArgumentConfig.LISTENERMETHODARGUMENTRESOLVER;
for (ResolverCanalEvent event : list) {
TableFilter filter = event.getClass().getAnnotation(TableFilter.class);
if(filter!=null && StringUtils.isNotBlank(filter.schame()) && StringUtils.isNotBlank(filter.table())){
String key = filter.schame()+":"+filter.table();
List<ResolverCanalEvent<?>> filterList = MapUtils.getObject(resolverCanalEvents,key,new ArrayList<>());
filterList.add(event);
resolverCanalEvents.put(key,filterList);
resolver.getArgumentClass(event);
continue;
}
}
}
@Override
protected void process(CanalConnector connector, Map.Entry<String, CanalClientConfig> config) {
HandlerConf handlerConf = new HandlerConf(listeners,tableCanalEventMap,annoListeners);
HandlerConf handlerConf = new HandlerConf(listeners,tableCanalEventMap,annoListeners,resolverCanalEvents);
executor.submit(factory.newTransponder(connector, config,handlerConf));
}
......
package com.schbrain.canal.client.event;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.schbrain.canal.client.conf.MethodArgumentConfig;
import com.schbrain.canal.client.core.EditMetaInfo;
import com.schbrain.canal.client.core.MethodArgumentResolver;
import com.schbrain.canal.client.exception.ReflectionException;
import com.schbrain.canal.client.utils.Dml;
import com.schbrain.canal.client.utils.MessageUtil;
import java.lang.reflect.InvocationTargetException;
import java.text.ParseException;
import java.util.List;
public interface ResolverCanalEvent<T> {
default void onEvent(CanalEntry.Entry entry) throws InvocationTargetException, IllegalAccessException, ParseException, ReflectionException {
CanalEntry.Header header = entry.getHeader();
Dml dml = MessageUtil.parse4Dml(entry);
MethodArgumentResolver resolver = MethodArgumentConfig.LISTENERMETHODARGUMENTRESOLVER;
List<EditMetaInfo> metaInfos = resolver.resolver(this,dml);
CanalEntry.EventType eventType = header.getEventType();
switch (eventType) {
case INSERT:
for (EditMetaInfo metaInfo : metaInfos) {
onInsert(header, (T) metaInfo.getAfter());
}
break;
case UPDATE:
for (EditMetaInfo metaInfo : metaInfos) {
onUpdate(header,(T) metaInfo.getBefore(),(T) metaInfo.getAfter());
}
break;
case DELETE:
for (EditMetaInfo metaInfo : metaInfos) {
onDelete(header,(T) metaInfo.getAfter());
}
break;
default:
break;
}
}
/**
* onInsert
* @param t
*/
void onInsert(CanalEntry.Header header, T t);
/**
* onUpdate
* @param before
* @param after
*/
void onUpdate(CanalEntry.Header header, T before,T after);
/**
* onDelete
* @param t
*/
void onDelete(CanalEntry.Header header, T t);
}
package com.schbrain.canal.client.event;
import com.alibaba.otter.canal.protocol.CanalEntry;
public class SimpleDefCanalEvent implements DefCanalEvent{
@Override
public void onInsert(CanalEntry.Header header, CanalEntry.RowData rowData) {
}
@Override
public void onUpdate(CanalEntry.Header header, CanalEntry.RowData rowData) {
}
@Override
public void onDelete(CanalEntry.Header header, CanalEntry.RowData rowData) {
}
}
package com.schbrain.canal.client.event;
import com.alibaba.otter.canal.protocol.CanalEntry;
/**
* @author zhuyf
* @date 2022/6/22
*/
public class SimpleResolverCanalEvent<T> implements ResolverCanalEvent<T>{
@Override
public void onInsert(CanalEntry.Header header, T o) {
}
@Override
public void onUpdate(CanalEntry.Header header, T before, T after) {
}
@Override
public void onDelete(CanalEntry.Header header, T o) {
}
}
package com.schbrain.canal.client.exception;
public class PersistenceException extends CanalClientException {
private static final long serialVersionUID = -7537395265357977271L;
public PersistenceException() {
}
public PersistenceException(String message) {
super(message);
}
public PersistenceException(String message, Throwable cause) {
super(message, cause);
}
public PersistenceException(Throwable cause) {
super(cause);
}
}
package com.schbrain.canal.client.exception;
public class ReflectionException extends Exception {
private static final long serialVersionUID = 7642570221267566591L;
public ReflectionException() {
}
public ReflectionException(String message) {
super(message);
}
public ReflectionException(String message, Throwable cause) {
super(message, cause);
}
public ReflectionException(Throwable cause) {
super(cause);
}
}
package com.schbrain.canal.client.exception;
public class TypeException extends PersistenceException {
private static final long serialVersionUID = 8614420898975117130L;
public TypeException() {
}
public TypeException(String message) {
super(message);
}
public TypeException(String message, Throwable cause) {
super(message, cause);
}
public TypeException(Throwable cause) {
super(cause);
}
}
package com.schbrain.canal.client.reflector;
import com.schbrain.canal.client.exception.ReflectionException;
import com.schbrain.canal.client.transfer.ObjectFactory;
import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.util.*;
/**
* @author zhuyf
* @date 2022/6/22
*/
public class DefaultObjectFactory implements ObjectFactory, Serializable {
private static final long serialVersionUID = -8855120656740914948L;
public DefaultObjectFactory() {
}
/**
* 创建指定类型的对象 不使用构造函数创建
* @param type 类型
* @param <T>
* @return
*/
@Override
public <T> T create(Class<T> type) throws ReflectionException {
return (T) this.create(type, (List)null, (List)null);
}
/**
* 创建指定类型的对象
* @param type 类型
* @param constructorArgTypes 构造函数参数类型列表
* @param constructorArgs 构造函数参数列表
* @param <T>
* @return
*/
@Override
public <T> T create(Class<T> type, List<Class<?>> constructorArgTypes, List<Object> constructorArgs) throws ReflectionException {
Class<?> classToCreate = this.resolveInterface(type);
return (T) this.instantiateClass(classToCreate, constructorArgTypes, constructorArgs);
}
@Override
public void setProperties(Properties properties) {
}
public <T> T instantiateClass(Class<T> type, List<Class<?>> constructorArgTypes, List<Object> constructorArgs) throws ReflectionException {
try {
Constructor constructor;
//判断是否指定了构造函数初始化
if (constructorArgTypes != null && constructorArgs != null) {
//获得private或public指定参数类型列表的构造函数 注:getConstructor和getDeclaredConstructor的区别是只能获得public
constructor = type.getDeclaredConstructor((Class[])constructorArgTypes.toArray(new Class[constructorArgTypes.size()]));
//如果是私有的 设置可以访问
if (!constructor.isAccessible()) {
constructor.setAccessible(true);
}
return (T) constructor.newInstance(constructorArgs.toArray(new Object[constructorArgs.size()]));
} else {
constructor = type.getDeclaredConstructor();
if (!constructor.isAccessible()) {
constructor.setAccessible(true);
}
return (T) constructor.newInstance();
}
} catch (Exception var9) {
StringBuilder argTypes = new StringBuilder();
if (constructorArgTypes != null && !constructorArgTypes.isEmpty()) {
Iterator i$ = constructorArgTypes.iterator();
while(i$.hasNext()) {
Class<?> argType = (Class)i$.next();
argTypes.append(argType.getSimpleName());
argTypes.append(",");
}
argTypes.deleteCharAt(argTypes.length() - 1);
}
StringBuilder argValues = new StringBuilder();
if (constructorArgs != null && !constructorArgs.isEmpty()) {
Iterator i$ = constructorArgs.iterator();
while(i$.hasNext()) {
Object argValue = i$.next();
argValues.append(String.valueOf(argValue));
argValues.append(",");
}
argValues.deleteCharAt(argValues.length() - 1);
}
throw new ReflectionException("Error instantiating " + type + " with invalid types (" + argTypes + ") or values (" + argValues + "). Cause: " + var9,var9);
}
}
/**
* 如果是定义结合类型 类型改为实现类
* @param type
* @return
*/
protected Class<?> resolveInterface(Class<?> type) {
Class classToCreate;
if (type != List.class && type != Collection.class && type != Iterable.class) {
if (type == Map.class) {
classToCreate = HashMap.class;
} else if (type == SortedSet.class) {
classToCreate = TreeSet.class;
} else if (type == Set.class) {
classToCreate = HashSet.class;
} else {
classToCreate = type;
}
} else {
classToCreate = ArrayList.class;
}
return classToCreate;
}
@Override
public <T> boolean isCollection(Class<T> type) {
return Collection.class.isAssignableFrom(type);
}
}
package com.schbrain.canal.client.reflector;
import com.schbrain.canal.client.exception.ReflectionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class DefaultReflectorFactory implements ReflectorFactory {
private boolean classCacheEnabled = true;
/**
* 将反射的元数据信息封装保存到Reflector,优化性能
*/
private final ConcurrentMap<Class<?>, Reflector> reflectorMap = new ConcurrentHashMap();
public DefaultReflectorFactory() {
}
@Override
public boolean isClassCacheEnabled() {
return this.classCacheEnabled;
}
@Override
public void setClassCacheEnabled(boolean classCacheEnabled) {
this.classCacheEnabled = classCacheEnabled;
}
/**
* 获得指定类型反射元数据信息
* @param type
* @return
*/
@Override
public Reflector findForClass(Class<?> type) throws ReflectionException {
if (this.classCacheEnabled) {
Reflector cached = (Reflector)this.reflectorMap.get(type);
if (cached == null) {
cached = new Reflector(type);
this.reflectorMap.put(type, cached);
}
return cached;
} else {
return new Reflector(type);
}
}
}
\ No newline at end of file
package com.schbrain.canal.client.reflector;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
public class GetFieldInvoker implements Invoker {
private Field field;
public GetFieldInvoker(Field field) {
this.field = field;
}
@Override
public Object invoke(Object target, Object[] args) throws IllegalAccessException, InvocationTargetException {
return this.field.get(target);
}
@Override
public Class<?> getType() {
return this.field.getType();
}
}
\ No newline at end of file
package com.schbrain.canal.client.reflector;
import java.lang.reflect.InvocationTargetException;
public interface Invoker {
/**
* 给指定对象的当前属性设置值
* @param var1
* @param var2
* @return
* @throws IllegalAccessException
* @throws InvocationTargetException
*/
Object invoke(Object var1, Object[] var2) throws IllegalAccessException, InvocationTargetException;
/**
* 类型
* @return
*/
Class<?> getType();
}
package com.schbrain.canal.client.reflector;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
/**
* 方法元数据封装 以及提供调用的方法
*/
public class MethodInvoker implements Invoker {
private Class<?> type;
private Method method;
public MethodInvoker(Method method) {
this.method = method;
if (method.getParameterTypes().length == 1) {
this.type = method.getParameterTypes()[0];
} else {
this.type = method.getReturnType();
}
}
@Override
public Object invoke(Object target, Object[] args) throws IllegalAccessException, InvocationTargetException {
return this.method.invoke(target, args);
}
@Override
public Class<?> getType() {
return this.type;
}
}
\ No newline at end of file
package com.schbrain.canal.client.reflector;
import com.schbrain.canal.client.exception.ReflectionException;
import java.util.Locale;
public final class PropertyNamer {
private PropertyNamer() {
}
public static String methodToProperty(String name) throws ReflectionException {
if (name.startsWith("is")) {
name = name.substring(2);
} else {
if (!name.startsWith("get") && !name.startsWith("set")) {
throw new ReflectionException("Error parsing property name '" + name + "'. Didn't start with 'is', 'get' or 'set'.");
}
name = name.substring(3);
}
if (name.length() == 1 || name.length() > 1 && !Character.isUpperCase(name.charAt(1))) {
name = name.substring(0, 1).toLowerCase(Locale.ENGLISH) + name.substring(1);
}
return name;
}
public static boolean isProperty(String name) {
return isGetter(name) || isSetter(name);
}
public static boolean isGetter(String name) {
return name.startsWith("get") && name.length() > 3 || name.startsWith("is") && name.length() > 2;
}
public static boolean isSetter(String name) {
return name.startsWith("set") && name.length() > 3;
}
}
package com.schbrain.canal.client.reflector;
import com.schbrain.canal.client.exception.ReflectionException;
import java.lang.reflect.*;
import java.util.*;
public class Reflector {
private static final String[] EMPTY_STRING_ARRAY = new String[0];
private Class<?> type;
private String[] readablePropertyNames;
private String[] writeablePropertyNames;
private Map<String, Invoker> setMethods;
private Map<String, Invoker> getMethods;
private Map<String, Class<?>> setTypes;
private Map<String, Class<?>> getTypes;
private Constructor<?> defaultConstructor;
private Map<String, String> caseInsensitivePropertyMap;
/**
* 初始化并将对应的元数据信息封装起来
* @param clazz
*/
public Reflector(Class<?> clazz) throws ReflectionException {
this.readablePropertyNames = EMPTY_STRING_ARRAY;
this.writeablePropertyNames = EMPTY_STRING_ARRAY;
//初始化几个map
this.setMethods = new HashMap();
this.getMethods = new HashMap();
this.setTypes = new HashMap();
this.getTypes = new HashMap();
this.caseInsensitivePropertyMap = new HashMap();
this.type = clazz;
//反射查找默认构造函数到defaultConstructor
this.addDefaultConstructor(clazz);
//反射获得所有的get方法元数据保存到以Invoker保存getMethods
this.addGetMethods(clazz);
//反射获得所有的set方法元数据以invokersetMethods
this.addSetMethods(clazz);
//反射获得所有的Fields元数据
this.addFields(clazz);
this.readablePropertyNames = (String[])this.getMethods.keySet().toArray(new String[this.getMethods.keySet().size()]);
this.writeablePropertyNames = (String[])this.setMethods.keySet().toArray(new String[this.setMethods.keySet().size()]);
String[] arr$ = this.readablePropertyNames;
int len$ = arr$.length;
int i$;
String propName;
for(i$ = 0; i$ < len$; ++i$) {
propName = arr$[i$];
this.caseInsensitivePropertyMap.put(propName.toUpperCase(Locale.ENGLISH), propName);
}
arr$ = this.writeablePropertyNames;
len$ = arr$.length;
for(i$ = 0; i$ < len$; ++i$) {
propName = arr$[i$];
this.caseInsensitivePropertyMap.put(propName.toUpperCase(Locale.ENGLISH), propName);
}
}
private void addDefaultConstructor(Class<?> clazz) {
Constructor<?>[] consts = clazz.getDeclaredConstructors();
Constructor[] arr$ = consts;
int len$ = consts.length;
for(int i$ = 0; i$ < len$; ++i$) {
Constructor<?> constructor = arr$[i$];
if (constructor.getParameterTypes().length == 0) {
if (canAccessPrivateMethods()) {
try {
constructor.setAccessible(true);
} catch (Exception var8) {
;
}
}
if (constructor.isAccessible()) {
this.defaultConstructor = constructor;
}
}
}
}
private void addGetMethods(Class<?> cls) throws ReflectionException {
Map<String, List<Method>> conflictingGetters = new HashMap();
Method[] methods = this.getClassMethods(cls);
Method[] arr$ = methods;
int len$ = methods.length;
for(int i$ = 0; i$ < len$; ++i$) {
Method method = arr$[i$];
String name = method.getName();
if (name.startsWith("get") && name.length() > 3) {
if (method.getParameterTypes().length == 0) {
name = PropertyNamer.methodToProperty(name);
this.addMethodConflict(conflictingGetters, name, method);
}
} else if (name.startsWith("is") && name.length() > 2 && method.getParameterTypes().length == 0) {
name = PropertyNamer.methodToProperty(name);
this.addMethodConflict(conflictingGetters, name, method);
}
}
this.resolveGetterConflicts(conflictingGetters);
}
private void resolveGetterConflicts(Map<String, List<Method>> conflictingGetters) throws ReflectionException {
Iterator i$ = conflictingGetters.keySet().iterator();
while(true) {
while(i$.hasNext()) {
String propName = (String)i$.next();
List<Method> getters = (List)conflictingGetters.get(propName);
Iterator<Method> iterator = getters.iterator();
Method firstMethod = (Method)iterator.next();
if (getters.size() == 1) {
this.addGetMethod(propName, firstMethod);
} else {
Method getter = firstMethod;
Class getterType = firstMethod.getReturnType();
while(iterator.hasNext()) {
Method method = (Method)iterator.next();
Class<?> methodType = method.getReturnType();
if (methodType.equals(getterType)) {
throw new ReflectionException("Illegal overloaded getter method with ambiguous type for property " + propName + " in class " + firstMethod.getDeclaringClass() + ". This breaks the JavaBeans " + "specification and can cause unpredicatble results.");
}
if (!methodType.isAssignableFrom(getterType)) {
if (!getterType.isAssignableFrom(methodType)) {
throw new ReflectionException("Illegal overloaded getter method with ambiguous type for property " + propName + " in class " + firstMethod.getDeclaringClass() + ". This breaks the JavaBeans " + "specification and can cause unpredicatble results.");
}
getter = method;
getterType = methodType;
}
}
this.addGetMethod(propName, getter);
}
}
return;
}
}
private void addGetMethod(String name, Method method) {
if (this.isValidPropertyName(name)) {
this.getMethods.put(name, new MethodInvoker(method));
Type returnType = TypeParameterResolver.resolveReturnType(method, this.type);
this.getTypes.put(name, this.typeToClass(returnType));
}
}
private void addSetMethods(Class<?> cls) throws ReflectionException {
Map<String, List<Method>> conflictingSetters = new HashMap();
Method[] methods = this.getClassMethods(cls);
Method[] arr$ = methods;
int len$ = methods.length;
for(int i$ = 0; i$ < len$; ++i$) {
Method method = arr$[i$];
String name = method.getName();
if (name.startsWith("set") && name.length() > 3 && method.getParameterTypes().length == 1) {
name = PropertyNamer.methodToProperty(name);
this.addMethodConflict(conflictingSetters, name, method);
}
}
this.resolveSetterConflicts(conflictingSetters);
}
private void addMethodConflict(Map<String, List<Method>> conflictingMethods, String name, Method method) {
List<Method> list = (List)conflictingMethods.get(name);
if (list == null) {
list = new ArrayList();
conflictingMethods.put(name, list);
}
((List)list).add(method);
}
private void resolveSetterConflicts(Map<String, List<Method>> conflictingSetters) throws ReflectionException {
Iterator i$ = conflictingSetters.keySet().iterator();
while(true) {
while(i$.hasNext()) {
String propName = (String)i$.next();
List<Method> setters = (List)conflictingSetters.get(propName);
Method firstMethod = (Method)setters.get(0);
if (setters.size() == 1) {
this.addSetMethod(propName, firstMethod);
} else {
Class<?> expectedType = (Class)this.getTypes.get(propName);
if (expectedType == null) {
throw new ReflectionException("Illegal overloaded setter method with ambiguous type for property " + propName + " in class " + firstMethod.getDeclaringClass() + ". This breaks the JavaBeans " + "specification and can cause unpredicatble results.");
}
Iterator<Method> methods = setters.iterator();
Method setter = null;
while(methods.hasNext()) {
Method method = (Method)methods.next();
if (method.getParameterTypes().length == 1 && expectedType.equals(method.getParameterTypes()[0])) {
setter = method;
break;
}
}
if (setter == null) {
throw new ReflectionException("Illegal overloaded setter method with ambiguous type for property " + propName + " in class " + firstMethod.getDeclaringClass() + ". This breaks the JavaBeans " + "specification and can cause unpredicatble results.");
}
this.addSetMethod(propName, setter);
}
}
return;
}
}
private void addSetMethod(String name, Method method) {
if (this.isValidPropertyName(name)) {
this.setMethods.put(name, new MethodInvoker(method));
Type[] paramTypes = TypeParameterResolver.resolveParamTypes(method, this.type);
this.setTypes.put(name, this.typeToClass(paramTypes[0]));
}
}
private Class<?> typeToClass(Type src) {
Class<?> result = null;
if (src instanceof Class) {
result = (Class)src;
} else if (src instanceof ParameterizedType) {
result = (Class)((ParameterizedType)src).getRawType();
} else if (src instanceof GenericArrayType) {
Type componentType = ((GenericArrayType)src).getGenericComponentType();
if (componentType instanceof Class) {
result = Array.newInstance((Class)componentType, 0).getClass();
} else {
Class<?> componentClass = this.typeToClass(componentType);
result = Array.newInstance(componentClass, 0).getClass();
}
}
if (result == null) {
result = Object.class;
}
return result;
}
private void addFields(Class<?> clazz) {
Field[] fields = clazz.getDeclaredFields();
Field[] arr$ = fields;
int len$ = fields.length;
for(int i$ = 0; i$ < len$; ++i$) {
Field field = arr$[i$];
if (canAccessPrivateMethods()) {
try {
field.setAccessible(true);
} catch (Exception var8) {
;
}
}
if (field.isAccessible()) {
if (!this.setMethods.containsKey(field.getName())) {
int modifiers = field.getModifiers();
if (!Modifier.isFinal(modifiers) || !Modifier.isStatic(modifiers)) {
this.addSetField(field);
}
}
if (!this.getMethods.containsKey(field.getName())) {
this.addGetField(field);
}
}
}
if (clazz.getSuperclass() != null) {
this.addFields(clazz.getSuperclass());
}
}
private void addSetField(Field field) {
if (this.isValidPropertyName(field.getName())) {
this.setMethods.put(field.getName(), new SetFieldInvoker(field));
Type fieldType = TypeParameterResolver.resolveFieldType(field, this.type);
this.setTypes.put(field.getName(), this.typeToClass(fieldType));
}
}
private void addGetField(Field field) {
if (this.isValidPropertyName(field.getName())) {
this.getMethods.put(field.getName(), new GetFieldInvoker(field));
Type fieldType = TypeParameterResolver.resolveFieldType(field, this.type);
this.getTypes.put(field.getName(), this.typeToClass(fieldType));
}
}
private boolean isValidPropertyName(String name) {
return !name.startsWith("$") && !"serialVersionUID".equals(name) && !"class".equals(name);
}
private Method[] getClassMethods(Class<?> cls) {
Map<String, Method> uniqueMethods = new HashMap();
for(Class currentClass = cls; currentClass != null; currentClass = currentClass.getSuperclass()) {
this.addUniqueMethods(uniqueMethods, currentClass.getDeclaredMethods());
Class<?>[] interfaces = currentClass.getInterfaces();
Class[] arr$ = interfaces;
int len$ = interfaces.length;
for(int i$ = 0; i$ < len$; ++i$) {
Class<?> anInterface = arr$[i$];
this.addUniqueMethods(uniqueMethods, anInterface.getMethods());
}
}
Collection<Method> methods = uniqueMethods.values();
return (Method[])methods.toArray(new Method[methods.size()]);
}
private void addUniqueMethods(Map<String, Method> uniqueMethods, Method[] methods) {
Method[] arr$ = methods;
int len$ = methods.length;
for(int i$ = 0; i$ < len$; ++i$) {
Method currentMethod = arr$[i$];
if (!currentMethod.isBridge()) {
String signature = this.getSignature(currentMethod);
if (!uniqueMethods.containsKey(signature)) {
if (canAccessPrivateMethods()) {
try {
currentMethod.setAccessible(true);
} catch (Exception var9) {
;
}
}
uniqueMethods.put(signature, currentMethod);
}
}
}
}
private String getSignature(Method method) {
StringBuilder sb = new StringBuilder();
Class<?> returnType = method.getReturnType();
if (returnType != null) {
sb.append(returnType.getName()).append('#');
}
sb.append(method.getName());
Class<?>[] parameters = method.getParameterTypes();
for(int i = 0; i < parameters.length; ++i) {
if (i == 0) {
sb.append(':');
} else {
sb.append(',');
}
sb.append(parameters[i].getName());
}
return sb.toString();
}
private static boolean canAccessPrivateMethods() {
try {
SecurityManager securityManager = System.getSecurityManager();
if (null != securityManager) {
securityManager.checkPermission(new ReflectPermission("suppressAccessChecks"));
}
return true;
} catch (SecurityException var1) {
return false;
}
}
public Class<?> getType() {
return this.type;
}
public Constructor<?> getDefaultConstructor() throws ReflectionException {
if (this.defaultConstructor != null) {
return this.defaultConstructor;
} else {
throw new ReflectionException("There is no default constructor for " + this.type);
}
}
public boolean hasDefaultConstructor() {
return this.defaultConstructor != null;
}
public Invoker getSetInvoker(String propertyName) throws ReflectionException {
Invoker method = (Invoker)this.setMethods.get(propertyName);
if (method == null) {
throw new ReflectionException("There is no setter for property named '" + propertyName + "' in '" + this.type + "'");
} else {
return method;
}
}
public Invoker getGetInvoker(String propertyName) throws ReflectionException {
Invoker method = (Invoker)this.getMethods.get(propertyName);
if (method == null) {
throw new ReflectionException("There is no getter for property named '" + propertyName + "' in '" + this.type + "'");
} else {
return method;
}
}
public Class<?> getSetterType(String propertyName) throws ReflectionException {
Class<?> clazz = (Class)this.setTypes.get(propertyName);
if (clazz == null) {
throw new ReflectionException("There is no setter for property named '" + propertyName + "' in '" + this.type + "'");
} else {
return clazz;
}
}
public Class<?> getGetterType(String propertyName) throws ReflectionException {
Class<?> clazz = (Class)this.getTypes.get(propertyName);
if (clazz == null) {
throw new ReflectionException("There is no getter for property named '" + propertyName + "' in '" + this.type + "'");
} else {
return clazz;
}
}
public String[] getGetablePropertyNames() {
return this.readablePropertyNames;
}
public String[] getSetablePropertyNames() {
return this.writeablePropertyNames;
}
public boolean hasSetter(String propertyName) {
return this.setMethods.keySet().contains(propertyName);
}
public boolean hasGetter(String propertyName) {
return this.getMethods.keySet().contains(propertyName);
}
public String findPropertyName(String name) {
return (String)this.caseInsensitivePropertyMap.get(name.toUpperCase(Locale.ENGLISH));
}
}
\ No newline at end of file
package com.schbrain.canal.client.reflector;
import com.schbrain.canal.client.exception.ReflectionException;
public interface ReflectorFactory {
boolean isClassCacheEnabled();
void setClassCacheEnabled(boolean var1);
Reflector findForClass(Class<?> var1) throws ReflectionException;
}
\ No newline at end of file
package com.schbrain.canal.client.reflector;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
/**
* 封装filed元数据信息
*/
public class SetFieldInvoker implements Invoker {
private Field field;
public SetFieldInvoker(Field field) {
this.field = field;
}
/**
* 给指定对象的当前属性设置值
* @param target
* @param args
* @return
* @throws IllegalAccessException
* @throws InvocationTargetException
*/
@Override
public Object invoke(Object target, Object[] args) throws IllegalAccessException, InvocationTargetException {
this.field.set(target, args[0]);
return null;
}
/**
* 类型
* @return
*/
@Override
public Class<?> getType() {
return this.field.getType();
}
}
package com.schbrain.canal.client.reflector;
import java.lang.reflect.*;
import java.util.Arrays;
public class TypeParameterResolver {
public static Type resolveFieldType(Field field, Type srcType) {
Type fieldType = field.getGenericType();
Class<?> declaringClass = field.getDeclaringClass();
return resolveType(fieldType, srcType, declaringClass);
}
public static Type resolveReturnType(Method method, Type srcType) {
Type returnType = method.getGenericReturnType();
Class<?> declaringClass = method.getDeclaringClass();
return resolveType(returnType, srcType, declaringClass);
}
public static Type[] resolveParamTypes(Method method, Type srcType) {
Type[] paramTypes = method.getGenericParameterTypes();
Class<?> declaringClass = method.getDeclaringClass();
Type[] result = new Type[paramTypes.length];
for(int i = 0; i < paramTypes.length; ++i) {
result[i] = resolveType(paramTypes[i], srcType, declaringClass);
}
return result;
}
private static Type resolveType(Type type, Type srcType, Class<?> declaringClass) {
if (type instanceof TypeVariable) {
return resolveTypeVar((TypeVariable)type, srcType, declaringClass);
} else if (type instanceof ParameterizedType) {
return resolveParameterizedType((ParameterizedType)type, srcType, declaringClass);
} else {
return type instanceof GenericArrayType ? resolveGenericArrayType((GenericArrayType)type, srcType, declaringClass) : type;
}
}
private static Type resolveGenericArrayType(GenericArrayType genericArrayType, Type srcType, Class<?> declaringClass) {
Type componentType = genericArrayType.getGenericComponentType();
Type resolvedComponentType = null;
if (componentType instanceof TypeVariable) {
resolvedComponentType = resolveTypeVar((TypeVariable)componentType, srcType, declaringClass);
} else if (componentType instanceof GenericArrayType) {
resolvedComponentType = resolveGenericArrayType((GenericArrayType)componentType, srcType, declaringClass);
} else if (componentType instanceof ParameterizedType) {
resolvedComponentType = resolveParameterizedType((ParameterizedType)componentType, srcType, declaringClass);
}
return (Type)(resolvedComponentType instanceof Class ? Array.newInstance((Class)resolvedComponentType, 0).getClass() : new GenericArrayTypeImpl((Type)resolvedComponentType));
}
private static ParameterizedType resolveParameterizedType(ParameterizedType parameterizedType, Type srcType, Class<?> declaringClass) {
Class<?> rawType = (Class)parameterizedType.getRawType();
Type[] typeArgs = parameterizedType.getActualTypeArguments();
Type[] args = new Type[typeArgs.length];
for(int i = 0; i < typeArgs.length; ++i) {
if (typeArgs[i] instanceof TypeVariable) {
args[i] = resolveTypeVar((TypeVariable)typeArgs[i], srcType, declaringClass);
} else if (typeArgs[i] instanceof ParameterizedType) {
args[i] = resolveParameterizedType((ParameterizedType)typeArgs[i], srcType, declaringClass);
} else if (typeArgs[i] instanceof WildcardType) {
args[i] = resolveWildcardType((WildcardType)typeArgs[i], srcType, declaringClass);
} else {
args[i] = typeArgs[i];
}
}
return new ParameterizedTypeImpl(rawType, (Type)null, args);
}
private static Type resolveWildcardType(WildcardType wildcardType, Type srcType, Class<?> declaringClass) {
Type[] lowerBounds = resolveWildcardTypeBounds(wildcardType.getLowerBounds(), srcType, declaringClass);
Type[] upperBounds = resolveWildcardTypeBounds(wildcardType.getUpperBounds(), srcType, declaringClass);
return new WildcardTypeImpl(lowerBounds, upperBounds);
}
private static Type[] resolveWildcardTypeBounds(Type[] bounds, Type srcType, Class<?> declaringClass) {
Type[] result = new Type[bounds.length];
for(int i = 0; i < bounds.length; ++i) {
if (bounds[i] instanceof TypeVariable) {
result[i] = resolveTypeVar((TypeVariable)bounds[i], srcType, declaringClass);
} else if (bounds[i] instanceof ParameterizedType) {
result[i] = resolveParameterizedType((ParameterizedType)bounds[i], srcType, declaringClass);
} else if (bounds[i] instanceof WildcardType) {
result[i] = resolveWildcardType((WildcardType)bounds[i], srcType, declaringClass);
} else {
result[i] = bounds[i];
}
}
return result;
}
private static Type resolveTypeVar(TypeVariable<?> typeVar, Type srcType, Class<?> declaringClass) {
Class clazz;
if (srcType instanceof Class) {
clazz = (Class)srcType;
} else {
if (!(srcType instanceof ParameterizedType)) {
throw new IllegalArgumentException("The 2nd arg must be Class or ParameterizedType, but was: " + srcType.getClass());
}
ParameterizedType parameterizedType = (ParameterizedType)srcType;
clazz = (Class)parameterizedType.getRawType();
}
if (clazz == declaringClass) {
Type[] bounds = typeVar.getBounds();
return (Type)(bounds.length > 0 ? bounds[0] : Object.class);
} else {
Type superclass = clazz.getGenericSuperclass();
Type result = scanSuperTypes(typeVar, srcType, declaringClass, clazz, superclass);
if (result != null) {
return result;
} else {
Type[] superInterfaces = clazz.getGenericInterfaces();
Type[] var7 = superInterfaces;
int var8 = superInterfaces.length;
for(int var9 = 0; var9 < var8; ++var9) {
Type superInterface = var7[var9];
result = scanSuperTypes(typeVar, srcType, declaringClass, clazz, superInterface);
if (result != null) {
return result;
}
}
return Object.class;
}
}
}
private static Type scanSuperTypes(TypeVariable<?> typeVar, Type srcType, Class<?> declaringClass, Class<?> clazz, Type superclass) {
if (superclass instanceof ParameterizedType) {
ParameterizedType parentAsType = (ParameterizedType)superclass;
Class<?> parentAsClass = (Class)parentAsType.getRawType();
TypeVariable<?>[] parentTypeVars = parentAsClass.getTypeParameters();
if (srcType instanceof ParameterizedType) {
parentAsType = translateParentTypeVars((ParameterizedType)srcType, clazz, parentAsType);
}
if (declaringClass == parentAsClass) {
for(int i = 0; i < parentTypeVars.length; ++i) {
if (typeVar == parentTypeVars[i]) {
return parentAsType.getActualTypeArguments()[i];
}
}
}
if (declaringClass.isAssignableFrom(parentAsClass)) {
return resolveTypeVar(typeVar, parentAsType, declaringClass);
}
} else if (superclass instanceof Class && declaringClass.isAssignableFrom((Class)superclass)) {
return resolveTypeVar(typeVar, superclass, declaringClass);
}
return null;
}
private static ParameterizedType translateParentTypeVars(ParameterizedType srcType, Class<?> srcClass, ParameterizedType parentType) {
Type[] parentTypeArgs = parentType.getActualTypeArguments();
Type[] srcTypeArgs = srcType.getActualTypeArguments();
TypeVariable<?>[] srcTypeVars = srcClass.getTypeParameters();
Type[] newParentArgs = new Type[parentTypeArgs.length];
boolean noChange = true;
for(int i = 0; i < parentTypeArgs.length; ++i) {
if (parentTypeArgs[i] instanceof TypeVariable) {
for(int j = 0; j < srcTypeVars.length; ++j) {
if (srcTypeVars[j] == parentTypeArgs[i]) {
noChange = false;
newParentArgs[i] = srcTypeArgs[j];
}
}
} else {
newParentArgs[i] = parentTypeArgs[i];
}
}
return (ParameterizedType)(noChange ? parentType : new ParameterizedTypeImpl((Class)parentType.getRawType(), (Type)null, newParentArgs));
}
private TypeParameterResolver() {
}
static class GenericArrayTypeImpl implements GenericArrayType {
private Type genericComponentType;
GenericArrayTypeImpl(Type genericComponentType) {
this.genericComponentType = genericComponentType;
}
@Override
public Type getGenericComponentType() {
return this.genericComponentType;
}
}
static class WildcardTypeImpl implements WildcardType {
private Type[] lowerBounds;
private Type[] upperBounds;
WildcardTypeImpl(Type[] lowerBounds, Type[] upperBounds) {
this.lowerBounds = lowerBounds;
this.upperBounds = upperBounds;
}
@Override
public Type[] getLowerBounds() {
return this.lowerBounds;
}
@Override
public Type[] getUpperBounds() {
return this.upperBounds;
}
}
static class ParameterizedTypeImpl implements ParameterizedType {
private Class<?> rawType;
private Type ownerType;
private Type[] actualTypeArguments;
public ParameterizedTypeImpl(Class<?> rawType, Type ownerType, Type[] actualTypeArguments) {
this.rawType = rawType;
this.ownerType = ownerType;
this.actualTypeArguments = actualTypeArguments;
}
@Override
public Type[] getActualTypeArguments() {
return this.actualTypeArguments;
}
@Override
public Type getOwnerType() {
return this.ownerType;
}
@Override
public Type getRawType() {
return this.rawType;
}
@Override
public String toString() {
return "ParameterizedTypeImpl [rawType=" + this.rawType + ", ownerType=" + this.ownerType + ", actualTypeArguments=" + Arrays.toString(this.actualTypeArguments) + "]";
}
}
}
......@@ -8,6 +8,7 @@ import com.schbrain.canal.client.conf.CanalClientConfig;
import com.schbrain.canal.client.core.HandlerConf;
import com.schbrain.canal.client.core.ListenerPoint;
import com.schbrain.canal.client.event.CanalEvent;
import com.schbrain.canal.client.event.ResolverCanalEvent;
import com.schbrain.canal.client.exception.CanalClientException;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.Method;
......@@ -51,12 +52,11 @@ public abstract class AbstractBasicMessageTransponder extends AbstractMessageTra
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
//distribute to listener interfaces
distributeByImpl(header,rowChange.getEventType(), rowData);
//带解析器的执行器
//distribute to annotation listener interfaces
distributeByAnnotation(destination,
header,
rowChange.getEventType(),
rowData);
distributeByAnnotation(destination, header, rowChange.getEventType(), rowData);
}
resolverByImpl(entry);
}
}
......@@ -90,6 +90,33 @@ public abstract class AbstractBasicMessageTransponder extends AbstractMessageTra
}
}
/**
* 解析器执行器
* @param entry
*/
protected void resolverByImpl(CanalEntry.Entry entry) {
if(handlerConf == null){
return;
}
//带过滤器的执行
Map<String,List<ResolverCanalEvent<?>>> filterEvent = handlerConf.resolverCanalEvents();
if(filterEvent == null){
return;
}
CanalEntry.Header header = entry.getHeader();
String key = String.format("%s:%s", header.getSchemaName(), header.getTableName());
List<ResolverCanalEvent<?>> events = filterEvent.get(key);
if(events!=null && events.size()>0){
for (ResolverCanalEvent event : events) {
try {
event.onEvent(entry);
}catch (Exception e){
log.warn("resolver event handel error",e);
}
}
}
}
private void doEvent(CanalEvent event,CanalEntry.Header header,CanalEntry.EventType eventType,CanalEntry.RowData rowData){
try {
event.onEvent(header,eventType, rowData);
......
package com.schbrain.canal.client.transfer;
import com.schbrain.canal.client.exception.ReflectionException;
import java.util.List;
import java.util.Properties;
/**
* @author zhuyf
* @date 2022/6/22
*/
public interface ObjectFactory {
void setProperties(Properties var1);
<T> T create(Class<T> var1) throws ReflectionException;
<T> T create(Class<T> var1, List<Class<?>> var2, List<Object> var3) throws ReflectionException;
<T> boolean isCollection(Class<T> var1);
}
package com.schbrain.canal.client.type;
import java.text.ParseException;
public abstract class BaseTypeHandler<T> implements TypeHandler<T> {
public T convertNullableResult(Object value) throws ParseException {
if(value==null){
return null;
}
return convert(value);
}
public abstract T convert(Object value) throws ParseException;
}
package com.schbrain.canal.client.type;
import java.math.BigDecimal;
public class BigDecimalTypeHandler extends BaseTypeHandler<BigDecimal> {
@Override
public BigDecimal convert(Object value) {
return new BigDecimal(value.toString());
}
}
package com.schbrain.canal.client.type;
import java.math.BigInteger;
public class BigIntegerTypeHandler extends BaseTypeHandler<BigInteger> {
@Override
public BigInteger convert(Object value) {
return new BigInteger(value.toString());
}
}
package com.schbrain.canal.client.type;
public class BooleanTypeHandler extends BaseTypeHandler<Boolean>{
@Override
public Boolean convert(Object value) {
return new Boolean(value.toString());
}
}
package com.schbrain.canal.client.type;
import java.sql.Timestamp;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
public class DateTypeHandler extends BaseTypeHandler<Date> {
@Override
public Date convert(Object value) throws ParseException {
String valueStr=value.toString();
if(valueStr.indexOf("-")>0){
SimpleDateFormat sdf = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );
return sdf.parse(valueStr);
}
return new Timestamp(Long.valueOf(value.toString()));
}
}
package com.schbrain.canal.client.type;
import java.text.ParseException;
public class EnumTypeHandler<E extends Enum<E>> extends BaseTypeHandler<E>{
private final Class<E> type;
public EnumTypeHandler(Class<E> type) {
if (type == null) {
throw new IllegalArgumentException("Type argument cannot be null");
} else {
this.type = type;
}
}
@Override
public E convert(Object value) throws ParseException {
return Enum.valueOf(type,value.toString());
}
}
package com.schbrain.canal.client.type;
public class IntegerTypeHandler extends BaseTypeHandler<Integer> {
@Override
public Integer convert(Object value) {
return new Integer(value.toString());
}
}
package com.schbrain.canal.client.type;
public class LongTypeHandler extends BaseTypeHandler<Long> {
@Override
public Long convert(Object value) {
return new Long(value.toString());
}
}
package com.schbrain.canal.client.type;
import java.text.ParseException;
public class StringTypeHandler extends BaseTypeHandler<String> {
@Override
public String convert(Object value) throws ParseException {
return value.toString();
}
}
package com.schbrain.canal.client.type;
import java.sql.Timestamp;
import java.text.ParseException;
import java.text.SimpleDateFormat;
public class TimestampTypeHandler extends BaseTypeHandler<Timestamp> {
@Override
public Timestamp convert(Object value) throws ParseException {
String valueStr=value.toString();
if(valueStr.indexOf("-")>0){
SimpleDateFormat sdf = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );
return new Timestamp(sdf.parse(valueStr).getTime());
}
return new Timestamp(Long.valueOf(value.toString()));
}
}
package com.schbrain.canal.client.type;
import java.text.ParseException;
public interface TypeHandler<T> {
public T convert(Object value) throws ParseException;
}
package com.schbrain.canal.client.type;
import com.schbrain.canal.client.exception.TypeException;
import java.lang.reflect.Constructor;
import java.lang.reflect.Type;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Timestamp;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
public class TypeHandlerRegister {
private static Map<Type,TypeHandler> typeHandlerMap=new HashMap<>();
private static Class< ? extends TypeHandler> defaultEnumTypeHandler=EnumTypeHandler.class;
static {
register((Type)Integer.class,new IntegerTypeHandler());
register((Class)Integer.TYPE, new IntegerTypeHandler());
register((Type)BigDecimal.class,new BigDecimalTypeHandler());
register((Type)BigInteger.class,new BigIntegerTypeHandler());
register((Type)Boolean.class,new BooleanTypeHandler());
register((Type)Boolean.TYPE,new BigDecimalTypeHandler());
register((Type)Date.class,new DateTypeHandler());
register((Type)Long.class,new LongTypeHandler());
register((Type)Long.TYPE,new LongTypeHandler());
register((Type)Timestamp.class,new TimestampTypeHandler());
register((Type)String.class,new StringTypeHandler());
}
public static TypeHandler getTypeHandler(Type c){
if(!typeHandlerMap.containsKey(c)){
if (Enum.class.isAssignableFrom((Class)c)){
Class cs=(Class)c;
Class<?> enumClass = cs.isAnonymousClass() ? cs.getSuperclass() : cs;
register(c,(TypeHandler)getInstance(enumClass,defaultEnumTypeHandler));
}
}
TypeHandler typeHandler= typeHandlerMap.get(c);
return typeHandler;
}
public static void register(Type c,TypeHandler t){
typeHandlerMap.put(c,t);
}
public static <T> TypeHandler<T> getInstance(Class<?> javaTypeClass, Class<?> typeHandlerClass) {
Constructor c;
if (javaTypeClass != null) {
try {
c = typeHandlerClass.getConstructor(Class.class);
return (TypeHandler)c.newInstance(javaTypeClass);
} catch (NoSuchMethodException var5) {
} catch (Exception var6) {
throw new TypeException("Failed invoking constructor for handler " + typeHandlerClass, var6);
}
}
try {
c = typeHandlerClass.getConstructor();
return (TypeHandler)c.newInstance();
} catch (Exception var4) {
throw new TypeException("Unable to find a usable constructor for " + typeHandlerClass, var4);
}
}
}
package com.schbrain.canal.client.utils;
import lombok.Data;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* @author zhuyf
* @date 2022/6/22
*/
@Data
public class Dml {
/**
* 对应canal的实例或者MQ的topic
*/
private String destination;
/**
* 数据库或schema
*/
private String database;
/**
* 表名
*/
private String table;
private List<String> pkNames;
private Boolean isDdl;
/**
* 类型: INSERT UPDATE DELETE
*/
private String type;
/**
* 执行耗时
*/
private Long es;
// dml build timeStamp
/**
* 同步时间
*/
private Long ts;
/**
* 执行的sql, dml sql为空
*/
private String sql;
/**
* 数据列表
*/
private List<Row> data;
/**
* 数据列表
*/
private Set<String> updatedNames;
/**
* 数据列
*/
@Data
public static class Row{
Map<String, Object> data;
Map<String, Object> old;
}
}
package com.schbrain.canal.client.utils;
import lombok.extern.slf4j.Slf4j;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
/**
* @author zhuyf
* @date 2022/6/22
*/
@Slf4j
public class JdbcTypeUtil {
private static boolean isText(String columnType) {
return "LONGTEXT".equalsIgnoreCase(columnType) || "MEDIUMTEXT".equalsIgnoreCase(columnType)
|| "TEXT".equalsIgnoreCase(columnType) || "TINYTEXT".equalsIgnoreCase(columnType);
}
public static Object typeConvert(String tableName ,String columnName, String value, int sqlType, String mysqlType) {
if (value == null
|| (value.equals("") && !(isText(mysqlType) || sqlType == Types.CHAR || sqlType == Types.VARCHAR || sqlType == Types.LONGVARCHAR))) {
return null;
}
try {
Object res;
switch (sqlType) {
case Types.INTEGER:
res = Integer.parseInt(value);
break;
case Types.SMALLINT:
res = Short.parseShort(value);
break;
case Types.BIT:
case Types.TINYINT:
res = Byte.parseByte(value);
break;
case Types.BIGINT:
if (mysqlType.startsWith("bigint") && mysqlType.endsWith("unsigned")) {
res = new BigInteger(value);
} else {
res = Long.parseLong(value);
}
break;
// case Types.BIT:
case Types.BOOLEAN:
res = !"0".equals(value);
break;
case Types.DOUBLE:
case Types.FLOAT:
res = Double.parseDouble(value);
break;
case Types.REAL:
res = Float.parseFloat(value);
break;
case Types.DECIMAL:
case Types.NUMERIC:
res = new BigDecimal(value);
break;
case Types.BINARY:
case Types.VARBINARY:
case Types.LONGVARBINARY:
case Types.BLOB:
res = value.getBytes("ISO-8859-1");
break;
case Types.DATE:
if (!value.startsWith("0000-00-00")) {
java.util.Date date = Util.parseDate(value);
if (date != null) {
res = new Date(date.getTime());
} else {
res = null;
}
} else {
res = null;
}
break;
case Types.TIME: {
java.util.Date date = Util.parseDate(value);
if (date != null) {
res = new Time(date.getTime());
} else {
res = null;
}
break;
}
case Types.TIMESTAMP:
if (!value.startsWith("0000-00-00")) {
java.util.Date date = Util.parseDate(value);
if (date != null) {
res = new Timestamp(date.getTime());
} else {
res = null;
}
} else {
res = null;
}
break;
case Types.CLOB:
default:
res = value;
break;
}
return res;
} catch (Exception e) {
log.error("table: {} column: {}, failed convert type {} to {}", tableName, columnName, value, sqlType);
return value;
}
}
}
package com.schbrain.canal.client.utils;
import com.google.common.base.CaseFormat;
import java.util.concurrent.ConcurrentHashMap;
public class MapUnderscoreToCamelCase {
private static ConcurrentHashMap<String,String> names=new ConcurrentHashMap<>();
public static String convertByCache(String name){
if(names.containsKey(name)) {
return names.get(name);
}
String convertName= CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, name);
names.put(name,convertName);
return convertName;
}
}
package com.schbrain.canal.client.utils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import java.util.*;
/**
* @author zhuyf
* @date 2022/6/22
*/
@Slf4j
public class MessageUtil {
public static Dml parse4Dml(CanalEntry.Entry entry) {
if (entry == null) {
return null;
}
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
return null;
}
CanalEntry.RowChange rowChange;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
CanalEntry.EventType eventType = rowChange.getEventType();
final Dml dml = new Dml();
dml.setIsDdl(rowChange.getIsDdl());
dml.setDatabase(entry.getHeader().getSchemaName());
dml.setTable(entry.getHeader().getTableName());
dml.setType(eventType.toString());
dml.setEs(entry.getHeader().getExecuteTime());
dml.setTs(System.currentTimeMillis());
dml.setSql(rowChange.getSql());
if (!rowChange.getIsDdl()) {
List<Dml.Row> rows=new ArrayList<>();
Set<String> updateSet = new HashSet<>();
dml.setPkNames(new ArrayList<>());
int i = 0;
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE
&& eventType != CanalEntry.EventType.DELETE) {
continue;
}
Dml.Row dmlRow=new Dml.Row();
Map<String, Object> row = new LinkedHashMap<>();
List<CanalEntry.Column> columns;
if (eventType == CanalEntry.EventType.DELETE) {
columns = rowData.getBeforeColumnsList();
} else {
columns = rowData.getAfterColumnsList();
}
for (CanalEntry.Column column : columns) {
if (i == 0) {
if (column.getIsKey()) {
dml.getPkNames().add(column.getName());
}
}
if (column.getIsNull()) {
row.put(column.getName(), null);
} else {
// row.put(column.getName(),
// JdbcTypeUtil.typeConvert(dml.getTable(),
// column.getName(),
// column.getValue(),
// column.getSqlType(),
// column.getMysqlType()));
row.put(column.getName(),column.getValue());
}
// 获取update为true的字段
if (column.getUpdated()) {
updateSet.add(column.getName());
}
}
if (!row.isEmpty()) {
dmlRow.setData(row);
}
if (eventType == CanalEntry.EventType.UPDATE) {
Map<String, Object> rowOld = new LinkedHashMap<>();
for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
if (column.getIsNull()) {
rowOld.put(column.getName(), null);
} else {
// rowOld.put(column.getName(),
// JdbcTypeUtil.typeConvert(dml.getTable(),
// column.getName(),
// column.getValue(),
// column.getSqlType(),
// column.getMysqlType()));
rowOld.put(column.getName(),
column.getValue());
}
}
// update操作将记录修改前的值
if (!rowOld.isEmpty()) {
dmlRow.setOld(rowOld);
}
}
if(!CollectionUtils.isEmpty(dmlRow.getData())||!CollectionUtils.isEmpty(dmlRow.getOld())){
rows.add(dmlRow);
}
i++;
}
dml.setData(rows);
dml.setUpdatedNames(updateSet);
}
return dml;
}
private static List<Map<String, Object>> changeRows(String table, List<Map<String, String>> rows, Map<String, Integer> sqlTypes, Map<String, String> mysqlTypes) {
List<Map<String, Object>> result = new ArrayList<>();
for (Map<String, String> row : rows) {
Map<String, Object> resultRow = new LinkedHashMap<>();
for (Map.Entry<String, String> entry : row.entrySet()) {
String columnName = entry.getKey();
String columnValue = entry.getValue();
Integer sqlType = sqlTypes.get(columnName);
if (sqlType == null) {
continue;
}
String mysqlType = mysqlTypes.get(columnName);
if (mysqlType == null) {
continue;
}
Object finalValue = JdbcTypeUtil.typeConvert(table, columnName, columnValue, sqlType, mysqlType);
resultRow.put(columnName, finalValue);
}
result.add(resultRow);
}
return result;
}
}
package com.schbrain.canal.client.utils;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import org.apache.commons.lang.StringUtils;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.sql.DataSource;
import java.sql.*;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.List;
import java.util.TimeZone;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
public class Util {
private static final Logger logger = LoggerFactory.getLogger(Util.class);
/**
* 通过DS执行sql
*/
public static Object sqlRS(DataSource ds, String sql, Function<ResultSet, Object> fun) {
try (Connection conn = ds.getConnection();
Statement stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
stmt.setFetchSize(Integer.MIN_VALUE);
try (ResultSet rs = stmt.executeQuery(sql)) {
return fun.apply(rs);
}
} catch (Exception e) {
logger.error("sqlRs has error, sql: {} ", sql);
throw new RuntimeException(e);
}
}
public static Object sqlRS(DataSource ds, String sql, List<Object> values, Function<ResultSet, Object> fun) {
try (Connection conn = ds.getConnection()) {
try (PreparedStatement pstmt = conn
.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
pstmt.setFetchSize(Integer.MIN_VALUE);
if (values != null) {
for (int i = 0; i < values.size(); i++) {
pstmt.setObject(i + 1, values.get(i));
}
}
try (ResultSet rs = pstmt.executeQuery()) {
return fun.apply(rs);
}
}
} catch (Exception e) {
logger.error("sqlRs has error, sql: {} ", sql);
throw new RuntimeException(e);
}
}
/**
* sql执行获取resultSet
*
* @param conn sql connection
* @param sql sql
* @param consumer 回调方法
*/
public static void sqlRS(Connection conn, String sql, Consumer<ResultSet> consumer) {
try (Statement stmt = conn.createStatement(); ResultSet rs = stmt.executeQuery(sql)) {
consumer.accept(rs);
} catch (SQLException e) {
logger.error(e.getMessage(), e);
}
}
public static String cleanColumn(String column) {
if (column == null) {
return null;
}
if (column.contains("`")) {
column = column.replaceAll("`", "");
}
if (column.contains("'")) {
column = column.replaceAll("'", "");
}
if (column.contains("\"")) {
column = column.replaceAll("\"", "");
}
return column;
}
public static ThreadPoolExecutor newFixedThreadPool(int nThreads, long keepAliveTime) {
return new ThreadPoolExecutor(nThreads,
nThreads,
keepAliveTime,
TimeUnit.MILLISECONDS,
new SynchronousQueue<>(),
(r, exe) -> {
if (!exe.isShutdown()) {
try {
exe.getQueue().put(r);
} catch (InterruptedException e) {
// ignore
}
}
});
}
public static ThreadPoolExecutor newSingleThreadExecutor(long keepAliveTime) {
return new ThreadPoolExecutor(1,
1,
keepAliveTime,
TimeUnit.MILLISECONDS,
new SynchronousQueue<>(),
(r, exe) -> {
if (!exe.isShutdown()) {
try {
exe.getQueue().put(r);
} catch (InterruptedException e) {
// ignore
}
}
});
}
public final static String timeZone; // 当前时区
private static DateTimeZone dateTimeZone;
static {
TimeZone localTimeZone = TimeZone.getDefault();
int rawOffset = localTimeZone.getRawOffset();
String symbol = "+";
if (rawOffset < 0) {
symbol = "-";
}
rawOffset = Math.abs(rawOffset);
int offsetHour = rawOffset / 3600000;
int offsetMinute = rawOffset % 3600000 / 60000;
String hour = String.format("%1$02d", offsetHour);
String minute = String.format("%1$02d", offsetMinute);
timeZone = symbol + hour + ":" + minute;
dateTimeZone = DateTimeZone.forID(timeZone);
TimeZone.setDefault(TimeZone.getTimeZone("GMT" + timeZone));
}
/**
* 通用日期时间字符解析
*
* @param datetimeStr 日期时间字符串
* @return Date
*/
public static Date parseDate(String datetimeStr) {
if (StringUtils.isEmpty(datetimeStr)) {
return null;
}
datetimeStr = datetimeStr.trim();
if (datetimeStr.contains("-")) {
if (datetimeStr.contains(":")) {
datetimeStr = datetimeStr.replace(" ", "T");
}
} else if (datetimeStr.contains(":")) {
datetimeStr = "T" + datetimeStr;
}
DateTime dateTime = new DateTime(datetimeStr, dateTimeZone);
return dateTime.toDate();
}
private static LoadingCache<String, DateTimeFormatter> dateFormatterCache = CacheBuilder.newBuilder()
.build(new CacheLoader<String, DateTimeFormatter>() {
@Override
public DateTimeFormatter load(String key) {
return DateTimeFormatter.ofPattern(key);
}
});
public static Date parseDate2(String datetimeStr) {
if (StringUtils.isEmpty(datetimeStr)) {
return null;
}
try {
datetimeStr = datetimeStr.trim();
int len = datetimeStr.length();
if (datetimeStr.contains("-") && datetimeStr.contains(":") && datetimeStr.contains(".")) {
// 包含日期+时间+毫秒
// 取毫秒位数
int msLen = len - datetimeStr.indexOf(".") - 1;
StringBuilder ms = new StringBuilder();
for (int i = 0; i < msLen; i++) {
ms.append("S");
}
String formatter = "yyyy-MM-dd HH:mm:ss." + ms;
DateTimeFormatter dateTimeFormatter = dateFormatterCache.get(formatter);
LocalDateTime dateTime = LocalDateTime.parse(datetimeStr, dateTimeFormatter);
return Date.from(dateTime.atZone(ZoneId.systemDefault()).toInstant());
} else if (datetimeStr.contains("-") && datetimeStr.contains(":")) {
// 包含日期+时间
// 判断包含时间位数
int i = datetimeStr.indexOf(":");
i = datetimeStr.indexOf(":", i + 1);
String formatter;
if (i > -1) {
formatter = "yyyy-MM-dd HH:mm:ss";
} else {
formatter = "yyyy-MM-dd HH:mm";
}
DateTimeFormatter dateTimeFormatter = dateFormatterCache.get(formatter);
LocalDateTime dateTime = LocalDateTime.parse(datetimeStr, dateTimeFormatter);
return Date.from(dateTime.atZone(ZoneId.systemDefault()).toInstant());
} else if (datetimeStr.contains("-")) {
// 只包含日期
String formatter = "yyyy-MM-dd";
DateTimeFormatter dateTimeFormatter = dateFormatterCache.get(formatter);
LocalDate localDate = LocalDate.parse(datetimeStr, dateTimeFormatter);
return Date.from(localDate.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant());
} else if (datetimeStr.contains(":")) {
// 只包含时间
String formatter;
if (datetimeStr.contains(".")) {
// 包含毫秒
int msLen = len - datetimeStr.indexOf(".") - 1;
StringBuilder ms = new StringBuilder();
for (int i = 0; i < msLen; i++) {
ms.append("S");
}
formatter = "HH:mm:ss." + ms;
} else {
// 判断包含时间位数
int i = datetimeStr.indexOf(":");
i = datetimeStr.indexOf(":", i + 1);
if (i > -1) {
formatter = "HH:mm:ss";
} else {
formatter = "HH:mm";
}
}
DateTimeFormatter dateTimeFormatter = dateFormatterCache.get(formatter);
LocalTime localTime = LocalTime.parse(datetimeStr, dateTimeFormatter);
LocalDate localDate = LocalDate.of(1970, Month.JANUARY, 1);
LocalDateTime localDateTime = LocalDateTime.of(localDate, localTime);
return Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant());
}
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
return null;
}
}
package com.schbrain.bean;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
/**
* @author zhuyf
* @date 2022/6/21
*/
@Data
public class User implements Serializable {
/**
* ID
*/
private Long id;
/**
* PKID
*/
private String pkId;
/**
* 接入方ID
*/
private Long appInfoId;
/**
* 用户ID
*/
private String userid;
/**
* 姓名
*/
private String name;
/**
* 别名
*/
private String alias;
/**
* 性别
*/
private String gender;
/**
* 头像
*/
private String avatar;
/**
* 激活状态1=已激活,2=已禁用,4=未激活,5=退出企业。
*/
private Integer status;
/**
* 手机号码
*/
private String mobile;
/**
* 职务信息
*/
private String position;
/**
* 邮箱
*/
private String email;
/**
* 头像缩略图
*/
private String thumbAvatar;
/**
* 座机
*/
private String telephone;
/**
* 扩展属性
*/
private String extattr;
/**
* 员工个人二维码
*/
private String qrCode;
/**
* 成员对外属性
*/
private String externalProfile;
/**
* 对外职务
*/
private String externalPosition;
/**
* 地址
*/
private String address;
/**
* 主部门
*/
private Long mainDepartment;
/**
* 主部门
*/
private String openUserid;
/**
* 创建时间
*/
private Date createTime;
/**
* 更新时间
*/
private Date modifyTime;
}
package com.schbrain.web;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.schbrain.bean.User;
import com.schbrain.canal.client.annotation.TableFilter;
import com.schbrain.canal.client.event.SimpleResolverCanalEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/**
* @author zhuyf
* @date 2022/6/16
*/
@Service("myCanalEvent4")
@TableFilter(table = "wechat_user",schame = "kp_user")
@Slf4j
public class MyCanalEvent4 extends SimpleResolverCanalEvent<User> {
@Override
public void onInsert(CanalEntry.Header header, User user) {
String s = JSONObject.toJSONString(user);
log.info("onInsert:{}",s);
}
@Override
public void onUpdate(CanalEntry.Header header, User before, User after) {
String s = JSONObject.toJSONString(before);
String b = JSONObject.toJSONString(after);
log.info("onUpdate,before:{},after:{}",s,b);
}
@Override
public void onDelete(CanalEntry.Header header, User user) {
String s = JSONObject.toJSONString(user);
log.info("onDelete:{}",s);
}
}
package com.schbrain.web;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.schbrain.bean.User;
import com.schbrain.canal.client.annotation.TableFilter;
import com.schbrain.canal.client.event.ResolverCanalEvent;
import com.schbrain.canal.client.event.SimpleResolverCanalEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/**
* @author zhuyf
* @date 2022/6/16
*/
@Service("myCanalEvent5")
@TableFilter(table = "wechat_user",schame = "kp_user")
@Slf4j
public class MyCanalEvent5 implements ResolverCanalEvent<User> {
@Override
public void onInsert(CanalEntry.Header header, User user) {
String s = JSONObject.toJSONString(user);
log.info("onInsert:{}",s);
}
@Override
public void onUpdate(CanalEntry.Header header, User before, User after) {
String s = JSONObject.toJSONString(before);
String b = JSONObject.toJSONString(after);
log.info("onUpdate,before:{},after:{}",s,b);
}
@Override
public void onDelete(CanalEntry.Header header, User user) {
String s = JSONObject.toJSONString(user);
log.info("onDelete:{}",s);
}
}
package com.schbrain.web;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.schbrain.bean.User;
import com.schbrain.canal.client.annotation.TableFilter;
import com.schbrain.canal.client.event.ResolverCanalEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/**
* @author zhuyf
* @date 2022/6/16
*/
@Slf4j
@Service("myCanalEvent6")
@TableFilter(table = "wechat_user",schame = "kp_user")
public class MyCanalEvent6 extends MyCanalEvent4 {
@Override
public void onInsert(CanalEntry.Header header, User user) {
String s = JSONObject.toJSONString(user);
log.info("onInsert:{}",s);
}
@Override
public void onUpdate(CanalEntry.Header header, User before, User after) {
String s = JSONObject.toJSONString(before);
String b = JSONObject.toJSONString(after);
log.info("onUpdate,before:{},after:{}",s,b);
}
@Override
public void onDelete(CanalEntry.Header header, User user) {
String s = JSONObject.toJSONString(user);
log.info("onDelete:{}",s);
}
}
package com.schbrain.web;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.schbrain.bean.User;
import com.schbrain.canal.client.annotation.TableFilter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/**
* @author zhuyf
* @date 2022/6/16
*/
@Slf4j
@Service("myCanalEvent7")
@TableFilter(table = "wechat_user",schame = "kp_user")
public class MyCanalEvent7 extends MyCanalEvent6 {
@Override
public void onInsert(CanalEntry.Header header, User user) {
String s = JSONObject.toJSONString(user);
log.info("onInsert:{}",s);
}
@Override
public void onUpdate(CanalEntry.Header header, User before, User after) {
String s = JSONObject.toJSONString(before);
String b = JSONObject.toJSONString(after);
log.info("onUpdate,before:{},after:{}",s,b);
}
@Override
public void onDelete(CanalEntry.Header header, User user) {
String s = JSONObject.toJSONString(user);
log.info("onDelete:{}",s);
}
}
......@@ -5,19 +5,19 @@ canal.client.instances.kp_user.retryCount=10
canal.client.instances.kp_user.zkHosts=192.168.22.22:2181,192.168.22.26:2181,192.168.22.21:2181
#canal.client.instances.screenschbrain.addresses=192.168.36.66:11111
canal.client.instances.screenschbrain.username=
canal.client.instances.screenschbrain.password=
canal.client.instances.screenschbrain.retryCount=10
canal.client.instances.screenschbrain.subscribe=kp_weekly.comment_student
canal.client.instances.screenschbrain.zkHosts=192.168.2.48:2181,192.168.2.47:2181,192.168.2.43:2181
canal.client.instances.qicheng_czzs.addresses=192.168.36.66:11111
canal.client.instances.qicheng_czzs.username=
canal.client.instances.qicheng_czzs.password=
canal.client.instances.qicheng_czzs.retryCount=10
canal.client.instances.qicheng_czzs.zkHosts=192.168.22.22:2181,192.168.22.26:2181,192.168.22.21:2181
#canal.client.instances.screenschbrain.username=
#canal.client.instances.screenschbrain.password=
#canal.client.instances.screenschbrain.retryCount=10
#canal.client.instances.screenschbrain.subscribe=kp_weekly.comment_student
#canal.client.instances.screenschbrain.zkHosts=192.168.2.48:2181,192.168.2.47:2181,192.168.2.43:2181
#canal.client.instances.qicheng_czzs.addresses=192.168.36.66:11111
#canal.client.instances.qicheng_czzs.username=
#canal.client.instances.qicheng_czzs.password=
#canal.client.instances.qicheng_czzs.retryCount=10
#canal.client.instances.qicheng_czzs.zkHosts=192.168.22.22:2181,192.168.22.26:2181,192.168.22.21:2181
#
#
#
package com.schbrain.web;
import com.schbrain.canal.client.event.ResolverCanalEvent;
import com.schbrain.canal.client.event.SimpleResolverCanalEvent;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
/**
* @author zhuyf
* @date 2022/6/22
*/
public class MyCanalEvent6Test {
public static void main(String[] args) {
Class g = null;
MyCanalEvent7 event5 = new MyCanalEvent7();
Class c = event5.getClass();
while(true){
Class aClass = interfaceGeneric(event5);
if(aClass!=null){
g = aClass;
break;
}
Type type = c.getGenericSuperclass();
if(type instanceof ParameterizedType){
ParameterizedType parameterizedType = (ParameterizedType)type;
Type[] types = parameterizedType.getActualTypeArguments();
if(types!= null && types.length>0){
g = (Class) types[0];
break;
}
}
c = c.getSuperclass();
}
System.out.println(g);
}
/**
* 接口泛型缓存
* @param event
* @return
*/
private static Class interfaceGeneric(ResolverCanalEvent<?> event){
Type[] types = event.getClass().getGenericInterfaces();
if(types == null || types.length == 0){
return null;
}
for (Type type : types) {
if(!(type instanceof ParameterizedType)){
continue;
}
ParameterizedType parameterized = (ParameterizedType)type;
Type rawType = parameterized.getRawType();
if(rawType.equals(ResolverCanalEvent.class)){
Class clazz = (Class)parameterized.getActualTypeArguments()[0];
return clazz;
}
}
return null;
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment