Fix hadoop load failed when enable batch delete in unique table (#6996)
This commit is contained in:
@ -83,6 +83,7 @@ import org.apache.doris.common.util.MetaLockUtils;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.load.FailMsg.CancelType;
|
||||
import org.apache.doris.load.LoadJob.JobState;
|
||||
import org.apache.doris.load.loadv2.LoadTask;
|
||||
import org.apache.doris.metric.MetricRepo;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.persist.ReplicaPersistInfo;
|
||||
@ -581,6 +582,11 @@ public class Load {
|
||||
// set default timeout
|
||||
job.setTimeoutSecond(Config.hadoop_load_default_timeout_second);
|
||||
}
|
||||
for (DataDescription dataDescription : dataDescriptions) {
|
||||
if (dataDescription.getMergeType() != LoadTask.MergeType.APPEND) {
|
||||
throw new DdlException("MERGE OR DELETE is not supported in hadoop load.");
|
||||
}
|
||||
}
|
||||
} else if (etlJobType == EtlJobType.BROKER) {
|
||||
if (job.getTimeoutSecond() == 0) {
|
||||
// set default timeout
|
||||
@ -758,6 +764,23 @@ public class Load {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
} else if (!column.isVisible()) {
|
||||
/*
|
||||
* For batch delete table add hidden column __DORIS_DELETE_SIGN__ to columns
|
||||
* eg:
|
||||
* (A, B, C)
|
||||
* ->
|
||||
* (A, B, C) SET (__DORIS_DELETE_SIGN__ = 0)
|
||||
*/
|
||||
columnToHadoopFunction.put(column.getName(), Pair.create("default_value", Lists.newArrayList(column.getDefaultValue())));
|
||||
ImportColumnDesc importColumnDesc = null;
|
||||
try {
|
||||
importColumnDesc = new ImportColumnDesc(column.getName(),
|
||||
new FunctionCallExpr("default_value", Arrays.asList(column.getDefaultValueExpr())));
|
||||
} catch (AnalysisException e) {
|
||||
throw new DdlException(e.getMessage());
|
||||
}
|
||||
parsedColumnExprList.add(importColumnDesc);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -169,7 +169,7 @@ public class HadoopLoadPendingTask extends LoadPendingTask {
|
||||
|
||||
private Map<String, EtlColumn> createEtlColumns(OlapTable table) {
|
||||
Map<String, EtlColumn> etlColumns = Maps.newHashMap();
|
||||
for (Column column : table.getBaseSchema()) {
|
||||
for (Column column : table.getBaseSchema(true)) {
|
||||
etlColumns.put(column.getName(), new EtlColumn(column));
|
||||
}
|
||||
return etlColumns;
|
||||
@ -225,7 +225,6 @@ public class HadoopLoadPendingTask extends LoadPendingTask {
|
||||
}
|
||||
columnRefs.add(dppColumn);
|
||||
}
|
||||
|
||||
// distribution infos
|
||||
DistributionInfo distributionInfo = partition.getDistributionInfo();
|
||||
List<String> distributionColumnRefs = Lists.newArrayList();
|
||||
@ -266,7 +265,6 @@ public class HadoopLoadPendingTask extends LoadPendingTask {
|
||||
LOG.warn("unknown distribution type. type: {}", distributionInfo.getType().name());
|
||||
throw new LoadException("unknown distribution type. type: " + distributionInfo.getType().name());
|
||||
}
|
||||
|
||||
etlIndex.setPidKeyCount(keySize);
|
||||
etlIndex.setColumnRefs(columnRefs);
|
||||
etlIndices.put(String.valueOf(indexId), etlIndex);
|
||||
|
||||
Reference in New Issue
Block a user