[update](hudi) update hudi version to 0.14.1 and compatible with flink hive catalog (#31181)

1. Update hudi version from 0.13.1 to .14.1
2. Compatible with the hudi table created by flink hive catalog
This commit is contained in:
Ashin Gau
2024-02-22 17:42:37 +08:00
committed by yiguolei
parent a5ee1b3c87
commit 260568db17
7 changed files with 34 additions and 21 deletions

View File

@ -218,8 +218,11 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
if (remoteTable.getSd() == null) {
return false;
}
Map<String, String> paras = remoteTable.getParameters();
String inputFormatName = remoteTable.getSd().getInputFormat();
return inputFormatName != null && SUPPORTED_HUDI_FILE_FORMATS.contains(inputFormatName);
// compatible with flink hive catalog
return (paras != null && "hudi".equalsIgnoreCase(paras.get("flink.connector")))
|| (inputFormatName != null && SUPPORTED_HUDI_FILE_FORMATS.contains(inputFormatName));
}
public boolean isHoodieCowTable() {

View File

@ -22,7 +22,6 @@ import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
@ -52,8 +51,7 @@ public abstract class HudiPartitionProcessor {
HoodieTableMetadata newTableMetadata = HoodieTableMetadata.create(
new HoodieLocalEngineContext(tableMetaClient.getHadoopConf()), metadataConfig,
tableMetaClient.getBasePathV2().toString(),
FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue(), true);
tableMetaClient.getBasePathV2().toString(), true);
return newTableMetadata.getAllPartitionPaths();
}

View File

@ -47,7 +47,6 @@ import com.google.common.collect.Maps;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
@ -93,8 +92,10 @@ public class HudiScanNode extends HiveScanNode {
*/
public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) {
super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, needCheckColumnPriv);
isCowOrRoTable = hmsTable.isHoodieCowTable() || "skip_merge".equals(
hmsTable.getCatalogProperties().get("hoodie.datasource.merge.type"));
Map<String, String> paras = hmsTable.getRemoteTable().getParameters();
isCowOrRoTable = hmsTable.isHoodieCowTable()
|| "skip_merge".equals(hmsTable.getCatalogProperties().get("hoodie.datasource.merge.type"))
|| (paras != null && "COPY_ON_WRITE".equalsIgnoreCase(paras.get("flink.table.type")));
if (isCowOrRoTable) {
if (LOG.isDebugEnabled()) {
LOG.debug("Hudi table {} can read as cow/read optimize table", hmsTable.getName());
@ -239,19 +240,8 @@ public class HudiScanNode extends HiveScanNode {
List<String> columnNames = new ArrayList<>();
List<String> columnTypes = new ArrayList<>();
List<FieldSchema> allFields = Lists.newArrayList();
allFields.addAll(hmsTable.getRemoteTable().getSd().getCols());
allFields.addAll(hmsTable.getRemoteTable().getPartitionKeys());
for (Schema.Field hudiField : hudiSchema.getFields()) {
String columnName = hudiField.name().toLowerCase(Locale.ROOT);
// keep hive metastore column in hudi avro schema.
Optional<FieldSchema> field = allFields.stream().filter(f -> f.getName().equals(columnName)).findFirst();
if (!field.isPresent()) {
String errorMsg = String.format("Hudi column %s not exists in hive metastore.", hudiField.name());
throw new IllegalArgumentException(errorMsg);
}
columnNames.add(columnName);
columnNames.add(hudiField.name().toLowerCase(Locale.ROOT));
String columnType = HudiUtils.fromAvroHudiTypeToHiveTypeString(hudiField.schema());
columnTypes.add(columnType);
}