From 28e7d01ef74ac1842e8daf404618314aa60b2363 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Wed, 30 Jun 2021 09:27:12 +0800 Subject: [PATCH] [FlinkConnector] Support time interval for flink connector (#5934) --- .../extending-doris/flink-doris-connector.md | 1 + .../extending-doris/flink-doris-connector.md | 1 + .../flink/cfg/DorisExecutionOptions.java | 20 +++++- .../flink/table/DorisDynamicOutputFormat.java | 64 +++++++++++++++---- .../flink/table/DorisDynamicTableFactory.java | 10 +++ .../doris/flink/table/DorisStreamLoad.java | 2 +- 6 files changed, 84 insertions(+), 14 deletions(-) diff --git a/docs/en/extending-doris/flink-doris-connector.md b/docs/en/extending-doris/flink-doris-connector.md index dcba028e7c..39bbe41a76 100644 --- a/docs/en/extending-doris/flink-doris-connector.md +++ b/docs/en/extending-doris/flink-doris-connector.md @@ -130,6 +130,7 @@ INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source | doris.filter.query | -- | Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering. | | sink.batch.size | 100 | Maximum number of lines in a single write BE | | sink.max-retries | 1 | Number of retries after writing BE failed | +| sink.batch.interval | 1s | The flush interval, after which the asynchronous thread will write the data in the cache to BE. The default value is 1 second, and the time units are ms, s, min, h, and d. Set to 0 to turn off periodic writing. | ## Doris & Flink Column Type Mapping diff --git a/docs/zh-CN/extending-doris/flink-doris-connector.md b/docs/zh-CN/extending-doris/flink-doris-connector.md index 5cebf1a771..22363db04d 100644 --- a/docs/zh-CN/extending-doris/flink-doris-connector.md +++ b/docs/zh-CN/extending-doris/flink-doris-connector.md @@ -134,6 +134,7 @@ INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source | doris.filter.query | -- | 过滤读取数据的表达式,此表达式透传给Doris。Doris使用此表达式完成源端数据过滤。 | | sink.batch.size | 100 | 单次写BE的最大行数 | | sink.max-retries | 1 | 写BE失败之后的重试次数 | +| sink.batch.interval | 1s | flush 间隔时间,超过该时间后异步线程将 缓存中数据写入BE。 默认值为1秒,支持时间单位ms、s、min、h和d。设置为0表示关闭定期写入。| diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java index ee8b09e4b2..330cbc9be4 100644 --- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java +++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java @@ -17,7 +17,10 @@ package org.apache.doris.flink.cfg; +import org.apache.flink.util.Preconditions; + import java.io.Serializable; +import java.time.Duration; /** * JDBC sink batch options. @@ -27,10 +30,13 @@ public class DorisExecutionOptions implements Serializable { private final Integer batchSize; private final Integer maxRetries; + private final Long batchIntervalMs; - public DorisExecutionOptions(Integer batchSize, Integer maxRetries) { + public DorisExecutionOptions(Integer batchSize, Integer maxRetries,Long batchIntervalMs) { + Preconditions.checkArgument(maxRetries >= 0); this.batchSize = batchSize; this.maxRetries = maxRetries; + this.batchIntervalMs = batchIntervalMs; } public Integer getBatchSize() { @@ -41,6 +47,10 @@ public class DorisExecutionOptions implements Serializable { return maxRetries; } + public Long getBatchIntervalMs() { + return batchIntervalMs; + } + public static Builder builder() { return new Builder(); } @@ -51,6 +61,7 @@ public class DorisExecutionOptions implements Serializable { public static class Builder { private Integer batchSize; private Integer maxRetries; + private Long batchIntervalMs; public Builder setBatchSize(Integer batchSize) { this.batchSize = batchSize; @@ -62,8 +73,13 @@ public class DorisExecutionOptions implements Serializable { return this; } + public Builder setBatchIntervalMs(Long batchIntervalMs) { + this.batchIntervalMs = batchIntervalMs; + return this; + } + public DorisExecutionOptions build() { - return new DorisExecutionOptions(batchSize,maxRetries); + return new DorisExecutionOptions(batchSize,maxRetries,batchIntervalMs); } } diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java index 44880b555c..4b2f5fea36 100644 --- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java +++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java @@ -24,15 +24,21 @@ import org.apache.doris.flink.exception.StreamLoadException; import org.apache.doris.flink.rest.RestService; import org.apache.flink.api.common.io.RichOutputFormat; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import java.util.StringJoiner; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; /** @@ -51,6 +57,10 @@ public class DorisDynamicOutputFormat extends RichOutputFormat { private final List batch = new ArrayList<>(); private transient volatile boolean closed = false; + private transient ScheduledExecutorService scheduler; + private transient ScheduledFuture scheduledFuture; + private transient volatile Exception flushException; + public DorisDynamicOutputFormat(DorisOptions option,DorisReadOptions readOptions,DorisExecutionOptions executionOptions) { this.options = option; this.readOptions = readOptions; @@ -71,10 +81,33 @@ public class DorisDynamicOutputFormat extends RichOutputFormat { options.getUsername(), options.getPassword()); LOG.info("Streamload BE:{}",dorisStreamLoad.getLoadUrlStr()); + + if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) { + this.scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("doris-streamload-output-format")); + this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> { + synchronized (DorisDynamicOutputFormat.this) { + if (!closed) { + try { + flush(); + } catch (Exception e) { + flushException = e; + } + } + } + }, executionOptions.getBatchIntervalMs(), executionOptions.getBatchIntervalMs(), TimeUnit.MILLISECONDS); + } + } + + private void checkFlushException() { + if (flushException != null) { + throw new RuntimeException("Writing records to streamload failed.", flushException); + } } @Override - public void writeRecord(RowData row) throws IOException { + public synchronized void writeRecord(RowData row) throws IOException { + checkFlushException(); + addBatch(row); if (executionOptions.getBatchSize() > 0 && batch.size() >= executionOptions.getBatchSize()) { flush(); @@ -91,22 +124,30 @@ public class DorisDynamicOutputFormat extends RichOutputFormat { } @Override - public void close() throws IOException { + public synchronized void close() throws IOException { if (!closed) { closed = true; - if (batch.size() > 0) { - try { - flush(); - } catch (Exception e) { - LOG.warn("Writing records to doris failed.", e); - throw new RuntimeException("Writing records to doris failed.", e); - } + + if (this.scheduledFuture != null) { + scheduledFuture.cancel(false); + this.scheduler.shutdown(); + } + + try { + flush(); + } catch (Exception e) { + LOG.warn("Writing records to doris failed.", e); + throw new RuntimeException("Writing records to doris failed.", e); } } + checkFlushException(); } - - public void flush() throws IOException { + public synchronized void flush() throws IOException { + checkFlushException(); + if(batch.isEmpty()){ + return; + } for (int i = 0; i <= executionOptions.getMaxRetries(); i++) { try { dorisStreamLoad.load(String.join(lineDelimiter,batch)); @@ -129,6 +170,7 @@ public class DorisDynamicOutputFormat extends RichOutputFormat { } } + private String getBackend() throws IOException{ try { //get be url from fe diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java index aecda37d5e..27b6f976a3 100644 --- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java +++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java @@ -32,6 +32,7 @@ import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.types.DataType; import org.apache.flink.table.utils.TableSchemaUtils; +import java.time.Duration; import java.util.HashSet; import java.util.Set; @@ -140,6 +141,13 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactor .defaultValue(3) .withDescription("the max retry times if writing records to database failed."); + private static final ConfigOption SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions + .key("sink.batch.interval") + .durationType() + .defaultValue(Duration.ofSeconds(1)) + .withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The " + + "default value is 1s."); + @Override public String factoryIdentifier() { @@ -176,6 +184,7 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactor options.add(SINK_BUFFER_FLUSH_MAX_ROWS); options.add(SINK_MAX_RETRIES); + options.add(SINK_BUFFER_FLUSH_INTERVAL); return options; } @@ -229,6 +238,7 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactor final DorisExecutionOptions.Builder builder = DorisExecutionOptions.builder(); builder.setBatchSize(readableConfig.get(SINK_BUFFER_FLUSH_MAX_ROWS)); builder.setMaxRetries(readableConfig.get(SINK_MAX_RETRIES)); + builder.setBatchIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis()); return builder.build(); } diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java index fa001a6038..ef16f33c46 100644 --- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java +++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java @@ -134,7 +134,7 @@ public class DorisStreamLoad implements Serializable{ private LoadResponse loadBatch(String value) { Calendar calendar = Calendar.getInstance(); - String label = String.format("audit_%s%02d%02d_%02d%02d%02d_%s", + String label = String.format("flink_connector_%s%02d%02d_%02d%02d%02d_%s", calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH), calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND), UUID.randomUUID().toString().replaceAll("-", ""));