From 929b33ac0a06df17b03afe7fd7cfee44d9fa47b3 Mon Sep 17 00:00:00 2001 From: wunan1210 Date: Tue, 10 Aug 2021 10:14:21 +0800 Subject: [PATCH] [DataX] doriswriter support csv (#6373) make doriswriter of DataX support format csv. Format csv is more simple and faster than format json when data is simple add property format: csv/json add property column_separator: effect when format is csv, for example "\x01" , "^", etc... --- .../DataX/doriswriter/doc/doriswriter.md | 19 +++++- .../plugin/writer/doriswriter/DorisCodec.java | 58 +++++++++++++++++++ .../writer/doriswriter/DorisCsvCodec.java | 52 +++++++++++++++++ .../writer/doriswriter/DorisJsonCodec.java | 50 +--------------- .../writer/doriswriter/DorisWriter.java | 11 +++- .../doriswriter/DorisWriterEmitter.java | 25 ++++++-- .../datax/plugin/writer/doriswriter/Key.java | 48 ++++++++++++++- 7 files changed, 205 insertions(+), 58 deletions(-) create mode 100644 extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCodec.java create mode 100644 extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvCodec.java diff --git a/extension/DataX/doriswriter/doc/doriswriter.md b/extension/DataX/doriswriter/doc/doriswriter.md index 7df32f6428..1bf9590b2a 100644 --- a/extension/DataX/doriswriter/doc/doriswriter.md +++ b/extension/DataX/doriswriter/doc/doriswriter.md @@ -160,9 +160,21 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter * **lineDelimiter** - - 描述:每批次数据包含多行,每行为 Json 格式,每行的的分隔符即为 lineDelimiter。 + - 描述:每批次数据包含多行,每行为 Json 格式,每行的的分隔符即为 lineDelimiter。支持多个字节, 例如'\x02\x03'。 - 必选:否 - 默认值:`\n` + +* **format** + + - 描述:导入数据的格式, 可以使是json或者csv。 + - 必选:否 + - 默认值:`json` + +* **columnSeparator** + + - 描述:当导入的格式是csv时, 字段之间的分隔符。支持多个字节, 例如'\x01\x02'。 + - 必选:否 + - 默认值:`\t` * **loadProps** @@ -170,3 +182,8 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter - 必选:否 - 默认值:无 +* **connectTimeout** + + - 描述:StreamLoad单次请求的超时时间, 单位毫秒(ms)。 + - 必选:否 + - 默认值:-1 diff --git a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCodec.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCodec.java new file mode 100644 index 0000000000..37d5008b87 --- /dev/null +++ b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCodec.java @@ -0,0 +1,58 @@ +package com.alibaba.datax.plugin.writer.doriswriter; + +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.DateColumn; +import com.alibaba.datax.common.element.Record; +import org.apache.commons.lang3.time.DateFormatUtils; + +import java.util.List; +import java.util.TimeZone; + +public abstract class DorisCodec { + protected static String timeZone = "GMT+8"; + protected static TimeZone timeZoner = TimeZone.getTimeZone(timeZone); + protected final List fieldNames; + + public DorisCodec(final List fieldNames) { + this.fieldNames = fieldNames; + } + + public abstract String serialize(Record row); + + /** + * convert datax internal data to string + * + * @param col + * @return + */ + protected Object convertColumn(final Column col) { + if (null == col.getRawData()) { + return null; + } + Column.Type type = col.getType(); + switch (type) { + case BOOL: + case INT: + case LONG: + return col.asLong(); + case DOUBLE: + return col.asDouble(); + case STRING: + return col.asString(); + case DATE: { + final DateColumn.DateType dateType = ((DateColumn) col).getSubType(); + switch (dateType) { + case DATE: + return DateFormatUtils.format(col.asDate(), "yyyy-MM-dd", timeZoner); + case DATETIME: + return DateFormatUtils.format(col.asDate(), "yyyy-MM-dd HH:mm:ss", timeZoner); + default: + return col.asString(); + } + } + default: + // BAD, NULL, BYTES + return null; + } + } +} diff --git a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvCodec.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvCodec.java new file mode 100644 index 0000000000..b7c6f76f6c --- /dev/null +++ b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvCodec.java @@ -0,0 +1,52 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + --> + */ +package com.alibaba.datax.plugin.writer.doriswriter; + +import com.alibaba.datax.common.element.Record; + +import java.util.ArrayList; +import java.util.List; + +// Convert DataX data to csv +public class DorisCsvCodec extends DorisCodec { + + private final String columnSeparator; + + public DorisCsvCodec(final List fieldNames, String columnSeparator) { + super(fieldNames); + this.columnSeparator = columnSeparator; + } + + @Override + public String serialize(final Record row) { + if (null == this.fieldNames) { + return ""; + } + List list = new ArrayList<>(); + + for (int i = 0; i < this.fieldNames.size(); i++) { + Object value = this.convertColumn(row.getColumn(i)); + list.add(value != null ? value.toString() : "\\N"); + } + + return String.join(columnSeparator, list); + } + +} \ No newline at end of file diff --git a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java index 4db273f3e4..68a2cbceb8 100644 --- a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java +++ b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java @@ -19,29 +19,21 @@ */ package com.alibaba.datax.plugin.writer.doriswriter; -import com.alibaba.datax.common.element.Column; -import com.alibaba.datax.common.element.DateColumn; import com.alibaba.datax.common.element.Record; import com.alibaba.fastjson.JSON; -import org.apache.commons.lang3.time.DateFormatUtils; - import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.TimeZone; // Convert DataX data to json -public class DorisJsonCodec { - private static String timeZone = "GMT+8"; - private static TimeZone timeZoner = TimeZone.getTimeZone(timeZone); - - private final List fieldNames; +public class DorisJsonCodec extends DorisCodec { public DorisJsonCodec(final List fieldNames) { - this.fieldNames = fieldNames; + super(fieldNames); } + @Override public String serialize(final Record row) { if (null == this.fieldNames) { return ""; @@ -55,40 +47,4 @@ public class DorisJsonCodec { return JSON.toJSONString(rowMap); } - /** - * convert datax internal data to string - * - * @param col - * @return - */ - private Object convertColumn(final Column col) { - if (null == col.getRawData()) { - return null; - } - Column.Type type = col.getType(); - switch (type) { - case BOOL: - case INT: - case LONG: - return col.asLong(); - case DOUBLE: - return col.asDouble(); - case STRING: - return col.asString(); - case DATE: { - final DateColumn.DateType dateType = ((DateColumn) col).getSubType(); - switch (dateType) { - case DATE: - return DateFormatUtils.format(col.asDate(), "yyyy-MM-dd", timeZoner); - case DATETIME: - return DateFormatUtils.format(col.asDate(), "yyyy-MM-dd HH:mm:ss", timeZoner); - default: - return col.asString(); - } - } - default: - // BAD, NULL, BYTES - return null; - } - } } \ No newline at end of file diff --git a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java index 0f4d6533d3..be96c5bbf8 100644 --- a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java +++ b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java @@ -53,7 +53,7 @@ public class DorisWriter extends Writer { private DorisWriterEmitter dorisWriterEmitter; private Key keys; - private DorisJsonCodec rowCodec; + private DorisCodec rowCodec; private int batchNum = 0; public Task() { @@ -62,7 +62,11 @@ public class DorisWriter extends Writer { @Override public void init() { this.keys = new Key(super.getPluginJobConf()); - this.rowCodec = new DorisJsonCodec(this.keys.getColumns()); + if("csv".equalsIgnoreCase(this.keys.getFormat())){ + this.rowCodec = new DorisCsvCodec(this.keys.getColumns(),this.keys.getColumnSeparator()); + }else{ + this.rowCodec = new DorisJsonCodec(this.keys.getColumns()); + } this.dorisWriterEmitter = new DorisWriterEmitter(keys); } @@ -84,11 +88,12 @@ public class DorisWriter extends Writer { if (record.getColumnNumber() != this.keys.getColumns().size()) { throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR, String.format("config writer column info error. because the column number of reader is :%s" + - "and the column number of writer is:%s. please check you datax job config json.", + "and the column number of writer is:%s. please check you datax job config json.", record.getColumnNumber(), this.keys.getColumns().size())); } // codec record final String recordStr = this.rowCodec.serialize(record); + // put into buffer flushBatch.putData(recordStr); batchCount += 1; diff --git a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java index d417332994..6e9e967ea2 100644 --- a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java +++ b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java @@ -33,6 +33,7 @@ import org.apache.http.HttpRequest; import org.apache.http.HttpResponse; import org.apache.http.HttpStatus; import org.apache.http.ProtocolException; +import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpHead; @@ -64,9 +65,15 @@ public class DorisWriterEmitter { private int hostPos = 0; private List targetHosts = Lists.newArrayList(); + private RequestConfig requestConfig; + public DorisWriterEmitter(final Key keys) { this.keys = keys; initHostList(); + initRequestConfig(); + } + private void initRequestConfig(){ + requestConfig=RequestConfig.custom().setConnectTimeout(this.keys.getConnectTimeout()).build(); } // get target host from config @@ -110,6 +117,7 @@ public class DorisWriterEmitter { /** * loop to get target host + * * @return */ private String getAvailableHost() { @@ -162,7 +170,7 @@ public class DorisWriterEmitter { return new HttpGet(uri); } else { int status = response.getStatusLine().getStatusCode(); - return (HttpUriRequest)(status == 307 ? RequestBuilder.copy(request).setUri(uri).build() : new HttpGet(uri)); + return (HttpUriRequest) (status == 307 ? RequestBuilder.copy(request).setUri(uri).build() : new HttpGet(uri)); } } }); @@ -186,12 +194,21 @@ public class DorisWriterEmitter { httpPut.setHeader(HttpHeaders.EXPECT, "100-continue"); httpPut.setHeader(HttpHeaders.AUTHORIZATION, this.getBasicAuthHeader(this.keys.getUsername(), this.keys.getPassword())); httpPut.setHeader("label", flushBatch.getLabel()); - httpPut.setHeader("format", "json"); - httpPut.setHeader("read_json_by_line", "true"); - httpPut.setHeader("fuzzy_parse", "true"); + httpPut.setHeader("format", this.keys.getFormat()); + httpPut.setHeader("line_delimiter", this.keys.getLineDelimiterDesc()); + + if ("csv".equalsIgnoreCase(this.keys.getFormat())) { + httpPut.setHeader("column_separator", this.keys.getColumnSeparatorDesc()); + } else { + httpPut.setHeader("read_json_by_line", "true"); + httpPut.setHeader("fuzzy_parse", "true"); + } + // Use ByteArrayEntity instead of StringEntity to handle Chinese correctly httpPut.setEntity(new ByteArrayEntity(flushBatch.getData().toString().getBytes())); + httpPut.setConfig(requestConfig); + try (final CloseableHttpResponse resp = httpclient.execute(httpPut)) { final int code = resp.getStatusLine().getStatusCode(); if (HttpStatus.SC_OK != code) { diff --git a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java index fcc1c15c6e..79b83c1b3a 100644 --- a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java +++ b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java @@ -48,15 +48,25 @@ public class Key implements Serializable { public static final String MAX_BATCH_BYTE_SIZE = "maxBatchByteSize"; public static final String LABEL_PREFIX = "labelPrefix"; public static final String LINE_DELIMITER = "lineDelimiter"; + public static final String COLUMN_SEPARATOR = "columnSeparator"; + public static final String FORMAT = "format"; + public static final String CONNECT_TIMEOUT = "connectTimeout"; private final Configuration options; - + private final String columnSeparatorDesc; + private final String lineDelimiterDesc; + private static final long DEFAULT_MAX_BATCH_ROWS = 50_0000; private static final long DEFAULT_MAX_BATCH_BYTE_SIZE = 100 * 1024 * 1024; // 100MB private static final String DEFAULT_LABEL_PREFIX = "datax_doris_writer_"; private static final String DEFAULT_LINE_DELIMITER = "\n"; + private static final String DEFAULT_COLUMN_SEPARATOR = "\t"; + private static final String DEFAULT_FORMAT = "json"; + private static final int DEFAULT_CONNECT_TIMEOUT = -1; public Key(final Configuration options) { this.options = options; + this.columnSeparatorDesc = parseHexReadable(this.getColumnSeparator()); + this.lineDelimiterDesc = parseHexReadable(this.getLineDelimiter()); } public void doPretreatment() { @@ -124,6 +134,27 @@ public class Key implements Serializable { return this.options.getString(LINE_DELIMITER, DEFAULT_LINE_DELIMITER); } + public String getFormat() { + return this.options.getString(FORMAT, DEFAULT_FORMAT); + } + + public String getColumnSeparator() { + return this.options.getString(COLUMN_SEPARATOR, DEFAULT_COLUMN_SEPARATOR); + } + + public int getConnectTimeout() { + return this.options.getInt(CONNECT_TIMEOUT, DEFAULT_CONNECT_TIMEOUT); + } + + + public String getColumnSeparatorDesc() { + return columnSeparatorDesc; + } + + public String getLineDelimiterDesc() { + return lineDelimiterDesc; + } + private void validateStreamLoadUrl() { List urlList = this.getBeLoadUrlList(); if (urlList == null) { @@ -137,15 +168,26 @@ public class Key implements Serializable { if (host.split(":").length < 2) { throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR, "Invalid load url format. IF use FE hosts, should be like: fe_host:fe_http_port." - + " If use BE hosts, should be like: be_host:be_webserver_port"); + + " If use BE hosts, should be like: be_host:be_webserver_port"); } } } + private String parseHexReadable(String s) { + byte[] separatorBytes = s.getBytes(); + StringBuilder desc = new StringBuilder(); + + for (byte separatorByte : separatorBytes) { + desc.append(String.format("\\x%02x", separatorByte)); + } + return desc.toString(); + } + private void validateRequired() { - final String[] requiredOptionKeys = new String[] { JDBC_URL, USERNAME, DATABASE, TABLE, COLUMN }; + final String[] requiredOptionKeys = new String[]{JDBC_URL, USERNAME, DATABASE, TABLE, COLUMN}; for (final String optionKey : requiredOptionKeys) { this.options.getNecessaryValue(optionKey, DBUtilErrorCode.REQUIRED_VALUE); } } } +