Commit fc159e48 authored by zhuyunfeng's avatar zhuyunfeng

no message

parent e452e3cc
......@@ -17,7 +17,7 @@
<name>schbrain-canal</name>
<properties>
<revision>1.1.0-RELEASE</revision>
<revision>1.1.1-SNAPSHOT</revision>
</properties>
<modules>
......
......@@ -88,9 +88,11 @@ public class SimpleCanalClient extends AbstractCanalClient {
String key = filter.schame()+":"+filter.table();
List<CanalEvent> filterList = MapUtils.getObject(tableCanalEventMap,key,new ArrayList<>());
filterList.add(canalEvent);
tableCanalEventMap.put(key,filterList);
continue;
}
unFilters.add(canalEvent);
}
if(unFilters!=null && unFilters.size()>0){
listeners.addAll(unFilters);
......
package com.schbrain.canal.client.event;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.schbrain.canal.client.utils.DataUtil;
import java.util.Map;
public interface MapCanalEvent extends CanalEvent{
@Override
default void onEvent(CanalEntry.Header header,CanalEntry.EventType eventType, CanalEntry.RowData rowData){
switch (eventType) {
case INSERT:
Map<String, Object> data = DataUtil.getCloumData(rowData.getAfterColumnsList());
onInsert(header,data);
break;
case UPDATE:
Map<String, Object> after = DataUtil.getCloumData(rowData.getAfterColumnsList());
Map<String, Object> before = DataUtil.getCloumData(rowData.getBeforeColumnsList());
onUpdate(header,before,after);
break;
case DELETE:
Map<String, Object> delete = DataUtil.getCloumData(rowData.getBeforeColumnsList());
onDelete(header,delete);
break;
default:
break;
}
}
/**
* onInsert
* @param after
*/
void onInsert(CanalEntry.Header header, Map<String,Object> after);
/**
* onUpdate
* @param after
*/
void onUpdate(CanalEntry.Header header,Map<String,Object> before,Map<String,Object> after);
/**
* onDelete
* @param after
*/
void onDelete(CanalEntry.Header header,Map<String,Object> after);
}
package com.schbrain.canal.client.event;
import com.alibaba.otter.canal.protocol.CanalEntry;
import java.util.Map;
/**
* @author zhuyf
* @date 2022/6/22
*/
public class SimpleMapCanalEvent implements MapCanalEvent{
@Override
public void onInsert(CanalEntry.Header header, Map<String, Object> after) {
}
@Override
public void onUpdate(CanalEntry.Header header, Map<String, Object> before, Map<String, Object> after) {
}
@Override
public void onDelete(CanalEntry.Header header, Map<String, Object> after) {
}
}
package com.schbrain.canal.client.utils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author zhuyf
* @date 2022/6/22
*/
public class DataUtil {
public static Map<String, Object> getCloumData(List<CanalEntry.Column> columns){
Map<String, Object> row = new HashMap<>();
for (CanalEntry.Column column : columns) {
if (column.getIsNull()) {
row.put(column.getName(), null);
} else {
row.put(column.getName(),column.getValue());
}
}
return row;
}
}
......@@ -26,8 +26,7 @@ public class MessageUtil {
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
CanalEntry.EventType eventType = rowChange.getEventType();
final Dml dml = new Dml();
......
......@@ -9,8 +9,8 @@ import org.springframework.stereotype.Service;
* @author zhuyf
* @date 2022/6/16
*/
@Service("myCanalEvent2")
@TableFilter(table = "ding_talk_user",schame = "kp_user")
//@Service("myCanalEvent2")
//@TableFilter(table = "ding_talk_user",schame = "kp_user")
public class MyCanalEvent2 implements DefCanalEvent {
@Override
......
......@@ -10,7 +10,7 @@ import org.springframework.beans.factory.annotation.Autowired;
* @author zhuyf
* @date 2022/6/16
*/
@CanalEventListener
//@CanalEventListener
public class MyCanalEvent3 {
@Autowired
......
......@@ -12,8 +12,8 @@ import org.springframework.stereotype.Service;
* @author zhuyf
* @date 2022/6/16
*/
@Service("myCanalEvent4")
@TableFilter(table = "wechat_user",schame = "kp_user")
//@Service("myCanalEvent4")
//@TableFilter(table = "wechat_user",schame = "kp_user")
@Slf4j
public class MyCanalEvent4 extends SimpleResolverCanalEvent<User> {
......
......@@ -13,8 +13,8 @@ import org.springframework.stereotype.Service;
* @author zhuyf
* @date 2022/6/16
*/
@Service("myCanalEvent5")
@TableFilter(table = "wechat_user",schame = "kp_user")
//@Service("myCanalEvent5")
//@TableFilter(table = "wechat_user",schame = "kp_user")
@Slf4j
public class MyCanalEvent5 implements ResolverCanalEvent<User> {
......
......@@ -13,8 +13,8 @@ import org.springframework.stereotype.Service;
* @date 2022/6/16
*/
@Slf4j
@Service("myCanalEvent6")
@TableFilter(table = "wechat_user",schame = "kp_user")
//@Service("myCanalEvent6")
//@TableFilter(table = "wechat_user",schame = "kp_user")
public class MyCanalEvent6 extends MyCanalEvent4 {
@Override
......
......@@ -12,8 +12,8 @@ import org.springframework.stereotype.Service;
* @date 2022/6/16
*/
@Slf4j
@Service("myCanalEvent7")
@TableFilter(table = "wechat_user",schame = "kp_user")
//@Service("myCanalEvent7")
//@TableFilter(table = "wechat_user",schame = "kp_user")
public class MyCanalEvent7 extends MyCanalEvent6 {
@Override
......
package com.schbrain.web;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.schbrain.bean.User;
import com.schbrain.canal.client.annotation.TableFilter;
import com.schbrain.canal.client.event.SimpleMapCanalEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.Map;
/**
* @author zhuyf
* @date 2022/6/16
*/
@Slf4j
@Service("myCanalEvent8")
@TableFilter(table = "wechat_user",schame = "kp_user")
public class MyCanalEvent8 extends SimpleMapCanalEvent {
@Override
public void onInsert(CanalEntry.Header header, Map<String, Object> user) {
String s = JSONObject.toJSONString(user);
log.info("onInsert:{}",s);
}
@Override
public void onUpdate(CanalEntry.Header header, Map<String, Object> before, Map<String, Object> after) {
String s = JSONObject.toJSONString(before);
String b = JSONObject.toJSONString(after);
log.info("onUpdate,before:{},after:{}",s,b);
}
@Override
public void onDelete(CanalEntry.Header header, Map<String, Object> user) {
String s = JSONObject.toJSONString(user);
log.info("onDelete:{}",s);
}
}
package com.schbrain.web;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.schbrain.canal.client.annotation.TableFilter;
import com.schbrain.canal.client.event.MapCanalEvent;
import com.schbrain.canal.client.event.SimpleMapCanalEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.Map;
/**
* @author zhuyf
* @date 2022/6/16
*/
@Slf4j
@Service("myCanalEvent9")
@TableFilter(table = "wechat_user",schame = "kp_user")
public class MyCanalEvent9 implements MapCanalEvent {
@Override
public void onInsert(CanalEntry.Header header, Map<String, Object> user) {
String s = JSONObject.toJSONString(user);
log.info("onInsert:{}",s);
}
@Override
public void onUpdate(CanalEntry.Header header, Map<String, Object> before, Map<String, Object> after) {
String s = JSONObject.toJSONString(before);
String b = JSONObject.toJSONString(after);
log.info("onUpdate,before:{},after:{}",s,b);
}
@Override
public void onDelete(CanalEntry.Header header, Map<String, Object> user) {
String s = JSONObject.toJSONString(user);
log.info("onDelete:{}",s);
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment