From 4816ca6679700ab34bca3b183078dcf8df28b2f0 Mon Sep 17 00:00:00 2001 From: slothever <18522955+wsjz@users.noreply.github.com> Date: Fri, 15 Sep 2023 17:44:56 +0800 Subject: [PATCH] [fix](multi-catalog)fix mc decimal type parse, fix wrong obj location (#24242) 1. mc decimal type need parse correctly by arrow vector method 2. fix wrong obj location if use oss,obs,cosn Will add test case in another PR --- .../maxcompute/MaxComputeColumnValue.java | 33 +++++++++++++++++-- .../org/apache/doris/common/util/S3Util.java | 8 +++++ .../datasource/hive/HiveMetaStoreCache.java | 10 ++++-- .../property/PropertyConverter.java | 18 ++++++++-- 4 files changed, 62 insertions(+), 7 deletions(-) diff --git a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java index 57b67bacf4..6581016384 100644 --- a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java +++ b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java @@ -19,6 +19,7 @@ package org.apache.doris.maxcompute; import org.apache.doris.common.jni.vec.ColumnValue; +import org.apache.arrow.memory.ArrowBuf; import org.apache.arrow.vector.BigIntVector; import org.apache.arrow.vector.BitVector; import org.apache.arrow.vector.DateDayVector; @@ -33,11 +34,11 @@ import org.apache.arrow.vector.TimeStampNanoVector; import org.apache.arrow.vector.TinyIntVector; import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; -import org.apache.arrow.vector.util.DecimalUtility; import org.apache.log4j.Logger; import java.math.BigDecimal; import java.math.BigInteger; +import java.nio.ByteOrder; import java.time.LocalDate; import java.time.LocalDateTime; import java.util.List; @@ -141,10 +142,38 @@ public class MaxComputeColumnValue implements ColumnValue { public BigDecimal getDecimal() { skippedIfNull(); DecimalVector decimalCol = (DecimalVector) column; - return DecimalUtility.getBigDecimalFromArrowBuf(column.getDataBuffer(), idx++, + return getBigDecimalFromArrowBuf(column.getDataBuffer(), idx++, decimalCol.getScale(), DecimalVector.TYPE_WIDTH); } + /** + * copy from arrow vector DecimalUtility.getBigDecimalFromArrowBuf + * @param byteBuf byteBuf + * @param index index + * @param scale scale + * @param byteWidth DecimalVector TYPE_WIDTH + * @return java BigDecimal + */ + public static BigDecimal getBigDecimalFromArrowBuf(ArrowBuf byteBuf, int index, int scale, int byteWidth) { + byte[] value = new byte[byteWidth]; + byte temp; + final long startIndex = (long) index * byteWidth; + + byteBuf.getBytes(startIndex, value, 0, byteWidth); + if (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN) { + // Decimal stored as native endian, need to swap bytes to make BigDecimal if native endian is LE + int stop = byteWidth / 2; + for (int i = 0, j; i < stop; i++) { + temp = value[i]; + j = (byteWidth - 1) - i; + value[i] = value[j]; + value[j] = temp; + } + } + BigInteger unscaledValue = new BigInteger(value); + return new BigDecimal(unscaledValue, scale); + } + @Override public String getString() { skippedIfNull(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java index b944df70d7..adf86cf73d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java @@ -20,6 +20,9 @@ package org.apache.doris.common.util; import org.apache.doris.catalog.HdfsResource; import org.apache.doris.common.FeConstants; import org.apache.doris.datasource.credentials.CloudCredential; +import org.apache.doris.datasource.property.constants.CosProperties; +import org.apache.doris.datasource.property.constants.ObsProperties; +import org.apache.doris.datasource.property.constants.OssProperties; import org.apache.doris.datasource.property.constants.S3Properties; import org.apache.commons.lang3.StringUtils; @@ -69,6 +72,11 @@ public class S3Util { } private static boolean isS3EndPoint(String location, Map props) { + if (props.containsKey(ObsProperties.ENDPOINT) + || props.containsKey(OssProperties.ENDPOINT) + || props.containsKey(CosProperties.ENDPOINT)) { + return false; + } // wide check range for the compatibility of s3 properties return (props.containsKey(S3Properties.ENDPOINT) || props.containsKey(S3Properties.Env.ENDPOINT)) && isObjStorage(location); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index e54466ad86..ec1704547a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -38,6 +38,7 @@ import org.apache.doris.common.util.S3Util; import org.apache.doris.datasource.CacheException; import org.apache.doris.datasource.HMSExternalCatalog; import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo; +import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.external.hive.util.HiveUtil; import org.apache.doris.fs.FileSystemCache; import org.apache.doris.fs.FileSystemFactory; @@ -405,14 +406,19 @@ public class HiveMetaStoreCache { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader()); - String finalLocation = S3Util.convertToS3IfNecessary(key.location, catalog.getProperties()); + String finalLocation = S3Util.convertToS3IfNecessary(key.location, + catalog.getCatalogProperty().getProperties()); // disable the fs cache in FileSystem, or it will always from new FileSystem // and save it in cache when calling FileInputFormat.setInputPaths(). try { Path path = new Path(finalLocation); URI uri = path.toUri(); if (uri.getScheme() != null) { - updateJobConf("fs." + uri.getScheme() + ".impl.disable.cache", "true"); + String scheme = uri.getScheme(); + updateJobConf("fs." + scheme + ".impl.disable.cache", "true"); + if (!scheme.equals("hdfs")) { + updateJobConf("fs." + scheme + ".impl", PropertyConverter.getHadoopFSImplByScheme(scheme)); + } } } catch (Exception e) { LOG.warn("unknown scheme in path: " + finalLocation, e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java index 754a885f4d..1edc5461f8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java @@ -163,7 +163,7 @@ public class PropertyConverter { Map obsProperties = Maps.newHashMap(); obsProperties.put(OBSConstants.ENDPOINT, props.get(ObsProperties.ENDPOINT)); obsProperties.put(ObsProperties.FS.IMPL_DISABLE_CACHE, "true"); - obsProperties.put("fs.obs.impl", OBSFileSystem.class.getName()); + obsProperties.put("fs.obs.impl", getHadoopFSImplByScheme("obs")); if (credential.isWhole()) { obsProperties.put(OBSConstants.ACCESS_KEY, credential.getAccessKey()); obsProperties.put(OBSConstants.SECRET_KEY, credential.getSecretKey()); @@ -179,6 +179,18 @@ public class PropertyConverter { return obsProperties; } + public static String getHadoopFSImplByScheme(String fsScheme) { + if (fsScheme.equalsIgnoreCase("obs")) { + return OBSFileSystem.class.getName(); + } else if (fsScheme.equalsIgnoreCase("oss")) { + return AliyunOSSFileSystem.class.getName(); + } else if (fsScheme.equalsIgnoreCase("cosn")) { + return CosNFileSystem.class.getName(); + } else { + return S3AFileSystem.class.getName(); + } + } + private static Map convertToS3EnvProperties(Map properties, CloudCredentialWithEndpoint credential, boolean isMeta) { @@ -286,7 +298,7 @@ public class PropertyConverter { } ossProperties.put(org.apache.hadoop.fs.aliyun.oss.Constants.ENDPOINT_KEY, endpoint); ossProperties.put("fs.oss.impl.disable.cache", "true"); - ossProperties.put("fs.oss.impl", AliyunOSSFileSystem.class.getName()); + ossProperties.put("fs.oss.impl", getHadoopFSImplByScheme("oss")); boolean hdfsEnabled = Boolean.parseBoolean(props.getOrDefault(OssProperties.OSS_HDFS_ENABLED, "false")); if (S3Util.isHdfsOnOssEndpoint(endpoint) || hdfsEnabled) { // use endpoint or enable hdfs @@ -327,7 +339,7 @@ public class PropertyConverter { Map cosProperties = Maps.newHashMap(); cosProperties.put(CosNConfigKeys.COSN_ENDPOINT_SUFFIX_KEY, props.get(CosProperties.ENDPOINT)); cosProperties.put("fs.cosn.impl.disable.cache", "true"); - cosProperties.put("fs.cosn.impl", CosNFileSystem.class.getName()); + cosProperties.put("fs.cosn.impl", getHadoopFSImplByScheme("cosn")); if (credential.isWhole()) { cosProperties.put(CosNConfigKeys.COSN_SECRET_ID_KEY, credential.getAccessKey()); cosProperties.put(CosNConfigKeys.COSN_SECRET_KEY_KEY, credential.getSecretKey());