[FOLLOWUP](load) fix nullable and add regression (#12375)

* [FOLLOWUP](load) fix nullable and add regression
This commit is contained in:
Gabriel
2022-09-08 00:05:04 +08:00
committed by GitHub
parent 86e347f3bb
commit a536030979
5 changed files with 91 additions and 1 deletions

View File

@ -20,6 +20,7 @@ package org.apache.doris.load.loadv2;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.UserIdentity;
@ -108,17 +109,37 @@ public class LoadingTaskPlanner {
throws UserException {
// Generate tuple descriptor
TupleDescriptor destTupleDesc = descTable.createTupleDescriptor();
TupleDescriptor scanTupleDesc = descTable.createTupleDescriptor("ScanTuple");
// use full schema to fill the descriptor table
for (Column col : table.getFullSchema()) {
SlotDescriptor slotDesc = descTable.addSlotDescriptor(destTupleDesc);
slotDesc.setIsMaterialized(true);
slotDesc.setColumn(col);
slotDesc.setIsNullable(col.isAllowNull());
SlotDescriptor scanSlotDesc = descTable.addSlotDescriptor(scanTupleDesc);
scanSlotDesc.setIsMaterialized(true);
scanSlotDesc.setColumn(col);
scanSlotDesc.setIsNullable(col.isAllowNull());
if (fileGroups.size() > 0) {
for (ImportColumnDesc importColumnDesc : fileGroups.get(0).getColumnExprList()) {
try {
if (!importColumnDesc.isColumn() && importColumnDesc.getColumnName() != null
&& importColumnDesc.getColumnName().equals(col.getName())) {
scanSlotDesc.setIsNullable(importColumnDesc.getExpr().isNullable());
break;
}
} catch (Exception e) {
// An exception may be thrown here because the `importColumnDesc.getExpr()` is not analyzed
// now. We just skip this case here.
}
}
}
}
// Generate plan trees
// 1. Broker scan node
BrokerScanNode scanNode = new BrokerScanNode(new PlanNodeId(nextNodeId++), destTupleDesc, "BrokerScanNode",
BrokerScanNode scanNode = new BrokerScanNode(new PlanNodeId(nextNodeId++), scanTupleDesc, "BrokerScanNode",
fileStatusesList, filesAdded);
scanNode.setLoadInfo(loadJobId, txnId, table, brokerDesc, fileGroups, strictMode, loadParallelism, userInfo);
scanNode.init(analyzer);