From 9e972cb0b97bc4e0b84028a47c8ebd0aedaa0354 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Sat, 8 Jun 2024 21:51:46 +0800 Subject: [PATCH] [bugfix](iceberg)Fix the datafile path error issue for 2.1 (#36066) bp: #35957 --- .../vec/exec/format/table/iceberg_reader.cpp | 15 ++---- be/src/vec/exec/format/table/iceberg_reader.h | 3 +- .../docker-compose/iceberg/iceberg.yaml.tpl | 6 +++ .../docker-compose/iceberg/spark-init.sql | 26 +++++++++++ .../iceberg/source/IcebergScanNode.java | 4 +- .../iceberg/source/IcebergSplit.java | 5 +- gensrc/thrift/PlanNodes.thrift | 1 + .../test_iceberg_read_with_posdelete.out | 7 +++ .../test_iceberg_read_with_posdelete.groovy | 46 +++++++++++++++++++ 9 files changed, 98 insertions(+), 15 deletions(-) create mode 100644 docker/thirdparties/docker-compose/iceberg/spark-init.sql create mode 100644 regression-test/data/external_table_p0/iceberg/test_iceberg_read_with_posdelete.out create mode 100644 regression-test/suites/external_table_p0/iceberg/test_iceberg_read_with_posdelete.groovy diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index 730f7e44ae..d321fc016f 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -180,7 +180,8 @@ Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) { } if (position_delete_files.size() > 0) { - RETURN_IF_ERROR(_position_delete_base(position_delete_files)); + RETURN_IF_ERROR( + _position_delete_base(table_desc.original_file_path, position_delete_files)); } if (equality_delete_files.size() > 0) { RETURN_IF_ERROR(_equality_delete_base(equality_delete_files)); @@ -293,17 +294,7 @@ Status IcebergTableReader::_shrink_block_if_need(Block* block) { } Status IcebergTableReader::_position_delete_base( - const std::vector& delete_files) { - std::string data_file_path = _range.path; - // the path in _range is remove the namenode prefix, - // and the file_path in delete file is full path, so we should add it back. - if (_params.__isset.hdfs_params && _params.hdfs_params.__isset.fs_name) { - std::string fs_name = _params.hdfs_params.fs_name; - if (!starts_with(data_file_path, fs_name)) { - data_file_path = fs_name + data_file_path; - } - } - + const std::string data_file_path, const std::vector& delete_files) { std::vector delete_rows_array; int64_t num_delete_rows = 0; std::vector erase_data; diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h index c0992095c8..07fc1baf90 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.h +++ b/be/src/vec/exec/format/table/iceberg_reader.h @@ -123,7 +123,8 @@ protected: void _gen_new_colname_to_value_range(); static std::string _delet_file_cache_key(const std::string& path) { return "delete_" + path; } - Status _position_delete_base(const std::vector& delete_files); + Status _position_delete_base(const std::string data_file_path, + const std::vector& delete_files); Status _equality_delete_base(const std::vector& delete_files); virtual std::unique_ptr _create_equality_reader( const TFileRangeDesc& delete_desc) = 0; diff --git a/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl b/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl index bc217c1dd6..8af2e745c0 100644 --- a/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl +++ b/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl @@ -30,10 +30,16 @@ services: - ./data/output/spark-warehouse:/home/iceberg/warehouse - ./data/output/spark-notebooks:/home/iceberg/notebooks/notebooks - ./data:/mnt/data + - ./spark-init.sql:/mnt/spark-init.sql environment: - AWS_ACCESS_KEY_ID=admin - AWS_SECRET_ACCESS_KEY=password - AWS_REGION=us-east-1 + entrypoint: > + /bin/sh -c " + spark-sql -f /mnt/spark-init.sql 2>&1; + tail -f /dev/null + " networks: - doris--iceberg diff --git a/docker/thirdparties/docker-compose/iceberg/spark-init.sql b/docker/thirdparties/docker-compose/iceberg/spark-init.sql new file mode 100644 index 0000000000..d7479a109e --- /dev/null +++ b/docker/thirdparties/docker-compose/iceberg/spark-init.sql @@ -0,0 +1,26 @@ +create database if not exists demo.test_db; +drop table if exists demo.test_db.location_s3a_table; +create table demo.test_db.location_s3a_table ( + id int, + val string +) using iceberg +location 's3a://warehouse/wh/test_db/location_s3a_table' +tblproperties ( + 'write.delete.mode'='merge-on-read', + 'write.update.mode'='merge-on-read' +); +insert into demo.test_db.location_s3a_table values (1,'a'); +update demo.test_db.location_s3a_table set val='b' where id=1; + +drop table if exists demo.test_db.location_s3_table; +create table demo.test_db.location_s3_table ( + id int, + val string +) using iceberg +location 's3://warehouse/wh/test_db/location_s3_table' +tblproperties ( + 'write.delete.mode'='merge-on-read', + 'write.update.mode'='merge-on-read' +); +insert into demo.test_db.location_s3_table values (1,'a'); +update demo.test_db.location_s3_table set val='b' where id=1; \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index 486e1242d8..25d28b092f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -143,6 +143,7 @@ public class IcebergScanNode extends FileQueryScanNode { TIcebergFileDesc fileDesc = new TIcebergFileDesc(); int formatVersion = icebergSplit.getFormatVersion(); fileDesc.setFormatVersion(formatVersion); + fileDesc.setOriginalFilePath(icebergSplit.getOriginalPath()); if (formatVersion < MIN_DELETE_FILE_SUPPORT_VERSION) { fileDesc.setContent(FileContent.DATA.id()); } else { @@ -253,7 +254,8 @@ public class IcebergScanNode extends FileQueryScanNode { new String[0], formatVersion, source.getCatalog().getProperties(), - partitionValues); + partitionValues, + splitTask.file().path().toString()); if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) { split.setDeleteFileFilters(getDeleteFileFilters(splitTask)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java index b4ea232c00..d867245dbe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java @@ -28,13 +28,16 @@ import java.util.Map; @Data public class IcebergSplit extends FileSplit { + private final String originalPath; + // File path will be changed if the file is modified, so there's no need to get modification time. public IcebergSplit(Path file, long start, long length, long fileLength, String[] hosts, Integer formatVersion, Map config, - List partitionList) { + List partitionList, String originalPath) { super(file, start, length, fileLength, hosts, partitionList); this.formatVersion = formatVersion; this.config = config; + this.originalPath = originalPath; } private Integer formatVersion; diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 5f34a261c5..3cb04bda33 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -305,6 +305,7 @@ struct TIcebergFileDesc { 4: optional Types.TTupleId delete_table_tuple_id; // Deprecated 5: optional Exprs.TExpr file_select_conjunct; + 6: optional string original_file_path; } struct TPaimonDeletionFileDesc { diff --git a/regression-test/data/external_table_p0/iceberg/test_iceberg_read_with_posdelete.out b/regression-test/data/external_table_p0/iceberg/test_iceberg_read_with_posdelete.out new file mode 100644 index 0000000000..6c0db029f1 --- /dev/null +++ b/regression-test/data/external_table_p0/iceberg/test_iceberg_read_with_posdelete.out @@ -0,0 +1,7 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !qt1 -- +1 b + +-- !qt2 -- +1 b + diff --git a/regression-test/suites/external_table_p0/iceberg/test_iceberg_read_with_posdelete.groovy b/regression-test/suites/external_table_p0/iceberg/test_iceberg_read_with_posdelete.groovy new file mode 100644 index 0000000000..139a409121 --- /dev/null +++ b/regression-test/suites/external_table_p0/iceberg/test_iceberg_read_with_posdelete.groovy @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_iceberg_read_with_delete", "p0,external,doris,external_docker,external_docker_doris") { + String enabled = context.config.otherConfigs.get("enableIcebergTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + try { + String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port") + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String catalog_name = "test_iceberg_read_with_delete" + + sql """drop catalog if exists ${catalog_name}""" + sql """CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='rest', + 'uri' = 'http://${externalEnvIp}:${rest_port}', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + + qt_qt1 """ select * from ${catalog_name}.test_db.location_s3_table order by id """ + qt_qt2 """ select * from ${catalog_name}.test_db.location_s3a_table order by id """ + + sql """drop catalog if exists ${catalog_name}""" + } finally { + } + } +} +