diff --git a/pom.xml b/pom.xml index ad409fe848e8afce7b4b63d3bfaf9226b9ecbf1c..e1f972ac0dcf64cac0bfcd9278394d47d37e5d59 100644 --- a/pom.xml +++ b/pom.xml @@ -17,7 +17,7 @@ schbrain-canal - 1.1.3-RELEASE + 1.1.4-SNAPSHOT diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/TableFilter.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/TableFilter.java index e3d2f4daa1b613524a427d5b966a1be25e317eab..1cd67b13c04bdeee48c3932f67ffe8f2422089c9 100644 --- a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/TableFilter.java +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/annotation/TableFilter.java @@ -15,12 +15,14 @@ import java.lang.annotation.Target; public @interface TableFilter { /** * 表名称 + * SpEL #{...} and property place holders ${...} are supported. * @return */ String table() default ""; /** * 数据库名称 + * SpEL #{...} and property place holders ${...} are supported. * @return */ String schame() default ""; diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/conf/CanalClientConfiguration.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/conf/CanalClientConfiguration.java index 1747973c28a28e20e538367344a9e3af8c052dbd..575becac2115722f46b34a4dcbcf6f5312aca48a 100644 --- a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/conf/CanalClientConfiguration.java +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/conf/CanalClientConfiguration.java @@ -7,6 +7,7 @@ import com.schbrain.canal.client.transfer.TransponderFactory; import com.schbrain.canal.client.utils.BeanUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.core.Ordered; import org.springframework.core.annotation.Order; @@ -30,10 +31,10 @@ public class CanalClientConfiguration { } @Bean - private CanalClient canalClient() { + private CanalClient canalClient(ConfigurableBeanFactory configurableBeanFactory) { log.info("starting canal client...."); TransponderFactory factory = MessageTransponders.defaultMessageTransponder(); - CanalClient client = new SimpleCanalClient(schbrainCanalConfig,factory); + CanalClient client = new SimpleCanalClient(schbrainCanalConfig,factory,configurableBeanFactory); client.start(); return client; } diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/SimpleCanalClient.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/SimpleCanalClient.java index 7bb9edcec1b618c3e47d571e8ee3ce00f08ba813..1001c7ba1490fbb1d6a8b1e66812d89addfe3bdc 100644 --- a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/SimpleCanalClient.java +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/SimpleCanalClient.java @@ -14,7 +14,12 @@ import com.schbrain.canal.client.utils.BeanUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang.StringUtils; +import org.springframework.beans.factory.config.BeanExpressionContext; +import org.springframework.beans.factory.config.BeanExpressionResolver; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.expression.StandardBeanExpressionResolver; import org.springframework.core.annotation.AnnotatedElementUtils; + import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; @@ -27,6 +32,11 @@ import java.util.concurrent.TimeUnit; @Slf4j public class SimpleCanalClient extends AbstractCanalClient { + + private BeanExpressionResolver resolver = new StandardBeanExpressionResolver(); + + private BeanExpressionContext expressionContext; + /** * executor */ @@ -47,8 +57,10 @@ public class SimpleCanalClient extends AbstractCanalClient { */ private final List annoListeners = new ArrayList<>(); - public SimpleCanalClient(SchbrainCanalConfig canalConfig, TransponderFactory factory) { + + public SimpleCanalClient(SchbrainCanalConfig canalConfig, TransponderFactory factory,ConfigurableBeanFactory beanFactory) { super(canalConfig,factory); + this.expressionContext = new BeanExpressionContext(beanFactory, null); executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), Executors.defaultThreadFactory()); initListeners(); } @@ -85,7 +97,10 @@ public class SimpleCanalClient extends AbstractCanalClient { } TableFilter filter = canalEvent.getClass().getAnnotation(TableFilter.class); if(filter!=null && StringUtils.isNotBlank(filter.schame()) && StringUtils.isNotBlank(filter.table())){ - String key = filter.schame()+":"+filter.table(); + //拼接key + String schame = parse(filter.schame()); + String table = parse(filter.table()); + String key = schame+":"+table; List filterList = MapUtils.getObject(tableCanalEventMap,key,new ArrayList<>()); filterList.add(canalEvent); tableCanalEventMap.put(key,filterList); @@ -128,13 +143,13 @@ public class SimpleCanalClient extends AbstractCanalClient { if(list==null || list.size()<=0){ return; } - MethodArgumentResolver resolver = MethodArgumentConfig.LISTENERMETHODARGUMENTRESOLVER; - for (ResolverCanalEvent event : list) { TableFilter filter = event.getClass().getAnnotation(TableFilter.class); if(filter!=null && StringUtils.isNotBlank(filter.schame()) && StringUtils.isNotBlank(filter.table())){ - String key = filter.schame()+":"+filter.table(); + String schame = parse(filter.schame()); + String table = parse(filter.table()); + String key = schame+":"+table; List> filterList = MapUtils.getObject(resolverCanalEvents,key,new ArrayList<>()); filterList.add(event); resolverCanalEvents.put(key,filterList); @@ -144,6 +159,23 @@ public class SimpleCanalClient extends AbstractCanalClient { } } + /** + * 解析参数值 + * @param val + * @return + */ + private String parse(String val){ + String resolvedValue = expressionContext.getBeanFactory().resolveEmbeddedValue(val); + if (!(resolvedValue.startsWith("#{") && resolvedValue.endsWith("}"))) { + return val; + } + Object elVal = resolver.evaluate(resolvedValue,expressionContext); + if(elVal instanceof String){ + return (String) elVal; + } + return elVal.toString(); + } + @Override protected void process(CanalConnector connector, Map.Entry config) { HandlerConf handlerConf = new HandlerConf(listeners,tableCanalEventMap,annoListeners,resolverCanalEvents); diff --git a/schbrain-canal-client/src/test/java/com/schbrain/canal/client/test/Test.java b/schbrain-canal-client/src/test/java/com/schbrain/canal/client/test/Test.java new file mode 100644 index 0000000000000000000000000000000000000000..48fd91e866de7f41e2d9857bb29b5559f546e8fa --- /dev/null +++ b/schbrain-canal-client/src/test/java/com/schbrain/canal/client/test/Test.java @@ -0,0 +1,25 @@ +package com.schbrain.canal.client.test; + +import org.springframework.expression.Expression; +import org.springframework.expression.ExpressionParser; +import org.springframework.expression.spel.standard.SpelExpressionParser; + +public class Test { + + public static void main(String[] args) { + + + + ExpressionParser parser = new SpelExpressionParser(); + Expression exp = parser.parseExpression("'Hello World'"); + String message = (String) exp.getValue(); + System.out.println(message); + assert message.equals("Hello World"); + exp = parser.parseExpression("'Hello World'.concat('!')"); + message = (String) exp.getValue(); + System.out.println(message); + assert message.equals("Hello World!"); + exp = parser.parseExpression("'Hello World'.bytes.length"); + assert exp.getValue().equals(11); + } +} diff --git a/schbrain-canal-web/src/main/java/com/schbrain/web/KeyService.java b/schbrain-canal-web/src/main/java/com/schbrain/web/KeyService.java new file mode 100644 index 0000000000000000000000000000000000000000..c50faaf963a0e235600281a924b012deda53f312 --- /dev/null +++ b/schbrain-canal-web/src/main/java/com/schbrain/web/KeyService.java @@ -0,0 +1,15 @@ +package com.schbrain.web; + +import org.springframework.stereotype.Service; + +@Service +public class KeyService { + + public String table(){ + return "data_dict"; + } + + public String schame(){ + return "kp_user"; + } +} diff --git a/schbrain-canal-web/src/main/java/com/schbrain/web/UserServcie.java b/schbrain-canal-web/src/main/java/com/schbrain/web/UserServcie.java new file mode 100644 index 0000000000000000000000000000000000000000..2af519b6d206f35ffe532ffee58c301128293765 --- /dev/null +++ b/schbrain-canal-web/src/main/java/com/schbrain/web/UserServcie.java @@ -0,0 +1,21 @@ +package com.schbrain.web; + +import com.alibaba.otter.canal.protocol.CanalEntry; +import com.schbrain.bean.User; +import com.schbrain.canal.client.annotation.TableFilter; +import com.schbrain.canal.client.event.SimpleResolverCanalEvent; +import org.springframework.stereotype.Service; + +/** + * @author zhuyf + * @date 2022/6/16 + */ +@Service +@TableFilter(table = "#{keyService.table()}" , schame = "${canal.table.scheam}") +public class UserServcie extends SimpleResolverCanalEvent { + + @Override + public void onInsert(CanalEntry.Header header, User o) { + } + +} \ No newline at end of file diff --git a/schbrain-canal-web/src/main/java/com/schbrain/web/UserService.java b/schbrain-canal-web/src/main/java/com/schbrain/web/UserService.java deleted file mode 100644 index ff8ba612123eaf26a285e50a71ba255a0035a713..0000000000000000000000000000000000000000 --- a/schbrain-canal-web/src/main/java/com/schbrain/web/UserService.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.schbrain.web; - -import com.alibaba.otter.canal.protocol.CanalEntry; -import com.schbrain.canal.client.annotation.CanalEventListener; -import com.schbrain.canal.client.annotation.InsertListenPoint; -import com.schbrain.canal.client.annotation.UpdateListenPoint; -import org.springframework.stereotype.Service; - -/** - * @author zhuyf - * @date 2022/6/16 - */ -@Service -public class UserService { - - String getUser(){ - return "1"; - } - - -} diff --git a/schbrain-canal-web/src/main/resources/application.properties b/schbrain-canal-web/src/main/resources/application.properties index 8c083963ddb4ad80d75b1469eb59ee0f3047e927..dadaf08df8797e0fb628263edff3667a9005cb42 100644 --- a/schbrain-canal-web/src/main/resources/application.properties +++ b/schbrain-canal-web/src/main/resources/application.properties @@ -7,6 +7,7 @@ canal.client.instances.kp_user.retryCount=10 canal.client.instances.kp_user.zkHosts=192.168.22.22:2181,192.168.22.26:2181,192.168.22.21:2181 + #canal.client.instances.kp_user.addresses=canal-server-stable-0.canal-server-discovery-svc-stable.devops.svc.cluster.local:11111,canal-server-stable-1.canal-server-discovery-svc-stable.devops.svc.cluster.local:11111,canal-server-stable-2.canal-server-discovery-svc-stable.devops.svc.cluster.local:11111 @@ -28,3 +29,4 @@ canal.client.instances.kp_user.zkHosts=192.168.22.22:2181,192.168.22.26:2181,192 # # +canal.table.scheam = schame \ No newline at end of file diff --git a/schbrain-canal-web/src/test/java/com/schbrain/web/MyCanalEvent6Test.java b/schbrain-canal-web/src/test/java/com/schbrain/web/MyCanalEvent6Test.java deleted file mode 100644 index cb752542bd27f6c9bbb465baf205bb6886db77f1..0000000000000000000000000000000000000000 --- a/schbrain-canal-web/src/test/java/com/schbrain/web/MyCanalEvent6Test.java +++ /dev/null @@ -1,62 +0,0 @@ -package com.schbrain.web; - -import com.schbrain.canal.client.event.ResolverCanalEvent; -import com.schbrain.canal.client.event.SimpleResolverCanalEvent; - -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; - -/** - * @author zhuyf - * @date 2022/6/22 - */ -public class MyCanalEvent6Test { - - public static void main(String[] args) { - Class g = null; - MyCanalEvent7 event5 = new MyCanalEvent7(); - Class c = event5.getClass(); - while(true){ - Class aClass = interfaceGeneric(event5); - if(aClass!=null){ - g = aClass; - break; - } - Type type = c.getGenericSuperclass(); - if(type instanceof ParameterizedType){ - ParameterizedType parameterizedType = (ParameterizedType)type; - Type[] types = parameterizedType.getActualTypeArguments(); - if(types!= null && types.length>0){ - g = (Class) types[0]; - break; - } - } - c = c.getSuperclass(); - } - System.out.println(g); - } - - /** - * 接口泛型缓存 - * @param event - * @return - */ - private static Class interfaceGeneric(ResolverCanalEvent event){ - Type[] types = event.getClass().getGenericInterfaces(); - if(types == null || types.length == 0){ - return null; - } - for (Type type : types) { - if(!(type instanceof ParameterizedType)){ - continue; - } - ParameterizedType parameterized = (ParameterizedType)type; - Type rawType = parameterized.getRawType(); - if(rawType.equals(ResolverCanalEvent.class)){ - Class clazz = (Class)parameterized.getActualTypeArguments()[0]; - return clazz; - } - } - return null; - } -}