[bugfix](iceberg)Fix the datafile path error issue for 2.1 (#36066)

bp: #35957
This commit is contained in:
wuwenchi
2024-06-08 21:51:46 +08:00
committed by GitHub
parent 075481faf1
commit 9e972cb0b9
9 changed files with 98 additions and 15 deletions

View File

@ -180,7 +180,8 @@ Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) {
}
if (position_delete_files.size() > 0) {
RETURN_IF_ERROR(_position_delete_base(position_delete_files));
RETURN_IF_ERROR(
_position_delete_base(table_desc.original_file_path, position_delete_files));
}
if (equality_delete_files.size() > 0) {
RETURN_IF_ERROR(_equality_delete_base(equality_delete_files));
@ -293,17 +294,7 @@ Status IcebergTableReader::_shrink_block_if_need(Block* block) {
}
Status IcebergTableReader::_position_delete_base(
const std::vector<TIcebergDeleteFileDesc>& delete_files) {
std::string data_file_path = _range.path;
// the path in _range is remove the namenode prefix,
// and the file_path in delete file is full path, so we should add it back.
if (_params.__isset.hdfs_params && _params.hdfs_params.__isset.fs_name) {
std::string fs_name = _params.hdfs_params.fs_name;
if (!starts_with(data_file_path, fs_name)) {
data_file_path = fs_name + data_file_path;
}
}
const std::string data_file_path, const std::vector<TIcebergDeleteFileDesc>& delete_files) {
std::vector<DeleteRows*> delete_rows_array;
int64_t num_delete_rows = 0;
std::vector<DeleteFile*> erase_data;

View File

@ -123,7 +123,8 @@ protected:
void _gen_new_colname_to_value_range();
static std::string _delet_file_cache_key(const std::string& path) { return "delete_" + path; }
Status _position_delete_base(const std::vector<TIcebergDeleteFileDesc>& delete_files);
Status _position_delete_base(const std::string data_file_path,
const std::vector<TIcebergDeleteFileDesc>& delete_files);
Status _equality_delete_base(const std::vector<TIcebergDeleteFileDesc>& delete_files);
virtual std::unique_ptr<GenericReader> _create_equality_reader(
const TFileRangeDesc& delete_desc) = 0;

View File

@ -30,10 +30,16 @@ services:
- ./data/output/spark-warehouse:/home/iceberg/warehouse
- ./data/output/spark-notebooks:/home/iceberg/notebooks/notebooks
- ./data:/mnt/data
- ./spark-init.sql:/mnt/spark-init.sql
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
entrypoint: >
/bin/sh -c "
spark-sql -f /mnt/spark-init.sql 2>&1;
tail -f /dev/null
"
networks:
- doris--iceberg

View File

@ -0,0 +1,26 @@
create database if not exists demo.test_db;
drop table if exists demo.test_db.location_s3a_table;
create table demo.test_db.location_s3a_table (
id int,
val string
) using iceberg
location 's3a://warehouse/wh/test_db/location_s3a_table'
tblproperties (
'write.delete.mode'='merge-on-read',
'write.update.mode'='merge-on-read'
);
insert into demo.test_db.location_s3a_table values (1,'a');
update demo.test_db.location_s3a_table set val='b' where id=1;
drop table if exists demo.test_db.location_s3_table;
create table demo.test_db.location_s3_table (
id int,
val string
) using iceberg
location 's3://warehouse/wh/test_db/location_s3_table'
tblproperties (
'write.delete.mode'='merge-on-read',
'write.update.mode'='merge-on-read'
);
insert into demo.test_db.location_s3_table values (1,'a');
update demo.test_db.location_s3_table set val='b' where id=1;

View File

@ -143,6 +143,7 @@ public class IcebergScanNode extends FileQueryScanNode {
TIcebergFileDesc fileDesc = new TIcebergFileDesc();
int formatVersion = icebergSplit.getFormatVersion();
fileDesc.setFormatVersion(formatVersion);
fileDesc.setOriginalFilePath(icebergSplit.getOriginalPath());
if (formatVersion < MIN_DELETE_FILE_SUPPORT_VERSION) {
fileDesc.setContent(FileContent.DATA.id());
} else {
@ -253,7 +254,8 @@ public class IcebergScanNode extends FileQueryScanNode {
new String[0],
formatVersion,
source.getCatalog().getProperties(),
partitionValues);
partitionValues,
splitTask.file().path().toString());
if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
split.setDeleteFileFilters(getDeleteFileFilters(splitTask));
}

View File

@ -28,13 +28,16 @@ import java.util.Map;
@Data
public class IcebergSplit extends FileSplit {
private final String originalPath;
// File path will be changed if the file is modified, so there's no need to get modification time.
public IcebergSplit(Path file, long start, long length, long fileLength, String[] hosts,
Integer formatVersion, Map<String, String> config,
List<String> partitionList) {
List<String> partitionList, String originalPath) {
super(file, start, length, fileLength, hosts, partitionList);
this.formatVersion = formatVersion;
this.config = config;
this.originalPath = originalPath;
}
private Integer formatVersion;

View File

@ -305,6 +305,7 @@ struct TIcebergFileDesc {
4: optional Types.TTupleId delete_table_tuple_id;
// Deprecated
5: optional Exprs.TExpr file_select_conjunct;
6: optional string original_file_path;
}
struct TPaimonDeletionFileDesc {

View File

@ -0,0 +1,7 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !qt1 --
1 b
-- !qt2 --
1 b

View File

@ -0,0 +1,46 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_iceberg_read_with_delete", "p0,external,doris,external_docker,external_docker_doris") {
String enabled = context.config.otherConfigs.get("enableIcebergTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
try {
String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
String catalog_name = "test_iceberg_read_with_delete"
sql """drop catalog if exists ${catalog_name}"""
sql """CREATE CATALOG ${catalog_name} PROPERTIES (
'type'='iceberg',
'iceberg.catalog.type'='rest',
'uri' = 'http://${externalEnvIp}:${rest_port}',
"s3.access_key" = "admin",
"s3.secret_key" = "password",
"s3.endpoint" = "http://${externalEnvIp}:${minio_port}",
"s3.region" = "us-east-1"
);"""
qt_qt1 """ select * from ${catalog_name}.test_db.location_s3_table order by id """
qt_qt2 """ select * from ${catalog_name}.test_db.location_s3a_table order by id """
sql """drop catalog if exists ${catalog_name}"""
} finally {
}
}
}