[Feature]:Flink-connector supports streamload parameters (#6243)

Flink-connector supports streamload parameters
#6199
This commit is contained in:
wudi
2021-08-09 22:12:46 +08:00
committed by GitHub
parent c8c571af37
commit d9fc1bf3ca
24 changed files with 570 additions and 482 deletions

View File

@ -131,6 +131,7 @@ INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source
| sink.batch.size | 100 | Maximum number of lines in a single write BE |
| sink.max-retries | 1 | Number of retries after writing BE failed |
| sink.batch.interval | 1s | The flush interval, after which the asynchronous thread will write the data in the cache to BE. The default value is 1 second, and the time units are ms, s, min, h, and d. Set to 0 to turn off periodic writing. |
| sink.properties.* | -- | The stream load parameters.eg:sink.properties.column_separator' = ','. |
## Doris & Flink Column Type Mapping

View File

@ -135,6 +135,7 @@ INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source
| sink.batch.size | 100 | 单次写BE的最大行数 |
| sink.max-retries | 1 | 写BE失败之后的重试次数 |
| sink.batch.interval | 1s | flush 间隔时间,超过该时间后异步线程将 缓存中数据写入BE。 默认值为1秒,支持时间单位ms、s、min、h和d。设置为0表示关闭定期写入。|
| sink.properties.* | -- | Stream load 的导入参数。例如:sink.properties.column_separator' = ','等 |

View File

@ -112,6 +112,7 @@ public class BackendClient {
/**
* Open a scanner for reading Doris data.
*
* @param openParams thrift struct to required by request
* @return scan open result
* @throws ConnectedFailedException throw if cannot connect to Doris BE
@ -147,6 +148,7 @@ public class BackendClient {
/**
* get next row batch from Doris BE
*
* @param nextBatchParams thrift struct to required by request
* @return scan batch result
* @throws ConnectedFailedException throw if cannot connect to Doris BE
@ -161,7 +163,7 @@ public class BackendClient {
for (int attempt = 0; attempt < retries; ++attempt) {
logger.debug("Attempt {} to getNext {}.", attempt, routing);
try {
result = client.get_next(nextBatchParams);
result = client.get_next(nextBatchParams);
if (result == null) {
logger.warn("GetNext result from {} is null.", routing);
continue;
@ -189,6 +191,7 @@ public class BackendClient {
/**
* close an scanner.
*
* @param closeParams thrift struct to required by request
*/
public void closeScanner(TScanCloseParams closeParams) {

View File

@ -21,7 +21,7 @@ import org.apache.flink.util.Preconditions;
import java.io.Serializable;
/**
* Doris connection options.
* Doris connection options.
*/
public class DorisConnectionOptions implements Serializable {

View File

@ -21,22 +21,29 @@ import org.apache.flink.util.Preconditions;
import java.io.Serializable;
import java.time.Duration;
import java.util.Properties;
/**
* JDBC sink batch options.
*/
public class DorisExecutionOptions implements Serializable {
public class DorisExecutionOptions implements Serializable {
private static final long serialVersionUID = 1L;
private final Integer batchSize;
private final Integer maxRetries;
private final Long batchIntervalMs;
public DorisExecutionOptions(Integer batchSize, Integer maxRetries,Long batchIntervalMs) {
/**
* Properties for the StreamLoad.
*/
private final Properties streamLoadProp;
public DorisExecutionOptions(Integer batchSize, Integer maxRetries, Long batchIntervalMs, Properties streamLoadProp) {
Preconditions.checkArgument(maxRetries >= 0);
this.batchSize = batchSize;
this.maxRetries = maxRetries;
this.batchIntervalMs = batchIntervalMs;
this.streamLoadProp = streamLoadProp;
}
public Integer getBatchSize() {
@ -51,6 +58,10 @@ public class DorisExecutionOptions implements Serializable {
return batchIntervalMs;
}
public Properties getStreamLoadProp() {
return streamLoadProp;
}
public static Builder builder() {
return new Builder();
}
@ -62,6 +73,7 @@ public class DorisExecutionOptions implements Serializable {
private Integer batchSize;
private Integer maxRetries;
private Long batchIntervalMs;
private Properties streamLoadProp;
public Builder setBatchSize(Integer batchSize) {
this.batchSize = batchSize;
@ -78,8 +90,13 @@ public class DorisExecutionOptions implements Serializable {
return this;
}
public Builder setStreamLoadProp(Properties streamLoadProp) {
this.streamLoadProp = streamLoadProp;
return this;
}
public DorisExecutionOptions build() {
return new DorisExecutionOptions(batchSize,maxRetries,batchIntervalMs);
return new DorisExecutionOptions(batchSize, maxRetries, batchIntervalMs, streamLoadProp);
}
}

View File

@ -25,7 +25,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Options for the Doris connector.
*/
public class DorisOptions extends DorisConnectionOptions{
public class DorisOptions extends DorisConnectionOptions {
private static final long serialVersionUID = 1L;

View File

@ -22,7 +22,7 @@ import java.io.Serializable;
/**
* Doris read Options
*/
public class DorisReadOptions implements Serializable {
public class DorisReadOptions implements Serializable {
private static final long serialVersionUID = 1L;
@ -35,7 +35,7 @@ public class DorisReadOptions implements Serializable {
private Integer requestRetries;
private Integer requestBatchSize;
private Long execMemLimit;
private Integer deserializeQueueSize;
private Integer deserializeQueueSize;
private Boolean deserializeArrowAsync;
public DorisReadOptions(String readFields, String filterQuery, Integer requestTabletSize, Integer requestConnectTimeoutMs, Integer requestReadTimeoutMs,
@ -117,7 +117,7 @@ public class DorisReadOptions implements Serializable {
private Integer requestRetries;
private Integer requestBatchSize;
private Long execMemLimit;
private Integer deserializeQueueSize;
private Integer deserializeQueueSize;
private Boolean deserializeArrowAsync;
@ -177,7 +177,7 @@ public class DorisReadOptions implements Serializable {
}
public DorisReadOptions build() {
return new DorisReadOptions(readFields,filterQuery,requestTabletSize,requestConnectTimeoutMs,requestReadTimeoutMs,requestQueryTimeoutS,requestRetries,requestBatchSize,execMemLimit,deserializeQueueSize,deserializeArrowAsync);
return new DorisReadOptions(readFields, filterQuery, requestTabletSize, requestConnectTimeoutMs, requestReadTimeoutMs, requestQueryTimeoutS, requestRetries, requestBatchSize, execMemLimit, deserializeQueueSize, deserializeArrowAsync);
}
}

View File

@ -38,15 +38,15 @@ import java.util.List;
public class DorisSourceFunction<T> extends RichSourceFunction<T> implements ResultTypeQueryable<T> {
private static final Logger logger = LoggerFactory.getLogger(DorisSourceFunction.class);
private static final Logger logger = LoggerFactory.getLogger(DorisSourceFunction.class);
private DorisDeserializationSchema deserializer;
private DorisOptions options;
private DorisReadOptions readOptions;
private List<PartitionDefinition> dorisPartitions;
private List<PartitionDefinition> dorisPartitions;
private ScalaValueReader scalaValueReader;
public DorisSourceFunction(DorisStreamOptions streamOptions, DorisDeserializationSchema deserializer) {
public DorisSourceFunction(DorisStreamOptions streamOptions, DorisDeserializationSchema deserializer) {
this.deserializer = deserializer;
this.options = streamOptions.getOptions();
this.readOptions = streamOptions.getReadOptions();
@ -55,14 +55,14 @@ public class DorisSourceFunction<T> extends RichSourceFunction<T> implements Res
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.dorisPartitions = RestService.findPartitions(options,readOptions,logger);
this.dorisPartitions = RestService.findPartitions(options, readOptions, logger);
}
@Override
public void run(SourceContext sourceContext) throws Exception{
for(PartitionDefinition partitions : dorisPartitions){
scalaValueReader = new ScalaValueReader(partitions, options,readOptions);
while (scalaValueReader.hasNext()){
public void run(SourceContext sourceContext) throws Exception {
for (PartitionDefinition partitions : dorisPartitions) {
scalaValueReader = new ScalaValueReader(partitions, options, readOptions);
while (scalaValueReader.hasNext()) {
Object next = scalaValueReader.next();
sourceContext.collect(next);
}

View File

@ -18,10 +18,11 @@ package org.apache.doris.flink.deserialization;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.util.List;
public class SimpleListDeserializationSchema implements DorisDeserializationSchema{
public class SimpleListDeserializationSchema implements DorisDeserializationSchema {
@Override
public TypeInformation getProducedType() {

View File

@ -21,18 +21,22 @@ public class DorisException extends Exception {
public DorisException() {
super();
}
public DorisException(String message) {
super(message);
}
public DorisException(String message, Throwable cause) {
super(message, cause);
}
public DorisException(Throwable cause) {
super(cause);
}
protected DorisException(String message, Throwable cause,
boolean enableSuppression,
boolean writableStackTrace) {
boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -17,4 +17,5 @@
package org.apache.doris.flink.exception;
public class ShouldNeverHappenException extends DorisException { }
public class ShouldNeverHappenException extends DorisException {
}

View File

@ -21,15 +21,19 @@ public class StreamLoadException extends Exception {
public StreamLoadException() {
super();
}
public StreamLoadException(String message) {
super(message);
}
public StreamLoadException(String message, Throwable cause) {
super(message, cause);
}
public StreamLoadException(Throwable cause) {
super(cause);
}
protected StreamLoadException(String message, Throwable cause,
boolean enableSuppression,
boolean writableStackTrace) {

View File

@ -103,7 +103,7 @@ public class PartitionDefinition implements Serializable, Comparable<PartitionDe
similar.retainAll(o.tabletIds);
diffSelf.removeAll(similar);
diffOther.removeAll(similar);
if (diffSelf.size() == 0) {
if (diffSelf.size() == 0) {
return 0;
}
long diff = Collections.min(diffSelf) - Collections.min(diffOther);

View File

@ -88,13 +88,14 @@ public class RestService implements Serializable {
/**
* send request to Doris FE and get response json string.
*
* @param options configuration of request
* @param request {@link HttpRequestBase} real request
* @param logger {@link Logger}
* @param logger {@link Logger}
* @return Doris FE response in json string
* @throws ConnectedFailedException throw when cannot connect to Doris FE
*/
private static String send(DorisOptions options,DorisReadOptions readOptions, HttpRequestBase request, Logger logger) throws
private static String send(DorisOptions options, DorisReadOptions readOptions, HttpRequestBase request, Logger logger) throws
ConnectedFailedException {
int connectTimeout = readOptions.getRequestConnectTimeoutMs() == null ? ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT : readOptions.getRequestConnectTimeoutMs();
int socketTimeout = readOptions.getRequestReadTimeoutMs() == null ? ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT : readOptions.getRequestReadTimeoutMs();
@ -116,10 +117,10 @@ public class RestService implements Serializable {
logger.debug("Attempt {} to request {}.", attempt, request.getURI());
try {
String response;
if (request instanceof HttpGet){
response = getConnectionGet(request.getURI().toString(), options.getUsername(), options.getPassword(),logger);
if (request instanceof HttpGet) {
response = getConnectionGet(request.getURI().toString(), options.getUsername(), options.getPassword(), logger);
} else {
response = getConnectionPost(request, options.getUsername(), options.getPassword(),logger);
response = getConnectionPost(request, options.getUsername(), options.getPassword(), logger);
}
if (response == null) {
logger.warn("Failed to get response from Doris FE {}, http code is {}",
@ -147,14 +148,14 @@ public class RestService implements Serializable {
throw new ConnectedFailedException(request.getURI().toString(), statusCode, ex);
}
private static String getConnectionPost(HttpRequestBase request,String user, String passwd,Logger logger) throws IOException {
private static String getConnectionPost(HttpRequestBase request, String user, String passwd, Logger logger) throws IOException {
URL url = new URL(request.getURI().toString());
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setInstanceFollowRedirects(false);
conn.setRequestMethod(request.getMethod());
String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
conn.setRequestProperty("Authorization", "Basic " + authEncoding);
InputStream content = ((HttpPost)request).getEntity().getContent();
InputStream content = ((HttpPost) request).getEntity().getContent();
String res = IOUtils.toString(content);
conn.setDoOutput(true);
conn.setDoInput(true);
@ -164,21 +165,21 @@ public class RestService implements Serializable {
// flush
out.flush();
// read response
return parseResponse(conn,logger);
return parseResponse(conn, logger);
}
private static String getConnectionGet(String request,String user, String passwd,Logger logger) throws IOException {
private static String getConnectionGet(String request, String user, String passwd, Logger logger) throws IOException {
URL realUrl = new URL(request);
// open connection
HttpURLConnection connection = (HttpURLConnection)realUrl.openConnection();
HttpURLConnection connection = (HttpURLConnection) realUrl.openConnection();
String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
connection.setRequestProperty("Authorization", "Basic " + authEncoding);
connection.connect();
return parseResponse(connection,logger);
return parseResponse(connection, logger);
}
private static String parseResponse(HttpURLConnection connection,Logger logger) throws IOException {
private static String parseResponse(HttpURLConnection connection, Logger logger) throws IOException {
if (connection.getResponseCode() != HttpStatus.SC_OK) {
logger.warn("Failed to get response from Doris {}, http code is {}",
connection.getURL(), connection.getResponseCode());
@ -198,8 +199,9 @@ public class RestService implements Serializable {
/**
* parse table identifier to array.
*
* @param tableIdentifier table identifier string
* @param logger {@link Logger}
* @param logger {@link Logger}
* @return first element is db name, second element is table name
* @throws IllegalArgumentException table identifier is illegal
*/
@ -220,8 +222,9 @@ public class RestService implements Serializable {
/**
* choice a Doris FE node to request.
*
* @param feNodes Doris FE node list, separate be comma
* @param logger slf4j logger
* @param logger slf4j logger
* @return the chosen one Doris FE node
* @throws IllegalArgumentException fe nodes is illegal
*/
@ -239,14 +242,15 @@ public class RestService implements Serializable {
/**
* choice a Doris BE node to request.
*
* @param options configuration of request
* @param logger slf4j logger
* @param logger slf4j logger
* @return the chosen one Doris BE node
* @throws IllegalArgumentException BE nodes is illegal
*/
@VisibleForTesting
public static String randomBackend(DorisOptions options,DorisReadOptions readOptions ,Logger logger) throws DorisException, IOException {
List<BackendRow> backends = getBackends(options,readOptions, logger);
public static String randomBackend(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException {
List<BackendRow> backends = getBackends(options, readOptions, logger);
logger.trace("Parse beNodes '{}'.", backends);
if (backends == null || backends.isEmpty()) {
logger.error(ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends);
@ -259,19 +263,20 @@ public class RestService implements Serializable {
/**
* get Doris BE nodes to request.
*
* @param options configuration of request
* @param logger slf4j logger
* @param logger slf4j logger
* @return the chosen one Doris BE node
* @throws IllegalArgumentException BE nodes is illegal
*/
@VisibleForTesting
static List<BackendRow> getBackends(DorisOptions options,DorisReadOptions readOptions, Logger logger) throws DorisException, IOException {
static List<BackendRow> getBackends(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException {
String feNodes = options.getFenodes();
String feNode = randomEndpoint(feNodes, logger);
String beUrl = "http://" + feNode + BACKENDS;
HttpGet httpGet = new HttpGet(beUrl);
String response = send(options, readOptions,httpGet, logger);
logger.info("Backend Info:{}",response);
String response = send(options, readOptions, httpGet, logger);
logger.info("Backend Info:{}", response);
List<BackendRow> backends = parseBackend(response, logger);
return backends;
}
@ -306,8 +311,9 @@ public class RestService implements Serializable {
/**
* get a valid URI to connect Doris FE.
*
* @param options configuration of request
* @param logger {@link Logger}
* @param logger {@link Logger}
* @return uri string
* @throws IllegalArgumentException throw when configuration is illegal
*/
@ -323,24 +329,26 @@ public class RestService implements Serializable {
/**
* discover Doris table schema from Doris FE.
*
* @param options configuration of request
* @param logger slf4j logger
* @param logger slf4j logger
* @return Doris table schema
* @throws DorisException throw when discover failed
*/
public static Schema getSchema(DorisOptions options,DorisReadOptions readOptions, Logger logger)
public static Schema getSchema(DorisOptions options, DorisReadOptions readOptions, Logger logger)
throws DorisException {
logger.trace("Finding schema.");
HttpGet httpGet = new HttpGet(getUriStr(options, logger) + SCHEMA);
String response = send(options, readOptions,httpGet, logger);
String response = send(options, readOptions, httpGet, logger);
logger.debug("Find schema response is '{}'.", response);
return parseSchema(response, logger);
}
/**
* translate Doris FE response to inner {@link Schema} struct.
*
* @param response Doris FE response
* @param logger {@link Logger}
* @param logger {@link Logger}
* @return inner {@link Schema} struct
* @throws DorisException throw when translate failed
*/
@ -381,14 +389,15 @@ public class RestService implements Serializable {
/**
* find Doris RDD partitions from Doris FE.
*
* @param options configuration of request
* @param logger {@link Logger}
* @param logger {@link Logger}
* @return an list of Doris RDD partitions
* @throws DorisException throw when find partition failed
*/
public static List<PartitionDefinition> findPartitions(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException {
String[] tableIdentifiers = parseIdentifier(options.getTableIdentifier(), logger);
String readFields = StringUtils.isBlank(readOptions.getReadFields()) ? "*" :readOptions.getReadFields();
String readFields = StringUtils.isBlank(readOptions.getReadFields()) ? "*" : readOptions.getReadFields();
String sql = "select " + readFields +
" from `" + tableIdentifiers[0] + "`.`" + tableIdentifiers[1] + "`";
if (!StringUtils.isEmpty(readOptions.getFilterQuery())) {
@ -397,14 +406,14 @@ public class RestService implements Serializable {
logger.debug("Query SQL Sending to Doris FE is: '{}'.", sql);
HttpPost httpPost = new HttpPost(getUriStr(options, logger) + QUERY_PLAN);
String entity = "{\"sql\": \""+ sql +"\"}";
String entity = "{\"sql\": \"" + sql + "\"}";
logger.debug("Post body Sending to Doris FE is: '{}'.", entity);
StringEntity stringEntity = new StringEntity(entity, StandardCharsets.UTF_8);
stringEntity.setContentEncoding("UTF-8");
stringEntity.setContentType("application/json");
httpPost.setEntity(stringEntity);
String resStr = send(options, readOptions,httpPost, logger);
String resStr = send(options, readOptions, httpPost, logger);
logger.debug("Find partition response is '{}'.", resStr);
QueryPlan queryPlan = getQueryPlan(resStr, logger);
Map<String, List<Long>> be2Tablets = selectBeForTablet(queryPlan, logger);
@ -420,8 +429,9 @@ public class RestService implements Serializable {
/**
* translate Doris FE response string to inner {@link QueryPlan} struct.
*
* @param response Doris FE response string
* @param logger {@link Logger}
* @param logger {@link Logger}
* @return inner {@link QueryPlan} struct
* @throws DorisException throw when translate failed.
*/
@ -461,13 +471,14 @@ public class RestService implements Serializable {
/**
* select which Doris BE to get tablet data.
*
* @param queryPlan {@link QueryPlan} translated from Doris FE response
* @param logger {@link Logger}
* @param logger {@link Logger}
* @return BE to tablets {@link Map}
* @throws DorisException throw when select failed.
*/
@VisibleForTesting
static Map<String, List<Long>> selectBeForTablet(QueryPlan queryPlan, Logger logger) throws DorisException {
static Map<String, List<Long>> selectBeForTablet(QueryPlan queryPlan, Logger logger) throws DorisException {
Map<String, List<Long>> be2Tablets = new HashMap<>();
for (Map.Entry<String, Tablet> part : queryPlan.getPartitions().entrySet()) {
logger.debug("Parse tablet info: '{}'.", part);
@ -512,8 +523,9 @@ public class RestService implements Serializable {
/**
* tablet count limit for one Doris RDD partition
*
* @param readOptions configuration of request
* @param logger {@link Logger}
* @param logger {@link Logger}
* @return tablet count limit
*/
@VisibleForTesting
@ -533,18 +545,19 @@ public class RestService implements Serializable {
/**
* translate BE tablets map to Doris RDD partition.
* @param options configuration of request
* @param be2Tablets BE to tablets {@link Map}
*
* @param options configuration of request
* @param be2Tablets BE to tablets {@link Map}
* @param opaquedQueryPlan Doris BE execute plan getting from Doris FE
* @param database database name of Doris table
* @param table table name of Doris table
* @param logger {@link Logger}
* @param database database name of Doris table
* @param table table name of Doris table
* @param logger {@link Logger}
* @return Doris RDD partition {@link List}
* @throws IllegalArgumentException throw when translate failed
*/
@VisibleForTesting
static List<PartitionDefinition> tabletsMapToPartition(DorisOptions options,DorisReadOptions readOptions, Map<String, List<Long>> be2Tablets,
String opaquedQueryPlan, String database, String table, Logger logger)
static List<PartitionDefinition> tabletsMapToPartition(DorisOptions options, DorisReadOptions readOptions, Map<String, List<Long>> be2Tablets,
String opaquedQueryPlan, String database, String table, Logger logger)
throws IllegalArgumentException {
int tabletsSize = tabletCountLimitForOnePartition(readOptions, logger);
List<PartitionDefinition> partitions = new ArrayList<>();

View File

@ -27,10 +27,11 @@ public class SchemaUtils {
/**
* convert Doris return schema to inner schema struct.
*
* @param tscanColumnDescs Doris BE return schema
* @return inner schema struct
*/
public static Schema convertToSchema(List<TScanColumnDesc> tscanColumnDescs ){
public static Schema convertToSchema(List<TScanColumnDesc> tscanColumnDescs) {
Schema schema = new Schema(tscanColumnDescs.size());
tscanColumnDescs.stream().forEach(desc -> schema.put(new Field(desc.getName(), desc.getType().name(), "", 0, 0)));
return schema;

View File

@ -26,7 +26,8 @@ public class Field {
private int precision;
private int scale;
public Field() { }
public Field() {
}
public Field(String name, String type, String comment, int precision, int scale) {
this.name = name;

View File

@ -63,7 +63,7 @@ public class RowBatch {
this.cols = new ArrayList<>(colCount);
}
public List<Object> getCols() {
public List<Object> getCols() {
return cols;
}
@ -87,13 +87,13 @@ public class RowBatch {
return rowBatch;
}
public RowBatch(TScanBatchResult nextResult, Schema schema){
public RowBatch(TScanBatchResult nextResult, Schema schema) {
this.schema = schema;
this.rootAllocator = new RootAllocator(Integer.MAX_VALUE);
this.arrowStreamReader = new ArrowStreamReader(
new ByteArrayInputStream(nextResult.getRows()),
rootAllocator
);
);
this.offsetInRowBatch = 0;
}
@ -243,7 +243,7 @@ public class RowBatch {
continue;
}
BigDecimal value = decimalVector.getObject(rowIndex).stripTrailingZeros();
addValueToRow(rowIndex, DecimalData.fromBigDecimal(value,value.precision(),value.scale()));
addValueToRow(rowIndex, DecimalData.fromBigDecimal(value, value.precision(), value.scale()));
}
break;
case "DATE":

View File

@ -34,6 +34,7 @@ import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.StringJoiner;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@ -44,17 +45,23 @@ import java.util.concurrent.TimeUnit;
/**
* DorisDynamicOutputFormat
**/
public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
private static final Logger LOG = LoggerFactory.getLogger(DorisDynamicOutputFormat.class);
private static final String FIELD_DELIMITER_KEY = "column_separator";
private static final String FIELD_DELIMITER_DEFAULT = "\t";
private static final String LINE_DELIMITER_KEY = "line_delimiter";
private static final String LINE_DELIMITER_DEFAULT = "\n";
private static final String NULL_VALUE = "\\N";
private final String fieldDelimiter;
private final String lineDelimiter;
private DorisOptions options ;
private DorisReadOptions readOptions;
private DorisExecutionOptions executionOptions;
private DorisOptions options;
private DorisReadOptions readOptions;
private DorisExecutionOptions executionOptions;
private DorisStreamLoad dorisStreamLoad;
private final String fieldDelimiter = "\t";
private final String lineDelimiter = "\n";
private final String NULL_VALUE = "\\N";
private final List<String> batch = new ArrayList<>();
private transient volatile boolean closed = false;
@ -62,15 +69,16 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
private transient ScheduledFuture<?> scheduledFuture;
private transient volatile Exception flushException;
public DorisDynamicOutputFormat(DorisOptions option,DorisReadOptions readOptions,DorisExecutionOptions executionOptions) {
public DorisDynamicOutputFormat(DorisOptions option, DorisReadOptions readOptions, DorisExecutionOptions executionOptions) {
this.options = option;
this.readOptions = readOptions;
this.executionOptions = executionOptions;
this.fieldDelimiter = executionOptions.getStreamLoadProp().getProperty(FIELD_DELIMITER_KEY, FIELD_DELIMITER_DEFAULT);
this.lineDelimiter = executionOptions.getStreamLoadProp().getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT);
}
@Override
public void configure(Configuration configuration) {
}
@Override
@ -80,8 +88,9 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
options.getTableIdentifier().split("\\.")[0],
options.getTableIdentifier().split("\\.")[1],
options.getUsername(),
options.getPassword());
LOG.info("Streamload BE:{}",dorisStreamLoad.getLoadUrlStr());
options.getPassword(),
executionOptions.getStreamLoadProp());
LOG.info("Streamload BE:{}", dorisStreamLoad.getLoadUrlStr());
if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
this.scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("doris-streamload-output-format"));
@ -118,12 +127,12 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
private void addBatch(RowData row) {
StringJoiner value = new StringJoiner(this.fieldDelimiter);
GenericRowData rowData = (GenericRowData) row;
for(int i = 0; i < row.getArity(); ++i) {
for (int i = 0; i < row.getArity(); ++i) {
Object field = rowData.getField(i);
if(field != null){
if (field != null) {
value.add(field.toString());
}else{
value.add(this.NULL_VALUE);
} else {
value.add(NULL_VALUE);
}
}
batch.add(value.toString());
@ -151,12 +160,12 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
public synchronized void flush() throws IOException {
checkFlushException();
if(batch.isEmpty()){
if (batch.isEmpty()) {
return;
}
for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
try {
dorisStreamLoad.load(String.join(lineDelimiter,batch));
dorisStreamLoad.load(String.join(this.lineDelimiter, batch));
batch.clear();
break;
} catch (StreamLoadException e) {
@ -166,7 +175,7 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
}
try {
dorisStreamLoad.setHostPort(getBackend());
LOG.warn("streamload error,switch be: {}",dorisStreamLoad.getLoadUrlStr(), e);
LOG.warn("streamload error,switch be: {}", dorisStreamLoad.getLoadUrlStr(), e);
Thread.sleep(1000 * i);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
@ -177,10 +186,10 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
}
private String getBackend() throws IOException{
private String getBackend() throws IOException {
try {
//get be url from fe
return RestService.randomBackend(options,readOptions, LOG);
return RestService.randomBackend(options, readOptions, LOG);
} catch (IOException | DorisException e) {
LOG.error("get backends info fail");
throw new IOException(e);
@ -202,8 +211,8 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
*/
public static class Builder {
private DorisOptions.Builder optionsBuilder;
private DorisReadOptions readOptions;
private DorisExecutionOptions executionOptions;
private DorisReadOptions readOptions;
private DorisExecutionOptions executionOptions;
public Builder() {
this.optionsBuilder = DorisOptions.builder();
@ -241,7 +250,7 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
public DorisDynamicOutputFormat build() {
return new DorisDynamicOutputFormat(
optionsBuilder.build(),readOptions,executionOptions
optionsBuilder.build(), readOptions, executionOptions
);
}
}

View File

@ -34,6 +34,8 @@ import org.apache.flink.table.utils.TableSchemaUtils;
import java.time.Duration;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
@ -52,209 +54,224 @@ import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_
* <p>Because the table source requires a decoding format, we are discovering the format using the
* provided {@link FactoryUtil} for convenience.
*/
public final class DorisDynamicTableFactory implements DynamicTableSourceFactory , DynamicTableSinkFactory {
public final class DorisDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
public static final ConfigOption<String> FENODES = ConfigOptions.key("fenodes").stringType().noDefaultValue().withDescription("doris fe http address.");
public static final ConfigOption<String> TABLE_IDENTIFIER = ConfigOptions.key("table.identifier").stringType().noDefaultValue().withDescription("the jdbc table name.");
public static final ConfigOption<String> USERNAME = ConfigOptions.key("username").stringType().noDefaultValue().withDescription("the jdbc user name.");
public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password").stringType().noDefaultValue().withDescription("the jdbc password.");
public static final ConfigOption<String> FENODES = ConfigOptions.key("fenodes").stringType().noDefaultValue().withDescription("doris fe http address.");
public static final ConfigOption<String> TABLE_IDENTIFIER = ConfigOptions.key("table.identifier").stringType().noDefaultValue().withDescription("the jdbc table name.");
public static final ConfigOption<String> USERNAME = ConfigOptions.key("username").stringType().noDefaultValue().withDescription("the jdbc user name.");
public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password").stringType().noDefaultValue().withDescription("the jdbc password.");
// doris options
private static final ConfigOption<String> DORIS_READ_FIELD = ConfigOptions
.key("doris.read.field")
.stringType()
.noDefaultValue()
.withDescription("List of column names in the Doris table, separated by commas");
// doris options
private static final ConfigOption<String> DORIS_READ_FIELD = ConfigOptions
.key("doris.read.field")
.stringType()
.noDefaultValue()
.withDescription("List of column names in the Doris table, separated by commas");
private static final ConfigOption<String> DORIS_FILTER_QUERY = ConfigOptions
.key("doris.filter.query")
.stringType()
.noDefaultValue()
.withDescription("Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering");
private static final ConfigOption<String> DORIS_FILTER_QUERY = ConfigOptions
.key("doris.filter.query")
.stringType()
.noDefaultValue()
.withDescription("Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering");
private static final ConfigOption<Integer> DORIS_TABLET_SIZE = ConfigOptions
.key("doris.request.tablet.size")
.intType()
.defaultValue(DORIS_TABLET_SIZE_DEFAULT)
.withDescription("");
private static final ConfigOption<Integer> DORIS_TABLET_SIZE = ConfigOptions
.key("doris.request.tablet.size")
.intType()
.defaultValue(DORIS_TABLET_SIZE_DEFAULT)
.withDescription("");
private static final ConfigOption<Integer> DORIS_REQUEST_CONNECT_TIMEOUT_MS = ConfigOptions
.key("doris.request.connect.timeout.ms")
.intType()
.defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT)
.withDescription("");
private static final ConfigOption<Integer> DORIS_REQUEST_CONNECT_TIMEOUT_MS = ConfigOptions
.key("doris.request.connect.timeout.ms")
.intType()
.defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT)
.withDescription("");
private static final ConfigOption<Integer> DORIS_REQUEST_READ_TIMEOUT_MS = ConfigOptions
.key("doris.request.read.timeout.ms")
.intType()
.defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT)
.withDescription("");
private static final ConfigOption<Integer> DORIS_REQUEST_READ_TIMEOUT_MS = ConfigOptions
.key("doris.request.read.timeout.ms")
.intType()
.defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT)
.withDescription("");
private static final ConfigOption<Integer> DORIS_REQUEST_QUERY_TIMEOUT_S = ConfigOptions
.key("doris.request.query.timeout.s")
.intType()
.defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT)
.withDescription("");
private static final ConfigOption<Integer> DORIS_REQUEST_QUERY_TIMEOUT_S = ConfigOptions
.key("doris.request.query.timeout.s")
.intType()
.defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT)
.withDescription("");
private static final ConfigOption<Integer> DORIS_REQUEST_RETRIES = ConfigOptions
.key("doris.request.retries")
.intType()
.defaultValue(DORIS_REQUEST_RETRIES_DEFAULT)
.withDescription("");
private static final ConfigOption<Integer> DORIS_REQUEST_RETRIES = ConfigOptions
.key("doris.request.retries")
.intType()
.defaultValue(DORIS_REQUEST_RETRIES_DEFAULT)
.withDescription("");
private static final ConfigOption<Boolean> DORIS_DESERIALIZE_ARROW_ASYNC = ConfigOptions
.key("doris.deserialize.arrow.async")
.booleanType()
.defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT)
.withDescription("");
private static final ConfigOption<Boolean> DORIS_DESERIALIZE_ARROW_ASYNC = ConfigOptions
.key("doris.deserialize.arrow.async")
.booleanType()
.defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT)
.withDescription("");
private static final ConfigOption<Integer> DORIS_DESERIALIZE_QUEUE_SIZE = ConfigOptions
.key("doris.request.retriesdoris.deserialize.queue.size")
.intType()
.defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT)
.withDescription("");
private static final ConfigOption<Integer> DORIS_DESERIALIZE_QUEUE_SIZE = ConfigOptions
.key("doris.request.retriesdoris.deserialize.queue.size")
.intType()
.defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT)
.withDescription("");
private static final ConfigOption<Integer> DORIS_BATCH_SIZE = ConfigOptions
.key("doris.batch.size")
.intType()
.defaultValue(DORIS_BATCH_SIZE_DEFAULT)
.withDescription("");
private static final ConfigOption<Integer> DORIS_BATCH_SIZE = ConfigOptions
.key("doris.batch.size")
.intType()
.defaultValue(DORIS_BATCH_SIZE_DEFAULT)
.withDescription("");
private static final ConfigOption<Long> DORIS_EXEC_MEM_LIMIT = ConfigOptions
.key("doris.exec.mem.limit")
.longType()
.defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT)
.withDescription("");
private static final ConfigOption<Long> DORIS_EXEC_MEM_LIMIT = ConfigOptions
.key("doris.exec.mem.limit")
.longType()
.defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT)
.withDescription("");
// flink write config options
private static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS = ConfigOptions
.key("sink.batch.size")
.intType()
.defaultValue(100)
.withDescription("the flush max size (includes all append, upsert and delete records), over this number" +
" of records, will flush data. The default value is 100.");
// flink write config options
private static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS = ConfigOptions
.key("sink.batch.size")
.intType()
.defaultValue(100)
.withDescription("the flush max size (includes all append, upsert and delete records), over this number" +
" of records, will flush data. The default value is 100.");
private static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions
.key("sink.max-retries")
.intType()
.defaultValue(3)
.withDescription("the max retry times if writing records to database failed.");
private static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions
.key("sink.max-retries")
.intType()
.defaultValue(3)
.withDescription("the max retry times if writing records to database failed.");
private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions
.key("sink.batch.interval")
.durationType()
.defaultValue(Duration.ofSeconds(1))
.withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The " +
"default value is 1s.");
private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions
.key("sink.batch.interval")
.durationType()
.defaultValue(Duration.ofSeconds(1))
.withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The " +
"default value is 1s.");
@Override
public String factoryIdentifier() {
return "doris"; // used for matching to `connector = '...'`
}
// Prefix for Doris StreamLoad specific properties.
public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties.";
@Override
public Set<ConfigOption<?>> requiredOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(FENODES);
options.add(TABLE_IDENTIFIER);
return options;
}
@Override
public String factoryIdentifier() {
return "doris"; // used for matching to `connector = '...'`
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(FENODES);
options.add(TABLE_IDENTIFIER);
options.add(USERNAME);
options.add(PASSWORD);
@Override
public Set<ConfigOption<?>> requiredOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(FENODES);
options.add(TABLE_IDENTIFIER);
return options;
}
options.add(DORIS_READ_FIELD);
options.add(DORIS_FILTER_QUERY);
options.add(DORIS_TABLET_SIZE);
options.add(DORIS_REQUEST_CONNECT_TIMEOUT_MS);
options.add(DORIS_REQUEST_READ_TIMEOUT_MS);
options.add(DORIS_REQUEST_QUERY_TIMEOUT_S);
options.add(DORIS_REQUEST_RETRIES);
options.add(DORIS_DESERIALIZE_ARROW_ASYNC);
options.add(DORIS_DESERIALIZE_QUEUE_SIZE);
options.add(DORIS_BATCH_SIZE);
options.add(DORIS_EXEC_MEM_LIMIT);
@Override
public Set<ConfigOption<?>> optionalOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(FENODES);
options.add(TABLE_IDENTIFIER);
options.add(USERNAME);
options.add(PASSWORD);
options.add(SINK_BUFFER_FLUSH_MAX_ROWS);
options.add(SINK_MAX_RETRIES);
options.add(SINK_BUFFER_FLUSH_INTERVAL);
return options;
}
options.add(DORIS_READ_FIELD);
options.add(DORIS_FILTER_QUERY);
options.add(DORIS_TABLET_SIZE);
options.add(DORIS_REQUEST_CONNECT_TIMEOUT_MS);
options.add(DORIS_REQUEST_READ_TIMEOUT_MS);
options.add(DORIS_REQUEST_QUERY_TIMEOUT_S);
options.add(DORIS_REQUEST_RETRIES);
options.add(DORIS_DESERIALIZE_ARROW_ASYNC);
options.add(DORIS_DESERIALIZE_QUEUE_SIZE);
options.add(DORIS_BATCH_SIZE);
options.add(DORIS_EXEC_MEM_LIMIT);
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
// either implement your custom validation logic here ...
// or use the provided helper utility
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
// validate all options
helper.validate();
// get the validated options
final ReadableConfig options = helper.getOptions();
// derive the produced data type (excluding computed columns) from the catalog table
final DataType producedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
// create and return dynamic table source
return new DorisDynamicTableSource(
getDorisOptions(helper.getOptions()),
getDorisReadOptions(helper.getOptions()),
physicalSchema);
}
options.add(SINK_BUFFER_FLUSH_MAX_ROWS);
options.add(SINK_MAX_RETRIES);
options.add(SINK_BUFFER_FLUSH_INTERVAL);
return options;
}
private DorisOptions getDorisOptions(ReadableConfig readableConfig) {
final String fenodes = readableConfig.get(FENODES);
final DorisOptions.Builder builder = DorisOptions.builder()
.setFenodes(fenodes)
.setTableIdentifier(readableConfig.get(TABLE_IDENTIFIER));
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
// either implement your custom validation logic here ...
// or use the provided helper utility
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
// validate all options
helper.validate();
// get the validated options
final ReadableConfig options = helper.getOptions();
// derive the produced data type (excluding computed columns) from the catalog table
final DataType producedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
// create and return dynamic table source
return new DorisDynamicTableSource(
getDorisOptions(helper.getOptions()),
getDorisReadOptions(helper.getOptions()),
physicalSchema);
}
readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword);
return builder.build();
}
private DorisOptions getDorisOptions(ReadableConfig readableConfig) {
final String fenodes = readableConfig.get(FENODES);
final DorisOptions.Builder builder = DorisOptions.builder()
.setFenodes(fenodes)
.setTableIdentifier(readableConfig.get(TABLE_IDENTIFIER));
private DorisReadOptions getDorisReadOptions(ReadableConfig readableConfig) {
final DorisReadOptions.Builder builder = DorisReadOptions.builder();
builder.setDeserializeArrowAsync(readableConfig.get(DORIS_DESERIALIZE_ARROW_ASYNC))
.setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE))
.setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT))
.setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY))
.setReadFields(readableConfig.get(DORIS_READ_FIELD))
.setRequestQueryTimeoutS(readableConfig.get(DORIS_REQUEST_QUERY_TIMEOUT_S))
.setRequestBatchSize(readableConfig.get(DORIS_BATCH_SIZE))
.setRequestConnectTimeoutMs(readableConfig.get(DORIS_REQUEST_CONNECT_TIMEOUT_MS))
.setRequestReadTimeoutMs(readableConfig.get(DORIS_REQUEST_READ_TIMEOUT_MS))
.setRequestRetries(readableConfig.get(DORIS_REQUEST_RETRIES))
.setRequestTabletSize(readableConfig.get(DORIS_TABLET_SIZE));
return builder.build();
}
readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword);
return builder.build();
}
private DorisExecutionOptions getDorisExecutionOptions(ReadableConfig readableConfig) {
final DorisExecutionOptions.Builder builder = DorisExecutionOptions.builder();
builder.setBatchSize(readableConfig.get(SINK_BUFFER_FLUSH_MAX_ROWS));
builder.setMaxRetries(readableConfig.get(SINK_MAX_RETRIES));
builder.setBatchIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
return builder.build();
}
private DorisReadOptions getDorisReadOptions(ReadableConfig readableConfig) {
final DorisReadOptions.Builder builder = DorisReadOptions.builder();
builder.setDeserializeArrowAsync(readableConfig.get(DORIS_DESERIALIZE_ARROW_ASYNC))
.setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE))
.setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT))
.setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY))
.setReadFields(readableConfig.get(DORIS_READ_FIELD))
.setRequestQueryTimeoutS(readableConfig.get(DORIS_REQUEST_QUERY_TIMEOUT_S))
.setRequestBatchSize(readableConfig.get(DORIS_BATCH_SIZE))
.setRequestConnectTimeoutMs(readableConfig.get(DORIS_REQUEST_CONNECT_TIMEOUT_MS))
.setRequestReadTimeoutMs(readableConfig.get(DORIS_REQUEST_READ_TIMEOUT_MS))
.setRequestRetries(readableConfig.get(DORIS_REQUEST_RETRIES))
.setRequestTabletSize(readableConfig.get(DORIS_TABLET_SIZE));
return builder.build();
}
private DorisExecutionOptions getDorisExecutionOptions(ReadableConfig readableConfig, Properties streamLoadProp) {
final DorisExecutionOptions.Builder builder = DorisExecutionOptions.builder();
builder.setBatchSize(readableConfig.get(SINK_BUFFER_FLUSH_MAX_ROWS));
builder.setMaxRetries(readableConfig.get(SINK_MAX_RETRIES));
builder.setBatchIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
builder.setStreamLoadProp(streamLoadProp);
return builder.build();
}
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
// either implement your custom validation logic here ...
// or use the provided helper utility
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
// validate all options
helper.validate();
// create and return dynamic table source
return new DorisDynamicTableSink(
getDorisOptions(helper.getOptions()),
getDorisReadOptions(helper.getOptions()),
getDorisExecutionOptions(helper.getOptions())
);
}
private Properties getStreamLoadProp(Map<String, String> tableOptions) {
final Properties streamLoadProp = new Properties();
for (Map.Entry<String, String> entry : tableOptions.entrySet()) {
if (entry.getKey().startsWith(STREAM_LOAD_PROP_PREFIX)) {
String subKey = entry.getKey().substring(STREAM_LOAD_PROP_PREFIX.length());
streamLoadProp.put(subKey, entry.getValue());
}
}
return streamLoadProp;
}
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
// validate all options
helper.validateExcept(STREAM_LOAD_PROP_PREFIX);
Properties streamLoadProp = getStreamLoadProp(context.getCatalogTable().getOptions());
// create and return dynamic table source
return new DorisDynamicTableSink(
getDorisOptions(helper.getOptions()),
getDorisReadOptions(helper.getOptions()),
getDorisExecutionOptions(helper.getOptions(), streamLoadProp)
);
}
}

View File

@ -63,7 +63,7 @@ public class DorisDynamicTableSink implements DynamicTableSink {
@Override
public DynamicTableSink copy() {
return new DorisDynamicTableSink(options,readOptions,executionOptions);
return new DorisDynamicTableSink(options, readOptions, executionOptions);
}
@Override

View File

@ -46,56 +46,56 @@ import java.util.List;
* where we instantiate the required {@link SourceFunction} and its {@link DeserializationSchema} for
* runtime. Both instances are parameterized to return internal data structures (i.e. {@link RowData}).
*/
public final class DorisDynamicTableSource implements ScanTableSource ,LookupTableSource {
public final class DorisDynamicTableSource implements ScanTableSource, LookupTableSource {
private final DorisOptions options;
private final DorisReadOptions readOptions;
private TableSchema physicalSchema;
private static final Logger LOG = LoggerFactory.getLogger(DorisRowDataInputFormat.class);
private final DorisOptions options;
private final DorisReadOptions readOptions;
private TableSchema physicalSchema;
private static final Logger LOG = LoggerFactory.getLogger(DorisRowDataInputFormat.class);
public DorisDynamicTableSource(DorisOptions options, DorisReadOptions readOptions,TableSchema physicalSchema) {
this.options = options;
this.readOptions = readOptions;
this.physicalSchema = physicalSchema;
}
public DorisDynamicTableSource(DorisOptions options, DorisReadOptions readOptions, TableSchema physicalSchema) {
this.options = options;
this.readOptions = readOptions;
this.physicalSchema = physicalSchema;
}
@Override
public ChangelogMode getChangelogMode() {
// in our example the format decides about the changelog mode
// but it could also be the source itself
return ChangelogMode.insertOnly();
}
@Override
public ChangelogMode getChangelogMode() {
// in our example the format decides about the changelog mode
// but it could also be the source itself
return ChangelogMode.insertOnly();
}
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
List<PartitionDefinition> dorisPartitions ;
try {
dorisPartitions = RestService.findPartitions(options,readOptions,LOG);
} catch (DorisException e) {
throw new RuntimeException("can not fetch partitions");
}
DorisRowDataInputFormat.Builder builder = DorisRowDataInputFormat.builder()
.setFenodes(options.getFenodes())
.setUsername(options.getUsername())
.setPassword(options.getPassword())
.setTableIdentifier(options.getTableIdentifier())
.setPartitions(dorisPartitions)
.setReadOptions(readOptions);
return InputFormatProvider.of(builder.build());
}
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
List<PartitionDefinition> dorisPartitions;
try {
dorisPartitions = RestService.findPartitions(options, readOptions, LOG);
} catch (DorisException e) {
throw new RuntimeException("can not fetch partitions");
}
DorisRowDataInputFormat.Builder builder = DorisRowDataInputFormat.builder()
.setFenodes(options.getFenodes())
.setUsername(options.getUsername())
.setPassword(options.getPassword())
.setTableIdentifier(options.getTableIdentifier())
.setPartitions(dorisPartitions)
.setReadOptions(readOptions);
return InputFormatProvider.of(builder.build());
}
@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) {
return null;
}
@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) {
return null;
}
@Override
public DynamicTableSource copy() {
return new DorisDynamicTableSource(options,readOptions,physicalSchema);
}
@Override
public DynamicTableSource copy() {
return new DorisDynamicTableSource(options, readOptions, physicalSchema);
}
@Override
public String asSummaryString() {
return "Doris Table Source";
}
@Override
public String asSummaryString() {
return "Doris Table Source";
}
}

View File

@ -45,183 +45,183 @@ import java.util.List;
@Internal
public class DorisRowDataInputFormat extends RichInputFormat<RowData, DorisTableInputSplit> implements ResultTypeQueryable<RowData> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(DorisRowDataInputFormat.class);
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(DorisRowDataInputFormat.class);
private DorisOptions options;
private DorisReadOptions readOptions;
private List<PartitionDefinition> dorisPartitions;
private TypeInformation<RowData> rowDataTypeInfo;
private DorisOptions options;
private DorisReadOptions readOptions;
private List<PartitionDefinition> dorisPartitions;
private TypeInformation<RowData> rowDataTypeInfo;
private ScalaValueReader scalaValueReader;
private transient boolean hasNext;
private ScalaValueReader scalaValueReader;
private transient boolean hasNext;
public DorisRowDataInputFormat(DorisOptions options,List<PartitionDefinition> dorisPartitions,DorisReadOptions readOptions) {
this.options = options;
this.dorisPartitions = dorisPartitions;
this.readOptions = readOptions;
}
public DorisRowDataInputFormat(DorisOptions options, List<PartitionDefinition> dorisPartitions, DorisReadOptions readOptions) {
this.options = options;
this.dorisPartitions = dorisPartitions;
this.readOptions = readOptions;
}
@Override
public void configure(Configuration parameters) {
//do nothing here
}
@Override
public void configure(Configuration parameters) {
//do nothing here
}
@Override
public void openInputFormat() {
//called once per inputFormat (on open)
}
@Override
public void openInputFormat() {
//called once per inputFormat (on open)
}
@Override
public void closeInputFormat() {
//called once per inputFormat (on close)
}
@Override
public void closeInputFormat() {
//called once per inputFormat (on close)
}
/**
* Connects to the source database and executes the query in a <b>parallel
* fashion</b> if
* this {@link InputFormat} is built using a parameterized query (i.e. using
* a {@link PreparedStatement})
* and a proper {@link }, in a <b>non-parallel
* fashion</b> otherwise.
*
* @param inputSplit which is ignored if this InputFormat is executed as a
* non-parallel source,
* a "hook" to the query parameters otherwise (using its
* <i>splitNumber</i>)
* @throws IOException if there's an error during the execution of the query
*/
@Override
public void open(DorisTableInputSplit inputSplit) throws IOException {
scalaValueReader = new ScalaValueReader(inputSplit.partition, options,readOptions);
hasNext = scalaValueReader.hasNext();
}
/**
* Connects to the source database and executes the query in a <b>parallel
* fashion</b> if
* this {@link InputFormat} is built using a parameterized query (i.e. using
* a {@link PreparedStatement})
* and a proper {@link }, in a <b>non-parallel
* fashion</b> otherwise.
*
* @param inputSplit which is ignored if this InputFormat is executed as a
* non-parallel source,
* a "hook" to the query parameters otherwise (using its
* <i>splitNumber</i>)
* @throws IOException if there's an error during the execution of the query
*/
@Override
public void open(DorisTableInputSplit inputSplit) throws IOException {
scalaValueReader = new ScalaValueReader(inputSplit.partition, options, readOptions);
hasNext = scalaValueReader.hasNext();
}
/**
* Closes all resources used.
*
* @throws IOException Indicates that a resource could not be closed.
*/
@Override
public void close() throws IOException {
/**
* Closes all resources used.
*
* @throws IOException Indicates that a resource could not be closed.
*/
@Override
public void close() throws IOException {
}
}
@Override
public TypeInformation<RowData> getProducedType() {
return rowDataTypeInfo;
}
@Override
public TypeInformation<RowData> getProducedType() {
return rowDataTypeInfo;
}
/**
* Checks whether all data has been read.
*
* @return boolean value indication whether all data has been read.
* @throws IOException
*/
@Override
public boolean reachedEnd() throws IOException {
return !hasNext;
}
/**
* Checks whether all data has been read.
*
* @return boolean value indication whether all data has been read.
* @throws IOException
*/
@Override
public boolean reachedEnd() throws IOException {
return !hasNext;
}
/**
* Stores the next resultSet row in a tuple.
*
* @param reuse row to be reused.
* @return row containing next {@link RowData}
* @throws IOException
*/
@Override
public RowData nextRecord(RowData reuse) throws IOException {
if (!hasNext) {
return null;
}
List next = (List)scalaValueReader.next();
GenericRowData genericRowData = new GenericRowData(next.size());
for(int i =0;i<next.size();i++){
genericRowData.setField(i, next.get(i));
}
//update hasNext after we've read the record
hasNext = scalaValueReader.hasNext();
return genericRowData;
}
/**
* Stores the next resultSet row in a tuple.
*
* @param reuse row to be reused.
* @return row containing next {@link RowData}
* @throws IOException
*/
@Override
public RowData nextRecord(RowData reuse) throws IOException {
if (!hasNext) {
return null;
}
List next = (List) scalaValueReader.next();
GenericRowData genericRowData = new GenericRowData(next.size());
for (int i = 0; i < next.size(); i++) {
genericRowData.setField(i, next.get(i));
}
//update hasNext after we've read the record
hasNext = scalaValueReader.hasNext();
return genericRowData;
}
@Override
public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
return cachedStatistics;
}
@Override
public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
return cachedStatistics;
}
@Override
public DorisTableInputSplit[] createInputSplits(int minNumSplits) throws IOException {
List<DorisTableInputSplit> dorisSplits = new ArrayList<>();
int splitNum = 0;
for (PartitionDefinition partition : dorisPartitions) {
dorisSplits.add(new DorisTableInputSplit(splitNum++,partition));
}
LOG.info("DorisTableInputSplit Num:{}",dorisSplits.size());
return dorisSplits.toArray(new DorisTableInputSplit[0]);
}
@Override
public DorisTableInputSplit[] createInputSplits(int minNumSplits) throws IOException {
List<DorisTableInputSplit> dorisSplits = new ArrayList<>();
int splitNum = 0;
for (PartitionDefinition partition : dorisPartitions) {
dorisSplits.add(new DorisTableInputSplit(splitNum++, partition));
}
LOG.info("DorisTableInputSplit Num:{}", dorisSplits.size());
return dorisSplits.toArray(new DorisTableInputSplit[0]);
}
@Override
public InputSplitAssigner getInputSplitAssigner(DorisTableInputSplit[] inputSplits) {
return new DefaultInputSplitAssigner(inputSplits);
}
@Override
public InputSplitAssigner getInputSplitAssigner(DorisTableInputSplit[] inputSplits) {
return new DefaultInputSplitAssigner(inputSplits);
}
/**
* A builder used to set parameters to the output format's configuration in a fluent way.
*
* @return builder
*/
public static Builder builder() {
return new Builder();
}
/**
* A builder used to set parameters to the output format's configuration in a fluent way.
*
* @return builder
*/
public static Builder builder() {
return new Builder();
}
/**
* Builder for {@link DorisRowDataInputFormat}.
*/
public static class Builder {
private DorisOptions.Builder optionsBuilder;
private List<PartitionDefinition> partitions;
private DorisReadOptions readOptions;
/**
* Builder for {@link DorisRowDataInputFormat}.
*/
public static class Builder {
private DorisOptions.Builder optionsBuilder;
private List<PartitionDefinition> partitions;
private DorisReadOptions readOptions;
public Builder() {
this.optionsBuilder = DorisOptions.builder();
}
public Builder() {
this.optionsBuilder = DorisOptions.builder();
}
public Builder setFenodes(String fenodes) {
this.optionsBuilder.setFenodes(fenodes);
return this;
}
public Builder setFenodes(String fenodes) {
this.optionsBuilder.setFenodes(fenodes);
return this;
}
public Builder setUsername(String username) {
this.optionsBuilder.setUsername(username);
return this;
}
public Builder setUsername(String username) {
this.optionsBuilder.setUsername(username);
return this;
}
public Builder setPassword(String password) {
this.optionsBuilder.setPassword(password);
return this;
}
public Builder setPassword(String password) {
this.optionsBuilder.setPassword(password);
return this;
}
public Builder setTableIdentifier(String tableIdentifier) {
this.optionsBuilder.setTableIdentifier(tableIdentifier);
return this;
}
public Builder setTableIdentifier(String tableIdentifier) {
this.optionsBuilder.setTableIdentifier(tableIdentifier);
return this;
}
public Builder setPartitions(List<PartitionDefinition> partitions) {
this.partitions = partitions;
return this;
}
public Builder setPartitions(List<PartitionDefinition> partitions) {
this.partitions = partitions;
return this;
}
public Builder setReadOptions(DorisReadOptions readOptions) {
this.readOptions = readOptions;
return this;
}
public Builder setReadOptions(DorisReadOptions readOptions) {
this.readOptions = readOptions;
return this;
}
public DorisRowDataInputFormat build() {
return new DorisRowDataInputFormat(
optionsBuilder.build(),partitions,readOptions
);
}
}
public DorisRowDataInputFormat build() {
return new DorisRowDataInputFormat(
optionsBuilder.build(), partitions, readOptions
);
}
}
}

View File

@ -17,6 +17,7 @@
package org.apache.doris.flink.table;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.exception.StreamLoadException;
import org.apache.doris.flink.rest.models.RespContent;
import org.slf4j.Logger;
@ -31,17 +32,21 @@ import java.io.Serializable;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
/**
* DorisStreamLoad
**/
public class DorisStreamLoad implements Serializable{
public class DorisStreamLoad implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class);
@ -54,8 +59,9 @@ public class DorisStreamLoad implements Serializable{
private String db;
private String tbl;
private String authEncoding;
private Properties streamLoadProp;
public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) {
public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd, Properties streamLoadProp) {
this.hostPort = hostPort;
this.db = db;
this.tbl = tbl;
@ -63,6 +69,7 @@ public class DorisStreamLoad implements Serializable{
this.passwd = passwd;
this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
this.streamLoadProp = streamLoadProp;
}
public String getLoadUrlStr() {
@ -89,6 +96,9 @@ public class DorisStreamLoad implements Serializable{
conn.addRequestProperty("Expect", "100-continue");
conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
conn.addRequestProperty("label", label);
for (Map.Entry<Object, Object> entry : streamLoadProp.entrySet()) {
conn.addRequestProperty(String.valueOf(entry.getKey()), String.valueOf(entry.getValue()));
}
conn.setDoOutput(true);
conn.setDoInput(true);
return conn;
@ -104,6 +114,7 @@ public class DorisStreamLoad implements Serializable{
this.respMsg = respMsg;
this.respContent = respContent;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@ -116,14 +127,14 @@ public class DorisStreamLoad implements Serializable{
public void load(String value) throws StreamLoadException {
LoadResponse loadResponse = loadBatch(value);
LOG.info("Streamload Response:{}",loadResponse);
if(loadResponse.status != 200){
LOG.info("Streamload Response:{}", loadResponse);
if (loadResponse.status != 200) {
throw new StreamLoadException("stream load error: " + loadResponse.respContent);
}else{
} else {
ObjectMapper obj = new ObjectMapper();
try {
RespContent respContent = obj.readValue(loadResponse.respContent, RespContent.class);
if(!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())){
if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
throw new StreamLoadException("stream load error: " + respContent.getMessage());
}
} catch (IOException e) {
@ -133,11 +144,13 @@ public class DorisStreamLoad implements Serializable{
}
private LoadResponse loadBatch(String value) {
Calendar calendar = Calendar.getInstance();
String label = String.format("flink_connector_%s%02d%02d_%02d%02d%02d_%s",
calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH),
calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
UUID.randomUUID().toString().replaceAll("-", ""));
String label = streamLoadProp.getProperty("label");
if (StringUtils.isBlank(label)) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd_HHmmss");
String formatDate = sdf.format(new Date());
label = String.format("flink_connector_%s_%s",formatDate,
UUID.randomUUID().toString().replaceAll("-", ""));
}
HttpURLConnection feConn = null;
HttpURLConnection beConn = null;

View File

@ -22,14 +22,16 @@ import org.apache.flink.core.io.InputSplit;
/**
* DorisTableInputSplit
**/
public class DorisTableInputSplit implements InputSplit, java.io.Serializable{
public class DorisTableInputSplit implements InputSplit, java.io.Serializable {
/** The number of the split. */
/**
* The number of the split.
*/
private final int splitNumber;
protected final PartitionDefinition partition;
public DorisTableInputSplit(int splitNumber,PartitionDefinition partition) {
public DorisTableInputSplit(int splitNumber, PartitionDefinition partition) {
super();
this.splitNumber = splitNumber;
this.partition = partition;