From 92e0eb7d4f49f67e7aa1124ba7f1e6255d9596df Mon Sep 17 00:00:00 2001 From: zhuyf Date: Thu, 20 Apr 2023 20:59:56 +0800 Subject: [PATCH] =?UTF-8?q?spring=20EL=E8=A1=A8=E8=BE=BE=E5=BC=8F=E6=94=AF?= =?UTF-8?q?=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 2 +- .../canal/client/annotation/TableFilter.java | 2 + .../client/conf/CanalClientConfiguration.java | 5 +- .../canal/client/core/SimpleCanalClient.java | 42 +++++++++++-- .../com/schbrain/canal/client/test/Test.java | 25 ++++++++ .../java/com/schbrain/web/KeyService.java | 15 +++++ .../java/com/schbrain/web/UserServcie.java | 21 +++++++ .../java/com/schbrain/web/UserService.java | 21 ------- .../src/main/resources/application.properties | 2 + .../com/schbrain/web/MyCanalEvent6Test.java | 62 ------------------- 10 files changed, 106 insertions(+), 91 deletions(-) create mode 100644 schbrain-canal-client/src/test/java/com/schbrain/canal/client/test/Test.java create mode 100644 schbrain-canal-web/src/main/java/com/schbrain/web/KeyService.java create mode 100644 schbrain-canal-web/src/main/java/com/schbrain/web/UserServcie.java delete mode 100644 schbrain-canal-web/src/main/java/com/schbrain/web/UserService.java delete mode 100644 schbrain-canal-web/src/test/java/com/schbrain/web/MyCanalEvent6Test.java diff --git a/pom.xml b/pom.xml index ad409fe..e1f972a 100644 --- a/pom.xml +++ b/pom.xml @@ -17,7 +17,7 @@ schbrain-canal - 1.1.3-RELEASE + 1.1.4-SNAPSHOT diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/TableFilter.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/TableFilter.java index e3d2f4d..1cd67b1 100644 --- a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/TableFilter.java +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/TableFilter.java @@ -15,12 +15,14 @@ import java.lang.annotation.Target; public @interface TableFilter { /** * 表名称 + * SpEL #{...} and property place holders ${...} are supported. * @return */ String table() default ""; /** * 数据库名称 + * SpEL #{...} and property place holders ${...} are supported. * @return */ String schame() default ""; diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/conf/CanalClientConfiguration.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/conf/CanalClientConfiguration.java index 1747973..575beca 100644 --- a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/conf/CanalClientConfiguration.java +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/conf/CanalClientConfiguration.java @@ -7,6 +7,7 @@ import com.schbrain.canal.client.transfer.TransponderFactory; import com.schbrain.canal.client.utils.BeanUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.core.Ordered; import org.springframework.core.annotation.Order; @@ -30,10 +31,10 @@ public class CanalClientConfiguration { } @Bean - private CanalClient canalClient() { + private CanalClient canalClient(ConfigurableBeanFactory configurableBeanFactory) { log.info("starting canal client...."); TransponderFactory factory = MessageTransponders.defaultMessageTransponder(); - CanalClient client = new SimpleCanalClient(schbrainCanalConfig,factory); + CanalClient client = new SimpleCanalClient(schbrainCanalConfig,factory,configurableBeanFactory); client.start(); return client; } diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/SimpleCanalClient.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/SimpleCanalClient.java index 7bb9edc..1001c7b 100644 --- a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/SimpleCanalClient.java +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/SimpleCanalClient.java @@ -14,7 +14,12 @@ import com.schbrain.canal.client.utils.BeanUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang.StringUtils; +import org.springframework.beans.factory.config.BeanExpressionContext; +import org.springframework.beans.factory.config.BeanExpressionResolver; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.expression.StandardBeanExpressionResolver; import org.springframework.core.annotation.AnnotatedElementUtils; + import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; @@ -27,6 +32,11 @@ import java.util.concurrent.TimeUnit; @Slf4j public class SimpleCanalClient extends AbstractCanalClient { + + private BeanExpressionResolver resolver = new StandardBeanExpressionResolver(); + + private BeanExpressionContext expressionContext; + /** * executor */ @@ -47,8 +57,10 @@ public class SimpleCanalClient extends AbstractCanalClient { */ private final List annoListeners = new ArrayList<>(); - public SimpleCanalClient(SchbrainCanalConfig canalConfig, TransponderFactory factory) { + + public SimpleCanalClient(SchbrainCanalConfig canalConfig, TransponderFactory factory,ConfigurableBeanFactory beanFactory) { super(canalConfig,factory); + this.expressionContext = new BeanExpressionContext(beanFactory, null); executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), Executors.defaultThreadFactory()); initListeners(); } @@ -85,7 +97,10 @@ public class SimpleCanalClient extends AbstractCanalClient { } TableFilter filter = canalEvent.getClass().getAnnotation(TableFilter.class); if(filter!=null && StringUtils.isNotBlank(filter.schame()) && StringUtils.isNotBlank(filter.table())){ - String key = filter.schame()+":"+filter.table(); + //拼接key + String schame = parse(filter.schame()); + String table = parse(filter.table()); + String key = schame+":"+table; List filterList = MapUtils.getObject(tableCanalEventMap,key,new ArrayList<>()); filterList.add(canalEvent); tableCanalEventMap.put(key,filterList); @@ -128,13 +143,13 @@ public class SimpleCanalClient extends AbstractCanalClient { if(list==null || list.size()<=0){ return; } - MethodArgumentResolver resolver = MethodArgumentConfig.LISTENERMETHODARGUMENTRESOLVER; - for (ResolverCanalEvent event : list) { TableFilter filter = event.getClass().getAnnotation(TableFilter.class); if(filter!=null && StringUtils.isNotBlank(filter.schame()) && StringUtils.isNotBlank(filter.table())){ - String key = filter.schame()+":"+filter.table(); + String schame = parse(filter.schame()); + String table = parse(filter.table()); + String key = schame+":"+table; List> filterList = MapUtils.getObject(resolverCanalEvents,key,new ArrayList<>()); filterList.add(event); resolverCanalEvents.put(key,filterList); @@ -144,6 +159,23 @@ public class SimpleCanalClient extends AbstractCanalClient { } } + /** + * 解析参数值 + * @param val + * @return + */ + private String parse(String val){ + String resolvedValue = expressionContext.getBeanFactory().resolveEmbeddedValue(val); + if (!(resolvedValue.startsWith("#{") && resolvedValue.endsWith("}"))) { + return val; + } + Object elVal = resolver.evaluate(resolvedValue,expressionContext); + if(elVal instanceof String){ + return (String) elVal; + } + return elVal.toString(); + } + @Override protected void process(CanalConnector connector, Map.Entry config) { HandlerConf handlerConf = new HandlerConf(listeners,tableCanalEventMap,annoListeners,resolverCanalEvents); diff --git a/schbrain-canal-client/src/test/java/com/schbrain/canal/client/test/Test.java b/schbrain-canal-client/src/test/java/com/schbrain/canal/client/test/Test.java new file mode 100644 index 0000000..48fd91e --- /dev/null +++ b/schbrain-canal-client/src/test/java/com/schbrain/canal/client/test/Test.java @@ -0,0 +1,25 @@ +package com.schbrain.canal.client.test; + +import org.springframework.expression.Expression; +import org.springframework.expression.ExpressionParser; +import org.springframework.expression.spel.standard.SpelExpressionParser; + +public class Test { + + public static void main(String[] args) { + + + + ExpressionParser parser = new SpelExpressionParser(); + Expression exp = parser.parseExpression("'Hello World'"); + String message = (String) exp.getValue(); + System.out.println(message); + assert message.equals("Hello World"); + exp = parser.parseExpression("'Hello World'.concat('!')"); + message = (String) exp.getValue(); + System.out.println(message); + assert message.equals("Hello World!"); + exp = parser.parseExpression("'Hello World'.bytes.length"); + assert exp.getValue().equals(11); + } +} diff --git a/schbrain-canal-web/src/main/java/com/schbrain/web/KeyService.java b/schbrain-canal-web/src/main/java/com/schbrain/web/KeyService.java new file mode 100644 index 0000000..c50faaf --- /dev/null +++ b/schbrain-canal-web/src/main/java/com/schbrain/web/KeyService.java @@ -0,0 +1,15 @@ +package com.schbrain.web; + +import org.springframework.stereotype.Service; + +@Service +public class KeyService { + + public String table(){ + return "data_dict"; + } + + public String schame(){ + return "kp_user"; + } +} diff --git a/schbrain-canal-web/src/main/java/com/schbrain/web/UserServcie.java b/schbrain-canal-web/src/main/java/com/schbrain/web/UserServcie.java new file mode 100644 index 0000000..2af519b --- /dev/null +++ b/schbrain-canal-web/src/main/java/com/schbrain/web/UserServcie.java @@ -0,0 +1,21 @@ +package com.schbrain.web; + +import com.alibaba.otter.canal.protocol.CanalEntry; +import com.schbrain.bean.User; +import com.schbrain.canal.client.annotation.TableFilter; +import com.schbrain.canal.client.event.SimpleResolverCanalEvent; +import org.springframework.stereotype.Service; + +/** + * @author zhuyf + * @date 2022/6/16 + */ +@Service +@TableFilter(table = "#{keyService.table()}" , schame = "${canal.table.scheam}") +public class UserServcie extends SimpleResolverCanalEvent { + + @Override + public void onInsert(CanalEntry.Header header, User o) { + } + +} \ No newline at end of file diff --git a/schbrain-canal-web/src/main/java/com/schbrain/web/UserService.java b/schbrain-canal-web/src/main/java/com/schbrain/web/UserService.java deleted file mode 100644 index ff8ba61..0000000 --- a/schbrain-canal-web/src/main/java/com/schbrain/web/UserService.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.schbrain.web; - -import com.alibaba.otter.canal.protocol.CanalEntry; -import com.schbrain.canal.client.annotation.CanalEventListener; -import com.schbrain.canal.client.annotation.InsertListenPoint; -import com.schbrain.canal.client.annotation.UpdateListenPoint; -import org.springframework.stereotype.Service; - -/** - * @author zhuyf - * @date 2022/6/16 - */ -@Service -public class UserService { - - String getUser(){ - return "1"; - } - - -} diff --git a/schbrain-canal-web/src/main/resources/application.properties b/schbrain-canal-web/src/main/resources/application.properties index 8c08396..dadaf08 100644 --- a/schbrain-canal-web/src/main/resources/application.properties +++ b/schbrain-canal-web/src/main/resources/application.properties @@ -7,6 +7,7 @@ canal.client.instances.kp_user.retryCount=10 canal.client.instances.kp_user.zkHosts=192.168.22.22:2181,192.168.22.26:2181,192.168.22.21:2181 + #canal.client.instances.kp_user.addresses=canal-server-stable-0.canal-server-discovery-svc-stable.devops.svc.cluster.local:11111,canal-server-stable-1.canal-server-discovery-svc-stable.devops.svc.cluster.local:11111,canal-server-stable-2.canal-server-discovery-svc-stable.devops.svc.cluster.local:11111 @@ -28,3 +29,4 @@ canal.client.instances.kp_user.zkHosts=192.168.22.22:2181,192.168.22.26:2181,192 # # +canal.table.scheam = schame \ No newline at end of file diff --git a/schbrain-canal-web/src/test/java/com/schbrain/web/MyCanalEvent6Test.java b/schbrain-canal-web/src/test/java/com/schbrain/web/MyCanalEvent6Test.java deleted file mode 100644 index cb75254..0000000 --- a/schbrain-canal-web/src/test/java/com/schbrain/web/MyCanalEvent6Test.java +++ /dev/null @@ -1,62 +0,0 @@ -package com.schbrain.web; - -import com.schbrain.canal.client.event.ResolverCanalEvent; -import com.schbrain.canal.client.event.SimpleResolverCanalEvent; - -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; - -/** - * @author zhuyf - * @date 2022/6/22 - */ -public class MyCanalEvent6Test { - - public static void main(String[] args) { - Class g = null; - MyCanalEvent7 event5 = new MyCanalEvent7(); - Class c = event5.getClass(); - while(true){ - Class aClass = interfaceGeneric(event5); - if(aClass!=null){ - g = aClass; - break; - } - Type type = c.getGenericSuperclass(); - if(type instanceof ParameterizedType){ - ParameterizedType parameterizedType = (ParameterizedType)type; - Type[] types = parameterizedType.getActualTypeArguments(); - if(types!= null && types.length>0){ - g = (Class) types[0]; - break; - } - } - c = c.getSuperclass(); - } - System.out.println(g); - } - - /** - * 接口泛型缓存 - * @param event - * @return - */ - private static Class interfaceGeneric(ResolverCanalEvent event){ - Type[] types = event.getClass().getGenericInterfaces(); - if(types == null || types.length == 0){ - return null; - } - for (Type type : types) { - if(!(type instanceof ParameterizedType)){ - continue; - } - ParameterizedType parameterized = (ParameterizedType)type; - Type rawType = parameterized.getRawType(); - if(rawType.equals(ResolverCanalEvent.class)){ - Class clazz = (Class)parameterized.getActualTypeArguments()[0]; - return clazz; - } - } - return null; - } -} -- GitLab