diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveBucketUtil.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveBucketUtil.java new file mode 100644 index 0000000000..49823afabf --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveBucketUtil.java @@ -0,0 +1,402 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + +import org.apache.doris.analysis.BinaryPredicate; +import org.apache.doris.analysis.CompoundPredicate; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.InPredicate; +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.common.DdlException; +import org.apache.doris.thrift.TExprOpcode; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hive.common.util.Murmur3; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class HiveBucketUtil { + private static final Logger LOG = LogManager.getLogger(HiveBucketUtil.class); + + private static final Set SUPPORTED_TYPES_FOR_BUCKET_FILTER = ImmutableSet.of( + PrimitiveType.BOOLEAN, + PrimitiveType.TINYINT, + PrimitiveType.SMALLINT, + PrimitiveType.INT, + PrimitiveType.BIGINT, + PrimitiveType.STRING); + + private static PrimitiveTypeInfo convertToHiveColType(PrimitiveType dorisType) throws DdlException { + switch (dorisType) { + case BOOLEAN: + return TypeInfoFactory.booleanTypeInfo; + case TINYINT: + return TypeInfoFactory.byteTypeInfo; + case SMALLINT: + return TypeInfoFactory.shortTypeInfo; + case INT: + return TypeInfoFactory.intTypeInfo; + case BIGINT: + return TypeInfoFactory.longTypeInfo; + case STRING: + return TypeInfoFactory.stringTypeInfo; + default: + throw new DdlException("Unsupported pruning bucket column type: " + dorisType); + } + } + + private static final Pattern BUCKET_WITH_OPTIONAL_ATTEMPT_ID_PATTERN = + Pattern.compile("bucket_(\\d+)(_\\d+)?$"); + + private static final Iterable BUCKET_PATTERNS = ImmutableList.of( + // legacy Presto naming pattern (current version matches Hive) + Pattern.compile("\\d{8}_\\d{6}_\\d{5}_[a-z0-9]{5}_bucket-(\\d+)(?:[-_.].*)?"), + // Hive naming pattern per `org.apache.hadoop.hive.ql.exec.Utilities#getBucketIdFromFile()` + Pattern.compile("(\\d+)_\\d+.*"), + // Hive ACID with optional direct insert attempt id + BUCKET_WITH_OPTIONAL_ATTEMPT_ID_PATTERN); + + public static List getPrunedSplitsByBuckets( + List splits, + String tableName, + List conjuncts, + List bucketCols, + int numBuckets, + Map parameters) throws DdlException { + Optional> prunedBuckets = HiveBucketUtil.getPrunedBuckets( + conjuncts, bucketCols, numBuckets, parameters); + if (!prunedBuckets.isPresent()) { + return splits; + } + Set buckets = prunedBuckets.get(); + if (buckets.size() == 0) { + return Collections.emptyList(); + } + List result = new LinkedList<>(); + boolean valid = true; + for (InputSplit split : splits) { + String fileName = ((FileSplit) split).getPath().getName(); + OptionalInt bucket = getBucketNumberFromPath(fileName); + if (bucket.isPresent()) { + int bucketId = bucket.getAsInt(); + if (bucketId >= numBuckets) { + valid = false; + LOG.debug("Hive table {} is corrupt for file {}(bucketId={}), skip bucket pruning.", + tableName, fileName, bucketId); + break; + } + if (buckets.contains(bucketId)) { + result.add(split); + } + } else { + valid = false; + LOG.debug("File {} is not a bucket file in hive table {}, skip bucket pruning.", fileName, tableName); + break; + } + } + if (valid) { + LOG.debug("{} / {} input splits in hive table {} after bucket pruning.", + result.size(), splits.size(), tableName); + return result; + } else { + return splits; + } + } + + public static Optional> getPrunedBuckets( + List conjuncts, List bucketCols, int numBuckets, Map parameters) + throws DdlException { + if (parameters.containsKey("spark.sql.sources.provider")) { + // spark currently does not populate bucketed output which is compatible with Hive. + return Optional.empty(); + } + int bucketVersion = Integer.parseInt(parameters.getOrDefault("bucketing_version", "1")); + Optional> result = Optional.empty(); + for (Expr conjunct : conjuncts) { + Optional> buckets = getPrunedBuckets(conjunct, bucketCols, bucketVersion, numBuckets); + if (buckets.isPresent()) { + if (!result.isPresent()) { + result = Optional.of(new HashSet<>(buckets.get())); + } else { + result.get().retainAll(buckets.get()); + } + } + } + return result; + } + + public static Optional> getPrunedBuckets( + Expr dorisExpr, List bucketCols, int bucketVersion, int numBuckets) throws DdlException { + // TODO(gaoxin): support multiple bucket columns + if (dorisExpr == null || bucketCols == null || bucketCols.size() != 1) { + return Optional.empty(); + } + String bucketCol = bucketCols.get(0); + if (dorisExpr instanceof CompoundPredicate) { + CompoundPredicate compoundPredicate = (CompoundPredicate) dorisExpr; + Optional> result = Optional.empty(); + Optional> left = getPrunedBuckets( + compoundPredicate.getChild(0), bucketCols, bucketVersion, numBuckets); + Optional> right = getPrunedBuckets( + compoundPredicate.getChild(1), bucketCols, bucketVersion, numBuckets); + if (left.isPresent()) { + result = Optional.of(new HashSet<>(left.get())); + } + switch (compoundPredicate.getOp()) { + case AND: { + if (right.isPresent()) { + if (result.isPresent()) { + result.get().retainAll(right.get()); + } else { + result = Optional.of(new HashSet<>(right.get())); + } + } + break; + } + case OR: { + if (right.isPresent()) { + if (result.isPresent()) { + result.get().addAll(right.get()); + } + } else { + result = Optional.empty(); + } + break; + } + default: + result = Optional.empty(); + } + return result; + } else if (dorisExpr instanceof BinaryPredicate || dorisExpr instanceof InPredicate) { + return pruneBucketsFromPredicate(dorisExpr, bucketCol, bucketVersion, numBuckets); + } else { + return Optional.empty(); + } + } + + private static Optional> getPrunedBucketsFromLiteral( + SlotRef slotRef, LiteralExpr literalExpr, String bucketCol, int bucketVersion, int numBuckets) + throws DdlException { + if (slotRef == null || literalExpr == null) { + return Optional.empty(); + } + String colName = slotRef.getColumnName(); + // check whether colName is bucket column or not + if (!bucketCol.equals(colName)) { + return Optional.empty(); + } + PrimitiveType dorisPrimitiveType = slotRef.getType().getPrimitiveType(); + if (!SUPPORTED_TYPES_FOR_BUCKET_FILTER.contains(dorisPrimitiveType)) { + return Optional.empty(); + } + Object value = HiveMetaStoreClientHelper.extractDorisLiteral(literalExpr); + if (value == null) { + return Optional.empty(); + } + PrimitiveObjectInspector constOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector( + convertToHiveColType(dorisPrimitiveType).getPrimitiveCategory()); + PrimitiveObjectInspector origOI = + PrimitiveObjectInspectorFactory.getPrimitiveObjectInspectorFromClass(value.getClass()); + Converter conv = ObjectInspectorConverters.getConverter(origOI, constOI); + if (conv == null) { + return Optional.empty(); + } + Object[] convCols = new Object[] {conv.convert(value)}; + int bucketId = getBucketNumber(convCols, new ObjectInspector[]{constOI}, bucketVersion, numBuckets); + return Optional.of(ImmutableSet.of(bucketId)); + } + + private static Optional> pruneBucketsFromPredicate( + Expr dorisExpr, String bucketCol, int bucketVersion, int numBuckets) throws DdlException { + TExprOpcode opcode = dorisExpr.getOpcode(); + switch (opcode) { + case EQ: { + // Make sure the col slot is always first + SlotRef slotRef = HiveMetaStoreClientHelper.convertDorisExprToSlotRef(dorisExpr.getChild(0)); + LiteralExpr literalExpr = + HiveMetaStoreClientHelper.convertDorisExprToLiteralExpr(dorisExpr.getChild(1)); + return getPrunedBucketsFromLiteral(slotRef, literalExpr, bucketCol, bucketVersion, numBuckets); + } + case FILTER_IN: { + SlotRef slotRef = HiveMetaStoreClientHelper.convertDorisExprToSlotRef(dorisExpr.getChild(0)); + Optional> result = Optional.empty(); + for (int i = 1; i < dorisExpr.getChildren().size(); i++) { + LiteralExpr literalExpr = + HiveMetaStoreClientHelper.convertDorisExprToLiteralExpr(dorisExpr.getChild(i)); + Optional> childBucket = + getPrunedBucketsFromLiteral(slotRef, literalExpr, bucketCol, bucketVersion, numBuckets); + if (childBucket.isPresent()) { + if (result.isPresent()) { + result.get().addAll(childBucket.get()); + } else { + result = Optional.of(new HashSet<>(childBucket.get())); + } + } else { + return Optional.empty(); + } + } + return result; + } + default: + return Optional.empty(); + } + } + + private static int getBucketNumber( + Object[] bucketFields, ObjectInspector[] bucketFieldInspectors, int bucketVersion, int numBuckets) + throws DdlException { + int hashCode = bucketVersion == 2 ? getBucketHashCodeV2(bucketFields, bucketFieldInspectors) + : getBucketHashCodeV1(bucketFields, bucketFieldInspectors); + return (hashCode & Integer.MAX_VALUE) % numBuckets; + } + + private static int getBucketHashCodeV1(Object[] bucketFields, ObjectInspector[] bucketFieldInspectors) + throws DdlException { + int hashCode = 0; + for (int i = 0; i < bucketFields.length; i++) { + int fieldHash = hashCodeV1(bucketFields[i], bucketFieldInspectors[i]); + hashCode = 31 * hashCode + fieldHash; + } + return hashCode; + } + + private static int getBucketHashCodeV2(Object[] bucketFields, ObjectInspector[] bucketFieldInspectors) + throws DdlException { + int hashCode = 0; + ByteBuffer b = ByteBuffer.allocate(8); // To be used with primitive types + for (int i = 0; i < bucketFields.length; i++) { + int fieldHash = hashCodeV2(bucketFields[i], bucketFieldInspectors[i], b); + hashCode = 31 * hashCode + fieldHash; + } + return hashCode; + } + + private static int hashCodeV1(Object o, ObjectInspector objIns) throws DdlException { + if (o == null) { + return 0; + } + if (objIns.getCategory() == Category.PRIMITIVE) { + PrimitiveObjectInspector poi = ((PrimitiveObjectInspector) objIns); + switch (poi.getPrimitiveCategory()) { + case BOOLEAN: + return ((BooleanObjectInspector) poi).get(o) ? 1 : 0; + case BYTE: + return ((ByteObjectInspector) poi).get(o); + case SHORT: + return ((ShortObjectInspector) poi).get(o); + case INT: + return ((IntObjectInspector) poi).get(o); + case LONG: { + long a = ((LongObjectInspector) poi).get(o); + return (int) ((a >>> 32) ^ a); + } + case STRING: { + // This hash function returns the same result as String.hashCode() when + // all characters are ASCII, while Text.hashCode() always returns a + // different result. + Text t = ((StringObjectInspector) poi).getPrimitiveWritableObject(o); + int r = 0; + for (int i = 0; i < t.getLength(); i++) { + r = r * 31 + t.getBytes()[i]; + } + return r; + } + default: + throw new DdlException("Unknown type: " + poi.getPrimitiveCategory()); + } + } + throw new DdlException("Unknown type: " + objIns.getTypeName()); + } + + private static int hashCodeV2(Object o, ObjectInspector objIns, ByteBuffer byteBuffer) throws DdlException { + // Reset the bytebuffer + byteBuffer.clear(); + if (objIns.getCategory() == Category.PRIMITIVE) { + PrimitiveObjectInspector poi = ((PrimitiveObjectInspector) objIns); + switch (poi.getPrimitiveCategory()) { + case BOOLEAN: + return (((BooleanObjectInspector) poi).get(o) ? 1 : 0); + case BYTE: + return ((ByteObjectInspector) poi).get(o); + case SHORT: { + byteBuffer.putShort(((ShortObjectInspector) poi).get(o)); + return Murmur3.hash32(byteBuffer.array(), 2, 104729); + } + case INT: { + byteBuffer.putInt(((IntObjectInspector) poi).get(o)); + return Murmur3.hash32(byteBuffer.array(), 4, 104729); + } + case LONG: { + byteBuffer.putLong(((LongObjectInspector) poi).get(o)); + return Murmur3.hash32(byteBuffer.array(), 8, 104729); + } + case STRING: { + // This hash function returns the same result as String.hashCode() when + // all characters are ASCII, while Text.hashCode() always returns a + // different result. + Text text = ((StringObjectInspector) poi).getPrimitiveWritableObject(o); + return Murmur3.hash32(text.getBytes(), text.getLength(), 104729); + } + default: + throw new DdlException("Unknown type: " + poi.getPrimitiveCategory()); + } + } + throw new DdlException("Unknown type: " + objIns.getTypeName()); + } + + private static OptionalInt getBucketNumberFromPath(String name) { + for (Pattern pattern : BUCKET_PATTERNS) { + Matcher matcher = pattern.matcher(name); + if (matcher.matches()) { + return OptionalInt.of(Integer.parseInt(matcher.group(1))); + } + } + return OptionalInt.empty(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java index 22974ddea2..ea347a4486 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java @@ -76,6 +76,7 @@ import java.util.List; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; /** * Helper class for HiveMetaStoreClient @@ -594,7 +595,7 @@ public class HiveMetaStoreClientHelper { return compoundExpr; } - private static SlotRef convertDorisExprToSlotRef(Expr expr) { + public static SlotRef convertDorisExprToSlotRef(Expr expr) { SlotRef slotRef = null; if (expr instanceof SlotRef) { slotRef = (SlotRef) expr; @@ -606,7 +607,7 @@ public class HiveMetaStoreClientHelper { return slotRef; } - private static LiteralExpr convertDorisExprToLiteralExpr(Expr expr) { + public static LiteralExpr convertDorisExprToLiteralExpr(Expr expr) { LiteralExpr literalExpr = null; if (expr instanceof LiteralExpr) { literalExpr = (LiteralExpr) expr; @@ -618,7 +619,7 @@ public class HiveMetaStoreClientHelper { return literalExpr; } - private static Object extractDorisLiteral(Expr expr) { + public static Object extractDorisLiteral(Expr expr) { if (!expr.isLiteral()) { return null; } @@ -832,7 +833,22 @@ public class HiveMetaStoreClientHelper { } } output.append(")\n"); + if (remoteTable.getPartitionKeys().size() > 0) { + output.append("PARTITIONED BY (\n") + .append(remoteTable.getPartitionKeys().stream().map( + partition -> String.format(" `%s` `%s`", partition.getName(), partition.getType())) + .collect(Collectors.joining(",\n"))) + .append(")\n"); + } StorageDescriptor descriptor = remoteTable.getSd(); + List bucketCols = descriptor.getBucketCols(); + if (bucketCols != null && bucketCols.size() > 0) { + output.append("CLUSTERED BY (\n") + .append(bucketCols.stream().map( + bucketCol -> " " + bucketCol).collect(Collectors.joining(",\n"))) + .append(")\n") + .append(String.format("INTO %d BUCKETS\n", descriptor.getNumBuckets())); + } if (descriptor.getSerdeInfo().isSetSerializationLib()) { output.append("ROW FORMAT SERDE\n") .append(String.format(" '%s'\n", descriptor.getSerdeInfo().getSerializationLib())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java index 849511b0bf..22b0436d75 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java @@ -18,6 +18,7 @@ package org.apache.doris.planner.external; import org.apache.doris.analysis.Expr; +import org.apache.doris.catalog.HiveBucketUtil; import org.apache.doris.catalog.HiveMetaStoreClientHelper; import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.DdlException; @@ -109,13 +110,21 @@ public class ExternalHiveScanProvider implements ExternalFileScanProvider { Configuration configuration = setConfiguration(); InputFormat inputFormat = HiveUtil.getInputFormat(configuration, inputFormatName, false); + List splits; if (!hivePartitions.isEmpty()) { - return hivePartitions.parallelStream() + splits = hivePartitions.parallelStream() .flatMap(x -> getSplitsByPath(inputFormat, configuration, x.getSd().getLocation()).stream()) .collect(Collectors.toList()); } else { - return getSplitsByPath(inputFormat, configuration, splitsPath); + splits = getSplitsByPath(inputFormat, configuration, splitsPath); } + return HiveBucketUtil.getPrunedSplitsByBuckets( + splits, + hmsTable.getName(), + exprs, + getRemoteHiveTable().getSd().getBucketCols(), + getRemoteHiveTable().getSd().getNumBuckets(), + getRemoteHiveTable().getParameters()); } private List getSplitsByPath(