diff --git a/pom.xml b/pom.xml
index e1f972ac0dcf64cac0bfcd9278394d47d37e5d59..8f93bcb34f9a00393aa508306b9adf3478311851 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,11 +7,10 @@
com.schbrain.framework
schbrain-parent
- 1.0.0-SNAPSHOT
+ 3.0.0
pom
- com.schbrain.framework
schbrain-canal
${revision}
schbrain-canal
diff --git a/schbrain-canal-client/pom.xml b/schbrain-canal-client/pom.xml
index 5cafeb1b8e26d1342f329000a93007265887afb9..9b681090c8d4c09cdc3f7091c632ab7c95c1bf8d 100644
--- a/schbrain-canal-client/pom.xml
+++ b/schbrain-canal-client/pom.xml
@@ -9,14 +9,23 @@
${revision}
+
+ 1.1.5
+
+
schbrain-canal-client
schbrain-canal-client
+
+ com.alibaba.otter
+ canal.protocol
+ ${canal-version}
+
com.alibaba.otter
canal.client
- 1.1.5
+ ${canal-version}
org.apache.zookeeper
@@ -27,12 +36,6 @@
org.apache.zookeeper
zookeeper
- 3.4.14
-
-
- com.alibaba.otter
- canal.protocol
- 1.1.5
joda-time
@@ -40,22 +43,16 @@
2.9.4
- com.schbrain.framework
- schbrain-spring-support
+ com.schbrain.common
+ common-util
org.springframework.boot
- spring-boot-starter
+ spring-boot-starter-actuator
org.springframework.boot
spring-boot-configuration-processor
- true
-
-
- org.springframework.boot
- spring-boot-starter-test
- test
diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/conf/CanalClientConfig.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/conf/CanalClientConfig.java
index 46e73c1aeceabe98011c34dc3b449c80bc98cb8b..4e1ff872ac275f91acd97a0f8b9e1f4d0b7e3234 100644
--- a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/conf/CanalClientConfig.java
+++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/conf/CanalClientConfig.java
@@ -10,21 +10,38 @@ import lombok.Data;
*/
@Data
public class CanalClientConfig {
-
+ /**
+ * 连接地址
+ */
private String addresses;
-
+ /**
+ * 订阅信息
+ */
private String subscribe;
-
+ /**
+ * zookeeper连接地址
+ */
private String zkHosts;
-
+ /**
+ * 用户名
+ */
private String username = "";
-
+ /**
+ * 密码
+ */
private String password = "";
-
- private int retryCount;
-
+ /**
+ * 连接重试次数
+ * -1 表示无限次重连
+ */
+ private int retryCount = 10;
+ /**
+ * 获取数据间隔时间(millisecond)
+ */
private long acquireInterval = 1000;
-
+ /**
+ * 批量获取数据条数
+ */
private int batchSize = 1000;
}
diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/conf/CanalClientConfiguration.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/conf/CanalClientConfiguration.java
index 575becac2115722f46b34a4dcbcf6f5312aca48a..2f4b446e2805b876bd8bb4c35d58e144f85d459e 100644
--- a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/conf/CanalClientConfiguration.java
+++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/conf/CanalClientConfiguration.java
@@ -8,6 +8,7 @@ import com.schbrain.canal.client.utils.BeanUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
@@ -31,6 +32,7 @@ public class CanalClientConfiguration {
}
@Bean
+ @ConditionalOnMissingBean(CanalClient.class)
private CanalClient canalClient(ConfigurableBeanFactory configurableBeanFactory) {
log.info("starting canal client....");
TransponderFactory factory = MessageTransponders.defaultMessageTransponder();
diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/SimpleCanalClient.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/SimpleCanalClient.java
index 1001c7ba1490fbb1d6a8b1e66812d89addfe3bdc..81d5f5e06060cd686a9c6aaa9d9e18dcb6aa653d 100644
--- a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/SimpleCanalClient.java
+++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/core/SimpleCanalClient.java
@@ -167,7 +167,7 @@ public class SimpleCanalClient extends AbstractCanalClient {
private String parse(String val){
String resolvedValue = expressionContext.getBeanFactory().resolveEmbeddedValue(val);
if (!(resolvedValue.startsWith("#{") && resolvedValue.endsWith("}"))) {
- return val;
+ return resolvedValue;
}
Object elVal = resolver.evaluate(resolvedValue,expressionContext);
if(elVal instanceof String){
diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/transfer/AbstractBasicMessageTransponder.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/transfer/AbstractBasicMessageTransponder.java
index 190315f5d8787e8bfffccb6ffd707839cbbb9739..444c9c70fb2ec6a93401c0bfe32d6300533ffc62 100644
--- a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/transfer/AbstractBasicMessageTransponder.java
+++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/transfer/AbstractBasicMessageTransponder.java
@@ -187,7 +187,9 @@ public abstract class AbstractBasicMessageTransponder extends AbstractMessageTra
* ddl事件
* @param rowChange
*/
- protected void processDdl(CanalEntry.RowChange rowChange) {}
+ protected void processDdl(CanalEntry.RowChange rowChange) {
+
+ }
protected List getIgnoreEntryTypes() {
return Collections.emptyList();
diff --git a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/transfer/AbstractMessageTransponder.java b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/transfer/AbstractMessageTransponder.java
index 0947f31ce062e75d426c3f1b85b5a6db087bc3da..3594ee7f52772e9d3b4bf5b4fc8f80402224dd43 100644
--- a/schbrain-canal-client/src/main/java/com/schbrain/canal/client/transfer/AbstractMessageTransponder.java
+++ b/schbrain-canal-client/src/main/java/com/schbrain/canal/client/transfer/AbstractMessageTransponder.java
@@ -5,16 +5,10 @@ import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import com.schbrain.canal.client.conf.CanalClientConfig;
import com.schbrain.canal.client.core.HandlerConf;
-import com.schbrain.canal.client.core.ListenerPoint;
-import com.schbrain.canal.client.event.CanalEvent;
import lombok.extern.slf4j.Slf4j;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* @author zhuyf
@@ -44,6 +38,8 @@ public abstract class AbstractMessageTransponder implements MessageTransponder {
*/
private volatile boolean running = true;
+ private AtomicInteger tryCount = new AtomicInteger(0);
+
public AbstractMessageTransponder(CanalConnector connector,
Map.Entry config,
HandlerConf handlerConf) {
@@ -57,50 +53,59 @@ public abstract class AbstractMessageTransponder implements MessageTransponder {
@Override
public void run() {
- int errorCount = config.getRetryCount();
+ tryCount.set(config.getRetryCount());
final long interval = config.getAcquireInterval();
- final String threadName = Thread.currentThread().getName();
while (running && !Thread.currentThread().isInterrupted()) {
try {
Message message = connector.getWithoutAck(config.getBatchSize());
long batchId = message.getId();
int size = message.getEntries().size();
- if (log.isDebugEnabled()) {
- //log.debug("{}: Get message from canal server >>>>> size:{}", threadName, size);
- }
//empty message
if (batchId == -1 || size == 0) {
- if (log.isDebugEnabled()) {
- //log.debug("{}: Empty message... sleep for {} millis", threadName, interval);
- }
Thread.sleep(interval);
} else {
distributeEvent(message);
}
connector.ack(batchId);
- if (log.isDebugEnabled()) {
- //log.debug("{}: Ack message. batchId:{}", threadName, batchId);
- }
} catch (CanalClientException e) {
- errorCount--;
- log.error(threadName + ": Error occurred!! ", e);
+ boolean nextTry = nextTry();
+ if(!nextTry){
+ stop();
+ }
try {
Thread.sleep(interval);
} catch (InterruptedException e1) {
- errorCount = 0;
}
} catch (InterruptedException e) {
- errorCount = 0;
+ stop();
connector.rollback();
} finally {
- if (errorCount <= 0) {
- stop();
- log.info("{}: Stopping the client.. ", Thread.currentThread().getName());
+ if(!running){
+ connector.rollback();
+ break;
}
}
}
stop();
- log.info("{}: client stopped. ", Thread.currentThread().getName());
+ try {
+ connector.disconnect();
+ }catch (Exception e){
+ e.printStackTrace();
+ }
+ log.info("{}: client stopped. ", destination);
+ }
+
+ /**
+ * 是否尝试下一次
+ * @return
+ */
+ private boolean nextTry(){
+ int retryCount = config.getRetryCount();
+ if(retryCount <= 0){
+ return true;
+ }
+ int n = tryCount.decrementAndGet();
+ return n > 0;
}
/**
diff --git a/schbrain-canal-web/src/main/java/com/schbrain/web/UserServcie.java b/schbrain-canal-web/src/main/java/com/schbrain/web/UserServcie.java
index 2af519b6d206f35ffe532ffee58c301128293765..4612e867216c4587ac27f3e9ccc20123f7740834 100644
--- a/schbrain-canal-web/src/main/java/com/schbrain/web/UserServcie.java
+++ b/schbrain-canal-web/src/main/java/com/schbrain/web/UserServcie.java
@@ -4,18 +4,25 @@ import com.alibaba.otter.canal.protocol.CanalEntry;
import com.schbrain.bean.User;
import com.schbrain.canal.client.annotation.TableFilter;
import com.schbrain.canal.client.event.SimpleResolverCanalEvent;
+import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/**
* @author zhuyf
* @date 2022/6/16
*/
+@Slf4j
@Service
@TableFilter(table = "#{keyService.table()}" , schame = "${canal.table.scheam}")
public class UserServcie extends SimpleResolverCanalEvent {
@Override
public void onInsert(CanalEntry.Header header, User o) {
+ log.info("on insert :{}",o);
}
+ @Override
+ public void onUpdate(CanalEntry.Header header, User before, User after) {
+ log.info("on update before :{} after :{}",before,after);
+ }
}
\ No newline at end of file
diff --git a/schbrain-canal-web/src/main/resources/application.properties b/schbrain-canal-web/src/main/resources/application.properties
index dadaf08df8797e0fb628263edff3667a9005cb42..b7a746f1d27be7e4c961776d2cd2af8ad44dfbe9 100644
--- a/schbrain-canal-web/src/main/resources/application.properties
+++ b/schbrain-canal-web/src/main/resources/application.properties
@@ -1,10 +1,10 @@
spring.application.name = test-canal
-canal.client.instances.kp_user.addresses=192.168.36.66:11111
+#canal.client.instances.kp_user.addresses=192.168.36.66:11111
canal.client.instances.kp_user.username=
canal.client.instances.kp_user.password=
-canal.client.instances.kp_user.retryCount=10
-canal.client.instances.kp_user.zkHosts=192.168.22.22:2181,192.168.22.26:2181,192.168.22.21:2181
+canal.client.instances.kp_user.retryCount=-1
+canal.client.instances.kp_user.zkHosts=zookeeper.dev.server.schbrain.com:2181
@@ -29,4 +29,4 @@ canal.client.instances.kp_user.zkHosts=192.168.22.22:2181,192.168.22.26:2181,192
#
#
-canal.table.scheam = schame
\ No newline at end of file
+canal.table.scheam = kp_user
\ No newline at end of file