[fix](load) add scan tuple for stream load scan node only when vectorization is enable (#12578)

This commit is contained in:
Mingyu Chen
2022-09-15 08:44:39 +08:00
committed by GitHub
parent beeb0ef3eb
commit 8aa5899484
2 changed files with 24 additions and 19 deletions

View File

@ -146,12 +146,12 @@ public class LoadingTaskPlanner {
// 1. Broker scan node
ScanNode scanNode;
if (Config.enable_new_load_scan_node) {
scanNode = new ExternalFileScanNode(new PlanNodeId(nextNodeId++), destTupleDesc, "FileScanNode");
scanNode = new ExternalFileScanNode(new PlanNodeId(nextNodeId++), scanTupleDesc, "FileScanNode");
((ExternalFileScanNode) scanNode).setLoadInfo(loadJobId, txnId, table, brokerDesc, fileGroups,
fileStatusesList, filesAdded, strictMode, loadParallelism, userInfo);
} else {
scanNode = new BrokerScanNode(new PlanNodeId(nextNodeId++), destTupleDesc, "BrokerScanNode",
fileStatusesList, filesAdded);
scanNode = new BrokerScanNode(new PlanNodeId(nextNodeId++), scanTupleDesc, "BrokerScanNode",
fileStatusesList, filesAdded);
((BrokerScanNode) scanNode).setLoadInfo(loadJobId, txnId, table, brokerDesc, fileGroups, strictMode,
loadParallelism, userInfo);
}

View File

@ -125,11 +125,14 @@ public class StreamLoadPlanner {
throw new UserException("There is no sequence column in the table " + destTable.getName());
}
resetAnalyzer();
// note: we use two tuples separately for Scan and Sink here to avoid wrong nullable info.
// construct tuple descriptor, used for scanNode
scanTupleDesc = descTable.createTupleDescriptor("ScanTuple");
// construct tuple descriptor, used for dataSink
tupleDesc = descTable.createTupleDescriptor("DstTableTuple");
TupleDescriptor scanTupleDesc = tupleDesc;
if (Config.enable_vectorized_load) {
// note: we use two tuples separately for Scan and Sink here to avoid wrong nullable info.
// construct tuple descriptor, used for scanNode
scanTupleDesc = descTable.createTupleDescriptor("ScanTuple");
}
boolean negative = taskInfo.getNegative();
// here we should be full schema to fill the descriptor table
for (Column col : destTable.getFullSchema()) {
@ -138,20 +141,22 @@ public class StreamLoadPlanner {
slotDesc.setColumn(col);
slotDesc.setIsNullable(col.isAllowNull());
SlotDescriptor scanSlotDesc = descTable.addSlotDescriptor(scanTupleDesc);
scanSlotDesc.setIsMaterialized(true);
scanSlotDesc.setColumn(col);
scanSlotDesc.setIsNullable(col.isAllowNull());
for (ImportColumnDesc importColumnDesc : taskInfo.getColumnExprDescs().descs) {
try {
if (!importColumnDesc.isColumn() && importColumnDesc.getColumnName() != null
&& importColumnDesc.getColumnName().equals(col.getName())) {
scanSlotDesc.setIsNullable(importColumnDesc.getExpr().isNullable());
break;
if (Config.enable_vectorized_load) {
SlotDescriptor scanSlotDesc = descTable.addSlotDescriptor(scanTupleDesc);
scanSlotDesc.setIsMaterialized(true);
scanSlotDesc.setColumn(col);
scanSlotDesc.setIsNullable(col.isAllowNull());
for (ImportColumnDesc importColumnDesc : taskInfo.getColumnExprDescs().descs) {
try {
if (!importColumnDesc.isColumn() && importColumnDesc.getColumnName() != null
&& importColumnDesc.getColumnName().equals(col.getName())) {
scanSlotDesc.setIsNullable(importColumnDesc.getExpr().isNullable());
break;
}
} catch (Exception e) {
// An exception may be thrown here because the `importColumnDesc.getExpr()` is not analyzed now.
// We just skip this case here.
}
} catch (Exception e) {
// An exception may be thrown here because the `importColumnDesc.getExpr()` is not analyzed now.
// We just skip this case here.
}
}
if (negative && !col.isKey() && col.getAggregationType() != AggregateType.SUM) {