[fix](hudi) use hudi api to split the COW table (#21385)
Fix tow bugs: COW & Read Optimized table will use hive splitter to split files, but it can't recognize some specific files. ERROR 1105 (HY000): errCode = 2, detailMessage = (172.21.0.101)[CORRUPTION]Invalid magic number in parquet file, bytes read: 3035, file size: 3035, path: /usr/hive/warehouse/hudi.db/test/.hoodie/metadata/.hoodie/00000000000000.deltacommit.inflight, read magic: The read optimized table created by spark will add empty partition even if the table has no partition, so we have to filter these empty partition keys in hive client. | test_ro | CREATE TABLE `test_ro`( `_hoodie_commit_time` string COMMENT '', ... `ts` bigint COMMENT '') PARTITIONED BY ( `` string) ROW FORMAT SERDE
This commit is contained in:
@ -215,8 +215,9 @@ public class HiveScanNode extends FileQueryScanNode {
|
||||
|
||||
@Override
|
||||
public List<String> getPathPartitionKeys() {
|
||||
return hmsTable.getRemoteTable().getPartitionKeys()
|
||||
.stream().map(FieldSchema::getName).map(String::toLowerCase).collect(Collectors.toList());
|
||||
return hmsTable.getRemoteTable().getPartitionKeys().stream()
|
||||
.map(FieldSchema::getName).filter(partitionKey -> !"".equals(partitionKey))
|
||||
.map(String::toLowerCase).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -43,7 +43,6 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
|
||||
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.model.BaseFile;
|
||||
import org.apache.hudi.common.model.FileSlice;
|
||||
import org.apache.hudi.common.model.HoodieBaseFile;
|
||||
import org.apache.hudi.common.model.HoodieLogFile;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
@ -56,7 +55,6 @@ import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
@ -141,15 +139,6 @@ public class HudiScanNode extends HiveScanNode {
|
||||
|
||||
@Override
|
||||
public List<Split> getSplits() throws UserException {
|
||||
if (isCowTable) {
|
||||
// skip hidden files start with "."
|
||||
List<Split> cowSplits = super.getSplits().stream()
|
||||
.filter(split -> !((FileSplit) split).getPath().getName().startsWith("."))
|
||||
.collect(Collectors.toList());
|
||||
noLogsSplitNum = cowSplits.size();
|
||||
return cowSplits;
|
||||
}
|
||||
|
||||
HoodieTableMetaClient hudiClient = HiveMetaStoreClientHelper.getHudiClient(hmsTable);
|
||||
hudiClient.reloadActiveTimeline();
|
||||
String basePath = hmsTable.getRemoteTable().getSd().getLocation();
|
||||
@ -207,32 +196,42 @@ public class HudiScanNode extends HiveScanNode {
|
||||
HoodieTableFileSystemView fileSystemView = new HoodieTableFileSystemView(hudiClient,
|
||||
timeline, statuses.toArray(new FileStatus[0]));
|
||||
|
||||
Iterator<FileSlice> hoodieFileSliceIterator = fileSystemView
|
||||
.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant).iterator();
|
||||
while (hoodieFileSliceIterator.hasNext()) {
|
||||
FileSlice fileSlice = hoodieFileSliceIterator.next();
|
||||
Optional<HoodieBaseFile> baseFile = fileSlice.getBaseFile().toJavaOptional();
|
||||
String filePath = baseFile.map(BaseFile::getPath).orElse("");
|
||||
long fileSize = baseFile.map(BaseFile::getFileSize).orElse(0L);
|
||||
|
||||
List<String> logs = fileSlice.getLogFiles().map(HoodieLogFile::getPath).map(Path::toString)
|
||||
.collect(Collectors.toList());
|
||||
if (logs.isEmpty()) {
|
||||
if (isCowTable) {
|
||||
fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant).forEach(baseFile -> {
|
||||
noLogsSplitNum++;
|
||||
}
|
||||
String filePath = baseFile.getPath();
|
||||
long fileSize = baseFile.getFileSize();
|
||||
FileSplit split = new FileSplit(new Path(filePath), 0, fileSize, fileSize, new String[0],
|
||||
partition.getPartitionValues());
|
||||
splits.add(split);
|
||||
});
|
||||
} else {
|
||||
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant)
|
||||
.forEach(fileSlice -> {
|
||||
Optional<HoodieBaseFile> baseFile = fileSlice.getBaseFile().toJavaOptional();
|
||||
String filePath = baseFile.map(BaseFile::getPath).orElse("");
|
||||
long fileSize = baseFile.map(BaseFile::getFileSize).orElse(0L);
|
||||
|
||||
HudiSplit split = new HudiSplit(new Path(filePath), 0, fileSize, fileSize, new String[0],
|
||||
partition.getPartitionValues());
|
||||
split.setTableFormatType(TableFormatType.HUDI);
|
||||
split.setDataFilePath(filePath);
|
||||
split.setHudiDeltaLogs(logs);
|
||||
split.setInputFormat(inputFormat);
|
||||
split.setSerde(serdeLib);
|
||||
split.setBasePath(basePath);
|
||||
split.setHudiColumnNames(columnNames);
|
||||
split.setHudiColumnTypes(columnTypes);
|
||||
split.setInstantTime(queryInstant);
|
||||
splits.add(split);
|
||||
List<String> logs = fileSlice.getLogFiles().map(HoodieLogFile::getPath)
|
||||
.map(Path::toString)
|
||||
.collect(Collectors.toList());
|
||||
if (logs.isEmpty()) {
|
||||
noLogsSplitNum++;
|
||||
}
|
||||
|
||||
HudiSplit split = new HudiSplit(new Path(filePath), 0, fileSize, fileSize,
|
||||
new String[0], partition.getPartitionValues());
|
||||
split.setTableFormatType(TableFormatType.HUDI);
|
||||
split.setDataFilePath(filePath);
|
||||
split.setHudiDeltaLogs(logs);
|
||||
split.setInputFormat(inputFormat);
|
||||
split.setSerde(serdeLib);
|
||||
split.setBasePath(basePath);
|
||||
split.setHudiColumnNames(columnNames);
|
||||
split.setHudiColumnTypes(columnTypes);
|
||||
split.setInstantTime(queryInstant);
|
||||
splits.add(split);
|
||||
});
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
||||
Reference in New Issue
Block a user