Commit ca014a8c authored by zhuyunfeng's avatar zhuyunfeng

fix spring value

parent 92e0eb7d
...@@ -7,11 +7,10 @@ ...@@ -7,11 +7,10 @@
<parent> <parent>
<groupId>com.schbrain.framework</groupId> <groupId>com.schbrain.framework</groupId>
<artifactId>schbrain-parent</artifactId> <artifactId>schbrain-parent</artifactId>
<version>1.0.0-SNAPSHOT</version> <version>3.0.0</version>
</parent> </parent>
<packaging>pom</packaging> <packaging>pom</packaging>
<groupId>com.schbrain.framework</groupId>
<artifactId>schbrain-canal</artifactId> <artifactId>schbrain-canal</artifactId>
<version>${revision}</version> <version>${revision}</version>
<name>schbrain-canal</name> <name>schbrain-canal</name>
......
...@@ -9,14 +9,23 @@ ...@@ -9,14 +9,23 @@
<version>${revision}</version> <version>${revision}</version>
</parent> </parent>
<properties>
<canal-version>1.1.5</canal-version>
</properties>
<artifactId>schbrain-canal-client</artifactId> <artifactId>schbrain-canal-client</artifactId>
<name>schbrain-canal-client</name> <name>schbrain-canal-client</name>
<dependencies> <dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>${canal-version}</version>
</dependency>
<dependency> <dependency>
<groupId>com.alibaba.otter</groupId> <groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId> <artifactId>canal.client</artifactId>
<version>1.1.5</version> <version>${canal-version}</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.apache.zookeeper</groupId> <groupId>org.apache.zookeeper</groupId>
...@@ -27,12 +36,6 @@ ...@@ -27,12 +36,6 @@
<dependency> <dependency>
<groupId>org.apache.zookeeper</groupId> <groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId> <artifactId>zookeeper</artifactId>
<version>3.4.14</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.5</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>joda-time</groupId> <groupId>joda-time</groupId>
...@@ -40,22 +43,16 @@ ...@@ -40,22 +43,16 @@
<version>2.9.4</version> <version>2.9.4</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.schbrain.framework</groupId> <groupId>com.schbrain.common</groupId>
<artifactId>schbrain-spring-support</artifactId> <artifactId>common-util</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId> <artifactId>spring-boot-starter-actuator</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId> <artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency> </dependency>
</dependencies> </dependencies>
......
...@@ -10,21 +10,38 @@ import lombok.Data; ...@@ -10,21 +10,38 @@ import lombok.Data;
*/ */
@Data @Data
public class CanalClientConfig { public class CanalClientConfig {
/**
* 连接地址
*/
private String addresses; private String addresses;
/**
* 订阅信息
*/
private String subscribe; private String subscribe;
/**
* zookeeper连接地址
*/
private String zkHosts; private String zkHosts;
/**
* 用户名
*/
private String username = ""; private String username = "";
/**
* 密码
*/
private String password = ""; private String password = "";
/**
private int retryCount; * 连接重试次数
* -1 表示无限次重连
*/
private int retryCount = 10;
/**
* 获取数据间隔时间(millisecond)
*/
private long acquireInterval = 1000; private long acquireInterval = 1000;
/**
* 批量获取数据条数
*/
private int batchSize = 1000; private int batchSize = 1000;
} }
...@@ -8,6 +8,7 @@ import com.schbrain.canal.client.utils.BeanUtil; ...@@ -8,6 +8,7 @@ import com.schbrain.canal.client.utils.BeanUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.core.Ordered; import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order; import org.springframework.core.annotation.Order;
...@@ -31,6 +32,7 @@ public class CanalClientConfiguration { ...@@ -31,6 +32,7 @@ public class CanalClientConfiguration {
} }
@Bean @Bean
@ConditionalOnMissingBean(CanalClient.class)
private CanalClient canalClient(ConfigurableBeanFactory configurableBeanFactory) { private CanalClient canalClient(ConfigurableBeanFactory configurableBeanFactory) {
log.info("starting canal client...."); log.info("starting canal client....");
TransponderFactory factory = MessageTransponders.defaultMessageTransponder(); TransponderFactory factory = MessageTransponders.defaultMessageTransponder();
......
...@@ -167,7 +167,7 @@ public class SimpleCanalClient extends AbstractCanalClient { ...@@ -167,7 +167,7 @@ public class SimpleCanalClient extends AbstractCanalClient {
private String parse(String val){ private String parse(String val){
String resolvedValue = expressionContext.getBeanFactory().resolveEmbeddedValue(val); String resolvedValue = expressionContext.getBeanFactory().resolveEmbeddedValue(val);
if (!(resolvedValue.startsWith("#{") && resolvedValue.endsWith("}"))) { if (!(resolvedValue.startsWith("#{") && resolvedValue.endsWith("}"))) {
return val; return resolvedValue;
} }
Object elVal = resolver.evaluate(resolvedValue,expressionContext); Object elVal = resolver.evaluate(resolvedValue,expressionContext);
if(elVal instanceof String){ if(elVal instanceof String){
......
...@@ -187,7 +187,9 @@ public abstract class AbstractBasicMessageTransponder extends AbstractMessageTra ...@@ -187,7 +187,9 @@ public abstract class AbstractBasicMessageTransponder extends AbstractMessageTra
* ddl事件 * ddl事件
* @param rowChange * @param rowChange
*/ */
protected void processDdl(CanalEntry.RowChange rowChange) {} protected void processDdl(CanalEntry.RowChange rowChange) {
}
protected List<CanalEntry.EntryType> getIgnoreEntryTypes() { protected List<CanalEntry.EntryType> getIgnoreEntryTypes() {
return Collections.emptyList(); return Collections.emptyList();
......
...@@ -5,16 +5,10 @@ import com.alibaba.otter.canal.protocol.Message; ...@@ -5,16 +5,10 @@ import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.exception.CanalClientException; import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import com.schbrain.canal.client.conf.CanalClientConfig; import com.schbrain.canal.client.conf.CanalClientConfig;
import com.schbrain.canal.client.core.HandlerConf; import com.schbrain.canal.client.core.HandlerConf;
import com.schbrain.canal.client.core.ListenerPoint;
import com.schbrain.canal.client.event.CanalEvent;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* @author zhuyf * @author zhuyf
...@@ -44,6 +38,8 @@ public abstract class AbstractMessageTransponder implements MessageTransponder { ...@@ -44,6 +38,8 @@ public abstract class AbstractMessageTransponder implements MessageTransponder {
*/ */
private volatile boolean running = true; private volatile boolean running = true;
private AtomicInteger tryCount = new AtomicInteger(0);
public AbstractMessageTransponder(CanalConnector connector, public AbstractMessageTransponder(CanalConnector connector,
Map.Entry<String,CanalClientConfig> config, Map.Entry<String,CanalClientConfig> config,
HandlerConf handlerConf) { HandlerConf handlerConf) {
...@@ -57,50 +53,59 @@ public abstract class AbstractMessageTransponder implements MessageTransponder { ...@@ -57,50 +53,59 @@ public abstract class AbstractMessageTransponder implements MessageTransponder {
@Override @Override
public void run() { public void run() {
int errorCount = config.getRetryCount(); tryCount.set(config.getRetryCount());
final long interval = config.getAcquireInterval(); final long interval = config.getAcquireInterval();
final String threadName = Thread.currentThread().getName();
while (running && !Thread.currentThread().isInterrupted()) { while (running && !Thread.currentThread().isInterrupted()) {
try { try {
Message message = connector.getWithoutAck(config.getBatchSize()); Message message = connector.getWithoutAck(config.getBatchSize());
long batchId = message.getId(); long batchId = message.getId();
int size = message.getEntries().size(); int size = message.getEntries().size();
if (log.isDebugEnabled()) {
//log.debug("{}: Get message from canal server >>>>> size:{}", threadName, size);
}
//empty message //empty message
if (batchId == -1 || size == 0) { if (batchId == -1 || size == 0) {
if (log.isDebugEnabled()) {
//log.debug("{}: Empty message... sleep for {} millis", threadName, interval);
}
Thread.sleep(interval); Thread.sleep(interval);
} else { } else {
distributeEvent(message); distributeEvent(message);
} }
connector.ack(batchId); connector.ack(batchId);
if (log.isDebugEnabled()) {
//log.debug("{}: Ack message. batchId:{}", threadName, batchId);
}
} catch (CanalClientException e) { } catch (CanalClientException e) {
errorCount--; boolean nextTry = nextTry();
log.error(threadName + ": Error occurred!! ", e); if(!nextTry){
stop();
}
try { try {
Thread.sleep(interval); Thread.sleep(interval);
} catch (InterruptedException e1) { } catch (InterruptedException e1) {
errorCount = 0;
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
errorCount = 0; stop();
connector.rollback(); connector.rollback();
} finally { } finally {
if (errorCount <= 0) { if(!running){
stop(); connector.rollback();
log.info("{}: Stopping the client.. ", Thread.currentThread().getName()); break;
} }
} }
} }
stop(); stop();
log.info("{}: client stopped. ", Thread.currentThread().getName()); try {
connector.disconnect();
}catch (Exception e){
e.printStackTrace();
}
log.info("{}: client stopped. ", destination);
}
/**
* 是否尝试下一次
* @return
*/
private boolean nextTry(){
int retryCount = config.getRetryCount();
if(retryCount <= 0){
return true;
}
int n = tryCount.decrementAndGet();
return n > 0;
} }
/** /**
......
...@@ -4,18 +4,25 @@ import com.alibaba.otter.canal.protocol.CanalEntry; ...@@ -4,18 +4,25 @@ import com.alibaba.otter.canal.protocol.CanalEntry;
import com.schbrain.bean.User; import com.schbrain.bean.User;
import com.schbrain.canal.client.annotation.TableFilter; import com.schbrain.canal.client.annotation.TableFilter;
import com.schbrain.canal.client.event.SimpleResolverCanalEvent; import com.schbrain.canal.client.event.SimpleResolverCanalEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
/** /**
* @author zhuyf * @author zhuyf
* @date 2022/6/16 * @date 2022/6/16
*/ */
@Slf4j
@Service @Service
@TableFilter(table = "#{keyService.table()}" , schame = "${canal.table.scheam}") @TableFilter(table = "#{keyService.table()}" , schame = "${canal.table.scheam}")
public class UserServcie extends SimpleResolverCanalEvent<User> { public class UserServcie extends SimpleResolverCanalEvent<User> {
@Override @Override
public void onInsert(CanalEntry.Header header, User o) { public void onInsert(CanalEntry.Header header, User o) {
log.info("on insert :{}",o);
} }
@Override
public void onUpdate(CanalEntry.Header header, User before, User after) {
log.info("on update before :{} after :{}",before,after);
}
} }
\ No newline at end of file
spring.application.name = test-canal spring.application.name = test-canal
canal.client.instances.kp_user.addresses=192.168.36.66:11111 #canal.client.instances.kp_user.addresses=192.168.36.66:11111
canal.client.instances.kp_user.username= canal.client.instances.kp_user.username=
canal.client.instances.kp_user.password= canal.client.instances.kp_user.password=
canal.client.instances.kp_user.retryCount=10 canal.client.instances.kp_user.retryCount=-1
canal.client.instances.kp_user.zkHosts=192.168.22.22:2181,192.168.22.26:2181,192.168.22.21:2181 canal.client.instances.kp_user.zkHosts=zookeeper.dev.server.schbrain.com:2181
...@@ -29,4 +29,4 @@ canal.client.instances.kp_user.zkHosts=192.168.22.22:2181,192.168.22.26:2181,192 ...@@ -29,4 +29,4 @@ canal.client.instances.kp_user.zkHosts=192.168.22.22:2181,192.168.22.26:2181,192
# #
# #
canal.table.scheam = schame canal.table.scheam = kp_user
\ No newline at end of file \ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment