[branch-2.1](auto-partition) Fix auto partition load failure in multi replica (#36586)

this pr
1. picked #35630, which was reverted #36098 before.
2. picked #36344 from master

these two pr fixed existing bug about auto partition load.

---------

Co-authored-by: Kaijie Chen <ckj@apache.org>
This commit is contained in:
zclllyybb
2024-06-20 17:51:18 +08:00
committed by GitHub
parent 6df1a9ab75
commit bd47d5a681
28 changed files with 495 additions and 151 deletions

View File

@ -35,7 +35,7 @@ import java.util.Set;
import java.util.stream.Collectors;
public class ListPartitionItem extends PartitionItem {
public static ListPartitionItem DUMMY_ITEM = new ListPartitionItem(Lists.newArrayList());
public static final ListPartitionItem DUMMY_ITEM = new ListPartitionItem(Lists.newArrayList());
private final List<PartitionKey> partitionKeys;
private boolean isDefaultPartition = false;

View File

@ -87,6 +87,13 @@ public class PartitionKey implements Comparable<PartitionKey>, Writable {
return partitionKey;
}
public static PartitionKey createMaxPartitionKey() {
PartitionKey partitionKey = new PartitionKey();
partitionKey.keys.add(MaxLiteral.MAX_VALUE);
// type not set
return partitionKey;
}
public static PartitionKey createPartitionKey(List<PartitionValue> keys, List<Column> columns)
throws AnalysisException {
PartitionKey partitionKey = new PartitionKey();

View File

@ -30,10 +30,12 @@ import java.util.Optional;
public class RangePartitionItem extends PartitionItem {
private Range<PartitionKey> partitionKeyRange;
public static final Range<PartitionKey> DUMMY_ITEM;
public static final Range<PartitionKey> DUMMY_RANGE;
public static final RangePartitionItem DUMMY_ITEM;
static {
DUMMY_ITEM = Range.closed(new PartitionKey(), new PartitionKey());
DUMMY_RANGE = Range.closed(new PartitionKey(), new PartitionKey());
DUMMY_ITEM = new RangePartitionItem(Range.closed(new PartitionKey(), PartitionKey.createMaxPartitionKey()));
}
public RangePartitionItem(Range<PartitionKey> range) {

View File

@ -1698,12 +1698,12 @@ public class InternalCatalog implements CatalogIf<Database> {
isTempPartition, partitionInfo.getIsMutable(partitionId));
} else if (partitionInfo.getType() == PartitionType.LIST) {
info = new PartitionPersistInfo(db.getId(), olapTable.getId(), partition,
RangePartitionItem.DUMMY_ITEM, partitionInfo.getItem(partitionId), dataProperty,
RangePartitionItem.DUMMY_RANGE, partitionInfo.getItem(partitionId), dataProperty,
partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId),
isTempPartition, partitionInfo.getIsMutable(partitionId));
} else {
info = new PartitionPersistInfo(db.getId(), olapTable.getId(), partition,
RangePartitionItem.DUMMY_ITEM, ListPartitionItem.DUMMY_ITEM, dataProperty,
RangePartitionItem.DUMMY_RANGE, ListPartitionItem.DUMMY_ITEM, dataProperty,
partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId),
isTempPartition, partitionInfo.getIsMutable(partitionId));
}

View File

@ -339,18 +339,84 @@ public class OlapTableSink extends DataSink {
return distColumns;
}
private PartitionItem createDummyPartitionItem(PartitionType partType) throws UserException {
if (partType == PartitionType.LIST) {
return ListPartitionItem.DUMMY_ITEM;
} else if (partType == PartitionType.RANGE) {
return RangePartitionItem.DUMMY_ITEM;
} else {
throw new UserException("unsupported partition for OlapTable, partition=" + partType);
}
}
private TOlapTablePartitionParam createDummyPartition(long dbId, OlapTable table, Analyzer analyzer,
TOlapTablePartitionParam partitionParam, PartitionInfo partitionInfo, PartitionType partType)
throws UserException {
partitionParam.setEnableAutomaticPartition(true);
// these partitions only use in locations. not find partition.
partitionParam.setPartitionsIsFake(true);
// set columns
for (Column partCol : partitionInfo.getPartitionColumns()) {
partitionParam.addToPartitionColumns(partCol.getName());
}
int partColNum = partitionInfo.getPartitionColumns().size();
TOlapTablePartition fakePartition = new TOlapTablePartition();
fakePartition.setId(0);
// set partition keys
setPartitionKeys(fakePartition, createDummyPartitionItem(partType), partColNum);
for (Long indexId : table.getIndexIdToMeta().keySet()) {
fakePartition.addToIndexes(new TOlapTableIndexTablets(indexId, Arrays.asList(0L)));
fakePartition.setNumBuckets(1);
}
fakePartition.setIsMutable(true);
DistributionInfo distInfo = table.getDefaultDistributionInfo();
partitionParam.setDistributedColumns(getDistColumns(distInfo));
partitionParam.addToPartitions(fakePartition);
ArrayList<Expr> exprSource = partitionInfo.getPartitionExprs();
if (exprSource != null && analyzer != null) {
Analyzer funcAnalyzer = new Analyzer(analyzer.getEnv(), analyzer.getContext());
tupleDescriptor.setTable(table);
funcAnalyzer.registerTupleDescriptor(tupleDescriptor);
// we must clone the exprs. otherwise analyze will influence the origin exprs.
ArrayList<Expr> exprs = new ArrayList<Expr>();
for (Expr e : exprSource) {
exprs.add(e.clone());
}
for (Expr e : exprs) {
e.reset();
e.analyze(funcAnalyzer);
}
partitionParam.setPartitionFunctionExprs(Expr.treesToThrift(exprs));
}
return partitionParam;
}
public TOlapTablePartitionParam createPartition(long dbId, OlapTable table, Analyzer analyzer)
throws UserException {
TOlapTablePartitionParam partitionParam = new TOlapTablePartitionParam();
PartitionInfo partitionInfo = table.getPartitionInfo();
boolean enableAutomaticPartition = partitionInfo.enableAutomaticPartition();
PartitionType partType = table.getPartitionInfo().getType();
partitionParam.setDbId(dbId);
partitionParam.setTableId(table.getId());
partitionParam.setVersion(0);
partitionParam.setPartitionType(partType.toThrift());
// create shadow partition for empty auto partition table. only use in this load.
if (enableAutomaticPartition && partitionIds.isEmpty()) {
return createDummyPartition(dbId, table, analyzer, partitionParam, partitionInfo, partType);
}
PartitionType partType = table.getPartitionInfo().getType();
switch (partType) {
case LIST:
case RANGE: {
PartitionInfo partitionInfo = table.getPartitionInfo();
for (Column partCol : partitionInfo.getPartitionColumns()) {
partitionParam.addToPartitionColumns(partCol.getName());
}
@ -395,7 +461,6 @@ public class OlapTableSink extends DataSink {
}
}
}
boolean enableAutomaticPartition = partitionInfo.enableAutomaticPartition();
// for auto create partition by function expr, there is no any partition firstly,
// But this is required in thrift struct.
if (enableAutomaticPartition && partitionIds.isEmpty()) {
@ -464,7 +529,6 @@ public class OlapTableSink extends DataSink {
throw new UserException("unsupported partition for OlapTable, partition=" + partType);
}
}
partitionParam.setPartitionType(partType.toThrift());
return partitionParam;
}
@ -505,7 +569,46 @@ public class OlapTableSink extends DataSink {
}
}
public List<TOlapTableLocationParam> createDummyLocation(OlapTable table) throws UserException {
TOlapTableLocationParam locationParam = new TOlapTableLocationParam();
TOlapTableLocationParam slaveLocationParam = new TOlapTableLocationParam();
final long fakeTabletId = 0;
SystemInfoService clusterInfo = Env.getCurrentSystemInfo();
List<Long> aliveBe = clusterInfo.getAllBackendIds(true);
if (aliveBe.isEmpty()) {
throw new UserException(InternalErrorCode.REPLICA_FEW_ERR, "no available BE in cluster");
}
for (int i = 0; i < table.getIndexNumber(); i++) {
// only one fake tablet here
if (singleReplicaLoad) {
Long[] nodes = aliveBe.toArray(new Long[0]);
List<Long> slaveBe = aliveBe;
Random random = new SecureRandom();
int masterNode = random.nextInt(nodes.length);
locationParam.addToTablets(new TTabletLocation(fakeTabletId,
Arrays.asList(nodes[masterNode])));
slaveBe.remove(masterNode);
slaveLocationParam.addToTablets(new TTabletLocation(fakeTabletId,
slaveBe));
} else {
locationParam.addToTablets(new TTabletLocation(fakeTabletId,
Arrays.asList(aliveBe.get(0)))); // just one fake location is enough
LOG.info("created dummy location tablet_id={}, be_id={}", fakeTabletId, aliveBe.get(0));
}
}
return Arrays.asList(locationParam, slaveLocationParam);
}
public List<TOlapTableLocationParam> createLocation(OlapTable table) throws UserException {
if (table.getPartitionInfo().enableAutomaticPartition() && partitionIds.isEmpty()) {
return createDummyLocation(table);
}
TOlapTableLocationParam locationParam = new TOlapTableLocationParam();
TOlapTableLocationParam slaveLocationParam = new TOlapTableLocationParam();
// BE id -> path hash

View File

@ -3511,7 +3511,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
if (!Env.getCurrentEnv().isMaster()) {
errorStatus.setStatusCode(TStatusCode.NOT_MASTER);
errorStatus.addToErrorMsgs(NOT_MASTER_ERR_MSG);
LOG.warn("failed to createPartition: {}", NOT_MASTER_ERR_MSG);
LOG.warn("failed to replace Partition: {}", NOT_MASTER_ERR_MSG);
return result;
}
@ -3546,10 +3546,8 @@ public class FrontendServiceImpl implements FrontendService.Iface {
List<String> allReqPartNames; // all request partitions
try {
taskLock.lock();
// we dont lock the table. other thread in this txn will be controled by
// taskLock.
// if we have already replaced. dont do it again, but acquire the recorded new
// partition directly.
// we dont lock the table. other thread in this txn will be controled by taskLock.
// if we have already replaced. dont do it again, but acquire the recorded new partition directly.
// if not by this txn, just let it fail naturally is ok.
List<Long> replacedPartIds = overwriteManager.tryReplacePartitionIds(taskGroupId, partitionIds);
// here if replacedPartIds still have null. this will throw exception.
@ -3559,8 +3557,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
.filter(i -> partitionIds.get(i) == replacedPartIds.get(i)) // equal means not replaced
.mapToObj(partitionIds::get)
.collect(Collectors.toList());
// from here we ONLY deal the pending partitions. not include the dealed(by
// others).
// from here we ONLY deal the pending partitions. not include the dealed(by others).
if (!pendingPartitionIds.isEmpty()) {
// below two must have same order inner.
List<String> pendingPartitionNames = olapTable.uncheckedGetPartNamesById(pendingPartitionIds);
@ -3571,8 +3568,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
overwriteManager.registerTaskInGroup(taskGroupId, taskId);
InsertOverwriteUtil.addTempPartitions(olapTable, pendingPartitionNames, tempPartitionNames);
InsertOverwriteUtil.replacePartition(olapTable, pendingPartitionNames, tempPartitionNames);
// now temp partitions are bumped up and use new names. we get their ids and
// record them.
// now temp partitions are bumped up and use new names. we get their ids and record them.
List<Long> newPartitionIds = new ArrayList<Long>();
for (String newPartName : pendingPartitionNames) {
newPartitionIds.add(olapTable.getPartition(newPartName).getId());