[fix](multi-catalog)fix mc decimal type parse, fix wrong obj location (#24242)

1. mc decimal type need parse correctly by arrow vector method
2. fix wrong obj location if use oss,obs,cosn

Will add test case in another PR
This commit is contained in:
slothever
2023-09-15 17:44:56 +08:00
committed by GitHub
parent c270a2d89f
commit 4816ca6679
4 changed files with 62 additions and 7 deletions

View File

@ -19,6 +19,7 @@ package org.apache.doris.maxcompute;
import org.apache.doris.common.jni.vec.ColumnValue;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.DateDayVector;
@ -33,11 +34,11 @@ import org.apache.arrow.vector.TimeStampNanoVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.util.DecimalUtility;
import org.apache.log4j.Logger;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteOrder;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.List;
@ -141,10 +142,38 @@ public class MaxComputeColumnValue implements ColumnValue {
public BigDecimal getDecimal() {
skippedIfNull();
DecimalVector decimalCol = (DecimalVector) column;
return DecimalUtility.getBigDecimalFromArrowBuf(column.getDataBuffer(), idx++,
return getBigDecimalFromArrowBuf(column.getDataBuffer(), idx++,
decimalCol.getScale(), DecimalVector.TYPE_WIDTH);
}
/**
* copy from arrow vector DecimalUtility.getBigDecimalFromArrowBuf
* @param byteBuf byteBuf
* @param index index
* @param scale scale
* @param byteWidth DecimalVector TYPE_WIDTH
* @return java BigDecimal
*/
public static BigDecimal getBigDecimalFromArrowBuf(ArrowBuf byteBuf, int index, int scale, int byteWidth) {
byte[] value = new byte[byteWidth];
byte temp;
final long startIndex = (long) index * byteWidth;
byteBuf.getBytes(startIndex, value, 0, byteWidth);
if (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN) {
// Decimal stored as native endian, need to swap bytes to make BigDecimal if native endian is LE
int stop = byteWidth / 2;
for (int i = 0, j; i < stop; i++) {
temp = value[i];
j = (byteWidth - 1) - i;
value[i] = value[j];
value[j] = temp;
}
}
BigInteger unscaledValue = new BigInteger(value);
return new BigDecimal(unscaledValue, scale);
}
@Override
public String getString() {
skippedIfNull();

View File

@ -20,6 +20,9 @@ package org.apache.doris.common.util;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.common.FeConstants;
import org.apache.doris.datasource.credentials.CloudCredential;
import org.apache.doris.datasource.property.constants.CosProperties;
import org.apache.doris.datasource.property.constants.ObsProperties;
import org.apache.doris.datasource.property.constants.OssProperties;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.commons.lang3.StringUtils;
@ -69,6 +72,11 @@ public class S3Util {
}
private static boolean isS3EndPoint(String location, Map<String, String> props) {
if (props.containsKey(ObsProperties.ENDPOINT)
|| props.containsKey(OssProperties.ENDPOINT)
|| props.containsKey(CosProperties.ENDPOINT)) {
return false;
}
// wide check range for the compatibility of s3 properties
return (props.containsKey(S3Properties.ENDPOINT) || props.containsKey(S3Properties.Env.ENDPOINT))
&& isObjStorage(location);

View File

@ -38,6 +38,7 @@ import org.apache.doris.common.util.S3Util;
import org.apache.doris.datasource.CacheException;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.external.hive.util.HiveUtil;
import org.apache.doris.fs.FileSystemCache;
import org.apache.doris.fs.FileSystemFactory;
@ -405,14 +406,19 @@ public class HiveMetaStoreCache {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
String finalLocation = S3Util.convertToS3IfNecessary(key.location, catalog.getProperties());
String finalLocation = S3Util.convertToS3IfNecessary(key.location,
catalog.getCatalogProperty().getProperties());
// disable the fs cache in FileSystem, or it will always from new FileSystem
// and save it in cache when calling FileInputFormat.setInputPaths().
try {
Path path = new Path(finalLocation);
URI uri = path.toUri();
if (uri.getScheme() != null) {
updateJobConf("fs." + uri.getScheme() + ".impl.disable.cache", "true");
String scheme = uri.getScheme();
updateJobConf("fs." + scheme + ".impl.disable.cache", "true");
if (!scheme.equals("hdfs")) {
updateJobConf("fs." + scheme + ".impl", PropertyConverter.getHadoopFSImplByScheme(scheme));
}
}
} catch (Exception e) {
LOG.warn("unknown scheme in path: " + finalLocation, e);

View File

@ -163,7 +163,7 @@ public class PropertyConverter {
Map<String, String> obsProperties = Maps.newHashMap();
obsProperties.put(OBSConstants.ENDPOINT, props.get(ObsProperties.ENDPOINT));
obsProperties.put(ObsProperties.FS.IMPL_DISABLE_CACHE, "true");
obsProperties.put("fs.obs.impl", OBSFileSystem.class.getName());
obsProperties.put("fs.obs.impl", getHadoopFSImplByScheme("obs"));
if (credential.isWhole()) {
obsProperties.put(OBSConstants.ACCESS_KEY, credential.getAccessKey());
obsProperties.put(OBSConstants.SECRET_KEY, credential.getSecretKey());
@ -179,6 +179,18 @@ public class PropertyConverter {
return obsProperties;
}
public static String getHadoopFSImplByScheme(String fsScheme) {
if (fsScheme.equalsIgnoreCase("obs")) {
return OBSFileSystem.class.getName();
} else if (fsScheme.equalsIgnoreCase("oss")) {
return AliyunOSSFileSystem.class.getName();
} else if (fsScheme.equalsIgnoreCase("cosn")) {
return CosNFileSystem.class.getName();
} else {
return S3AFileSystem.class.getName();
}
}
private static Map<String, String> convertToS3EnvProperties(Map<String, String> properties,
CloudCredentialWithEndpoint credential,
boolean isMeta) {
@ -286,7 +298,7 @@ public class PropertyConverter {
}
ossProperties.put(org.apache.hadoop.fs.aliyun.oss.Constants.ENDPOINT_KEY, endpoint);
ossProperties.put("fs.oss.impl.disable.cache", "true");
ossProperties.put("fs.oss.impl", AliyunOSSFileSystem.class.getName());
ossProperties.put("fs.oss.impl", getHadoopFSImplByScheme("oss"));
boolean hdfsEnabled = Boolean.parseBoolean(props.getOrDefault(OssProperties.OSS_HDFS_ENABLED, "false"));
if (S3Util.isHdfsOnOssEndpoint(endpoint) || hdfsEnabled) {
// use endpoint or enable hdfs
@ -327,7 +339,7 @@ public class PropertyConverter {
Map<String, String> cosProperties = Maps.newHashMap();
cosProperties.put(CosNConfigKeys.COSN_ENDPOINT_SUFFIX_KEY, props.get(CosProperties.ENDPOINT));
cosProperties.put("fs.cosn.impl.disable.cache", "true");
cosProperties.put("fs.cosn.impl", CosNFileSystem.class.getName());
cosProperties.put("fs.cosn.impl", getHadoopFSImplByScheme("cosn"));
if (credential.isWhole()) {
cosProperties.put(CosNConfigKeys.COSN_SECRET_ID_KEY, credential.getAccessKey());
cosProperties.put(CosNConfigKeys.COSN_SECRET_KEY_KEY, credential.getSecretKey());