From fc159e4842c687b7556605517a61325a775fec5f Mon Sep 17 00:00:00 2001 From: zhuyf Date: Wed, 22 Jun 2022 18:23:07 +0800 Subject: [PATCH] no message --- pom.xml | 2 +- .../canal/client/core/SimpleCanalClient.java | 2 + .../canal/client/event/MapCanalEvent.java | 49 +++++++++++++++++++ .../client/event/SimpleMapCanalEvent.java | 27 ++++++++++ .../schbrain/canal/client/utils/DataUtil.java | 25 ++++++++++ .../canal/client/utils/MessageUtil.java | 3 +- .../java/com/schbrain/web/MyCanalEvent2.java | 4 +- .../java/com/schbrain/web/MyCanalEvent3.java | 2 +- .../java/com/schbrain/web/MyCanalEvent4.java | 4 +- .../java/com/schbrain/web/MyCanalEvent5.java | 4 +- .../java/com/schbrain/web/MyCanalEvent6.java | 4 +- .../java/com/schbrain/web/MyCanalEvent7.java | 4 +- .../java/com/schbrain/web/MyCanalEvent8.java | 40 +++++++++++++++ .../java/com/schbrain/web/MyCanalEvent9.java | 40 +++++++++++++++ 14 files changed, 196 insertions(+), 14 deletions(-) create mode 100644 schbrain-canal-client/src/main/java/com/schbrain/canal/client/event/MapCanalEvent.java create mode 100644 schbrain-canal-client/src/main/java/com/schbrain/canal/client/event/SimpleMapCanalEvent.java create mode 100644 schbrain-canal-client/src/main/java/com/schbrain/canal/client/utils/DataUtil.java create mode 100644 schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent8.java create mode 100644 schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent9.java diff --git a/pom.xml b/pom.xml index 97aa857..839ea73 100644 --- a/pom.xml +++ b/pom.xml @@ -17,7 +17,7 @@ schbrain-canal - 1.1.0-RELEASE + 1.1.1-SNAPSHOT diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/SimpleCanalClient.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/SimpleCanalClient.java index a56abcb..7bb9edc 100644 --- a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/SimpleCanalClient.java +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/SimpleCanalClient.java @@ -88,9 +88,11 @@ public class SimpleCanalClient extends AbstractCanalClient { String key = filter.schame()+":"+filter.table(); List filterList = MapUtils.getObject(tableCanalEventMap,key,new ArrayList<>()); filterList.add(canalEvent); + tableCanalEventMap.put(key,filterList); continue; } unFilters.add(canalEvent); + } if(unFilters!=null && unFilters.size()>0){ listeners.addAll(unFilters); diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/event/MapCanalEvent.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/event/MapCanalEvent.java new file mode 100644 index 0000000..9c51293 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/event/MapCanalEvent.java @@ -0,0 +1,49 @@ +package com.schbrain.canal.client.event; + +import com.alibaba.otter.canal.protocol.CanalEntry; +import com.schbrain.canal.client.utils.DataUtil; + +import java.util.Map; + +public interface MapCanalEvent extends CanalEvent{ + + @Override + default void onEvent(CanalEntry.Header header,CanalEntry.EventType eventType, CanalEntry.RowData rowData){ + + switch (eventType) { + case INSERT: + Map data = DataUtil.getCloumData(rowData.getAfterColumnsList()); + onInsert(header,data); + break; + case UPDATE: + Map after = DataUtil.getCloumData(rowData.getAfterColumnsList()); + Map before = DataUtil.getCloumData(rowData.getBeforeColumnsList()); + onUpdate(header,before,after); + break; + case DELETE: + Map delete = DataUtil.getCloumData(rowData.getBeforeColumnsList()); + onDelete(header,delete); + break; + default: + break; + } + } + + /** + * onInsert + * @param after + */ + void onInsert(CanalEntry.Header header, Map after); + + /** + * onUpdate + * @param after + */ + void onUpdate(CanalEntry.Header header,Map before,Map after); + + /** + * onDelete + * @param after + */ + void onDelete(CanalEntry.Header header,Map after); +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/event/SimpleMapCanalEvent.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/event/SimpleMapCanalEvent.java new file mode 100644 index 0000000..e16abb2 --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/event/SimpleMapCanalEvent.java @@ -0,0 +1,27 @@ +package com.schbrain.canal.client.event; + +import com.alibaba.otter.canal.protocol.CanalEntry; + +import java.util.Map; + +/** + * @author zhuyf + * @date 2022/6/22 + */ +public class SimpleMapCanalEvent implements MapCanalEvent{ + + @Override + public void onInsert(CanalEntry.Header header, Map after) { + + } + + @Override + public void onUpdate(CanalEntry.Header header, Map before, Map after) { + + } + + @Override + public void onDelete(CanalEntry.Header header, Map after) { + + } +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/utils/DataUtil.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/utils/DataUtil.java new file mode 100644 index 0000000..572ccec --- /dev/null +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/utils/DataUtil.java @@ -0,0 +1,25 @@ +package com.schbrain.canal.client.utils; + +import com.alibaba.otter.canal.protocol.CanalEntry; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author zhuyf + * @date 2022/6/22 + */ +public class DataUtil { + + public static Map getCloumData(List columns){ + Map row = new HashMap<>(); + for (CanalEntry.Column column : columns) { + if (column.getIsNull()) { + row.put(column.getName(), null); + } else { + row.put(column.getName(),column.getValue()); + } + } + return row; + } +} diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/utils/MessageUtil.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/utils/MessageUtil.java index 48da719..07df007 100644 --- a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/utils/MessageUtil.java +++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/utils/MessageUtil.java @@ -26,8 +26,7 @@ public class MessageUtil { try { rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { - throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), - e); + throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } CanalEntry.EventType eventType = rowChange.getEventType(); final Dml dml = new Dml(); diff --git a/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent2.java b/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent2.java index 57c9fbf..95cc473 100644 --- a/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent2.java +++ b/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent2.java @@ -9,8 +9,8 @@ import org.springframework.stereotype.Service; * @author zhuyf * @date 2022/6/16 */ -@Service("myCanalEvent2") -@TableFilter(table = "ding_talk_user",schame = "kp_user") +//@Service("myCanalEvent2") +//@TableFilter(table = "ding_talk_user",schame = "kp_user") public class MyCanalEvent2 implements DefCanalEvent { @Override diff --git a/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent3.java b/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent3.java index 97b1b78..5f4e7f3 100644 --- a/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent3.java +++ b/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent3.java @@ -10,7 +10,7 @@ import org.springframework.beans.factory.annotation.Autowired; * @author zhuyf * @date 2022/6/16 */ -@CanalEventListener +//@CanalEventListener public class MyCanalEvent3 { @Autowired diff --git a/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent4.java b/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent4.java index 0391cfc..7880ce6 100644 --- a/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent4.java +++ b/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent4.java @@ -12,8 +12,8 @@ import org.springframework.stereotype.Service; * @author zhuyf * @date 2022/6/16 */ -@Service("myCanalEvent4") -@TableFilter(table = "wechat_user",schame = "kp_user") +//@Service("myCanalEvent4") +//@TableFilter(table = "wechat_user",schame = "kp_user") @Slf4j public class MyCanalEvent4 extends SimpleResolverCanalEvent { diff --git a/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent5.java b/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent5.java index a425d99..9ef4cae 100644 --- a/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent5.java +++ b/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent5.java @@ -13,8 +13,8 @@ import org.springframework.stereotype.Service; * @author zhuyf * @date 2022/6/16 */ -@Service("myCanalEvent5") -@TableFilter(table = "wechat_user",schame = "kp_user") +//@Service("myCanalEvent5") +//@TableFilter(table = "wechat_user",schame = "kp_user") @Slf4j public class MyCanalEvent5 implements ResolverCanalEvent { diff --git a/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent6.java b/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent6.java index 3c07961..70b1c45 100644 --- a/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent6.java +++ b/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent6.java @@ -13,8 +13,8 @@ import org.springframework.stereotype.Service; * @date 2022/6/16 */ @Slf4j -@Service("myCanalEvent6") -@TableFilter(table = "wechat_user",schame = "kp_user") +//@Service("myCanalEvent6") +//@TableFilter(table = "wechat_user",schame = "kp_user") public class MyCanalEvent6 extends MyCanalEvent4 { @Override diff --git a/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent7.java b/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent7.java index cc6fdd7..cb0f976 100644 --- a/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent7.java +++ b/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent7.java @@ -12,8 +12,8 @@ import org.springframework.stereotype.Service; * @date 2022/6/16 */ @Slf4j -@Service("myCanalEvent7") -@TableFilter(table = "wechat_user",schame = "kp_user") +//@Service("myCanalEvent7") +//@TableFilter(table = "wechat_user",schame = "kp_user") public class MyCanalEvent7 extends MyCanalEvent6 { @Override diff --git a/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent8.java b/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent8.java new file mode 100644 index 0000000..f876b12 --- /dev/null +++ b/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent8.java @@ -0,0 +1,40 @@ +package com.schbrain.web; + +import com.alibaba.fastjson.JSONObject; +import com.alibaba.otter.canal.protocol.CanalEntry; +import com.schbrain.bean.User; +import com.schbrain.canal.client.annotation.TableFilter; +import com.schbrain.canal.client.event.SimpleMapCanalEvent; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import java.util.Map; + +/** + * @author zhuyf + * @date 2022/6/16 + */ +@Slf4j +@Service("myCanalEvent8") +@TableFilter(table = "wechat_user",schame = "kp_user") +public class MyCanalEvent8 extends SimpleMapCanalEvent { + + @Override + public void onInsert(CanalEntry.Header header, Map user) { + String s = JSONObject.toJSONString(user); + log.info("onInsert:{}",s); + } + + @Override + public void onUpdate(CanalEntry.Header header, Map before, Map after) { + String s = JSONObject.toJSONString(before); + String b = JSONObject.toJSONString(after); + log.info("onUpdate,before:{},after:{}",s,b); + } + + @Override + public void onDelete(CanalEntry.Header header, Map user) { + String s = JSONObject.toJSONString(user); + log.info("onDelete:{}",s); + } +} diff --git a/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent9.java b/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent9.java new file mode 100644 index 0000000..f447c67 --- /dev/null +++ b/schbrain-canal-web/src/main/java/com/schbrain/web/MyCanalEvent9.java @@ -0,0 +1,40 @@ +package com.schbrain.web; + +import com.alibaba.fastjson.JSONObject; +import com.alibaba.otter.canal.protocol.CanalEntry; +import com.schbrain.canal.client.annotation.TableFilter; +import com.schbrain.canal.client.event.MapCanalEvent; +import com.schbrain.canal.client.event.SimpleMapCanalEvent; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import java.util.Map; + +/** + * @author zhuyf + * @date 2022/6/16 + */ +@Slf4j +@Service("myCanalEvent9") +@TableFilter(table = "wechat_user",schame = "kp_user") +public class MyCanalEvent9 implements MapCanalEvent { + + @Override + public void onInsert(CanalEntry.Header header, Map user) { + String s = JSONObject.toJSONString(user); + log.info("onInsert:{}",s); + } + + @Override + public void onUpdate(CanalEntry.Header header, Map before, Map after) { + String s = JSONObject.toJSONString(before); + String b = JSONObject.toJSONString(after); + log.info("onUpdate,before:{},after:{}",s,b); + } + + @Override + public void onDelete(CanalEntry.Header header, Map user) { + String s = JSONObject.toJSONString(user); + log.info("onDelete:{}",s); + } +} -- GitLab