[fix](hudi) compatible with hudi spark configuration and support skip merge (#24067)
Fix three bugs: 1. Hudi slice maybe has log files only, so `new Path(filePath)` will throw errors. 2. Hive column names are lowercase only, so match column names in ignore-case-mode. 3. Compatible with [Spark Datasource Configs](https://hudi.apache.org/docs/configurations/#Read-Options), so users can add `hoodie.datasource.merge.type=skip_merge` in catalog properties to skip merge logs files.
This commit is contained in:
@ -79,7 +79,7 @@ public class HudiScanNode extends HiveScanNode {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(HudiScanNode.class);
|
||||
|
||||
private final boolean isCowTable;
|
||||
private final boolean isCowOrRoTable;
|
||||
|
||||
private final AtomicLong noLogsSplitNum = new AtomicLong(0);
|
||||
|
||||
@ -91,9 +91,10 @@ public class HudiScanNode extends HiveScanNode {
|
||||
*/
|
||||
public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) {
|
||||
super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, needCheckColumnPriv);
|
||||
isCowTable = hmsTable.isHoodieCowTable();
|
||||
if (isCowTable) {
|
||||
LOG.debug("Hudi table {} can read as cow table", hmsTable.getName());
|
||||
isCowOrRoTable = hmsTable.isHoodieCowTable() || "skip_merge".equals(
|
||||
hmsTable.getCatalogProperties().get("hoodie.datasource.merge.type"));
|
||||
if (isCowOrRoTable) {
|
||||
LOG.debug("Hudi table {} can read as cow/read optimize table", hmsTable.getName());
|
||||
} else {
|
||||
LOG.debug("Hudi table {} is a mor table, and will use JNI to read data in BE", hmsTable.getName());
|
||||
}
|
||||
@ -101,7 +102,7 @@ public class HudiScanNode extends HiveScanNode {
|
||||
|
||||
@Override
|
||||
public TFileFormatType getFileFormatType() throws UserException {
|
||||
if (isCowTable) {
|
||||
if (isCowOrRoTable) {
|
||||
return super.getFileFormatType();
|
||||
} else {
|
||||
// Use jni to read hudi table in BE
|
||||
@ -124,7 +125,7 @@ public class HudiScanNode extends HiveScanNode {
|
||||
|
||||
@Override
|
||||
protected Map<String, String> getLocationProperties() throws UserException {
|
||||
if (isCowTable) {
|
||||
if (isCowOrRoTable) {
|
||||
return super.getLocationProperties();
|
||||
} else {
|
||||
// HudiJniScanner uses hadoop client to read data.
|
||||
@ -291,7 +292,7 @@ public class HudiScanNode extends HiveScanNode {
|
||||
HoodieTableFileSystemView fileSystemView = new HoodieTableFileSystemView(hudiClient,
|
||||
timeline, statuses.toArray(new FileStatus[0]));
|
||||
|
||||
if (isCowTable) {
|
||||
if (isCowOrRoTable) {
|
||||
fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant).forEach(baseFile -> {
|
||||
noLogsSplitNum.incrementAndGet();
|
||||
String filePath = baseFile.getPath();
|
||||
@ -312,7 +313,9 @@ public class HudiScanNode extends HiveScanNode {
|
||||
noLogsSplitNum.incrementAndGet();
|
||||
}
|
||||
|
||||
HudiSplit split = new HudiSplit(new Path(filePath), 0, fileSize, fileSize,
|
||||
// no base file, use log file to parse file type
|
||||
String agencyPath = filePath.isEmpty() ? logs.get(0) : filePath;
|
||||
HudiSplit split = new HudiSplit(new Path(agencyPath), 0, fileSize, fileSize,
|
||||
new String[0], partition.getPartitionValues());
|
||||
split.setTableFormatType(TableFormatType.HUDI);
|
||||
split.setDataFilePath(filePath);
|
||||
|
||||
Reference in New Issue
Block a user