[FlinkConnector] Support time interval for flink connector (#5934)

This commit is contained in:
wudi
2021-06-30 09:27:12 +08:00
committed by GitHub
parent a475d3e3a4
commit 28e7d01ef7
6 changed files with 84 additions and 14 deletions

View File

@ -130,6 +130,7 @@ INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source
| doris.filter.query | -- | Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering. |
| sink.batch.size | 100 | Maximum number of lines in a single write BE |
| sink.max-retries | 1 | Number of retries after writing BE failed |
| sink.batch.interval | 1s | The flush interval, after which the asynchronous thread will write the data in the cache to BE. The default value is 1 second, and the time units are ms, s, min, h, and d. Set to 0 to turn off periodic writing. |
## Doris & Flink Column Type Mapping

View File

@ -134,6 +134,7 @@ INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source
| doris.filter.query | -- | 过滤读取数据的表达式,此表达式透传给Doris。Doris使用此表达式完成源端数据过滤。 |
| sink.batch.size | 100 | 单次写BE的最大行数 |
| sink.max-retries | 1 | 写BE失败之后的重试次数 |
| sink.batch.interval | 1s | flush 间隔时间,超过该时间后异步线程将 缓存中数据写入BE。 默认值为1秒,支持时间单位ms、s、min、h和d。设置为0表示关闭定期写入。|

View File

@ -17,7 +17,10 @@
package org.apache.doris.flink.cfg;
import org.apache.flink.util.Preconditions;
import java.io.Serializable;
import java.time.Duration;
/**
* JDBC sink batch options.
@ -27,10 +30,13 @@ public class DorisExecutionOptions implements Serializable {
private final Integer batchSize;
private final Integer maxRetries;
private final Long batchIntervalMs;
public DorisExecutionOptions(Integer batchSize, Integer maxRetries) {
public DorisExecutionOptions(Integer batchSize, Integer maxRetries,Long batchIntervalMs) {
Preconditions.checkArgument(maxRetries >= 0);
this.batchSize = batchSize;
this.maxRetries = maxRetries;
this.batchIntervalMs = batchIntervalMs;
}
public Integer getBatchSize() {
@ -41,6 +47,10 @@ public class DorisExecutionOptions implements Serializable {
return maxRetries;
}
public Long getBatchIntervalMs() {
return batchIntervalMs;
}
public static Builder builder() {
return new Builder();
}
@ -51,6 +61,7 @@ public class DorisExecutionOptions implements Serializable {
public static class Builder {
private Integer batchSize;
private Integer maxRetries;
private Long batchIntervalMs;
public Builder setBatchSize(Integer batchSize) {
this.batchSize = batchSize;
@ -62,8 +73,13 @@ public class DorisExecutionOptions implements Serializable {
return this;
}
public Builder setBatchIntervalMs(Long batchIntervalMs) {
this.batchIntervalMs = batchIntervalMs;
return this;
}
public DorisExecutionOptions build() {
return new DorisExecutionOptions(batchSize,maxRetries);
return new DorisExecutionOptions(batchSize,maxRetries,batchIntervalMs);
}
}

View File

@ -24,15 +24,21 @@ import org.apache.doris.flink.exception.StreamLoadException;
import org.apache.doris.flink.rest.RestService;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.StringJoiner;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
@ -51,6 +57,10 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
private final List<String> batch = new ArrayList<>();
private transient volatile boolean closed = false;
private transient ScheduledExecutorService scheduler;
private transient ScheduledFuture<?> scheduledFuture;
private transient volatile Exception flushException;
public DorisDynamicOutputFormat(DorisOptions option,DorisReadOptions readOptions,DorisExecutionOptions executionOptions) {
this.options = option;
this.readOptions = readOptions;
@ -71,10 +81,33 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
options.getUsername(),
options.getPassword());
LOG.info("Streamload BE:{}",dorisStreamLoad.getLoadUrlStr());
if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
this.scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("doris-streamload-output-format"));
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
synchronized (DorisDynamicOutputFormat.this) {
if (!closed) {
try {
flush();
} catch (Exception e) {
flushException = e;
}
}
}
}, executionOptions.getBatchIntervalMs(), executionOptions.getBatchIntervalMs(), TimeUnit.MILLISECONDS);
}
}
private void checkFlushException() {
if (flushException != null) {
throw new RuntimeException("Writing records to streamload failed.", flushException);
}
}
@Override
public void writeRecord(RowData row) throws IOException {
public synchronized void writeRecord(RowData row) throws IOException {
checkFlushException();
addBatch(row);
if (executionOptions.getBatchSize() > 0 && batch.size() >= executionOptions.getBatchSize()) {
flush();
@ -91,22 +124,30 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
}
@Override
public void close() throws IOException {
public synchronized void close() throws IOException {
if (!closed) {
closed = true;
if (batch.size() > 0) {
try {
flush();
} catch (Exception e) {
LOG.warn("Writing records to doris failed.", e);
throw new RuntimeException("Writing records to doris failed.", e);
}
if (this.scheduledFuture != null) {
scheduledFuture.cancel(false);
this.scheduler.shutdown();
}
try {
flush();
} catch (Exception e) {
LOG.warn("Writing records to doris failed.", e);
throw new RuntimeException("Writing records to doris failed.", e);
}
}
checkFlushException();
}
public void flush() throws IOException {
public synchronized void flush() throws IOException {
checkFlushException();
if(batch.isEmpty()){
return;
}
for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
try {
dorisStreamLoad.load(String.join(lineDelimiter,batch));
@ -129,6 +170,7 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
}
}
private String getBackend() throws IOException{
try {
//get be url from fe

View File

@ -32,6 +32,7 @@ import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.TableSchemaUtils;
import java.time.Duration;
import java.util.HashSet;
import java.util.Set;
@ -140,6 +141,13 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactor
.defaultValue(3)
.withDescription("the max retry times if writing records to database failed.");
private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions
.key("sink.batch.interval")
.durationType()
.defaultValue(Duration.ofSeconds(1))
.withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The " +
"default value is 1s.");
@Override
public String factoryIdentifier() {
@ -176,6 +184,7 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactor
options.add(SINK_BUFFER_FLUSH_MAX_ROWS);
options.add(SINK_MAX_RETRIES);
options.add(SINK_BUFFER_FLUSH_INTERVAL);
return options;
}
@ -229,6 +238,7 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactor
final DorisExecutionOptions.Builder builder = DorisExecutionOptions.builder();
builder.setBatchSize(readableConfig.get(SINK_BUFFER_FLUSH_MAX_ROWS));
builder.setMaxRetries(readableConfig.get(SINK_MAX_RETRIES));
builder.setBatchIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
return builder.build();
}

View File

@ -134,7 +134,7 @@ public class DorisStreamLoad implements Serializable{
private LoadResponse loadBatch(String value) {
Calendar calendar = Calendar.getInstance();
String label = String.format("audit_%s%02d%02d_%02d%02d%02d_%s",
String label = String.format("flink_connector_%s%02d%02d_%02d%02d%02d_%s",
calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH),
calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
UUID.randomUUID().toString().replaceAll("-", ""));