[DataX] doriswriter support csv (#6373)

make doriswriter of DataX support format csv.  Format csv is more simple and faster than
format json when data is simple

add property format: csv/json
add property column_separator: effect when format is csv, for example "\x01" , "^", etc...
This commit is contained in:
wunan1210
2021-08-10 10:14:21 +08:00
committed by GitHub
parent 35c8b6a0bf
commit 929b33ac0a
7 changed files with 205 additions and 58 deletions

View File

@ -160,9 +160,21 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter
* **lineDelimiter**
- 描述:每批次数据包含多行,每行为 Json 格式,每行的的分隔符即为 lineDelimiter。
- 描述:每批次数据包含多行,每行为 Json 格式,每行的的分隔符即为 lineDelimiter。支持多个字节, 例如'\x02\x03'。
- 必选:否
- 默认值:`\n`
* **format**
- 描述:导入数据的格式, 可以使是json或者csv。
- 必选:否
- 默认值:`json`
* **columnSeparator**
- 描述:当导入的格式是csv时, 字段之间的分隔符。支持多个字节, 例如'\x01\x02'。
- 必选:否
- 默认值:`\t`
* **loadProps**
@ -170,3 +182,8 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter
- 必选:否
- 默认值:无
* **connectTimeout**
- 描述:StreamLoad单次请求的超时时间, 单位毫秒(ms)。
- 必选:否
- 默认值:-1

View File

@ -0,0 +1,58 @@
package com.alibaba.datax.plugin.writer.doriswriter;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.DateColumn;
import com.alibaba.datax.common.element.Record;
import org.apache.commons.lang3.time.DateFormatUtils;
import java.util.List;
import java.util.TimeZone;
public abstract class DorisCodec {
protected static String timeZone = "GMT+8";
protected static TimeZone timeZoner = TimeZone.getTimeZone(timeZone);
protected final List<String> fieldNames;
public DorisCodec(final List<String> fieldNames) {
this.fieldNames = fieldNames;
}
public abstract String serialize(Record row);
/**
* convert datax internal data to string
*
* @param col
* @return
*/
protected Object convertColumn(final Column col) {
if (null == col.getRawData()) {
return null;
}
Column.Type type = col.getType();
switch (type) {
case BOOL:
case INT:
case LONG:
return col.asLong();
case DOUBLE:
return col.asDouble();
case STRING:
return col.asString();
case DATE: {
final DateColumn.DateType dateType = ((DateColumn) col).getSubType();
switch (dateType) {
case DATE:
return DateFormatUtils.format(col.asDate(), "yyyy-MM-dd", timeZoner);
case DATETIME:
return DateFormatUtils.format(col.asDate(), "yyyy-MM-dd HH:mm:ss", timeZoner);
default:
return col.asString();
}
}
default:
// BAD, NULL, BYTES
return null;
}
}
}

View File

@ -0,0 +1,52 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
*/
package com.alibaba.datax.plugin.writer.doriswriter;
import com.alibaba.datax.common.element.Record;
import java.util.ArrayList;
import java.util.List;
// Convert DataX data to csv
public class DorisCsvCodec extends DorisCodec {
private final String columnSeparator;
public DorisCsvCodec(final List<String> fieldNames, String columnSeparator) {
super(fieldNames);
this.columnSeparator = columnSeparator;
}
@Override
public String serialize(final Record row) {
if (null == this.fieldNames) {
return "";
}
List<String> list = new ArrayList<>();
for (int i = 0; i < this.fieldNames.size(); i++) {
Object value = this.convertColumn(row.getColumn(i));
list.add(value != null ? value.toString() : "\\N");
}
return String.join(columnSeparator, list);
}
}

View File

@ -19,29 +19,21 @@
*/
package com.alibaba.datax.plugin.writer.doriswriter;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.DateColumn;
import com.alibaba.datax.common.element.Record;
import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.time.DateFormatUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
// Convert DataX data to json
public class DorisJsonCodec {
private static String timeZone = "GMT+8";
private static TimeZone timeZoner = TimeZone.getTimeZone(timeZone);
private final List<String> fieldNames;
public class DorisJsonCodec extends DorisCodec {
public DorisJsonCodec(final List<String> fieldNames) {
this.fieldNames = fieldNames;
super(fieldNames);
}
@Override
public String serialize(final Record row) {
if (null == this.fieldNames) {
return "";
@ -55,40 +47,4 @@ public class DorisJsonCodec {
return JSON.toJSONString(rowMap);
}
/**
* convert datax internal data to string
*
* @param col
* @return
*/
private Object convertColumn(final Column col) {
if (null == col.getRawData()) {
return null;
}
Column.Type type = col.getType();
switch (type) {
case BOOL:
case INT:
case LONG:
return col.asLong();
case DOUBLE:
return col.asDouble();
case STRING:
return col.asString();
case DATE: {
final DateColumn.DateType dateType = ((DateColumn) col).getSubType();
switch (dateType) {
case DATE:
return DateFormatUtils.format(col.asDate(), "yyyy-MM-dd", timeZoner);
case DATETIME:
return DateFormatUtils.format(col.asDate(), "yyyy-MM-dd HH:mm:ss", timeZoner);
default:
return col.asString();
}
}
default:
// BAD, NULL, BYTES
return null;
}
}
}

View File

@ -53,7 +53,7 @@ public class DorisWriter extends Writer {
private DorisWriterEmitter dorisWriterEmitter;
private Key keys;
private DorisJsonCodec rowCodec;
private DorisCodec rowCodec;
private int batchNum = 0;
public Task() {
@ -62,7 +62,11 @@ public class DorisWriter extends Writer {
@Override
public void init() {
this.keys = new Key(super.getPluginJobConf());
this.rowCodec = new DorisJsonCodec(this.keys.getColumns());
if("csv".equalsIgnoreCase(this.keys.getFormat())){
this.rowCodec = new DorisCsvCodec(this.keys.getColumns(),this.keys.getColumnSeparator());
}else{
this.rowCodec = new DorisJsonCodec(this.keys.getColumns());
}
this.dorisWriterEmitter = new DorisWriterEmitter(keys);
}
@ -84,11 +88,12 @@ public class DorisWriter extends Writer {
if (record.getColumnNumber() != this.keys.getColumns().size()) {
throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
String.format("config writer column info error. because the column number of reader is :%s" +
"and the column number of writer is:%s. please check you datax job config json.",
"and the column number of writer is:%s. please check you datax job config json.",
record.getColumnNumber(), this.keys.getColumns().size()));
}
// codec record
final String recordStr = this.rowCodec.serialize(record);
// put into buffer
flushBatch.putData(recordStr);
batchCount += 1;

View File

@ -33,6 +33,7 @@ import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.ProtocolException;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpHead;
@ -64,9 +65,15 @@ public class DorisWriterEmitter {
private int hostPos = 0;
private List<String> targetHosts = Lists.newArrayList();
private RequestConfig requestConfig;
public DorisWriterEmitter(final Key keys) {
this.keys = keys;
initHostList();
initRequestConfig();
}
private void initRequestConfig(){
requestConfig=RequestConfig.custom().setConnectTimeout(this.keys.getConnectTimeout()).build();
}
// get target host from config
@ -110,6 +117,7 @@ public class DorisWriterEmitter {
/**
* loop to get target host
*
* @return
*/
private String getAvailableHost() {
@ -162,7 +170,7 @@ public class DorisWriterEmitter {
return new HttpGet(uri);
} else {
int status = response.getStatusLine().getStatusCode();
return (HttpUriRequest)(status == 307 ? RequestBuilder.copy(request).setUri(uri).build() : new HttpGet(uri));
return (HttpUriRequest) (status == 307 ? RequestBuilder.copy(request).setUri(uri).build() : new HttpGet(uri));
}
}
});
@ -186,12 +194,21 @@ public class DorisWriterEmitter {
httpPut.setHeader(HttpHeaders.EXPECT, "100-continue");
httpPut.setHeader(HttpHeaders.AUTHORIZATION, this.getBasicAuthHeader(this.keys.getUsername(), this.keys.getPassword()));
httpPut.setHeader("label", flushBatch.getLabel());
httpPut.setHeader("format", "json");
httpPut.setHeader("read_json_by_line", "true");
httpPut.setHeader("fuzzy_parse", "true");
httpPut.setHeader("format", this.keys.getFormat());
httpPut.setHeader("line_delimiter", this.keys.getLineDelimiterDesc());
if ("csv".equalsIgnoreCase(this.keys.getFormat())) {
httpPut.setHeader("column_separator", this.keys.getColumnSeparatorDesc());
} else {
httpPut.setHeader("read_json_by_line", "true");
httpPut.setHeader("fuzzy_parse", "true");
}
// Use ByteArrayEntity instead of StringEntity to handle Chinese correctly
httpPut.setEntity(new ByteArrayEntity(flushBatch.getData().toString().getBytes()));
httpPut.setConfig(requestConfig);
try (final CloseableHttpResponse resp = httpclient.execute(httpPut)) {
final int code = resp.getStatusLine().getStatusCode();
if (HttpStatus.SC_OK != code) {

View File

@ -48,15 +48,25 @@ public class Key implements Serializable {
public static final String MAX_BATCH_BYTE_SIZE = "maxBatchByteSize";
public static final String LABEL_PREFIX = "labelPrefix";
public static final String LINE_DELIMITER = "lineDelimiter";
public static final String COLUMN_SEPARATOR = "columnSeparator";
public static final String FORMAT = "format";
public static final String CONNECT_TIMEOUT = "connectTimeout";
private final Configuration options;
private final String columnSeparatorDesc;
private final String lineDelimiterDesc;
private static final long DEFAULT_MAX_BATCH_ROWS = 50_0000;
private static final long DEFAULT_MAX_BATCH_BYTE_SIZE = 100 * 1024 * 1024; // 100MB
private static final String DEFAULT_LABEL_PREFIX = "datax_doris_writer_";
private static final String DEFAULT_LINE_DELIMITER = "\n";
private static final String DEFAULT_COLUMN_SEPARATOR = "\t";
private static final String DEFAULT_FORMAT = "json";
private static final int DEFAULT_CONNECT_TIMEOUT = -1;
public Key(final Configuration options) {
this.options = options;
this.columnSeparatorDesc = parseHexReadable(this.getColumnSeparator());
this.lineDelimiterDesc = parseHexReadable(this.getLineDelimiter());
}
public void doPretreatment() {
@ -124,6 +134,27 @@ public class Key implements Serializable {
return this.options.getString(LINE_DELIMITER, DEFAULT_LINE_DELIMITER);
}
public String getFormat() {
return this.options.getString(FORMAT, DEFAULT_FORMAT);
}
public String getColumnSeparator() {
return this.options.getString(COLUMN_SEPARATOR, DEFAULT_COLUMN_SEPARATOR);
}
public int getConnectTimeout() {
return this.options.getInt(CONNECT_TIMEOUT, DEFAULT_CONNECT_TIMEOUT);
}
public String getColumnSeparatorDesc() {
return columnSeparatorDesc;
}
public String getLineDelimiterDesc() {
return lineDelimiterDesc;
}
private void validateStreamLoadUrl() {
List<String> urlList = this.getBeLoadUrlList();
if (urlList == null) {
@ -137,15 +168,26 @@ public class Key implements Serializable {
if (host.split(":").length < 2) {
throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
"Invalid load url format. IF use FE hosts, should be like: fe_host:fe_http_port."
+ " If use BE hosts, should be like: be_host:be_webserver_port");
+ " If use BE hosts, should be like: be_host:be_webserver_port");
}
}
}
private String parseHexReadable(String s) {
byte[] separatorBytes = s.getBytes();
StringBuilder desc = new StringBuilder();
for (byte separatorByte : separatorBytes) {
desc.append(String.format("\\x%02x", separatorByte));
}
return desc.toString();
}
private void validateRequired() {
final String[] requiredOptionKeys = new String[] { JDBC_URL, USERNAME, DATABASE, TABLE, COLUMN };
final String[] requiredOptionKeys = new String[]{JDBC_URL, USERNAME, DATABASE, TABLE, COLUMN};
for (final String optionKey : requiredOptionKeys) {
this.options.getNecessaryValue(optionKey, DBUtilErrorCode.REQUIRED_VALUE);
}
}
}