Commit 92e0eb7d authored by zhuyunfeng's avatar zhuyunfeng

spring EL表达式支持

parent 47fd9a3f
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
<name>schbrain-canal</name> <name>schbrain-canal</name>
<properties> <properties>
<revision>1.1.3-RELEASE</revision> <revision>1.1.4-SNAPSHOT</revision>
</properties> </properties>
<modules> <modules>
......
...@@ -15,12 +15,14 @@ import java.lang.annotation.Target; ...@@ -15,12 +15,14 @@ import java.lang.annotation.Target;
public @interface TableFilter { public @interface TableFilter {
/** /**
* 表名称 * 表名称
* SpEL #{...} and property place holders ${...} are supported.
* @return * @return
*/ */
String table() default ""; String table() default "";
/** /**
* 数据库名称 * 数据库名称
* SpEL #{...} and property place holders ${...} are supported.
* @return * @return
*/ */
String schame() default ""; String schame() default "";
......
...@@ -7,6 +7,7 @@ import com.schbrain.canal.client.transfer.TransponderFactory; ...@@ -7,6 +7,7 @@ import com.schbrain.canal.client.transfer.TransponderFactory;
import com.schbrain.canal.client.utils.BeanUtil; import com.schbrain.canal.client.utils.BeanUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.core.Ordered; import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order; import org.springframework.core.annotation.Order;
...@@ -30,10 +31,10 @@ public class CanalClientConfiguration { ...@@ -30,10 +31,10 @@ public class CanalClientConfiguration {
} }
@Bean @Bean
private CanalClient canalClient() { private CanalClient canalClient(ConfigurableBeanFactory configurableBeanFactory) {
log.info("starting canal client...."); log.info("starting canal client....");
TransponderFactory factory = MessageTransponders.defaultMessageTransponder(); TransponderFactory factory = MessageTransponders.defaultMessageTransponder();
CanalClient client = new SimpleCanalClient(schbrainCanalConfig,factory); CanalClient client = new SimpleCanalClient(schbrainCanalConfig,factory,configurableBeanFactory);
client.start(); client.start();
return client; return client;
} }
......
...@@ -14,7 +14,12 @@ import com.schbrain.canal.client.utils.BeanUtil; ...@@ -14,7 +14,12 @@ import com.schbrain.canal.client.utils.BeanUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.MapUtils; import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.config.BeanExpressionContext;
import org.springframework.beans.factory.config.BeanExpressionResolver;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.expression.StandardBeanExpressionResolver;
import org.springframework.core.annotation.AnnotatedElementUtils; import org.springframework.core.annotation.AnnotatedElementUtils;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
...@@ -27,6 +32,11 @@ import java.util.concurrent.TimeUnit; ...@@ -27,6 +32,11 @@ import java.util.concurrent.TimeUnit;
@Slf4j @Slf4j
public class SimpleCanalClient extends AbstractCanalClient { public class SimpleCanalClient extends AbstractCanalClient {
private BeanExpressionResolver resolver = new StandardBeanExpressionResolver();
private BeanExpressionContext expressionContext;
/** /**
* executor * executor
*/ */
...@@ -47,8 +57,10 @@ public class SimpleCanalClient extends AbstractCanalClient { ...@@ -47,8 +57,10 @@ public class SimpleCanalClient extends AbstractCanalClient {
*/ */
private final List<ListenerPoint> annoListeners = new ArrayList<>(); private final List<ListenerPoint> annoListeners = new ArrayList<>();
public SimpleCanalClient(SchbrainCanalConfig canalConfig, TransponderFactory factory) {
public SimpleCanalClient(SchbrainCanalConfig canalConfig, TransponderFactory factory,ConfigurableBeanFactory beanFactory) {
super(canalConfig,factory); super(canalConfig,factory);
this.expressionContext = new BeanExpressionContext(beanFactory, null);
executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), Executors.defaultThreadFactory()); executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), Executors.defaultThreadFactory());
initListeners(); initListeners();
} }
...@@ -85,7 +97,10 @@ public class SimpleCanalClient extends AbstractCanalClient { ...@@ -85,7 +97,10 @@ public class SimpleCanalClient extends AbstractCanalClient {
} }
TableFilter filter = canalEvent.getClass().getAnnotation(TableFilter.class); TableFilter filter = canalEvent.getClass().getAnnotation(TableFilter.class);
if(filter!=null && StringUtils.isNotBlank(filter.schame()) && StringUtils.isNotBlank(filter.table())){ if(filter!=null && StringUtils.isNotBlank(filter.schame()) && StringUtils.isNotBlank(filter.table())){
String key = filter.schame()+":"+filter.table(); //拼接key
String schame = parse(filter.schame());
String table = parse(filter.table());
String key = schame+":"+table;
List<CanalEvent> filterList = MapUtils.getObject(tableCanalEventMap,key,new ArrayList<>()); List<CanalEvent> filterList = MapUtils.getObject(tableCanalEventMap,key,new ArrayList<>());
filterList.add(canalEvent); filterList.add(canalEvent);
tableCanalEventMap.put(key,filterList); tableCanalEventMap.put(key,filterList);
...@@ -128,13 +143,13 @@ public class SimpleCanalClient extends AbstractCanalClient { ...@@ -128,13 +143,13 @@ public class SimpleCanalClient extends AbstractCanalClient {
if(list==null || list.size()<=0){ if(list==null || list.size()<=0){
return; return;
} }
MethodArgumentResolver resolver = MethodArgumentConfig.LISTENERMETHODARGUMENTRESOLVER; MethodArgumentResolver resolver = MethodArgumentConfig.LISTENERMETHODARGUMENTRESOLVER;
for (ResolverCanalEvent event : list) { for (ResolverCanalEvent event : list) {
TableFilter filter = event.getClass().getAnnotation(TableFilter.class); TableFilter filter = event.getClass().getAnnotation(TableFilter.class);
if(filter!=null && StringUtils.isNotBlank(filter.schame()) && StringUtils.isNotBlank(filter.table())){ if(filter!=null && StringUtils.isNotBlank(filter.schame()) && StringUtils.isNotBlank(filter.table())){
String key = filter.schame()+":"+filter.table(); String schame = parse(filter.schame());
String table = parse(filter.table());
String key = schame+":"+table;
List<ResolverCanalEvent<?>> filterList = MapUtils.getObject(resolverCanalEvents,key,new ArrayList<>()); List<ResolverCanalEvent<?>> filterList = MapUtils.getObject(resolverCanalEvents,key,new ArrayList<>());
filterList.add(event); filterList.add(event);
resolverCanalEvents.put(key,filterList); resolverCanalEvents.put(key,filterList);
...@@ -144,6 +159,23 @@ public class SimpleCanalClient extends AbstractCanalClient { ...@@ -144,6 +159,23 @@ public class SimpleCanalClient extends AbstractCanalClient {
} }
} }
/**
* 解析参数值
* @param val
* @return
*/
private String parse(String val){
String resolvedValue = expressionContext.getBeanFactory().resolveEmbeddedValue(val);
if (!(resolvedValue.startsWith("#{") && resolvedValue.endsWith("}"))) {
return val;
}
Object elVal = resolver.evaluate(resolvedValue,expressionContext);
if(elVal instanceof String){
return (String) elVal;
}
return elVal.toString();
}
@Override @Override
protected void process(CanalConnector connector, Map.Entry<String, CanalClientConfig> config) { protected void process(CanalConnector connector, Map.Entry<String, CanalClientConfig> config) {
HandlerConf handlerConf = new HandlerConf(listeners,tableCanalEventMap,annoListeners,resolverCanalEvents); HandlerConf handlerConf = new HandlerConf(listeners,tableCanalEventMap,annoListeners,resolverCanalEvents);
......
package com.schbrain.canal.client.test;
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
public class Test {
public static void main(String[] args) {
ExpressionParser parser = new SpelExpressionParser();
Expression exp = parser.parseExpression("'Hello World'");
String message = (String) exp.getValue();
System.out.println(message);
assert message.equals("Hello World");
exp = parser.parseExpression("'Hello World'.concat('!')");
message = (String) exp.getValue();
System.out.println(message);
assert message.equals("Hello World!");
exp = parser.parseExpression("'Hello World'.bytes.length");
assert exp.getValue().equals(11);
}
}
package com.schbrain.web;
import org.springframework.stereotype.Service;
@Service
public class KeyService {
public String table(){
return "data_dict";
}
public String schame(){
return "kp_user";
}
}
package com.schbrain.web; package com.schbrain.web;
import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.CanalEntry;
import com.schbrain.canal.client.annotation.CanalEventListener; import com.schbrain.bean.User;
import com.schbrain.canal.client.annotation.InsertListenPoint; import com.schbrain.canal.client.annotation.TableFilter;
import com.schbrain.canal.client.annotation.UpdateListenPoint; import com.schbrain.canal.client.event.SimpleResolverCanalEvent;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
/** /**
...@@ -11,11 +11,11 @@ import org.springframework.stereotype.Service; ...@@ -11,11 +11,11 @@ import org.springframework.stereotype.Service;
* @date 2022/6/16 * @date 2022/6/16
*/ */
@Service @Service
public class UserService { @TableFilter(table = "#{keyService.table()}" , schame = "${canal.table.scheam}")
public class UserServcie extends SimpleResolverCanalEvent<User> {
String getUser(){ @Override
return "1"; public void onInsert(CanalEntry.Header header, User o) {
} }
} }
\ No newline at end of file
...@@ -7,6 +7,7 @@ canal.client.instances.kp_user.retryCount=10 ...@@ -7,6 +7,7 @@ canal.client.instances.kp_user.retryCount=10
canal.client.instances.kp_user.zkHosts=192.168.22.22:2181,192.168.22.26:2181,192.168.22.21:2181 canal.client.instances.kp_user.zkHosts=192.168.22.22:2181,192.168.22.26:2181,192.168.22.21:2181
#canal.client.instances.kp_user.addresses=canal-server-stable-0.canal-server-discovery-svc-stable.devops.svc.cluster.local:11111,canal-server-stable-1.canal-server-discovery-svc-stable.devops.svc.cluster.local:11111,canal-server-stable-2.canal-server-discovery-svc-stable.devops.svc.cluster.local:11111 #canal.client.instances.kp_user.addresses=canal-server-stable-0.canal-server-discovery-svc-stable.devops.svc.cluster.local:11111,canal-server-stable-1.canal-server-discovery-svc-stable.devops.svc.cluster.local:11111,canal-server-stable-2.canal-server-discovery-svc-stable.devops.svc.cluster.local:11111
...@@ -28,3 +29,4 @@ canal.client.instances.kp_user.zkHosts=192.168.22.22:2181,192.168.22.26:2181,192 ...@@ -28,3 +29,4 @@ canal.client.instances.kp_user.zkHosts=192.168.22.22:2181,192.168.22.26:2181,192
# #
# #
canal.table.scheam = schame
\ No newline at end of file
package com.schbrain.web;
import com.schbrain.canal.client.event.ResolverCanalEvent;
import com.schbrain.canal.client.event.SimpleResolverCanalEvent;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
/**
* @author zhuyf
* @date 2022/6/22
*/
public class MyCanalEvent6Test {
public static void main(String[] args) {
Class g = null;
MyCanalEvent7 event5 = new MyCanalEvent7();
Class c = event5.getClass();
while(true){
Class aClass = interfaceGeneric(event5);
if(aClass!=null){
g = aClass;
break;
}
Type type = c.getGenericSuperclass();
if(type instanceof ParameterizedType){
ParameterizedType parameterizedType = (ParameterizedType)type;
Type[] types = parameterizedType.getActualTypeArguments();
if(types!= null && types.length>0){
g = (Class) types[0];
break;
}
}
c = c.getSuperclass();
}
System.out.println(g);
}
/**
* 接口泛型缓存
* @param event
* @return
*/
private static Class interfaceGeneric(ResolverCanalEvent<?> event){
Type[] types = event.getClass().getGenericInterfaces();
if(types == null || types.length == 0){
return null;
}
for (Type type : types) {
if(!(type instanceof ParameterizedType)){
continue;
}
ParameterizedType parameterized = (ParameterizedType)type;
Type rawType = parameterized.getRawType();
if(rawType.equals(ResolverCanalEvent.class)){
Class clazz = (Class)parameterized.getActualTypeArguments()[0];
return clazz;
}
}
return null;
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment