Commit c54acd43 authored by liaozan's avatar liaozan 🏀

Add starrocks starter

parent e3e182fa
......@@ -45,22 +45,6 @@
<properties>
<revision>3.0.9-SNAPSHOT</revision>
<java.version>11</java.version>
<!-- 2th part versions -->
<schbrain-apollo.version>${revision}</schbrain-apollo.version>
<schbrain-base-dao.version>${revision}</schbrain-base-dao.version>
<schbrain-cache.version>${revision}</schbrain-cache.version>
<schbrain-common.version>${revision}</schbrain-common.version>
<schbrain-common-util.version>${revision}</schbrain-common-util.version>
<schbrain-dubbo.version>${revision}</schbrain-dubbo.version>
<schbrain-elasticsearch.version>${revision}</schbrain-elasticsearch.version>
<schbrain-kafka.version>${revision}</schbrain-kafka.version>
<schbrain-logger.version>${revision}</schbrain-logger.version>
<schbrain-module-tree.version>${revision}</schbrain-module-tree.version>
<schbrain-mybatis.version>${revision}</schbrain-mybatis.version>
<schbrain-oss.version>${revision}</schbrain-oss.version>
<schbrain-spring-support.version>${revision}</schbrain-spring-support.version>
<schbrain-web-common.version>${revision}</schbrain-web-common.version>
<schbrain-xxl.version>${revision}</schbrain-xxl.version>
<!-- 3th part versions -->
<apollo.version>2.1.0</apollo.version>
......@@ -129,79 +113,84 @@
<dependencies>
<!-- 2th part versions -->
<dependency>
<groupId>com.schbrain.framework</groupId>
<artifactId>apollo-spring-boot-starter</artifactId>
<version>${schbrain-apollo.version}</version>
<groupId>com.schbrain.common</groupId>
<artifactId>common</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>com.schbrain.framework</groupId>
<artifactId>schbrain-base-dao</artifactId>
<version>${schbrain-base-dao.version}</version>
<groupId>com.schbrain.common</groupId>
<artifactId>web-common</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>com.schbrain.framework</groupId>
<artifactId>cache-spring-boot-starter</artifactId>
<version>${schbrain-cache.version}</version>
<groupId>com.schbrain.common</groupId>
<artifactId>common-util</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>com.schbrain.common</groupId>
<artifactId>common</artifactId>
<version>${schbrain-common.version}</version>
<artifactId>module-tree</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>com.schbrain.common</groupId>
<artifactId>common-util</artifactId>
<version>${schbrain-common-util.version}</version>
<groupId>com.schbrain.framework</groupId>
<artifactId>apollo-spring-boot-starter</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>com.schbrain.framework</groupId>
<artifactId>cache-spring-boot-starter</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>com.schbrain.framework</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
<version>${schbrain-dubbo.version}</version>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>com.schbrain.framework</groupId>
<artifactId>elasticsearch-spring-boot-starter</artifactId>
<version>${schbrain-elasticsearch.version}</version>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>com.schbrain.framework</groupId>
<artifactId>kafka-spring-boot-starter</artifactId>
<version>${schbrain-kafka.version}</version>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>com.schbrain.framework</groupId>
<artifactId>logger-spring-boot-starter</artifactId>
<version>${schbrain-logger.version}</version>
</dependency>
<dependency>
<groupId>com.schbrain.common</groupId>
<artifactId>module-tree</artifactId>
<version>${schbrain-module-tree.version}</version>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>com.schbrain.framework</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>${schbrain-mybatis.version}</version>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>com.schbrain.framework</groupId>
<artifactId>oss-spring-boot-starter</artifactId>
<version>${schbrain-oss.version}</version>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>com.schbrain.framework</groupId>
<artifactId>schbrain-spring-support</artifactId>
<version>${schbrain-spring-support.version}</version>
<artifactId>starrocks-spring-boot-starter</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>com.schbrain.common</groupId>
<artifactId>web-common</artifactId>
<version>${schbrain-web-common.version}</version>
<groupId>com.schbrain.framework</groupId>
<artifactId>xxl-job-spring-boot-starter</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>com.schbrain.framework</groupId>
<artifactId>xxl-job-spring-boot-starter</artifactId>
<version>${schbrain-xxl.version}</version>
<artifactId>schbrain-spring-support</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>com.schbrain.framework</groupId>
<artifactId>schbrain-base-dao</artifactId>
<version>${revision}</version>
</dependency>
<!-- 3th part versions -->
......
......@@ -203,8 +203,8 @@ public class OssUtils {
if (StringUtils.isBlank(domain)) {
return ossUrl;
}
UrlBuilder originUrlBuilder = UrlBuilder.ofHttp(ossUrl);
return UrlBuilder.ofHttp(domain)
UrlBuilder originUrlBuilder = UrlBuilder.ofHttpWithoutEncode(ossUrl);
return UrlBuilder.ofHttpWithoutEncode(domain)
.setPath(originUrlBuilder.getPath())
.setQuery(originUrlBuilder.getQuery())
.build();
......
......@@ -24,6 +24,7 @@
<module>mybatis-spring-boot-starter</module>
<module>oss-spring-boot-starter</module>
<module>xxl-job-spring-boot-starter</module>
<module>starrocks-spring-boot-starter</module>
</modules>
</project>
\ No newline at end of file
</project>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.schbrain.framework</groupId>
<artifactId>starters</artifactId>
<version>${revision}</version>
</parent>
<artifactId>starrocks-spring-boot-starter</artifactId>
<dependencies>
<dependency>
<groupId>com.schbrain.framework</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
</dependency>
</dependencies>
</project>
package com.schbrain.framework.autoconfigure.starrocks;
import com.schbrain.framework.autoconfigure.starrocks.operation.StarrocksOperationFactory;
import com.schbrain.framework.autoconfigure.starrocks.properties.StarrocksProperties;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
/**
* @author liaozan
* @since 2023/11/27
*/
@AutoConfiguration
@EnableConfigurationProperties(StarrocksProperties.class)
public class StarrocksAutoConfiguration {
@Bean
public StarrocksOperationFactory starrocksOperationFactory(StarrocksProperties starrocksProperties) {
return new StarrocksOperationFactory(starrocksProperties);
}
}
package com.schbrain.framework.autoconfigure.starrocks.operation;
import cn.hutool.core.lang.Singleton;
import com.schbrain.framework.autoconfigure.starrocks.properties.StarrocksProperties;
/**
* @author liaozan
* @since 2023/11/27
*/
public class StarrocksOperationFactory {
public static final String STREAM_LOAD_TEMPLATE = "http://%s:%s/api/%s/%s/_stream_load";
private static final String CACHE_PREFIX = "starrocks-operation-service-%s";
private final StarrocksProperties properties;
public StarrocksOperationFactory(StarrocksProperties properties) {
this.properties = properties;
}
public <T> StarrocksService<T> getStarrocksService(String tableName, Class<T> entityClass) {
return getStarrocksService(properties.getDatabase(), tableName, entityClass);
}
public <T> StarrocksService<T> getStarrocksService(String database, String tableName, Class<T> entityClass) {
return Singleton.get(String.format(CACHE_PREFIX, tableName), () -> new StarrocksServiceImpl<>(createStreamLoadHandler(database, tableName), entityClass));
}
private StarrocksStreamLoadHandler createStreamLoadHandler(String database, String tableName) {
String streamLoadUrl = String.format(STREAM_LOAD_TEMPLATE, properties.getHost(), properties.getHttpPort(), database, tableName);
return new StarrocksStreamLoadHandler(streamLoadUrl, properties.getUsername(), properties.getPassword());
}
}
package com.schbrain.framework.autoconfigure.starrocks.operation;
import java.util.List;
/**
* @author liaozan
* @since 2023/11/27
*/
public interface StarrocksService<T> {
/**
* 单个保存/更新
*/
void upsert(T entity);
/**
* 单个保存/更新,传入 columns 只会处理响应的 column
*/
void upsert(T entity, List<String> columns);
/**
* 批量保存/更新
*/
void upsertBatch(List<T> entityList);
/**
* 批量保存/更新,传入 columns 只会处理响应的 column
*/
void upsertBatch(List<T> entityList, List<String> columns);
/**
* 删除
*/
void delete(T entity);
/**
* 批量删除
*/
void deleteBatch(List<T> entityList);
}
package com.schbrain.framework.autoconfigure.starrocks.operation;
import com.schbrain.common.util.ValidateUtils;
import java.util.Collections;
import java.util.List;
/**
* @author liaozan
* @since 2023/11/27
*/
public class StarrocksServiceImpl<T> implements StarrocksService<T> {
// TODO for select
@SuppressWarnings({"FieldCanBeLocal", "unused"})
private final Class<T> entityClass;
private final StarrocksStreamLoadHandler handler;
public StarrocksServiceImpl(StarrocksStreamLoadHandler handler, Class<T> entityClass) {
this.handler = handler;
this.entityClass = entityClass;
}
@Override
public void upsert(T entity) {
upsert(entity, Collections.emptyList());
}
@Override
public void upsert(T entity, List<String> columns) {
ValidateUtils.notNull(entity, "entity不能为空");
upsertBatch(List.of(entity), columns);
}
@Override
public void upsertBatch(List<T> entityList) {
upsertBatch(entityList, Collections.emptyList());
}
@Override
public void upsertBatch(List<T> entityList, List<String> columns) {
ValidateUtils.notEmpty(entityList, "entityList不能为空");
handler.upsertBatch(entityList, columns);
}
@Override
public void delete(T entity) {
ValidateUtils.notNull(entity, "entity不能为空");
deleteBatch(List.of(entity));
}
@Override
public void deleteBatch(List<T> entityList) {
ValidateUtils.notNull(entityList, "entityList不能为空");
handler.deleteBatch(entityList);
}
}
package com.schbrain.framework.autoconfigure.starrocks.operation;
import cn.hutool.http.HttpRequest;
import com.schbrain.common.exception.ParamInvalidException;
import com.schbrain.common.util.JacksonUtils;
import com.schbrain.common.util.TraceIdUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import java.util.List;
import java.util.Optional;
/**
* @author liaozan
* @since 2023/11/27
*/
@Slf4j
public class StarrocksStreamLoadHandler {
private final String streamLoadUrl;
private final String username;
private final String password;
public StarrocksStreamLoadHandler(String streamLoadUrl, String username, String password) {
this.streamLoadUrl = streamLoadUrl;
this.username = username;
this.password = password;
}
public <T> void upsertBatch(List<T> entityList, List<String> columns) {
String content = JacksonUtils.toJsonString(entityList);
String upsertResult = createUpsertRequest(content, columns).execute().body();
log.debug("Starrocks streamLoad upsert result: {}", upsertResult);
checkResponse(upsertResult);
}
public <T> void deleteBatch(List<T> entityList) {
String content = JacksonUtils.toJsonString(entityList);
String deleteResult = createCommonRequest(content).header("columns", "__op='delete'").execute().body();
log.debug("Starrocks streamLoad delete result: {}", deleteResult);
checkResponse(deleteResult);
}
private HttpRequest createUpsertRequest(String content, List<String> columns) {
HttpRequest request = createCommonRequest(content);
if (CollectionUtils.isNotEmpty(columns)) {
request.header("partial_update", Boolean.TRUE.toString());
request.header("columns", String.join(",", columns));
}
return request;
}
private void checkResponse(String result) {
StreamLoadResponse response = JacksonUtils.getObjectFromJson(result, StreamLoadResponse.class);
if (response == null || response.isFailed()) {
throw new ParamInvalidException(Optional.ofNullable(response).map(StreamLoadResponse::getMessage).orElse(result));
}
}
private HttpRequest createCommonRequest(String content) {
return HttpRequest.put(streamLoadUrl)
.header("label", TraceIdUtils.get())
.header("strict_mode", Boolean.TRUE.toString())
.header("Expect", "100-continue")
.header("format", "json")
.header("strip_outer_array", Boolean.TRUE.toString())
.header("ignore_json_size", Boolean.TRUE.toString())
.basicAuth(username, password)
.setFollowRedirects(true)
.body(content);
}
}
package com.schbrain.framework.autoconfigure.starrocks.operation;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
/**
* @author liaozan
* @since 2023/11/27
*/
@Data
public class StreamLoadResponse {
private static final String SUCCESS = "Success";
@JsonProperty("Label")
public String label;
@JsonProperty("TxnId")
private Long txnId;
@JsonProperty("Status")
private String status;
@JsonProperty("Message")
private String message;
@JsonProperty("NumberTotalRows")
private Integer numberTotalRows;
@JsonProperty("NumberLoadedRows")
private Integer numberLoadedRows;
@JsonProperty("NumberFilteredRows")
private Integer numberFilteredRows;
@JsonProperty("NumberUnselectedRows")
private Integer numberUnselectedRows;
@JsonProperty("LoadBytes")
private Integer loadBytes;
@JsonProperty("LoadTimeMs")
private Integer loadTimeMs;
@JsonProperty("BeginTxnTimeMs")
private Integer beginTxnTimeMs;
@JsonProperty("StreamLoadPlanTimeMs")
private Integer streamLoadPlanTimeMs;
@JsonProperty("ReadDataTimeMs")
private Integer readDataTimeMs;
@JsonProperty("WriteDataTimeMs")
private Integer writeDataTimeMs;
@JsonProperty("CommitAndPublishTimeMs")
private Integer commitAndPublishTimeMs;
public boolean isSuccess() {
return SUCCESS.equals(status);
}
public boolean isFailed() {
return !isSuccess();
}
}
package com.schbrain.framework.autoconfigure.starrocks.properties;
import com.mysql.cj.jdbc.Driver;
import com.schbrain.common.util.support.ConfigurableProperties;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.validation.annotation.Validated;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
/**
* @author liaozan
* @since 2023/11/27
*/
@Data
@Validated
@ConfigurationProperties(prefix = "starrocks")
public class StarrocksProperties implements ConfigurableProperties {
/**
* 驱动
*/
@NotBlank
private String driverClassName = Driver.class.getName();
/**
* 连接地址
*/
@NotBlank
private String host;
/**
* 数据库名
*/
@NotBlank
private String database;
/**
* 数据库直连端口
*/
@NotNull
private Integer port = 9030;
/**
* http 连接地址
*/
@NotNull
private Integer httpPort = 8030;
/**
* 用户名
*/
@NotBlank
private String username;
/**
* 密码
*/
@NotBlank
private String password;
@Override
public String getNamespace() {
return "starrocks-common";
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment