From 8aa58994845c92ecd4d9fe0a8b82fbb48cc05666 Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Thu, 15 Sep 2022 08:44:39 +0800 Subject: [PATCH] [fix](load) add scan tuple for stream load scan node only when vectorization is enable (#12578) --- .../doris/load/loadv2/LoadingTaskPlanner.java | 6 +-- .../doris/planner/StreamLoadPlanner.java | 37 +++++++++++-------- 2 files changed, 24 insertions(+), 19 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java index 535ec50eed..06491c7991 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java @@ -146,12 +146,12 @@ public class LoadingTaskPlanner { // 1. Broker scan node ScanNode scanNode; if (Config.enable_new_load_scan_node) { - scanNode = new ExternalFileScanNode(new PlanNodeId(nextNodeId++), destTupleDesc, "FileScanNode"); + scanNode = new ExternalFileScanNode(new PlanNodeId(nextNodeId++), scanTupleDesc, "FileScanNode"); ((ExternalFileScanNode) scanNode).setLoadInfo(loadJobId, txnId, table, brokerDesc, fileGroups, fileStatusesList, filesAdded, strictMode, loadParallelism, userInfo); } else { - scanNode = new BrokerScanNode(new PlanNodeId(nextNodeId++), destTupleDesc, "BrokerScanNode", - fileStatusesList, filesAdded); + scanNode = new BrokerScanNode(new PlanNodeId(nextNodeId++), scanTupleDesc, "BrokerScanNode", + fileStatusesList, filesAdded); ((BrokerScanNode) scanNode).setLoadInfo(loadJobId, txnId, table, brokerDesc, fileGroups, strictMode, loadParallelism, userInfo); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index 8b407fdc6d..aa1e43ac5c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -125,11 +125,14 @@ public class StreamLoadPlanner { throw new UserException("There is no sequence column in the table " + destTable.getName()); } resetAnalyzer(); - // note: we use two tuples separately for Scan and Sink here to avoid wrong nullable info. - // construct tuple descriptor, used for scanNode - scanTupleDesc = descTable.createTupleDescriptor("ScanTuple"); // construct tuple descriptor, used for dataSink tupleDesc = descTable.createTupleDescriptor("DstTableTuple"); + TupleDescriptor scanTupleDesc = tupleDesc; + if (Config.enable_vectorized_load) { + // note: we use two tuples separately for Scan and Sink here to avoid wrong nullable info. + // construct tuple descriptor, used for scanNode + scanTupleDesc = descTable.createTupleDescriptor("ScanTuple"); + } boolean negative = taskInfo.getNegative(); // here we should be full schema to fill the descriptor table for (Column col : destTable.getFullSchema()) { @@ -138,20 +141,22 @@ public class StreamLoadPlanner { slotDesc.setColumn(col); slotDesc.setIsNullable(col.isAllowNull()); - SlotDescriptor scanSlotDesc = descTable.addSlotDescriptor(scanTupleDesc); - scanSlotDesc.setIsMaterialized(true); - scanSlotDesc.setColumn(col); - scanSlotDesc.setIsNullable(col.isAllowNull()); - for (ImportColumnDesc importColumnDesc : taskInfo.getColumnExprDescs().descs) { - try { - if (!importColumnDesc.isColumn() && importColumnDesc.getColumnName() != null - && importColumnDesc.getColumnName().equals(col.getName())) { - scanSlotDesc.setIsNullable(importColumnDesc.getExpr().isNullable()); - break; + if (Config.enable_vectorized_load) { + SlotDescriptor scanSlotDesc = descTable.addSlotDescriptor(scanTupleDesc); + scanSlotDesc.setIsMaterialized(true); + scanSlotDesc.setColumn(col); + scanSlotDesc.setIsNullable(col.isAllowNull()); + for (ImportColumnDesc importColumnDesc : taskInfo.getColumnExprDescs().descs) { + try { + if (!importColumnDesc.isColumn() && importColumnDesc.getColumnName() != null + && importColumnDesc.getColumnName().equals(col.getName())) { + scanSlotDesc.setIsNullable(importColumnDesc.getExpr().isNullable()); + break; + } + } catch (Exception e) { + // An exception may be thrown here because the `importColumnDesc.getExpr()` is not analyzed now. + // We just skip this case here. } - } catch (Exception e) { - // An exception may be thrown here because the `importColumnDesc.getExpr()` is not analyzed now. - // We just skip this case here. } } if (negative && !col.isKey() && col.getAggregationType() != AggregateType.SUM) {