[Bug] Fix bug that load data to wrong temp partitions (#3422)
When loading data without specifying partition, the data should only be loaded to formal partitions, not including temp partitions;
This commit is contained in:
@ -88,7 +88,8 @@ public class InsertStmt extends DdlStmt {
|
||||
|
||||
private final TableName tblName;
|
||||
private final PartitionNames targetPartitionNames;
|
||||
// parsed from targetPartitionNames. empty means no partition specified
|
||||
// parsed from targetPartitionNames.
|
||||
// if targetPartitionNames is not set, add all formal partitions' id of the table into it
|
||||
private List<Long> targetPartitionIds = Lists.newArrayList();
|
||||
private final List<String> targetColumnNames;
|
||||
private QueryStmt queryStmt;
|
||||
@ -334,6 +335,10 @@ public class InsertStmt extends DdlStmt {
|
||||
}
|
||||
targetPartitionIds.add(part.getId());
|
||||
}
|
||||
} else {
|
||||
for (Partition partition : olapTable.getPartitions()) {
|
||||
targetPartitionIds.add(partition.getId());
|
||||
}
|
||||
}
|
||||
// need a descriptor
|
||||
DescriptorTable descTable = analyzer.getDescTbl();
|
||||
|
||||
@ -25,6 +25,7 @@ import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.common.LoadException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.common.NotImplementedException;
|
||||
@ -175,6 +176,12 @@ public class LoadingTaskPlanner {
|
||||
break;
|
||||
}
|
||||
|
||||
if (partitionIds.isEmpty()) {
|
||||
for (Partition partition : table.getPartitions()) {
|
||||
partitionIds.add(partition.getId());
|
||||
}
|
||||
}
|
||||
|
||||
return Lists.newArrayList(partitionIds);
|
||||
}
|
||||
|
||||
|
||||
@ -62,6 +62,7 @@ import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Multimap;
|
||||
import com.google.common.collect.Range;
|
||||
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
@ -75,7 +76,7 @@ public class OlapTableSink extends DataSink {
|
||||
// input variables
|
||||
private OlapTable dstTable;
|
||||
private TupleDescriptor tupleDescriptor;
|
||||
// specified partition ids. empty means partition does not specified, so all partitions will be included.
|
||||
// specified partition ids. this list should not be empty and should contains all related partition ids
|
||||
private List<Long> partitionIds;
|
||||
|
||||
// set after init called
|
||||
@ -84,10 +85,8 @@ public class OlapTableSink extends DataSink {
|
||||
public OlapTableSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, List<Long> partitionIds) {
|
||||
this.dstTable = dstTable;
|
||||
this.tupleDescriptor = tupleDescriptor;
|
||||
Preconditions.checkState(!CollectionUtils.isEmpty(partitionIds));
|
||||
this.partitionIds = partitionIds;
|
||||
if (this.partitionIds == null) {
|
||||
this.partitionIds = Lists.newArrayList();
|
||||
}
|
||||
}
|
||||
|
||||
public void init(TUniqueId loadId, long txnId, long dbId, long loadChannelTimeoutS) throws AnalysisException {
|
||||
@ -100,11 +99,6 @@ public class OlapTableSink extends DataSink {
|
||||
tDataSink.setType(TDataSinkType.OLAP_TABLE_SINK);
|
||||
tDataSink.setOlap_table_sink(tSink);
|
||||
|
||||
// check partition
|
||||
if (!partitionIds.isEmpty() && dstTable.getPartitionInfo().getType() == PartitionType.UNPARTITIONED) {
|
||||
ErrorReport.reportAnalysisException(ErrorCode.ERR_PARTITION_CLAUSE_NO_ALLOWED);
|
||||
}
|
||||
|
||||
for (Long partitionId : partitionIds) {
|
||||
Partition part = dstTable.getPartition(partitionId);
|
||||
if (part == null) {
|
||||
@ -222,10 +216,9 @@ public class OlapTableSink extends DataSink {
|
||||
|
||||
int partColNum = rangePartitionInfo.getPartitionColumns().size();
|
||||
DistributionInfo selectedDistInfo = null;
|
||||
for (Partition partition : table.getAllPartitions()) {
|
||||
if (!partitionIds.isEmpty() && !partitionIds.contains(partition.getId())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
for (Long partitionId : partitionIds) {
|
||||
Partition partition = table.getPartition(partitionId);
|
||||
TOlapTablePartition tPartition = new TOlapTablePartition();
|
||||
tPartition.setId(partition.getId());
|
||||
Range<PartitionKey> range = rangePartitionInfo.getRange(partition.getId());
|
||||
@ -293,11 +286,8 @@ public class OlapTableSink extends DataSink {
|
||||
TOlapTableLocationParam locationParam = new TOlapTableLocationParam();
|
||||
// BE id -> path hash
|
||||
Multimap<Long, Long> allBePathsMap = HashMultimap.create();
|
||||
for (Partition partition : table.getAllPartitions()) {
|
||||
if (!partitionIds.isEmpty() && !partitionIds.contains(partition.getId())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
for (Long partitionId : partitionIds) {
|
||||
Partition partition = table.getPartition(partitionId);
|
||||
int quorum = table.getPartitionInfo().getReplicationNum(partition.getId()) / 2 + 1;
|
||||
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
|
||||
// we should ensure the replica backend is alive
|
||||
|
||||
@ -172,7 +172,8 @@ public class StreamLoadPlanner {
|
||||
return params;
|
||||
}
|
||||
|
||||
// get all specified partition ids. return an empty list if no partition is specified.
|
||||
// get all specified partition ids.
|
||||
// if no partition specified, return all partitions
|
||||
private List<Long> getAllPartitionIds() throws DdlException {
|
||||
List<Long> partitionIds = Lists.newArrayList();
|
||||
|
||||
@ -185,6 +186,10 @@ public class StreamLoadPlanner {
|
||||
}
|
||||
partitionIds.add(part.getId());
|
||||
}
|
||||
} else {
|
||||
for (Partition partition : destTable.getPartitions()) {
|
||||
partitionIds.add(partition.getId());
|
||||
}
|
||||
}
|
||||
return partitionIds;
|
||||
}
|
||||
|
||||
@ -95,9 +95,11 @@ public class OlapTableSinkTest {
|
||||
dstTable.getId(); result = 1;
|
||||
dstTable.getPartitionInfo(); result = partInfo;
|
||||
dstTable.getPartitions(); result = Lists.newArrayList(partition);
|
||||
dstTable.getPartition(2L);
|
||||
result = partition;
|
||||
}};
|
||||
|
||||
OlapTableSink sink = new OlapTableSink(dstTable, tuple, Lists.newArrayList());
|
||||
OlapTableSink sink = new OlapTableSink(dstTable, tuple, Lists.newArrayList(2L));
|
||||
sink.init(new TUniqueId(1, 2), 3, 4, 1000);
|
||||
sink.complete();
|
||||
LOG.info("sink is {}", sink.toThrift());
|
||||
@ -146,7 +148,6 @@ public class OlapTableSinkTest {
|
||||
|
||||
long unknownPartId = 12345L;
|
||||
new Expectations() {{
|
||||
partInfo.getType(); result = PartitionType.RANGE;
|
||||
dstTable.getPartition(unknownPartId); result = null;
|
||||
}};
|
||||
|
||||
@ -156,21 +157,4 @@ public class OlapTableSinkTest {
|
||||
LOG.info("sink is {}", sink.toThrift());
|
||||
LOG.info("{}", sink.getExplainString("", TExplainLevel.NORMAL));
|
||||
}
|
||||
|
||||
@Test(expected = UserException.class)
|
||||
public void testUnpartFail(
|
||||
@Injectable RangePartitionInfo partInfo,
|
||||
@Injectable MaterializedIndex index) throws UserException {
|
||||
TupleDescriptor tuple = getTuple();
|
||||
|
||||
new Expectations() {{
|
||||
partInfo.getType(); result = PartitionType.UNPARTITIONED;
|
||||
}};
|
||||
|
||||
OlapTableSink sink = new OlapTableSink(dstTable, tuple, Lists.newArrayList(1L));
|
||||
sink.init(new TUniqueId(1, 2), 3, 4, 1000);
|
||||
sink.complete();
|
||||
LOG.info("sink is {}", sink.toThrift());
|
||||
LOG.info("{}", sink.getExplainString("", TExplainLevel.NORMAL));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user