[Enhancement](load) add hidden_columns in stream load param (#11625)

Stream load will ignore invisible columns if no http header columns
specified, but in some case user cannot get all columns if columns
changed frequently。
Add a hidden_columns header to support hidden columns import。User can
set hidden_columns such as __DORIS_DELETE_SIGN__ and add this column
in stream load data so we can delete this line.
For example:
curl -u root -v --location-trusted -H "hidden_columns: __DORIS_DELETE_SIGN__" -H
"format: json" -H "strip_outer_array: true" -H "jsonpaths: [\"$.id\",
\"$.name\",\"$.__DORIS_DELETE_SIGN__\"]" -T 1.json
http://{beip}:{be_port}/api/test/test1/_stream_load

Co-authored-by: yixiutt <yixiu@selectdb.com>
This commit is contained in:
yixiutt
2022-08-19 14:57:11 +08:00
committed by GitHub
parent 01bd7f224b
commit 1b0b5b5f09
11 changed files with 57 additions and 7 deletions

View File

@ -798,7 +798,7 @@ public class Load {
*/
public static void initColumns(Table tbl, List<ImportColumnDesc> columnExprs,
Map<String, Pair<String, List<String>>> columnToHadoopFunction) throws UserException {
initColumns(tbl, columnExprs, columnToHadoopFunction, null, null, null, null, null, null, false, false);
initColumns(tbl, columnExprs, columnToHadoopFunction, null, null, null, null, null, null, null, false, false);
}
/*
@ -809,10 +809,11 @@ public class Load {
Map<String, Pair<String, List<String>>> columnToHadoopFunction,
Map<String, Expr> exprsByName, Analyzer analyzer, TupleDescriptor srcTupleDesc,
Map<String, SlotDescriptor> slotDescByName, TBrokerScanRangeParams params,
TFileFormatType formatType, boolean useVectorizedLoad) throws UserException {
TFileFormatType formatType, List<String> hiddenColumns,
boolean useVectorizedLoad) throws UserException {
rewriteColumns(columnDescs);
initColumns(tbl, columnDescs.descs, columnToHadoopFunction, exprsByName, analyzer,
srcTupleDesc, slotDescByName, params, formatType, useVectorizedLoad, true);
srcTupleDesc, slotDescByName, params, formatType, hiddenColumns, useVectorizedLoad, true);
}
/*
@ -827,7 +828,7 @@ public class Load {
Map<String, Pair<String, List<String>>> columnToHadoopFunction,
Map<String, Expr> exprsByName, Analyzer analyzer, TupleDescriptor srcTupleDesc,
Map<String, SlotDescriptor> slotDescByName, TBrokerScanRangeParams params,
TFileFormatType formatType, boolean useVectorizedLoad,
TFileFormatType formatType, List<String> hiddenColumns, boolean useVectorizedLoad,
boolean needInitSlotAndAnalyzeExprs) throws UserException {
// We make a copy of the columnExprs so that our subsequent changes
// to the columnExprs will not affect the original columnExprs.
@ -864,6 +865,16 @@ public class Load {
LOG.debug("add base column {} to stream load task", column.getName());
copiedColumnExprs.add(columnDesc);
}
if (hiddenColumns != null) {
for (String columnName : hiddenColumns) {
Column column = tbl.getColumn(columnName);
if (column != null && !column.isVisible()) {
ImportColumnDesc columnDesc = new ImportColumnDesc(column.getName());
LOG.debug("add hidden column {} to stream load task", column.getName());
copiedColumnExprs.add(columnDesc);
}
}
}
}
// generate a map for checking easily
Map<String, Expr> columnExprMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);

View File

@ -580,6 +580,11 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
return "";
}
@Override
public List<String> getHiddenColumns() {
return null;
}
@Override
public ImportColumnDescs getColumnExprDescs() {
if (columnDescs == null) {

View File

@ -286,7 +286,7 @@ public class BrokerScanNode extends LoadScanNode {
Load.initColumns(targetTable, columnDescs,
context.fileGroup.getColumnToHadoopFunction(), context.exprMap, analyzer,
context.srcTupleDescriptor, context.slotDescByName, context.params,
formatType(context.fileGroup.getFileFormat(), ""), VectorizedUtil.isVectorized());
formatType(context.fileGroup.getFileFormat(), ""), null, VectorizedUtil.isVectorized());
}
}

View File

@ -144,7 +144,7 @@ public class StreamLoadScanNode extends LoadScanNode {
Load.initColumns(dstTable, columnExprDescs, null /* no hadoop function */,
exprsByName, analyzer, srcTupleDesc, slotDescByName, params,
taskInfo.getFormatType(), VectorizedUtil.isVectorized());
taskInfo.getFormatType(), taskInfo.getHiddenColumns(), VectorizedUtil.isVectorized());
// analyze where statement
initAndSetPrecedingFilter(taskInfo.getPrecedingFilter(), this.srcTupleDesc, analyzer);

View File

@ -88,6 +88,8 @@ public interface LoadTaskInfo {
String getHeaderType();
List<String> getHiddenColumns();
class ImportColumnDescs {
public List<ImportColumnDesc> descs = Lists.newArrayList();
public boolean isColumnDescsRewrited = false;

View File

@ -41,6 +41,8 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.StringReader;
import java.util.Arrays;
import java.util.List;
public class StreamLoadTask implements LoadTaskInfo {
@ -76,6 +78,7 @@ public class StreamLoadTask implements LoadTaskInfo {
private double maxFilterRatio = 0.0;
private boolean loadToSingleTablet = false;
private String headerType = "";
private List<String> hiddenColumns;
public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType, TFileFormatType formatType) {
this.id = id;
@ -228,6 +231,11 @@ public class StreamLoadTask implements LoadTaskInfo {
return sequenceCol;
}
@Override
public List<String> getHiddenColumns() {
return hiddenColumns;
}
public static StreamLoadTask fromTStreamLoadPutRequest(TStreamLoadPutRequest request) throws UserException {
StreamLoadTask streamLoadTask = new StreamLoadTask(request.getLoadId(), request.getTxnId(),
request.getFileType(), request.getFormatType());
@ -320,6 +328,9 @@ public class StreamLoadTask implements LoadTaskInfo {
if (request.isSetLoadToSingleTablet()) {
loadToSingleTablet = request.isLoadToSingleTablet();
}
if (request.isSetHiddenColumns()) {
hiddenColumns = Arrays.asList(request.getHiddenColumns().replaceAll("\\s+", "").split(","));
}
}
// used for stream load