support use char like \x01 in flink-doris-sink column & line delimiter (#6937)
* support use char like \x01 in flink-doris-sink column & line delimiter * extend imports * add docs
This commit is contained in:
@ -257,7 +257,7 @@ outputFormat.close();
|
||||
| sink.batch.size | 100 | Maximum number of lines in a single write BE |
|
||||
| sink.max-retries | 1 | Number of retries after writing BE failed |
|
||||
| sink.batch.interval | 1s | The flush interval, after which the asynchronous thread will write the data in the cache to BE. The default value is 1 second, and the time units are ms, s, min, h, and d. Set to 0 to turn off periodic writing. |
|
||||
| sink.properties.* | -- | The stream load parameters.eg:sink.properties.column_separator' = ','.<br /> Support JSON format import, you need to enable both 'sink.properties.format' ='json' and 'sink.properties.strip_outer_array' ='true'|
|
||||
| sink.properties.* | -- | The stream load parameters.eg:sink.properties.column_separator' = ','. Setting 'sink.properties.escape_delimiters' = 'true' if you want to use a control char as a separator, so that such as '\\x01' will translate to binary 0x01<br /> Support JSON format import, you need to enable both 'sink.properties.format' ='json' and 'sink.properties.strip_outer_array' ='true'|
|
||||
|
||||
|
||||
## Doris & Flink Column Type Mapping
|
||||
|
||||
@ -260,7 +260,7 @@ outputFormat.close();
|
||||
| sink.batch.size | 100 | 单次写BE的最大行数 |
|
||||
| sink.max-retries | 1 | 写BE失败之后的重试次数 |
|
||||
| sink.batch.interval | 1s | flush 间隔时间,超过该时间后异步线程将 缓存中数据写入BE。 默认值为1秒,支持时间单位ms、s、min、h和d。设置为0表示关闭定期写入。|
|
||||
| sink.properties.* | -- | Stream load 的导入参数。例如:'sink.properties.column_separator' = ','等。<br /> 支持JSON格式导入,需要同时开启'sink.properties.format' = 'json'和'sink.properties.strip_outer_array' = 'true' |
|
||||
| sink.properties.* | -- | Stream load 的导入参数。例如:'sink.properties.column_separator' = ','等。如果需要特殊字符作为分隔符, 可以加上参数'sink.properties.escape_delimiters' = 'true', '\\x01'会被转换为二进制的0x01<br /> 支持JSON格式导入,需要同时开启'sink.properties.format' = 'json'和'sink.properties.strip_outer_array' = 'true' |
|
||||
|
||||
|
||||
|
||||
|
||||
@ -93,4 +93,8 @@ public class RespContent {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public String getErrorURL() {
|
||||
return ErrorURL;
|
||||
}
|
||||
}
|
||||
|
||||
@ -38,11 +38,14 @@ import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.StringJoiner;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import static org.apache.flink.table.data.RowData.createFieldGetter;
|
||||
|
||||
@ -62,9 +65,11 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> {
|
||||
private static final String FORMAT_KEY = "format";
|
||||
private static final String FORMAT_JSON_VALUE = "json";
|
||||
private static final String NULL_VALUE = "\\N";
|
||||
private static final String ESCAPE_DELIMITERS_KEY = "escape_delimiters";
|
||||
private static final String ESCAPE_DELIMITERS_DEFAULT = "false";
|
||||
|
||||
private final String fieldDelimiter;
|
||||
private final String lineDelimiter;
|
||||
private String fieldDelimiter;
|
||||
private String lineDelimiter;
|
||||
private final String[] fieldNames;
|
||||
private final boolean jsonFormat;
|
||||
private DorisOptions options;
|
||||
@ -88,10 +93,26 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> {
|
||||
this.options = option;
|
||||
this.readOptions = readOptions;
|
||||
this.executionOptions = executionOptions;
|
||||
this.fieldDelimiter = executionOptions.getStreamLoadProp().getProperty(FIELD_DELIMITER_KEY,
|
||||
FIELD_DELIMITER_DEFAULT);
|
||||
this.lineDelimiter = executionOptions.getStreamLoadProp().getProperty(LINE_DELIMITER_KEY,
|
||||
LINE_DELIMITER_DEFAULT);
|
||||
|
||||
Properties streamLoadProp=executionOptions.getStreamLoadProp();
|
||||
|
||||
boolean ifEscape = Boolean.parseBoolean(streamLoadProp.getProperty(ESCAPE_DELIMITERS_KEY, ESCAPE_DELIMITERS_DEFAULT));
|
||||
if (ifEscape) {
|
||||
this.fieldDelimiter = escapeString(streamLoadProp.getProperty(FIELD_DELIMITER_KEY,
|
||||
FIELD_DELIMITER_DEFAULT));
|
||||
this.lineDelimiter = escapeString(streamLoadProp.getProperty(LINE_DELIMITER_KEY,
|
||||
LINE_DELIMITER_DEFAULT));
|
||||
|
||||
if (streamLoadProp.contains(ESCAPE_DELIMITERS_KEY)) {
|
||||
streamLoadProp.remove(ESCAPE_DELIMITERS_KEY);
|
||||
}
|
||||
} else {
|
||||
this.fieldDelimiter = streamLoadProp.getProperty(FIELD_DELIMITER_KEY,
|
||||
FIELD_DELIMITER_DEFAULT);
|
||||
this.lineDelimiter = streamLoadProp.getProperty(LINE_DELIMITER_KEY,
|
||||
LINE_DELIMITER_DEFAULT);
|
||||
}
|
||||
|
||||
this.fieldNames = fieldNames;
|
||||
this.jsonFormat = FORMAT_JSON_VALUE.equals(executionOptions.getStreamLoadProp().getProperty(FORMAT_KEY));
|
||||
this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
|
||||
@ -100,6 +121,17 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> {
|
||||
}
|
||||
}
|
||||
|
||||
private String escapeString( String s) {
|
||||
Pattern p = Pattern.compile("\\\\x(\\d{2})");
|
||||
Matcher m = p.matcher(s);
|
||||
|
||||
StringBuffer buf = new StringBuffer();
|
||||
while (m.find()) {
|
||||
m.appendReplacement(buf, String.format("%s", (char) Integer.parseInt(m.group(1))));
|
||||
}
|
||||
m.appendTail(buf);
|
||||
return buf.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(Configuration configuration) {
|
||||
|
||||
@ -94,7 +94,8 @@ public class DorisStreamLoad implements Serializable {
|
||||
try {
|
||||
RespContent respContent = OBJECT_MAPPER.readValue(loadResponse.respContent, RespContent.class);
|
||||
if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
|
||||
throw new StreamLoadException("stream load error: " + respContent.getMessage());
|
||||
String errMsg=String.format("stream load error: %s, see more in %s",respContent.getMessage(),respContent.getErrorURL());
|
||||
throw new StreamLoadException(errMsg);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new StreamLoadException(e);
|
||||
|
||||
Reference in New Issue
Block a user