[Fix](multi catalog)Support hive default partition. (#17179)

Hive store all the data without partition columns to a default partition named __HIVE_DEFAULT_PARTITION__.
Doris will fail to get the this partition when the partition column type is INT or something else that 
__HIVE_DEFAULT_PARTITION__ couldn't convert to. 
This pr is to support hive default partition, set the column value to NULL for the missing partition columns.
This commit is contained in:
Jibing-Li
2023-02-28 00:08:29 +08:00
committed by GitHub
parent d3a6cab716
commit dd1bd6d8f1
9 changed files with 278 additions and 3 deletions

View File

@ -24,6 +24,7 @@ public class PartitionValue {
public static final PartitionValue MAX_VALUE = new PartitionValue();
private String value;
private boolean isHiveDefaultPartition;
private PartitionValue() {
@ -33,7 +34,15 @@ public class PartitionValue {
this.value = value;
}
public PartitionValue(String value, boolean isHiveDefaultPartition) {
this.value = value;
this.isHiveDefaultPartition = isHiveDefaultPartition;
}
public LiteralExpr getValue(Type type) throws AnalysisException {
if (isHiveDefaultPartition) {
return new StringLiteral(value);
}
if (isMax()) {
return LiteralExpr.createInfinity(type, true);
} else {
@ -52,4 +61,8 @@ public class PartitionValue {
return value;
}
}
public boolean isHiveDefaultPartition() {
return isHiveDefaultPartition;
}
}

View File

@ -26,8 +26,10 @@ import org.apache.doris.catalog.FsBroker;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TBrokerCheckPathExistRequest;
import org.apache.doris.thrift.TBrokerCheckPathExistResponse;
@ -151,7 +153,8 @@ public class BrokerUtil {
if (index == -1) {
continue;
}
columns[index] = pair[1];
columns[index] = HiveMetaStoreCache.HIVE_DEFAULT_PARTITION.equals(pair[1])
? FeConstants.null_string : pair[1];
size++;
if (size >= columnsFromPath.size()) {
break;

View File

@ -80,6 +80,7 @@ import java.util.stream.Stream;
public class HiveMetaStoreCache {
private static final Logger LOG = LogManager.getLogger(HiveMetaStoreCache.class);
private static final int MIN_BATCH_FETCH_PARTITION_NUM = 50;
public static final String HIVE_DEFAULT_PARTITION = "__HIVE_DEFAULT_PARTITION__";
private HMSExternalCatalog catalog;
@ -207,7 +208,7 @@ public class HiveMetaStoreCache {
for (String part : parts) {
String[] kv = part.split("=");
Preconditions.checkState(kv.length == 2, partitionName);
values.add(new PartitionValue(kv[1]));
values.add(new PartitionValue(kv[1], HIVE_DEFAULT_PARTITION.equals(kv[1])));
}
try {
PartitionKey key = PartitionKey.createListPartitionKeyWithTypes(values, types);

View File

@ -79,6 +79,19 @@ public class ListPartitionPrunerV2 extends PartitionPrunerV2Base {
this.rangeToId = rangeToId;
}
// For hive partition table.
public ListPartitionPrunerV2(Map<Long, PartitionItem> idToPartitionItem,
List<Column> partitionColumns,
Map<String, ColumnRange> columnNameToRange,
Map<UniqueId, Range<PartitionKey>> uidToPartitionRange,
Map<Range<PartitionKey>, UniqueId> rangeToId,
RangeMap<ColumnBound, UniqueId> singleColumnRangeMap,
boolean isHive) {
super(idToPartitionItem, partitionColumns, columnNameToRange, singleColumnRangeMap, isHive);
this.uidToPartitionRange = uidToPartitionRange;
this.rangeToId = rangeToId;
}
public static Map<UniqueId, Range<PartitionKey>> genUidToPartitionRange(
Map<Long, PartitionItem> idToPartitionItem, Map<Long, List<UniqueId>> idToUniqueIdsMap) {
Map<UniqueId, Range<PartitionKey>> uidToPartitionRange = Maps.newHashMap();
@ -147,6 +160,11 @@ public class ListPartitionPrunerV2 extends PartitionPrunerV2Base {
Optional<RangeSet<ColumnBound>> rangeSetOpt = columnRange.getRangeSet();
if (columnRange.hasConjunctiveIsNull() || !rangeSetOpt.isPresent()) {
// For Hive external table, partition column could be null.
// In which case, the data will be put to a default partition __HIVE_DEFAULT_PARTITION__
if (isHive) {
return FinalFilters.noFilters();
}
return FinalFilters.constantFalseFilters();
} else {
RangeSet<ColumnBound> rangeSet = rangeSetOpt.get();

View File

@ -43,6 +43,8 @@ public abstract class PartitionPrunerV2Base implements PartitionPruner {
protected final Map<String, ColumnRange> columnNameToRange;
// used for single column partition
protected RangeMap<ColumnBound, UniqueId> singleColumnRangeMap = null;
// Flag to indicate if this pruner is for hive partition or not.
protected boolean isHive = false;
// currently only used for list partition
private Map.Entry<Long, PartitionItem> defaultPartition;
@ -83,6 +85,18 @@ public abstract class PartitionPrunerV2Base implements PartitionPruner {
.orElse(null);
}
public PartitionPrunerV2Base(Map<Long, PartitionItem> idToPartitionItem,
List<Column> partitionColumns,
Map<String, ColumnRange> columnNameToRange,
RangeMap<ColumnBound, UniqueId> singleColumnRangeMap,
boolean isHive) {
this.idToPartitionItem = idToPartitionItem;
this.partitionColumns = partitionColumns;
this.columnNameToRange = columnNameToRange;
this.singleColumnRangeMap = singleColumnRangeMap;
this.isHive = isHive;
}
@Override
public Collection<Long> prune() throws AnalysisException {
Map<Column, FinalFilters> columnToFilters = Maps.newHashMap();

View File

@ -86,6 +86,11 @@ public class RangePartitionPrunerV2 extends PartitionPrunerV2Base {
Optional<RangeSet<ColumnBound>> rangeSetOpt = columnRange.getRangeSet();
if (columnRange.hasConjunctiveIsNull()) {
if (!rangeSetOpt.isPresent()) {
// For Hive external table, partition column could be null.
// In which case, the data will be put to a default partition __HIVE_DEFAULT_PARTITION__
if (isHive) {
return FinalFilters.noFilters();
}
// Only has conjunctive `is null` predicate.
return FinalFilters.create(Sets.newHashSet(getMinInfinityRange(column)));
} else {

View File

@ -161,7 +161,8 @@ public class HiveScanProvider extends HMSTableScanProvider {
hmsTable.getPartitionColumns(), columnNameToRange,
hivePartitionValues.getUidToPartitionRange(),
hivePartitionValues.getRangeToId(),
hivePartitionValues.getSingleColumnRangeMap());
hivePartitionValues.getSingleColumnRangeMap(),
true);
Collection<Long> filteredPartitionIds = pruner.prune();
this.readPartitionNum = filteredPartitionIds.size();
LOG.debug("hive partition fetch and prune for table {}.{} cost: {} ms",