From c40246efa966c2fbf5fc9b9a45bd71316b160236 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Fri, 23 Aug 2024 17:19:48 +0800 Subject: [PATCH] [bugfix](iceberg)Fixed random core with writing iceberg partitioned table for 2.1 (#39808)(#39569) (#39832) ## Proposed changes bp: #39808 #39569 --- .../writer/iceberg/viceberg_table_writer.cpp | 50 +++++------ .../writer/iceberg/viceberg_table_writer.h | 8 +- ...iceberg_overwrite_with_wrong_partition.out | 23 +++++ ...berg_overwrite_with_wrong_partition.groovy | 84 +++++++++++++++++++ 4 files changed, 132 insertions(+), 33 deletions(-) create mode 100644 regression-test/data/external_table_p0/iceberg/write/test_iceberg_overwrite_with_wrong_partition.out create mode 100644 regression-test/suites/external_table_p0/iceberg/write/test_iceberg_overwrite_with_wrong_partition.groovy diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp index 4b705b0e51..ddd4101c71 100644 --- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp +++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp @@ -125,7 +125,7 @@ Status VIcebergTableWriter::write(vectorized::Block& block) { auto writer_iter = _partitions_to_writers.find(""); if (writer_iter == _partitions_to_writers.end()) { try { - writer = _create_partition_writer(output_block, -1); + writer = _create_partition_writer(nullptr, -1); } catch (doris::Exception& e) { return e.to_status(); } @@ -141,7 +141,7 @@ Status VIcebergTableWriter::write(vectorized::Block& block) { } _partitions_to_writers.erase(writer_iter); try { - writer = _create_partition_writer(output_block, -1, &file_name, + writer = _create_partition_writer(nullptr, -1, &file_name, file_name_index + 1); } catch (doris::Exception& e) { return e.to_status(); @@ -160,21 +160,21 @@ Status VIcebergTableWriter::write(vectorized::Block& block) { } { + Block transformed_block; SCOPED_RAW_TIMER(&_partition_writers_dispatch_ns); - _transformed_block.reserve(_iceberg_partition_columns.size()); + transformed_block.reserve(_iceberg_partition_columns.size()); for (auto& iceberg_partition_columns : _iceberg_partition_columns) { - _transformed_block.insert(iceberg_partition_columns.partition_column_transform().apply( + transformed_block.insert(iceberg_partition_columns.partition_column_transform().apply( output_block, iceberg_partition_columns.source_idx())); } for (int i = 0; i < output_block.rows(); ++i) { std::optional partition_data; try { - partition_data = _get_partition_data(_transformed_block, i); + partition_data = _get_partition_data(&transformed_block, i); } catch (doris::Exception& e) { return e.to_status(); } std::string partition_name; - DCHECK(partition_data.has_value()); try { partition_name = _partition_to_path(partition_data.value()); } catch (doris::Exception& e) { @@ -185,7 +185,7 @@ Status VIcebergTableWriter::write(vectorized::Block& block) { const std::string* file_name, int file_name_index, std::shared_ptr& writer_ptr) -> Status { try { - auto writer = _create_partition_writer(output_block, position, file_name, + auto writer = _create_partition_writer(&transformed_block, position, file_name, file_name_index); RETURN_IF_ERROR(writer->open(_state, _profile)); IColumn::Filter filter(output_block.rows(), 0); @@ -341,30 +341,27 @@ std::vector VIcebergTableWriter::_partition_values( } std::shared_ptr VIcebergTableWriter::_create_partition_writer( - vectorized::Block& block, int position, const std::string* file_name, int file_name_index) { + vectorized::Block* transformed_block, int position, const std::string* file_name, + int file_name_index) { auto& iceberg_table_sink = _t_sink.iceberg_table_sink; - std::optional partition_data; - partition_data = _get_partition_data(_transformed_block, position); - std::string partition_path; std::vector partition_values; - if (partition_data.has_value()) { - partition_path = _partition_to_path(partition_data.value()); - partition_values = _partition_values(partition_data.value()); - } const std::string& output_path = iceberg_table_sink.output_path; - std::string write_path; std::string original_write_path; std::string target_path; - if (partition_path.empty()) { - original_write_path = iceberg_table_sink.original_output_path; - target_path = output_path; - write_path = output_path; - } else { + + if (transformed_block != nullptr) { + PartitionData partition_data = _get_partition_data(transformed_block, position); + std::string partition_path = _partition_to_path(partition_data); + partition_values = _partition_values(partition_data); original_write_path = fmt::format("{}/{}", iceberg_table_sink.original_output_path, partition_path); target_path = fmt::format("{}/{}", output_path, partition_path); write_path = fmt::format("{}/{}", output_path, partition_path); + } else { + original_write_path = iceberg_table_sink.original_output_path; + target_path = output_path; + write_path = output_path; } VIcebergPartitionWriter::WriteInfo write_info = { @@ -387,18 +384,15 @@ std::shared_ptr VIcebergTableWriter::_create_partition_ iceberg_table_sink.hadoop_config); } -std::optional VIcebergTableWriter::_get_partition_data( - vectorized::Block& transformed_block, int position) { - if (_iceberg_partition_columns.empty()) { - return std::nullopt; - } - +PartitionData VIcebergTableWriter::_get_partition_data(vectorized::Block* transformed_block, + int position) { + DCHECK(!_iceberg_partition_columns.empty()); std::vector values; values.reserve(_iceberg_partition_columns.size()); int column_idx = 0; for (auto& iceberg_partition_column : _iceberg_partition_columns) { const vectorized::ColumnWithTypeAndName& partition_column = - transformed_block.get_by_position(column_idx); + transformed_block->get_by_position(column_idx); const TypeDescriptor& result_type = iceberg_partition_column.partition_column_transform().get_result_type(); auto value = _get_iceberg_partition_value(result_type, partition_column, position); diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h index 35e71d1960..b9435b50cb 100644 --- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h +++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h @@ -97,10 +97,10 @@ private: std::vector _partition_values(const doris::iceberg::StructLike& data); std::shared_ptr _create_partition_writer( - vectorized::Block& block, int position, const std::string* file_name = nullptr, - int file_name_index = 0); + vectorized::Block* transformed_block, int position, + const std::string* file_name = nullptr, int file_name_index = 0); - std::optional _get_partition_data(vectorized::Block& block, int position); + PartitionData _get_partition_data(vectorized::Block* block, int position); std::any _get_iceberg_partition_value(const TypeDescriptor& type_desc, const ColumnWithTypeAndName& partition_column, @@ -127,8 +127,6 @@ private: VExprContextSPtrs _write_output_vexpr_ctxs; - Block _transformed_block; - size_t _row_count = 0; // profile counters diff --git a/regression-test/data/external_table_p0/iceberg/write/test_iceberg_overwrite_with_wrong_partition.out b/regression-test/data/external_table_p0/iceberg/write/test_iceberg_overwrite_with_wrong_partition.out new file mode 100644 index 0000000000..b17bf8063c --- /dev/null +++ b/regression-test/data/external_table_p0/iceberg/write/test_iceberg_overwrite_with_wrong_partition.out @@ -0,0 +1,23 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !qt01 -- +2450841 2450841 +2450841 2450841 +2450842 2450842 +2450842 2450842 +2450843 2450843 +2450843 2450843 +2450844 2450844 +2450844 2450844 +2450845 2450845 +2450845 2450845 +2450846 2450846 +2450846 2450846 +2450847 2450847 +2450847 2450847 +2450848 2450848 +2450848 2450848 +2450849 2450849 +2450849 2450849 +2450850 2450850 +2450850 2450850 + diff --git a/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_overwrite_with_wrong_partition.groovy b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_overwrite_with_wrong_partition.groovy new file mode 100644 index 0000000000..760611ab3b --- /dev/null +++ b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_overwrite_with_wrong_partition.groovy @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_iceberg_overwrite_with_wrong_partition", "p0,external,doris,external_docker,external_docker_doris") { + String enabled = context.config.otherConfigs.get("enableIcebergTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable iceberg test.") + return + } + + String tb1 = "tb_dst"; + String tb2 = "tb_src"; + + try { + String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port") + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String catalog_name = "test_iceberg_overwrite_with_wrong_partition" + + sql """drop catalog if exists ${catalog_name}""" + sql """CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='rest', + 'uri' = 'http://${externalEnvIp}:${rest_port}', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + + sql """ switch ${catalog_name} """ + sql """ use multi_catalog """ + + sql """ drop table if exists ${tb1} """ + sql """ drop table if exists ${tb2} """ + + sql """ + create table ${tb1} ( + id bigint, + id2 bigint + ) PARTITION BY LIST(id2)() ; + """ + sql """ + create table ${tb2} ( + id bigint, + id2 bigint + ); + """ + + sql """ insert into ${tb2} values (2450841,2450841), (2450842,2450842); """ + sql """ insert into ${tb2} values (2450843,2450843), (2450844,2450844); """ + sql """ insert into ${tb2} values (2450845,2450845), (2450846,2450846); """ + sql """ insert into ${tb2} values (2450847,2450847), (2450848,2450848); """ + sql """ insert into ${tb2} values (2450849,2450849), (2450850,2450850); """ + sql """ insert into ${tb2} values (2450841,2450841), (2450842,2450842); """ + sql """ insert into ${tb2} values (2450843,2450843), (2450844,2450844); """ + sql """ insert into ${tb2} values (2450845,2450845), (2450846,2450846); """ + sql """ insert into ${tb2} values (2450847,2450847), (2450848,2450848); """ + sql """ insert into ${tb2} values (2450849,2450849), (2450850,2450850); """ + + sql """ insert overwrite table ${tb1} (id, id2) select id, id2 from ${tb2} where id2 >= 2450841 AND id2 < 2450851; """ + + order_qt_qt01 """ select * from ${tb1} """ + + } finally { + sql """ drop table if exists ${tb1} """ + sql """ drop table if exists ${tb2} """ + } +} +