From 2765b561fba705acfb47fe391956d157a2cb1e66 Mon Sep 17 00:00:00 2001 From: liaozan <378024053@qq.com> Date: Thu, 7 Dec 2023 00:25:06 +0800 Subject: [PATCH] Update starrocks --- .../util/support/ConfigurableProperties.java | 4 +- .../common/entity/CanalChangedEvent.java | 71 +++++++++++++++++++ .../starrocks/StarrocksAutoConfiguration.java | 7 -- .../starrocks/annotation/StarrocksTable.java | 19 +++++ .../constants/StarrocksConstants.java | 19 +++++ .../operation/StarrocksOperationFactory.java | 35 --------- .../starrocks/operation/StarrocksService.java | 7 ++ .../operation/StarrocksServiceImpl.java | 71 ++++++++++++++----- .../operation/StarrocksStreamLoadHandler.java | 4 +- .../properties/StarrocksProperties.java | 6 ++ 10 files changed, 181 insertions(+), 62 deletions(-) create mode 100644 commons/common/src/main/java/com/schbrain/common/entity/CanalChangedEvent.java create mode 100644 starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/annotation/StarrocksTable.java create mode 100644 starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/constants/StarrocksConstants.java delete mode 100644 starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/operation/StarrocksOperationFactory.java diff --git a/commons/common-util/src/main/java/com/schbrain/common/util/support/ConfigurableProperties.java b/commons/common-util/src/main/java/com/schbrain/common/util/support/ConfigurableProperties.java index 187126d..f6a8ceb 100644 --- a/commons/common-util/src/main/java/com/schbrain/common/util/support/ConfigurableProperties.java +++ b/commons/common-util/src/main/java/com/schbrain/common/util/support/ConfigurableProperties.java @@ -4,7 +4,7 @@ import com.schbrain.common.util.ConfigurationPropertiesUtils; import org.springframework.boot.context.properties.bind.Bindable; import org.springframework.boot.context.properties.bind.Binder; import org.springframework.core.Ordered; -import org.springframework.core.env.ConfigurableEnvironment; +import org.springframework.core.env.Environment; /** * @author liaozan @@ -20,7 +20,7 @@ public interface ConfigurableProperties extends Ordered { /** * bind properties */ - default ConfigurableProperties bind(ConfigurableEnvironment environment) { + default ConfigurableProperties bind(Environment environment) { return Binder.get(environment).bindOrCreate(getPropertiesPrefix(), Bindable.ofInstance(this)); } diff --git a/commons/common/src/main/java/com/schbrain/common/entity/CanalChangedEvent.java b/commons/common/src/main/java/com/schbrain/common/entity/CanalChangedEvent.java new file mode 100644 index 0000000..8e4546b --- /dev/null +++ b/commons/common/src/main/java/com/schbrain/common/entity/CanalChangedEvent.java @@ -0,0 +1,71 @@ +package com.schbrain.common.entity; + +import lombok.*; + +import java.util.Arrays; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * @author liaozan + * @since 2023/12/6 + */ +@Data +public class CanalChangedEvent { + + /** + * 库名 + */ + private String schemaName; + + /** + * 表名 + */ + private String tableName; + + /** + * 变更类型 + */ + private CanalEventType eventType; + + /** + * 变更前的数据 + */ + private Map before; + + /** + * 变更后的数据 + */ + private Map after; + + @Getter + @AllArgsConstructor + public enum CanalEventType { + + /** + * INSERT + */ + INSERT(1), + + /** + * UPDATE + */ + UPDATE(2), + + /** + * DELETE + */ + DELETE(3); + + private static final Map ENUM_MAP = Arrays.stream(values()).collect(Collectors.toMap(CanalEventType::getValue, Function.identity())); + + private final int value; + + public static CanalEventType of(int value) { + return ENUM_MAP.get(value); + } + + } + +} diff --git a/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/StarrocksAutoConfiguration.java b/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/StarrocksAutoConfiguration.java index c5e4b3b..495d9ca 100644 --- a/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/StarrocksAutoConfiguration.java +++ b/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/StarrocksAutoConfiguration.java @@ -1,10 +1,8 @@ package com.schbrain.framework.autoconfigure.starrocks; -import com.schbrain.framework.autoconfigure.starrocks.operation.StarrocksOperationFactory; import com.schbrain.framework.autoconfigure.starrocks.properties.StarrocksProperties; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; /** * @author liaozan @@ -14,9 +12,4 @@ import org.springframework.context.annotation.Bean; @EnableConfigurationProperties(StarrocksProperties.class) public class StarrocksAutoConfiguration { - @Bean - public StarrocksOperationFactory starrocksOperationFactory(StarrocksProperties starrocksProperties) { - return new StarrocksOperationFactory(starrocksProperties); - } - } diff --git a/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/annotation/StarrocksTable.java b/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/annotation/StarrocksTable.java new file mode 100644 index 0000000..e241e0f --- /dev/null +++ b/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/annotation/StarrocksTable.java @@ -0,0 +1,19 @@ +package com.schbrain.framework.autoconfigure.starrocks.annotation; + +import java.lang.annotation.*; + +/** + * @author liaozan + * @since 2023/12/6 + */ +@Documented +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +public @interface StarrocksTable { + + /** + * 表名 + */ + String value(); + +} diff --git a/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/constants/StarrocksConstants.java b/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/constants/StarrocksConstants.java new file mode 100644 index 0000000..28e8369 --- /dev/null +++ b/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/constants/StarrocksConstants.java @@ -0,0 +1,19 @@ +package com.schbrain.framework.autoconfigure.starrocks.constants; + +/** + * @author liaozan + * @since 2023/12/6 + */ +public class StarrocksConstants { + + /** + * StreamLoad api 路径模板 + */ + public static final String STREAM_LOAD_TEMPLATE = "http://%s:%s/api/%s/%s/_stream_load"; + + /** + * jdbc url连接模板 + */ + public static final String JDBC_URL_TEMPLATE = "jdbc:mysql://%s:%s/%s"; + +} diff --git a/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/operation/StarrocksOperationFactory.java b/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/operation/StarrocksOperationFactory.java deleted file mode 100644 index b426cf5..0000000 --- a/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/operation/StarrocksOperationFactory.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.schbrain.framework.autoconfigure.starrocks.operation; - -import cn.hutool.core.lang.Singleton; -import com.schbrain.framework.autoconfigure.starrocks.properties.StarrocksProperties; - -/** - * @author liaozan - * @since 2023/11/27 - */ -public class StarrocksOperationFactory { - - public static final String STREAM_LOAD_TEMPLATE = "http://%s:%s/api/%s/%s/_stream_load"; - - private static final String CACHE_PREFIX = "starrocks-operation-service-%s"; - - private final StarrocksProperties properties; - - public StarrocksOperationFactory(StarrocksProperties properties) { - this.properties = properties; - } - - public StarrocksService getStarrocksService(String tableName, Class entityClass) { - return getStarrocksService(properties.getDatabase(), tableName, entityClass); - } - - public StarrocksService getStarrocksService(String database, String tableName, Class entityClass) { - return Singleton.get(String.format(CACHE_PREFIX, tableName), () -> new StarrocksServiceImpl<>(createStreamLoadHandler(database, tableName), entityClass)); - } - - private StarrocksStreamLoadHandler createStreamLoadHandler(String database, String tableName) { - String streamLoadUrl = String.format(STREAM_LOAD_TEMPLATE, properties.getHost(), properties.getHttpPort(), database, tableName); - return new StarrocksStreamLoadHandler(streamLoadUrl, properties.getUsername(), properties.getPassword()); - } - -} diff --git a/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/operation/StarrocksService.java b/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/operation/StarrocksService.java index 46928b9..6458510 100644 --- a/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/operation/StarrocksService.java +++ b/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/operation/StarrocksService.java @@ -1,5 +1,7 @@ package com.schbrain.framework.autoconfigure.starrocks.operation; +import org.springframework.jdbc.core.PreparedStatementCreator; + import java.util.List; /** @@ -38,4 +40,9 @@ public interface StarrocksService { */ void deleteBatch(List entityList); + /** + * 根据 sql 查询 + */ + List search(PreparedStatementCreator creator); + } diff --git a/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/operation/StarrocksServiceImpl.java b/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/operation/StarrocksServiceImpl.java index d746b83..4e93735 100644 --- a/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/operation/StarrocksServiceImpl.java +++ b/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/operation/StarrocksServiceImpl.java @@ -1,25 +1,41 @@ package com.schbrain.framework.autoconfigure.starrocks.operation; import com.schbrain.common.util.ValidateUtils; +import com.schbrain.framework.autoconfigure.starrocks.annotation.StarrocksTable; +import com.schbrain.framework.autoconfigure.starrocks.properties.StarrocksProperties; +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.ResolvableType; +import org.springframework.jdbc.core.*; import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; + +import static com.schbrain.framework.autoconfigure.starrocks.constants.StarrocksConstants.JDBC_URL_TEMPLATE; +import static com.schbrain.framework.autoconfigure.starrocks.constants.StarrocksConstants.STREAM_LOAD_TEMPLATE; /** * @author liaozan * @since 2023/11/27 */ -public class StarrocksServiceImpl implements StarrocksService { +public class StarrocksServiceImpl implements StarrocksService, InitializingBean { + + protected final Class entityClass; + protected final BeanPropertyRowMapper rowMapper; - // TODO for select - @SuppressWarnings({"FieldCanBeLocal", "unused"}) - private final Class entityClass; + @Autowired + protected StarrocksProperties config; - private final StarrocksStreamLoadHandler handler; + protected JdbcTemplate jdbcTemplate; + protected StarrocksStreamLoadHandler handler; - public StarrocksServiceImpl(StarrocksStreamLoadHandler handler, Class entityClass) { - this.handler = handler; - this.entityClass = entityClass; + @SuppressWarnings({"unchecked", "DataFlowIssue"}) + public StarrocksServiceImpl() { + this.entityClass = (Class) ResolvableType.forInstance(this).getSuperType().getGeneric(0).getRawClass(); + this.rowMapper = new BeanPropertyRowMapper<>(entityClass); } @Override @@ -29,8 +45,7 @@ public class StarrocksServiceImpl implements StarrocksService { @Override public void upsert(T entity, List columns) { - ValidateUtils.notNull(entity, "entity不能为空"); - upsertBatch(List.of(entity), columns); + upsertBatch(List.of(ValidateUtils.notNull(entity, "entity不能为空")), columns); } @Override @@ -40,20 +55,44 @@ public class StarrocksServiceImpl implements StarrocksService { @Override public void upsertBatch(List entityList, List columns) { - ValidateUtils.notEmpty(entityList, "entityList不能为空"); - handler.upsertBatch(entityList, columns); + handler.upsertBatch(ValidateUtils.notEmpty(entityList, "entityList不能为空"), columns); } @Override public void delete(T entity) { - ValidateUtils.notNull(entity, "entity不能为空"); - deleteBatch(List.of(entity)); + deleteBatch(List.of(ValidateUtils.notNull(entity, "entity不能为空"))); } @Override public void deleteBatch(List entityList) { - ValidateUtils.notNull(entityList, "entityList不能为空"); - handler.deleteBatch(entityList); + handler.deleteBatch(ValidateUtils.notNull(entityList, "entityList不能为空")); + } + + @Override + public List search(PreparedStatementCreator callback) { + return jdbcTemplate.queryForStream(callback, rowMapper).collect(Collectors.toList()); + } + + @Override + public void afterPropertiesSet() { + StarrocksTable annotation = ValidateUtils.notNull(entityClass.getAnnotation(StarrocksTable.class), StarrocksTable.class.getName() + "不能为空"); + this.handler = createHandler(annotation.value()); + this.jdbcTemplate = createJdbcTemplate(annotation.value()); + } + + protected StarrocksStreamLoadHandler createHandler(String tableName) { + String streamLoadUrl = String.format(STREAM_LOAD_TEMPLATE, config.getHost(), config.getHttpPort(), config.getDatabase(), tableName); + return new StarrocksStreamLoadHandler(streamLoadUrl, config.getUsername(), config.getPassword()); + } + + protected JdbcTemplate createJdbcTemplate(String tableName) { + HikariConfig hikariConfig = new HikariConfig(); + hikariConfig.setPoolName(String.format("%s:%s", config.getDatabase(), tableName)); + hikariConfig.setDriverClassName(config.getDriverClassName()); + hikariConfig.setJdbcUrl(String.format(JDBC_URL_TEMPLATE, config.getHost(), config.getPort(), config.getDatabase())); + hikariConfig.setUsername(config.getUsername()); + hikariConfig.setPassword(config.getPassword()); + return new JdbcTemplate(new HikariDataSource(hikariConfig)); } } diff --git a/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/operation/StarrocksStreamLoadHandler.java b/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/operation/StarrocksStreamLoadHandler.java index 284a720..b151c76 100644 --- a/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/operation/StarrocksStreamLoadHandler.java +++ b/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/operation/StarrocksStreamLoadHandler.java @@ -1,9 +1,9 @@ package com.schbrain.framework.autoconfigure.starrocks.operation; +import cn.hutool.core.util.IdUtil; import cn.hutool.http.HttpRequest; import com.schbrain.common.exception.ParamInvalidException; import com.schbrain.common.util.JacksonUtils; -import com.schbrain.common.util.TraceIdUtils; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; @@ -61,7 +61,7 @@ public class StarrocksStreamLoadHandler { private HttpRequest createCommonRequest(String content) { return HttpRequest.put(streamLoadUrl) - .header("label", TraceIdUtils.get()) + .header("label", IdUtil.getSnowflakeNextIdStr()) .header("strict_mode", Boolean.TRUE.toString()) .header("Expect", "100-continue") .header("format", "json") diff --git a/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/properties/StarrocksProperties.java b/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/properties/StarrocksProperties.java index 0880286..07c8078 100644 --- a/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/properties/StarrocksProperties.java +++ b/starters/starrocks-spring-boot-starter/src/main/java/com/schbrain/framework/autoconfigure/starrocks/properties/StarrocksProperties.java @@ -60,6 +60,12 @@ public class StarrocksProperties implements ConfigurableProperties { @NotBlank private String password; + /** + * 数据库连接池大小 + */ + @NotNull + private Integer maxPoolSize = 2; + @Override public String getNamespace() { return "starrocks-common"; -- GitLab