diff --git a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala index 3c10f8a4cd..a730f2cd1b 100644 --- a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala +++ b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala @@ -464,7 +464,7 @@ abstract class BaseSplitReader(val split: HoodieSplit) { options: Map[String, String], hadoopConf: Configuration, appendPartitionValues: Boolean = false): PartitionedFile => Iterator[InternalRow] = { - val parquetFileFormat: ParquetFileFormat = sparkAdapter.createHoodieParquetFileFormat(appendPartitionValues).get + val parquetFileFormat: ParquetFileFormat = sparkAdapter.createLegacyHoodieParquetFileFormat(appendPartitionValues).get val readParquetFile: PartitionedFile => Iterator[Any] = parquetFileFormat.buildReaderWithPartitionValues( sparkSession = sparkSession, dataSchema = dataSchema, diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index 62b2c35b8c..6da6073a4a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -218,8 +218,11 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI if (remoteTable.getSd() == null) { return false; } + Map paras = remoteTable.getParameters(); String inputFormatName = remoteTable.getSd().getInputFormat(); - return inputFormatName != null && SUPPORTED_HUDI_FILE_FORMATS.contains(inputFormatName); + // compatible with flink hive catalog + return (paras != null && "hudi".equalsIgnoreCase(paras.get("flink.connector"))) + || (inputFormatName != null && SUPPORTED_HUDI_FILE_FORMATS.contains(inputFormatName)); } public boolean isHoodieCowTable() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java index 91f9eff259..4baa147704 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java @@ -22,7 +22,6 @@ import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineUtils; -import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadataUtil; @@ -52,8 +51,7 @@ public abstract class HudiPartitionProcessor { HoodieTableMetadata newTableMetadata = HoodieTableMetadata.create( new HoodieLocalEngineContext(tableMetaClient.getHadoopConf()), metadataConfig, - tableMetaClient.getBasePathV2().toString(), - FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue(), true); + tableMetaClient.getBasePathV2().toString(), true); return newTableMetadata.getAllPartitionPaths(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java index 4f4b1c3e8f..dfbb12e858 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java @@ -47,7 +47,6 @@ import com.google.common.collect.Maps; import org.apache.avro.Schema; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.BaseFile; @@ -93,8 +92,10 @@ public class HudiScanNode extends HiveScanNode { */ public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) { super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, needCheckColumnPriv); - isCowOrRoTable = hmsTable.isHoodieCowTable() || "skip_merge".equals( - hmsTable.getCatalogProperties().get("hoodie.datasource.merge.type")); + Map paras = hmsTable.getRemoteTable().getParameters(); + isCowOrRoTable = hmsTable.isHoodieCowTable() + || "skip_merge".equals(hmsTable.getCatalogProperties().get("hoodie.datasource.merge.type")) + || (paras != null && "COPY_ON_WRITE".equalsIgnoreCase(paras.get("flink.table.type"))); if (isCowOrRoTable) { if (LOG.isDebugEnabled()) { LOG.debug("Hudi table {} can read as cow/read optimize table", hmsTable.getName()); @@ -239,19 +240,8 @@ public class HudiScanNode extends HiveScanNode { List columnNames = new ArrayList<>(); List columnTypes = new ArrayList<>(); - List allFields = Lists.newArrayList(); - allFields.addAll(hmsTable.getRemoteTable().getSd().getCols()); - allFields.addAll(hmsTable.getRemoteTable().getPartitionKeys()); - for (Schema.Field hudiField : hudiSchema.getFields()) { - String columnName = hudiField.name().toLowerCase(Locale.ROOT); - // keep hive metastore column in hudi avro schema. - Optional field = allFields.stream().filter(f -> f.getName().equals(columnName)).findFirst(); - if (!field.isPresent()) { - String errorMsg = String.format("Hudi column %s not exists in hive metastore.", hudiField.name()); - throw new IllegalArgumentException(errorMsg); - } - columnNames.add(columnName); + columnNames.add(hudiField.name().toLowerCase(Locale.ROOT)); String columnType = HudiUtils.fromAvroHudiTypeToHiveTypeString(hudiField.schema()); columnTypes.add(columnType); } diff --git a/fe/pom.xml b/fe/pom.xml index 3fd1bfc751..5ebd759f01 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -307,7 +307,7 @@ under the License. 1.11.3 15.0.0 - 0.13.1 + 0.14.1 2.7.4-11 3.0.0-8 diff --git a/regression-test/data/external_table_p2/hive/test_hive_hudi.out b/regression-test/data/external_table_p2/hive/test_hive_hudi.out index adff3889cf..9202749e61 100644 --- a/regression-test/data/external_table_p2/hive/test_hive_hudi.out +++ b/regression-test/data/external_table_p2/hive/test_hive_hudi.out @@ -119,6 +119,22 @@ row_4 2021-02-01 4 v_4 20230922203209630 20230922203209630_0_992 994 8b83d7ed-c150-4177-9dbb-d41169e8b9c7-0_0-163-0_20230922203209630.parquet 994 a991 [[991], [330], [991, -9, null]] {"k991":[1, null, 9], "k2":[], "k3":null, "k4":[null, 9]} {"col1": "2012-02-02 06:24:05.000000", "col2": [1000, 991, null]} 20230922203209630 20230922203209630_0_9910 9912 8b83d7ed-c150-4177-9dbb-d41169e8b9c7-0_0-163-0_20230922203209630.parquet 9912 a9909 [[9909], [3303], [9909, 8909, null]] {"k9909":[1, null, 9], "k2":[], "k3":null, "k4":[null, 9]} {"col1": "2012-02-04 02:50:13.000000", "col2": [9918, 9909, null]} +-- !flink_hive_catalog -- +20240221112835666 20240221112835666_0_1 1dced545-862b-4ceb-8b43-d2a568f6616b cc1f45f0-5d1e-4c3e-872e-b4c123d1250b 1695332066204 1dced545-862b-4ceb-8b43-d2a568f6616b rider-E driver-O 93.5 san_francisco +20240221112835666 20240221112835666_0_3 3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04 cc1f45f0-5d1e-4c3e-872e-b4c123d1250b 1695173887231 3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04 rider-I driver-S 41.06 chennai +20240221112835666 20240221112835666_0_4 7a84095f-737f-40bc-b62f-6b69664712d2 cc1f45f0-5d1e-4c3e-872e-b4c123d1250b 1695376420876 7a84095f-737f-40bc-b62f-6b69664712d2 rider-G driver-Q 43.4 sao_paulo +20240221112917786 20240221112917786_0_1 9909a8b1-2d15-4d3d-8ec9-efc48c536a00 cc1f45f0-5d1e-4c3e-872e-b4c123d1250b 1695046462179 9909a8b1-2d15-4d3d-8ec9-efc48c536a00 rider-D driver-L 88.88 san_francisco +20240221112835666 20240221112835666_0_7 e3cf430c-889d-4015-bc98-59bdce1e530c cc1f45f0-5d1e-4c3e-872e-b4c123d1250b 1695516137016 e3cf430c-889d-4015-bc98-59bdce1e530c rider-F driver-P 34.15 sao_paulo +20240221112835666 20240221112835666_0_8 e96c4396-3fad-413a-a942-4cb36106d721 cc1f45f0-5d1e-4c3e-872e-b4c123d1250b 1695091554788 e96c4396-3fad-413a-a942-4cb36106d721 rider-C driver-M 27.7 san_francisco + +-- !flink_hudi_catalog -- +20240221111045165 20240221111045165_0_1 334e26e9-8355-45cc-97c6-c31daf0df330 san_francisco 3efcaa94-3e58-436a-b489-1232731ed088 1695159649087 334e26e9-8355-45cc-97c6-c31daf0df330 rider-A driver-K 25.0 san_francisco +20240221111000868 20240221111000868_0_5 3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04 chennai 44d55961-1263-4639-bcab-abe4d240b009 1695173887231 3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04 rider-I driver-S 41.06 chennai +20240221111000868 20240221111000868_0_3 9909a8b1-2d15-4d3d-8ec9-efc48c536a00 san_francisco 3efcaa94-3e58-436a-b489-1232731ed088 1695046462179 9909a8b1-2d15-4d3d-8ec9-efc48c536a00 rider-D driver-L 33.9 san_francisco +20240221111000868 20240221111000868_0_6 c8abbe79-8d89-47ea-b4ce-4d224bae5bfa chennai 44d55961-1263-4639-bcab-abe4d240b009 1695115999911 c8abbe79-8d89-47ea-b4ce-4d224bae5bfa rider-J driver-T 17.85 chennai +20240221111000868 20240221111000868_0_8 e3cf430c-889d-4015-bc98-59bdce1e530c sao_paulo c97347e9-033a-4c19-a033-94ac1de9f892 1695516137016 e3cf430c-889d-4015-bc98-59bdce1e530c rider-F driver-P 34.15 sao_paulo +20240221111000868 20240221111000868_0_4 e96c4396-3fad-413a-a942-4cb36106d721 san_francisco 3efcaa94-3e58-436a-b489-1232731ed088 1695091554788 e96c4396-3fad-413a-a942-4cb36106d721 rider-C driver-M 27.7 san_francisco + -- !skip_merge -- 20230605145009209 20230605145009209_0_0 rowId:row_1 partitionId=2021-01-01/versionId=v_0 65ffc5d9-397a-456e-a735-30f3ad37466f-0_0-33-96_20230605145009209.parquet row_1 2021-01-01 0 bob v_0 toBeDel0 0 1000000 20230605145403388 20230605145403388_2_0 rowId:row_1 partitionId=2011-11-11/versionId=v_1 dbff8acb-42bc-400c-be33-47d9e0bae9b7-0_2-83-222_20230605145403388.parquet row_1 2011-11-11 1 bob v_1 toBeDel1 0 1000001 diff --git a/regression-test/suites/external_table_p2/hive/test_hive_hudi.groovy b/regression-test/suites/external_table_p2/hive/test_hive_hudi.groovy index 8648aa4bc9..d852e604df 100644 --- a/regression-test/suites/external_table_p2/hive/test_hive_hudi.groovy +++ b/regression-test/suites/external_table_p2/hive/test_hive_hudi.groovy @@ -42,6 +42,12 @@ suite("test_hive_hudi", "p2,external,hive,hudi") { // test complex types qt_complex_types """select * from complex_type_rt order by name desc limit 100""" + // hudi table created by flink hive catalog + qt_flink_hive_catalog """select * from hive_ctl_table order by uuid""" + + // hudi table created by flink hudi catalog + qt_flink_hudi_catalog """select * from hudi_ctl_table order by uuid""" + // skip logs sql """drop catalog if exists ${catalog_name};""" sql """