Commit 2765b561 authored by liaozan's avatar liaozan 🏀

Update starrocks

parent 18793d2f
...@@ -4,7 +4,7 @@ import com.schbrain.common.util.ConfigurationPropertiesUtils; ...@@ -4,7 +4,7 @@ import com.schbrain.common.util.ConfigurationPropertiesUtils;
import org.springframework.boot.context.properties.bind.Bindable; import org.springframework.boot.context.properties.bind.Bindable;
import org.springframework.boot.context.properties.bind.Binder; import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.core.Ordered; import org.springframework.core.Ordered;
import org.springframework.core.env.ConfigurableEnvironment; import org.springframework.core.env.Environment;
/** /**
* @author liaozan * @author liaozan
...@@ -20,7 +20,7 @@ public interface ConfigurableProperties extends Ordered { ...@@ -20,7 +20,7 @@ public interface ConfigurableProperties extends Ordered {
/** /**
* bind properties * bind properties
*/ */
default ConfigurableProperties bind(ConfigurableEnvironment environment) { default ConfigurableProperties bind(Environment environment) {
return Binder.get(environment).bindOrCreate(getPropertiesPrefix(), Bindable.ofInstance(this)); return Binder.get(environment).bindOrCreate(getPropertiesPrefix(), Bindable.ofInstance(this));
} }
......
package com.schbrain.common.entity;
import lombok.*;
import java.util.Arrays;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* @author liaozan
* @since 2023/12/6
*/
@Data
public class CanalChangedEvent {
/**
* 库名
*/
private String schemaName;
/**
* 表名
*/
private String tableName;
/**
* 变更类型
*/
private CanalEventType eventType;
/**
* 变更前的数据
*/
private Map<String, Object> before;
/**
* 变更后的数据
*/
private Map<String, Object> after;
@Getter
@AllArgsConstructor
public enum CanalEventType {
/**
* INSERT
*/
INSERT(1),
/**
* UPDATE
*/
UPDATE(2),
/**
* DELETE
*/
DELETE(3);
private static final Map<Integer, CanalEventType> ENUM_MAP = Arrays.stream(values()).collect(Collectors.toMap(CanalEventType::getValue, Function.identity()));
private final int value;
public static CanalEventType of(int value) {
return ENUM_MAP.get(value);
}
}
}
package com.schbrain.framework.autoconfigure.starrocks; package com.schbrain.framework.autoconfigure.starrocks;
import com.schbrain.framework.autoconfigure.starrocks.operation.StarrocksOperationFactory;
import com.schbrain.framework.autoconfigure.starrocks.properties.StarrocksProperties; import com.schbrain.framework.autoconfigure.starrocks.properties.StarrocksProperties;
import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
/** /**
* @author liaozan * @author liaozan
...@@ -14,9 +12,4 @@ import org.springframework.context.annotation.Bean; ...@@ -14,9 +12,4 @@ import org.springframework.context.annotation.Bean;
@EnableConfigurationProperties(StarrocksProperties.class) @EnableConfigurationProperties(StarrocksProperties.class)
public class StarrocksAutoConfiguration { public class StarrocksAutoConfiguration {
@Bean
public StarrocksOperationFactory starrocksOperationFactory(StarrocksProperties starrocksProperties) {
return new StarrocksOperationFactory(starrocksProperties);
}
} }
package com.schbrain.framework.autoconfigure.starrocks.annotation;
import java.lang.annotation.*;
/**
* @author liaozan
* @since 2023/12/6
*/
@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface StarrocksTable {
/**
* 表名
*/
String value();
}
package com.schbrain.framework.autoconfigure.starrocks.constants;
/**
* @author liaozan
* @since 2023/12/6
*/
public class StarrocksConstants {
/**
* StreamLoad api 路径模板
*/
public static final String STREAM_LOAD_TEMPLATE = "http://%s:%s/api/%s/%s/_stream_load";
/**
* jdbc url连接模板
*/
public static final String JDBC_URL_TEMPLATE = "jdbc:mysql://%s:%s/%s";
}
package com.schbrain.framework.autoconfigure.starrocks.operation;
import cn.hutool.core.lang.Singleton;
import com.schbrain.framework.autoconfigure.starrocks.properties.StarrocksProperties;
/**
* @author liaozan
* @since 2023/11/27
*/
public class StarrocksOperationFactory {
public static final String STREAM_LOAD_TEMPLATE = "http://%s:%s/api/%s/%s/_stream_load";
private static final String CACHE_PREFIX = "starrocks-operation-service-%s";
private final StarrocksProperties properties;
public StarrocksOperationFactory(StarrocksProperties properties) {
this.properties = properties;
}
public <T> StarrocksService<T> getStarrocksService(String tableName, Class<T> entityClass) {
return getStarrocksService(properties.getDatabase(), tableName, entityClass);
}
public <T> StarrocksService<T> getStarrocksService(String database, String tableName, Class<T> entityClass) {
return Singleton.get(String.format(CACHE_PREFIX, tableName), () -> new StarrocksServiceImpl<>(createStreamLoadHandler(database, tableName), entityClass));
}
private StarrocksStreamLoadHandler createStreamLoadHandler(String database, String tableName) {
String streamLoadUrl = String.format(STREAM_LOAD_TEMPLATE, properties.getHost(), properties.getHttpPort(), database, tableName);
return new StarrocksStreamLoadHandler(streamLoadUrl, properties.getUsername(), properties.getPassword());
}
}
package com.schbrain.framework.autoconfigure.starrocks.operation; package com.schbrain.framework.autoconfigure.starrocks.operation;
import org.springframework.jdbc.core.PreparedStatementCreator;
import java.util.List; import java.util.List;
/** /**
...@@ -38,4 +40,9 @@ public interface StarrocksService<T> { ...@@ -38,4 +40,9 @@ public interface StarrocksService<T> {
*/ */
void deleteBatch(List<T> entityList); void deleteBatch(List<T> entityList);
/**
* 根据 sql 查询
*/
List<T> search(PreparedStatementCreator creator);
} }
package com.schbrain.framework.autoconfigure.starrocks.operation; package com.schbrain.framework.autoconfigure.starrocks.operation;
import com.schbrain.common.util.ValidateUtils; import com.schbrain.common.util.ValidateUtils;
import com.schbrain.framework.autoconfigure.starrocks.annotation.StarrocksTable;
import com.schbrain.framework.autoconfigure.starrocks.properties.StarrocksProperties;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.ResolvableType;
import org.springframework.jdbc.core.*;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
import static com.schbrain.framework.autoconfigure.starrocks.constants.StarrocksConstants.JDBC_URL_TEMPLATE;
import static com.schbrain.framework.autoconfigure.starrocks.constants.StarrocksConstants.STREAM_LOAD_TEMPLATE;
/** /**
* @author liaozan * @author liaozan
* @since 2023/11/27 * @since 2023/11/27
*/ */
public class StarrocksServiceImpl<T> implements StarrocksService<T> { public class StarrocksServiceImpl<T> implements StarrocksService<T>, InitializingBean {
protected final Class<T> entityClass;
protected final BeanPropertyRowMapper<T> rowMapper;
// TODO for select @Autowired
@SuppressWarnings({"FieldCanBeLocal", "unused"}) protected StarrocksProperties config;
private final Class<T> entityClass;
private final StarrocksStreamLoadHandler handler; protected JdbcTemplate jdbcTemplate;
protected StarrocksStreamLoadHandler handler;
public StarrocksServiceImpl(StarrocksStreamLoadHandler handler, Class<T> entityClass) { @SuppressWarnings({"unchecked", "DataFlowIssue"})
this.handler = handler; public StarrocksServiceImpl() {
this.entityClass = entityClass; this.entityClass = (Class<T>) ResolvableType.forInstance(this).getSuperType().getGeneric(0).getRawClass();
this.rowMapper = new BeanPropertyRowMapper<>(entityClass);
} }
@Override @Override
...@@ -29,8 +45,7 @@ public class StarrocksServiceImpl<T> implements StarrocksService<T> { ...@@ -29,8 +45,7 @@ public class StarrocksServiceImpl<T> implements StarrocksService<T> {
@Override @Override
public void upsert(T entity, List<String> columns) { public void upsert(T entity, List<String> columns) {
ValidateUtils.notNull(entity, "entity不能为空"); upsertBatch(List.of(ValidateUtils.notNull(entity, "entity不能为空")), columns);
upsertBatch(List.of(entity), columns);
} }
@Override @Override
...@@ -40,20 +55,44 @@ public class StarrocksServiceImpl<T> implements StarrocksService<T> { ...@@ -40,20 +55,44 @@ public class StarrocksServiceImpl<T> implements StarrocksService<T> {
@Override @Override
public void upsertBatch(List<T> entityList, List<String> columns) { public void upsertBatch(List<T> entityList, List<String> columns) {
ValidateUtils.notEmpty(entityList, "entityList不能为空"); handler.upsertBatch(ValidateUtils.notEmpty(entityList, "entityList不能为空"), columns);
handler.upsertBatch(entityList, columns);
} }
@Override @Override
public void delete(T entity) { public void delete(T entity) {
ValidateUtils.notNull(entity, "entity不能为空"); deleteBatch(List.of(ValidateUtils.notNull(entity, "entity不能为空")));
deleteBatch(List.of(entity));
} }
@Override @Override
public void deleteBatch(List<T> entityList) { public void deleteBatch(List<T> entityList) {
ValidateUtils.notNull(entityList, "entityList不能为空"); handler.deleteBatch(ValidateUtils.notNull(entityList, "entityList不能为空"));
handler.deleteBatch(entityList); }
@Override
public List<T> search(PreparedStatementCreator callback) {
return jdbcTemplate.queryForStream(callback, rowMapper).collect(Collectors.toList());
}
@Override
public void afterPropertiesSet() {
StarrocksTable annotation = ValidateUtils.notNull(entityClass.getAnnotation(StarrocksTable.class), StarrocksTable.class.getName() + "不能为空");
this.handler = createHandler(annotation.value());
this.jdbcTemplate = createJdbcTemplate(annotation.value());
}
protected StarrocksStreamLoadHandler createHandler(String tableName) {
String streamLoadUrl = String.format(STREAM_LOAD_TEMPLATE, config.getHost(), config.getHttpPort(), config.getDatabase(), tableName);
return new StarrocksStreamLoadHandler(streamLoadUrl, config.getUsername(), config.getPassword());
}
protected JdbcTemplate createJdbcTemplate(String tableName) {
HikariConfig hikariConfig = new HikariConfig();
hikariConfig.setPoolName(String.format("%s:%s", config.getDatabase(), tableName));
hikariConfig.setDriverClassName(config.getDriverClassName());
hikariConfig.setJdbcUrl(String.format(JDBC_URL_TEMPLATE, config.getHost(), config.getPort(), config.getDatabase()));
hikariConfig.setUsername(config.getUsername());
hikariConfig.setPassword(config.getPassword());
return new JdbcTemplate(new HikariDataSource(hikariConfig));
} }
} }
package com.schbrain.framework.autoconfigure.starrocks.operation; package com.schbrain.framework.autoconfigure.starrocks.operation;
import cn.hutool.core.util.IdUtil;
import cn.hutool.http.HttpRequest; import cn.hutool.http.HttpRequest;
import com.schbrain.common.exception.ParamInvalidException; import com.schbrain.common.exception.ParamInvalidException;
import com.schbrain.common.util.JacksonUtils; import com.schbrain.common.util.JacksonUtils;
import com.schbrain.common.util.TraceIdUtils;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
...@@ -61,7 +61,7 @@ public class StarrocksStreamLoadHandler { ...@@ -61,7 +61,7 @@ public class StarrocksStreamLoadHandler {
private HttpRequest createCommonRequest(String content) { private HttpRequest createCommonRequest(String content) {
return HttpRequest.put(streamLoadUrl) return HttpRequest.put(streamLoadUrl)
.header("label", TraceIdUtils.get()) .header("label", IdUtil.getSnowflakeNextIdStr())
.header("strict_mode", Boolean.TRUE.toString()) .header("strict_mode", Boolean.TRUE.toString())
.header("Expect", "100-continue") .header("Expect", "100-continue")
.header("format", "json") .header("format", "json")
......
...@@ -60,6 +60,12 @@ public class StarrocksProperties implements ConfigurableProperties { ...@@ -60,6 +60,12 @@ public class StarrocksProperties implements ConfigurableProperties {
@NotBlank @NotBlank
private String password; private String password;
/**
* 数据库连接池大小
*/
@NotNull
private Integer maxPoolSize = 2;
@Override @Override
public String getNamespace() { public String getNamespace() {
return "starrocks-common"; return "starrocks-common";
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment