From 1b0b5b5f0940f37811fc9bdce8d148766e46f6cb Mon Sep 17 00:00:00 2001 From: yixiutt <102007456+yixiutt@users.noreply.github.com> Date: Fri, 19 Aug 2022 14:57:11 +0800 Subject: [PATCH] [Enhancement](load) add hidden_columns in stream load param (#11625) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stream load will ignore invisible columns if no http header columns specified, but in some case user cannot get all columns if columns changed frequently。 Add a hidden_columns header to support hidden columns import。User can set hidden_columns such as __DORIS_DELETE_SIGN__ and add this column in stream load data so we can delete this line. For example: curl -u root -v --location-trusted -H "hidden_columns: __DORIS_DELETE_SIGN__" -H "format: json" -H "strip_outer_array: true" -H "jsonpaths: [\"$.id\", \"$.name\",\"$.__DORIS_DELETE_SIGN__\"]" -T 1.json http://{beip}:{be_port}/api/test/test1/_stream_load Co-authored-by: yixiutt --- be/src/http/action/stream_load.cpp | 4 ++++ be/src/http/http_common.h | 1 + .../Load/STREAM-LOAD.md | 9 +++++++++ .../Load/STREAM-LOAD.md | 8 +++++++- .../main/java/org/apache/doris/load/Load.java | 19 +++++++++++++++---- .../load/routineload/RoutineLoadJob.java | 5 +++++ .../apache/doris/planner/BrokerScanNode.java | 2 +- .../doris/planner/StreamLoadScanNode.java | 2 +- .../org/apache/doris/task/LoadTaskInfo.java | 2 ++ .../org/apache/doris/task/StreamLoadTask.java | 11 +++++++++++ gensrc/thrift/FrontendService.thrift | 1 + 11 files changed, 57 insertions(+), 7 deletions(-) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 0586c44cc7..8e318b9a92 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -555,6 +555,10 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext* request.__set_max_filter_ratio(ctx->max_filter_ratio); } + if (!http_req->header(HTTP_HIDDEN_COLUMNS).empty()) { + request.__set_hidden_columns(http_req->header(HTTP_HIDDEN_COLUMNS)); + } + #ifndef BE_TEST // plan this load TNetworkAddress master_addr = _exec_env->master_info()->network_address; diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h index 4a0a042511..1ae254b062 100644 --- a/be/src/http/http_common.h +++ b/be/src/http/http_common.h @@ -50,6 +50,7 @@ static const std::string HTTP_SEQUENCE_COL = "sequence_col"; static const std::string HTTP_COMPRESS_TYPE = "compress_type"; static const std::string HTTP_SEND_BATCH_PARALLELISM = "send_batch_parallelism"; static const std::string HTTP_LOAD_TO_SINGLE_TABLET = "load_to_single_tablet"; +static const std::string HTTP_HIDDEN_COLUMNS = "hidden_columns"; static const std::string HTTP_TWO_PHASE_COMMIT = "two_phase_commit"; static const std::string HTTP_TXN_ID_KEY = "txn_id"; diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md index a5081e517f..1d2c4a05a1 100644 --- a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md +++ b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md @@ -136,6 +136,15 @@ Parameter introduction: 21. send_batch_parallelism: Integer, used to set the parallelism of sending batch data. If the value of parallelism exceeds `max_send_batch_parallelism_per_job` in the BE configuration, the BE as a coordination point will use the value of `max_send_batch_parallelism_per_job`. +22. hidden_columns: Specify hidden column when no `columns` in Headers,multi hidden column shoud be +separated by commas. + + ``` + hidden_columns: __DORIS_DELETE_SIGN__,__DORIS_SEQUENCE_COL__ + The system will use the order specified by user. in case above, data should be ended + with __DORIS_SEQUENCE_COL__. + ``` + RETURN VALUES After the import is complete, the related content of this import will be returned in Json format. Currently includes the following fields Status: Import the last status. diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md index a3874b5f02..0a374fb8ed 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md @@ -133,7 +133,13 @@ curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://fe_h 20. read_json_by_line: 布尔类型,为true表示支持每行读取一个json对象,默认值为false。 21. send_batch_parallelism: 整型,用于设置发送批处理数据的并行度,如果并行度的值超过 BE 配置中的 `max_send_batch_parallelism_per_job`,那么作为协调点的 BE 将使用 `max_send_batch_parallelism_per_job` 的值。 - + +22. hidden_columns: 用于指定导入数据中包含的隐藏列,在Header中不包含columns时生效,多个hidden column用逗号分割。 + ``` + hidden_columns: __DORIS_DELETE_SIGN__,__DORIS_SEQUENCE_COL__ + 系统会使用用户指定的数据导入数据。在上述用例中,导入数据中最后一列数据为__DORIS_SEQUENCE_COL__。 + ``` + RETURN VALUES 导入完成后,会以Json格式返回这次导入的相关内容。当前包括以下字段 Status: 导入最后的状态。 diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java index 2fdf604369..846d8aa78f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java @@ -798,7 +798,7 @@ public class Load { */ public static void initColumns(Table tbl, List columnExprs, Map>> columnToHadoopFunction) throws UserException { - initColumns(tbl, columnExprs, columnToHadoopFunction, null, null, null, null, null, null, false, false); + initColumns(tbl, columnExprs, columnToHadoopFunction, null, null, null, null, null, null, null, false, false); } /* @@ -809,10 +809,11 @@ public class Load { Map>> columnToHadoopFunction, Map exprsByName, Analyzer analyzer, TupleDescriptor srcTupleDesc, Map slotDescByName, TBrokerScanRangeParams params, - TFileFormatType formatType, boolean useVectorizedLoad) throws UserException { + TFileFormatType formatType, List hiddenColumns, + boolean useVectorizedLoad) throws UserException { rewriteColumns(columnDescs); initColumns(tbl, columnDescs.descs, columnToHadoopFunction, exprsByName, analyzer, - srcTupleDesc, slotDescByName, params, formatType, useVectorizedLoad, true); + srcTupleDesc, slotDescByName, params, formatType, hiddenColumns, useVectorizedLoad, true); } /* @@ -827,7 +828,7 @@ public class Load { Map>> columnToHadoopFunction, Map exprsByName, Analyzer analyzer, TupleDescriptor srcTupleDesc, Map slotDescByName, TBrokerScanRangeParams params, - TFileFormatType formatType, boolean useVectorizedLoad, + TFileFormatType formatType, List hiddenColumns, boolean useVectorizedLoad, boolean needInitSlotAndAnalyzeExprs) throws UserException { // We make a copy of the columnExprs so that our subsequent changes // to the columnExprs will not affect the original columnExprs. @@ -864,6 +865,16 @@ public class Load { LOG.debug("add base column {} to stream load task", column.getName()); copiedColumnExprs.add(columnDesc); } + if (hiddenColumns != null) { + for (String columnName : hiddenColumns) { + Column column = tbl.getColumn(columnName); + if (column != null && !column.isVisible()) { + ImportColumnDesc columnDesc = new ImportColumnDesc(column.getName()); + LOG.debug("add hidden column {} to stream load task", column.getName()); + copiedColumnExprs.add(columnDesc); + } + } + } } // generate a map for checking easily Map columnExprMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index f3af0debca..78a0a58015 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -580,6 +580,11 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl return ""; } + @Override + public List getHiddenColumns() { + return null; + } + @Override public ImportColumnDescs getColumnExprDescs() { if (columnDescs == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java index 3df83bad57..b67e1e91bb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -286,7 +286,7 @@ public class BrokerScanNode extends LoadScanNode { Load.initColumns(targetTable, columnDescs, context.fileGroup.getColumnToHadoopFunction(), context.exprMap, analyzer, context.srcTupleDescriptor, context.slotDescByName, context.params, - formatType(context.fileGroup.getFileFormat(), ""), VectorizedUtil.isVectorized()); + formatType(context.fileGroup.getFileFormat(), ""), null, VectorizedUtil.isVectorized()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java index f5e5aae77b..84d388fa34 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java @@ -144,7 +144,7 @@ public class StreamLoadScanNode extends LoadScanNode { Load.initColumns(dstTable, columnExprDescs, null /* no hadoop function */, exprsByName, analyzer, srcTupleDesc, slotDescByName, params, - taskInfo.getFormatType(), VectorizedUtil.isVectorized()); + taskInfo.getFormatType(), taskInfo.getHiddenColumns(), VectorizedUtil.isVectorized()); // analyze where statement initAndSetPrecedingFilter(taskInfo.getPrecedingFilter(), this.srcTupleDesc, analyzer); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java index cc28a5b15f..223bf0598b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java @@ -88,6 +88,8 @@ public interface LoadTaskInfo { String getHeaderType(); + List getHiddenColumns(); + class ImportColumnDescs { public List descs = Lists.newArrayList(); public boolean isColumnDescsRewrited = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java index 55cc6b17aa..569776f291 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java @@ -41,6 +41,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.io.StringReader; +import java.util.Arrays; +import java.util.List; public class StreamLoadTask implements LoadTaskInfo { @@ -76,6 +78,7 @@ public class StreamLoadTask implements LoadTaskInfo { private double maxFilterRatio = 0.0; private boolean loadToSingleTablet = false; private String headerType = ""; + private List hiddenColumns; public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType, TFileFormatType formatType) { this.id = id; @@ -228,6 +231,11 @@ public class StreamLoadTask implements LoadTaskInfo { return sequenceCol; } + @Override + public List getHiddenColumns() { + return hiddenColumns; + } + public static StreamLoadTask fromTStreamLoadPutRequest(TStreamLoadPutRequest request) throws UserException { StreamLoadTask streamLoadTask = new StreamLoadTask(request.getLoadId(), request.getTxnId(), request.getFileType(), request.getFormatType()); @@ -320,6 +328,9 @@ public class StreamLoadTask implements LoadTaskInfo { if (request.isSetLoadToSingleTablet()) { loadToSingleTablet = request.isLoadToSingleTablet(); } + if (request.isSetHiddenColumns()) { + hiddenColumns = Arrays.asList(request.getHiddenColumns().replaceAll("\\s+", "").split(",")); + } } // used for stream load diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 5765e87da1..ebb479140a 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -540,6 +540,7 @@ struct TStreamLoadPutRequest { 36: optional double max_filter_ratio 37: optional bool load_to_single_tablet 38: optional string header_type + 39: optional string hidden_columns } struct TStreamLoadPutResult {