[Fix](multi-catalog) Fix query hms tbl with compressed data files. (#19387)
If submit a query contains hms tbls which data files are compressed (bz2,lzo,lz4 ...), a error will occurs like this: ```[INTERNAL_ERROR]Only support csv data in utf8 codec``` . This is because `org.apache.doris.planner.external.HiveScanNode` set `fileFormatType` as `TFileFormatType.FORMAT_CSV_PLAIN` whether the real compress algo of data files are. This pr try to fix this problem.
This commit is contained in:
@ -24,6 +24,7 @@ import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.FeNameFormat;
|
||||
import org.apache.doris.datasource.InternalCatalog;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.thrift.TFileCompressType;
|
||||
import org.apache.doris.thrift.TFileFormatType;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
@ -526,6 +527,8 @@ public class Util {
|
||||
return TFileFormatType.FORMAT_CSV_LZ4FRAME;
|
||||
} else if (lowerCasePath.endsWith(".lzo")) {
|
||||
return TFileFormatType.FORMAT_CSV_LZOP;
|
||||
} else if (lowerCasePath.endsWith(".lzo_deflate")) {
|
||||
return TFileFormatType.FORMAT_CSV_LZO;
|
||||
} else if (lowerCasePath.endsWith(".deflate")) {
|
||||
return TFileFormatType.FORMAT_CSV_DEFLATE;
|
||||
} else {
|
||||
@ -533,11 +536,33 @@ public class Util {
|
||||
}
|
||||
}
|
||||
|
||||
@NotNull
|
||||
public static TFileCompressType getFileCompressType(String path) {
|
||||
String lowerCasePath = path.toLowerCase();
|
||||
if (lowerCasePath.endsWith(".gz")) {
|
||||
return TFileCompressType.GZ;
|
||||
} else if (lowerCasePath.endsWith(".bz2")) {
|
||||
return TFileCompressType.BZ2;
|
||||
} else if (lowerCasePath.endsWith(".lz4")) {
|
||||
return TFileCompressType.LZ4FRAME;
|
||||
} else if (lowerCasePath.endsWith(".lzo")) {
|
||||
return TFileCompressType.LZOP;
|
||||
} else if (lowerCasePath.endsWith(".lzo_deflate")) {
|
||||
return TFileCompressType.LZO;
|
||||
} else if (lowerCasePath.endsWith(".deflate")) {
|
||||
return TFileCompressType.DEFLATE;
|
||||
} else {
|
||||
return TFileCompressType.PLAIN;
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean isCsvFormat(TFileFormatType fileFormatType) {
|
||||
return fileFormatType == TFileFormatType.FORMAT_CSV_BZ2 || fileFormatType == TFileFormatType.FORMAT_CSV_DEFLATE
|
||||
return fileFormatType == TFileFormatType.FORMAT_CSV_BZ2
|
||||
|| fileFormatType == TFileFormatType.FORMAT_CSV_DEFLATE
|
||||
|| fileFormatType == TFileFormatType.FORMAT_CSV_GZ
|
||||
|| fileFormatType == TFileFormatType.FORMAT_CSV_LZ4FRAME
|
||||
|| fileFormatType == TFileFormatType.FORMAT_CSV_LZO || fileFormatType == TFileFormatType.FORMAT_CSV_LZOP
|
||||
|| fileFormatType == TFileFormatType.FORMAT_CSV_LZO
|
||||
|| fileFormatType == TFileFormatType.FORMAT_CSV_LZOP
|
||||
|| fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN;
|
||||
}
|
||||
|
||||
|
||||
@ -32,6 +32,7 @@ import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.NotImplementedException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.BrokerUtil;
|
||||
import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
|
||||
import org.apache.doris.planner.PlanNodeId;
|
||||
import org.apache.doris.planner.external.iceberg.IcebergScanNode;
|
||||
@ -41,6 +42,7 @@ import org.apache.doris.statistics.StatisticalType;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.thrift.TExternalScanRange;
|
||||
import org.apache.doris.thrift.TFileAttributes;
|
||||
import org.apache.doris.thrift.TFileCompressType;
|
||||
import org.apache.doris.thrift.TFileFormatType;
|
||||
import org.apache.doris.thrift.TFileRangeDesc;
|
||||
import org.apache.doris.thrift.TFileScanRange;
|
||||
@ -212,8 +214,10 @@ public abstract class FileQueryScanNode extends FileScanNode {
|
||||
TFileType locationType = getLocationType();
|
||||
params.setFileType(locationType);
|
||||
TFileFormatType fileFormatType = getFileFormatType();
|
||||
params.setFormatType(getFileFormatType());
|
||||
if (fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN || fileFormatType == TFileFormatType.FORMAT_JSON) {
|
||||
params.setFormatType(fileFormatType);
|
||||
TFileCompressType fileCompressType = getFileCompressType(inputSplit);
|
||||
params.setCompressType(fileCompressType);
|
||||
if (Util.isCsvFormat(fileFormatType) || fileFormatType == TFileFormatType.FORMAT_JSON) {
|
||||
params.setFileAttributes(getFileAttributes());
|
||||
}
|
||||
|
||||
@ -318,6 +322,10 @@ public abstract class FileQueryScanNode extends FileScanNode {
|
||||
throw new NotImplementedException("");
|
||||
}
|
||||
|
||||
protected TFileCompressType getFileCompressType(FileSplit fileSplit) throws UserException {
|
||||
return Util.getFileCompressType(fileSplit.getPath().toString());
|
||||
}
|
||||
|
||||
protected TFileAttributes getFileAttributes() throws UserException {
|
||||
throw new NotImplementedException("");
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user