[Optimize][Extension] optimize extension datax doriswriter,Remove import doris via csv in Dataxwriter, only support via json (#7568)

* 1.Remove import doris via csv in Dataxwriter, only support via json;
2.Format Dataxwriter code;
3.Optimize exception handling and reduce multiple output of exception logs;
4.Update the dataxwriter's documentation;

* Delete DorisCsvCodec.java

delete unused file extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvCodec.java

* 1.remove `format` config key;
2.Optimize serialization code in DorisJsonCodec class
This commit is contained in:
weajun
2022-01-09 13:27:52 +08:00
committed by GitHub
parent ad35067a2a
commit 3a8a85b739
10 changed files with 185 additions and 293 deletions

View File

@ -52,7 +52,7 @@ Because the doriswriter plug-in depends on some modules in the DataX code base,
The help doc can be found in `doriswriter/doc`
2. `init_env.sh`
2. `init-env.sh`
The script mainly performs the following steps:
@ -67,7 +67,7 @@ Because the doriswriter plug-in depends on some modules in the DataX code base,
### How to build
1. Run `init_env.sh`
1. Run `init-env.sh`
2. Modify code of doriswriter in `DataX/doriswriter` if you need.
3. Build doriswriter

View File

@ -52,7 +52,7 @@ doriswriter 插件依赖的 DataX 代码中的一些模块。而这些模块并
doriswriter 插件帮助文档在这里:`doriswriter/doc`
2. `init_env.sh`
2. `init-env.sh`
这个脚本主要用于构建 DataX 开发环境,他主要进行了以下操作:
@ -67,7 +67,7 @@ doriswriter 插件依赖的 DataX 代码中的一些模块。而这些模块并
### 编译
1. 运行 `init_env.sh`
1. 运行 `init-env.sh`
2. 按需修改 `DataX/doriswriter` 中的代码。
3. 编译 doriswriter:

View File

@ -164,18 +164,6 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter
- 必选:否
- 默认值:`\n`
* **format**
- 描述:导入数据的格式, 可以使是json或者csv。
- 必选:否
- 默认值:`json`
* **columnSeparator**
- 描述:当导入的格式是csv时, 字段之间的分隔符。支持多个字节, 例如'\x01\x02'。
- 必选:否
- 默认值:`\t`
* **loadProps**
- 描述:StreamLoad 的请求参数,详情参照StreamLoad介绍页面。

View File

@ -1,22 +1,20 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
*/
package com.alibaba.datax.plugin.writer.doriswriter;
import com.alibaba.datax.common.element.Column;

View File

@ -1,52 +0,0 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
*/
package com.alibaba.datax.plugin.writer.doriswriter;
import com.alibaba.datax.common.element.Record;
import java.util.ArrayList;
import java.util.List;
// Convert DataX data to csv
public class DorisCsvCodec extends DorisCodec {
private final String columnSeparator;
public DorisCsvCodec(final List<String> fieldNames, String columnSeparator) {
super(fieldNames);
this.columnSeparator = columnSeparator;
}
@Override
public String serialize(final Record row) {
if (null == this.fieldNames) {
return "";
}
List<String> list = new ArrayList<>();
for (int i = 0; i < this.fieldNames.size(); i++) {
Object value = this.convertColumn(row.getColumn(i));
list.add(value != null ? value.toString() : "\\N");
}
return String.join(columnSeparator, list);
}
}

View File

@ -1,60 +1,58 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
*/
package com.alibaba.datax.plugin.writer.doriswriter;
// A wrapper class to hold a batch of loaded rows
public class DorisFlushBatch {
private String lineDelimiter;
private String label;
private long rows = 0;
private StringBuilder data = new StringBuilder();
private String lineDelimiter;
private String label;
private long rows = 0;
private StringBuilder data = new StringBuilder();
public DorisFlushBatch(String lineDelimiter) {
this.lineDelimiter = lineDelimiter;
}
public DorisFlushBatch(String lineDelimiter) {
this.lineDelimiter = lineDelimiter;
}
public void setLabel(String label) {
this.label = label;
}
public void setLabel(String label) {
this.label = label;
}
public String getLabel() {
return label;
}
public String getLabel() {
return label;
}
public long getRows() {
return rows;
}
public long getRows() {
return rows;
}
public void putData(String row) {
if (data.length() > 0) {
data.append(lineDelimiter);
}
data.append(row);
rows++;
}
public void putData(String row) {
if (data.length() > 0) {
data.append(lineDelimiter);
}
data.append(row);
rows++;
}
public StringBuilder getData() {
return data;
}
public StringBuilder getData() {
return data;
}
public long getSize() {
return data.length();
}
public long getSize() {
return data.length();
}
}

View File

@ -1,22 +1,20 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
*/
package com.alibaba.datax.plugin.writer.doriswriter;
import com.alibaba.datax.common.element.Record;
@ -28,9 +26,11 @@ import java.util.Map;
// Convert DataX data to json
public class DorisJsonCodec extends DorisCodec {
private Map<String, Object> rowMap;
public DorisJsonCodec(final List<String> fieldNames) {
super(fieldNames);
this.rowMap = new HashMap<>(this.fieldNames.size());
}
@Override
@ -38,7 +38,8 @@ public class DorisJsonCodec extends DorisCodec {
if (null == this.fieldNames) {
return "";
}
final Map<String, Object> rowMap = new HashMap<String, Object>(this.fieldNames.size());
rowMap.clear();
int idx = 0;
for (final String fieldName : this.fieldNames) {
rowMap.put(fieldName, this.convertColumn(row.getColumn(idx)));
@ -46,5 +47,4 @@ public class DorisJsonCodec extends DorisCodec {
}
return JSON.toJSONString(rowMap);
}
}
}

View File

@ -1,22 +1,19 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
*/
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.alibaba.datax.plugin.writer.doriswriter;
@ -32,11 +29,9 @@ import com.alibaba.datax.plugin.rdbms.util.RdbmsException;
import com.alibaba.datax.plugin.rdbms.writer.Constant;
import com.alibaba.druid.sql.parser.ParserException;
import com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
@ -49,8 +44,6 @@ public class DorisWriter extends Writer {
}
public static class Task extends com.alibaba.datax.common.spi.Writer.Task {
private static final Logger LOG = LoggerFactory.getLogger(DorisWriter.Task.class);
private DorisWriterEmitter dorisWriterEmitter;
private Key keys;
private DorisCodec rowCodec;
@ -62,11 +55,7 @@ public class DorisWriter extends Writer {
@Override
public void init() {
this.keys = new Key(super.getPluginJobConf());
if("csv".equalsIgnoreCase(this.keys.getFormat())){
this.rowCodec = new DorisCsvCodec(this.keys.getColumns(),this.keys.getColumnSeparator());
}else{
this.rowCodec = new DorisJsonCodec(this.keys.getColumns());
}
this.rowCodec = new DorisJsonCodec(this.keys.getColumns());
this.dorisWriterEmitter = new DorisWriterEmitter(keys);
}
@ -77,47 +66,43 @@ public class DorisWriter extends Writer {
@Override
public void startWrite(RecordReceiver recordReceiver) {
String lineDelimiter = this.keys.getLineDelimiter();
try {
DorisFlushBatch flushBatch = new DorisFlushBatch(lineDelimiter);
long batchCount = 0;
long batchByteSize = 0L;
Record record;
// loop to get record from datax
while ((record = recordReceiver.getFromReader()) != null) {
// check column size
if (record.getColumnNumber() != this.keys.getColumns().size()) {
throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
String.format("config writer column info error. because the column number of reader is :%s" +
"and the column number of writer is:%s. please check you datax job config json.",
record.getColumnNumber(), this.keys.getColumns().size()));
}
// codec record
final String recordStr = this.rowCodec.serialize(record);
// put into buffer
flushBatch.putData(recordStr);
batchCount += 1;
batchByteSize += recordStr.length();
// trigger buffer
if (batchCount >= this.keys.getBatchRows() || batchByteSize >= this.keys.getBatchByteSize()) {
// generate doris stream load label
flush(flushBatch, batchCount, batchByteSize);
// clear buffer
batchCount = 0;
batchByteSize = 0L;
flushBatch = new DorisFlushBatch(lineDelimiter);
}
} // end of while
if (flushBatch.getSize() > 0) {
flush(flushBatch, batchCount, batchByteSize);
DorisFlushBatch flushBatch = new DorisFlushBatch(lineDelimiter);
long batchCount = 0;
long batchByteSize = 0L;
Record record;
// loop to get record from datax
while ((record = recordReceiver.getFromReader()) != null) {
// check column size
if (record.getColumnNumber() != this.keys.getColumns().size()) {
throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
String.format("config writer column info error. because the column number of reader is :%s" +
"and the column number of writer is:%s. please check you datax job config json.",
record.getColumnNumber(), this.keys.getColumns().size()));
}
} catch (Exception e) {
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
// codec record
final String recordStr = this.rowCodec.serialize(record);
// put into buffer
flushBatch.putData(recordStr);
batchCount += 1;
batchByteSize += recordStr.length();
// trigger buffer
if (batchCount >= this.keys.getBatchRows() || batchByteSize >= this.keys.getBatchByteSize()) {
// generate doris stream load label
flush(flushBatch);
// clear buffer
batchCount = 0;
batchByteSize = 0L;
flushBatch = new DorisFlushBatch(lineDelimiter);
}
} // end of while
if (flushBatch.getSize() > 0) {
flush(flushBatch);
}
}
private void flush(DorisFlushBatch flushBatch, long batchCount, long batchByteSize) throws IOException {
private void flush(DorisFlushBatch flushBatch) {
final String label = getStreamLoadLabel();
flushBatch.setLabel(label);
dorisWriterEmitter.doStreamLoad(flushBatch);
@ -271,4 +256,4 @@ public class DorisWriter extends Writer {
}
}
}
}

View File

@ -1,22 +1,19 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
*/
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.alibaba.datax.plugin.writer.doriswriter;
@ -26,7 +23,6 @@ import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpRequest;
@ -72,8 +68,9 @@ public class DorisWriterEmitter {
initHostList();
initRequestConfig();
}
private void initRequestConfig(){
requestConfig=RequestConfig.custom().setConnectTimeout(this.keys.getConnectTimeout()).build();
private void initRequestConfig() {
requestConfig = RequestConfig.custom().setConnectTimeout(this.keys.getConnectTimeout()).build();
}
// get target host from config
@ -94,24 +91,29 @@ public class DorisWriterEmitter {
/**
* execute doris stream load
*/
public void doStreamLoad(final DorisFlushBatch flushData) throws IOException {
public void doStreamLoad(final DorisFlushBatch flushData) {
long start = System.currentTimeMillis();
final String host = this.getAvailableHost();
if (null == host) {
throw new IOException("None of the load url can be connected.");
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, "None of the load url can be connected.");
}
final String loadUrl = host + "/api/" + this.keys.getDatabase() + "/" + this.keys.getTable() + "/_stream_load";
// do http put request and get response
final Map<String, Object> loadResult = this.doHttpPut(loadUrl, flushData);
final Map<String, Object> loadResult;
try {
loadResult = this.doHttpPut(loadUrl, flushData);
} catch (IOException e) {
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
}
long cost = System.currentTimeMillis() - start;
LOG.info("StreamLoad response: " + JSON.toJSONString(loadResult) + ", cost(ms): " + cost);
final String keyStatus = "Status";
if (null == loadResult || !loadResult.containsKey(keyStatus)) {
throw new IOException("Unable to flush data to doris: unknown result status.");
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, "Unable to flush data to doris: unknown result status.");
}
if (loadResult.get(keyStatus).equals("Fail")) {
throw new IOException("Failed to flush data to doris.\n" + JSON.toJSONString(loadResult));
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, "Failed to flush data to doris.\n" + JSON.toJSONString(loadResult));
}
}
@ -194,15 +196,10 @@ public class DorisWriterEmitter {
httpPut.setHeader(HttpHeaders.EXPECT, "100-continue");
httpPut.setHeader(HttpHeaders.AUTHORIZATION, this.getBasicAuthHeader(this.keys.getUsername(), this.keys.getPassword()));
httpPut.setHeader("label", flushBatch.getLabel());
httpPut.setHeader("format", this.keys.getFormat());
httpPut.setHeader("format", "json");
httpPut.setHeader("line_delimiter", this.keys.getLineDelimiterDesc());
if ("csv".equalsIgnoreCase(this.keys.getFormat())) {
httpPut.setHeader("column_separator", this.keys.getColumnSeparatorDesc());
} else {
httpPut.setHeader("read_json_by_line", "true");
httpPut.setHeader("fuzzy_parse", "true");
}
httpPut.setHeader("read_json_by_line", "true");
httpPut.setHeader("fuzzy_parse", "true");
// Use ByteArrayEntity instead of StringEntity to handle Chinese correctly
httpPut.setEntity(new ByteArrayEntity(flushBatch.getData().toString().getBytes()));

View File

@ -1,22 +1,20 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
*/
package com.alibaba.datax.plugin.writer.doriswriter;
import com.alibaba.datax.common.exception.DataXException;
@ -48,24 +46,18 @@ public class Key implements Serializable {
public static final String MAX_BATCH_BYTE_SIZE = "maxBatchByteSize";
public static final String LABEL_PREFIX = "labelPrefix";
public static final String LINE_DELIMITER = "lineDelimiter";
public static final String COLUMN_SEPARATOR = "columnSeparator";
public static final String FORMAT = "format";
public static final String CONNECT_TIMEOUT = "connectTimeout";
private final Configuration options;
private final String columnSeparatorDesc;
private final String lineDelimiterDesc;
private static final long DEFAULT_MAX_BATCH_ROWS = 50_0000;
private static final long DEFAULT_MAX_BATCH_BYTE_SIZE = 100 * 1024 * 1024; // 100MB
private static final String DEFAULT_LABEL_PREFIX = "datax_doris_writer_";
private static final String DEFAULT_LINE_DELIMITER = "\n";
private static final String DEFAULT_COLUMN_SEPARATOR = "\t";
private static final String DEFAULT_FORMAT = "json";
private static final int DEFAULT_CONNECT_TIMEOUT = -1;
public Key(final Configuration options) {
this.options = options;
this.columnSeparatorDesc = parseHexReadable(this.getColumnSeparator());
this.lineDelimiterDesc = parseHexReadable(this.getLineDelimiter());
}
@ -134,23 +126,10 @@ public class Key implements Serializable {
return this.options.getString(LINE_DELIMITER, DEFAULT_LINE_DELIMITER);
}
public String getFormat() {
return this.options.getString(FORMAT, DEFAULT_FORMAT);
}
public String getColumnSeparator() {
return this.options.getString(COLUMN_SEPARATOR, DEFAULT_COLUMN_SEPARATOR);
}
public int getConnectTimeout() {
return this.options.getInt(CONNECT_TIMEOUT, DEFAULT_CONNECT_TIMEOUT);
}
public String getColumnSeparatorDesc() {
return columnSeparatorDesc;
}
public String getLineDelimiterDesc() {
return lineDelimiterDesc;
}
@ -190,4 +169,3 @@ public class Key implements Serializable {
}
}
}